sendtext.py
                        
                             · 6.2 KiB · Python
                        
                    
                    
                      
                        Eredeti
                      
                      
                        
                          
                        
                    
                    
                
                
            #!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Onyx and Iris
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import logging
import socket
import sys
import time
from dataclasses import dataclass
from pathlib import Path
import tomllib
logger = logging.getLogger(__name__)
@dataclass
class RTHeader:
    name: str
    bps_index: int
    channel: int
    VBAN_PROTOCOL_TXT = 0x40
    framecounter: bytes = (0).to_bytes(4, "little")
    def __sr(self):
        return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
    def __nbc(self):
        return (self.channel).to_bytes(1, "little")
    def build(self) -> bytes:
        header = "VBAN".encode("utf-8")
        header += self.__sr()
        header += (0).to_bytes(1, "little")
        header += self.__nbc()
        header += (0x10).to_bytes(1, "little")
        header += self.name.encode() + bytes(16 - len(self.name))
        header += RTHeader.framecounter
        return header
class RequestPacket:
    def __init__(self, header: RTHeader):
        self.header = header
    def encode(self, text: str) -> bytes:
        return self.header.build() + text.encode("utf-8")
class VbanSendText:
    def __init__(self, **kwargs):
        defaultkwargs = {
            "host": "localhost",
            "port": 6980,
            "streamname": "Command1",
            "bps_index": 0,
            "channel": 0,
            "delay": 0.02,
        }
        defaultkwargs.update(kwargs)
        self.__dict__.update(defaultkwargs)
        self._request = RequestPacket(
            RTHeader(self.streamname, self.bps_index, self.channel)
        )
    def __enter__(self):
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        return self
    def __exit__(self, exc_type, exc_value, traceback):
        self._sock.close()
    def sendtext(self, text: str):
        """
        Sends a text message to the specified host and port.
        Args:
            text (str): The text message to be sent.
        """
        self._sock.sendto(self._request.encode(text), (self.host, self.port))
        self._request.header.framecounter = (
            int.from_bytes(self._request.header.framecounter, "little") + 1
        ).to_bytes(4, "little")
        logger.debug(
            f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}"
        )
        time.sleep(self.delay)
def conn_from_toml(filepath: str = "config.toml") -> dict:
    """
    Reads a TOML configuration file and returns its contents as a dictionary.
    Args:
        filepath (str): The path to the TOML file. Defaults to "config.toml".
    Returns:
        dict: The contents of the TOML file as a dictionary.
    Example:
        # config.toml
        host = "localhost"
        port = 6980
        streamname = "Command1"
    """
    pn = Path(filepath)
    if not pn.exists():
        logger.info(
            "no config.toml found, using defaults: localhost:6980 streamname: Command1"
        )
        return {}
    try:
        with open(filepath, "rb") as f:
            return tomllib.load(f)
    except tomllib.TOMLDecodeError as e:
        raise ValueError(f"Error decoding TOML file: {e}") from e
def parse_args() -> argparse.Namespace:
    """
    Parse command-line arguments.
    Returns:
        argparse.Namespace: Parsed command-line arguments.
    Command-line arguments:
        --log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO".
        --config (str): Path to config file. Default is "config.toml".
        -i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin.
        text (str, optional): Text to send.
    """
    parser = argparse.ArgumentParser(description="Send text to VBAN")
    parser.add_argument(
        "--log-level",
        type=str,
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        default="INFO",
        help="Set the logging level",
    )
    parser.add_argument(
        "--config", type=str, default="config.toml", help="Path to config file"
    )
    parser.add_argument(
        "-i",
        "--input-file",
        type=argparse.FileType("r"),
        default=sys.stdin,
    )
    parser.add_argument("text", nargs="?", type=str, help="Text to send")
    return parser.parse_args()
def main(config: dict):
    """
    Main function to send text using VbanSendText.
    Args:
        config (dict): Configuration dictionary for VbanSendText.
    Behavior:
        - If 'args.text' is provided, sends the text and returns.
        - Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line.
        - Stops reading and sending if a line equals "Q".
        - Logs each line being sent at the debug level.
    """
    with VbanSendText(**config) as vban:
        if args.text:
            vban.sendtext(args.text)
            return
        for line in args.input_file:
            line = line.strip()
            if line.upper() == "Q":
                break
            logger.debug(f"Sending {line}")
            vban.sendtext(line)
if __name__ == "__main__":
    args = parse_args()
    logging.basicConfig(level=args.log_level)
    main(conn_from_toml(args.config))
                | 1 | #!/usr/bin/env python3 | 
| 2 | |
| 3 | # MIT License | 
| 4 | # | 
| 5 | # Copyright (c) 2025 Onyx and Iris | 
| 6 | # | 
| 7 | # Permission is hereby granted, free of charge, to any person obtaining a copy | 
| 8 | # of this software and associated documentation files (the "Software"), to deal | 
| 9 | # in the Software without restriction, including without limitation the rights | 
| 10 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | 
| 11 | # copies of the Software, and to permit persons to whom the Software is | 
| 12 | # furnished to do so, subject to the following conditions: | 
| 13 | # | 
| 14 | # The above copyright notice and this permission notice shall be included in all | 
| 15 | # copies or substantial portions of the Software. | 
| 16 | # | 
| 17 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | 
| 18 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | 
| 19 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | 
| 20 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | 
| 21 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 
| 22 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | 
| 23 | # SOFTWARE. | 
| 24 | |
| 25 | import argparse | 
| 26 | import logging | 
| 27 | import socket | 
| 28 | import sys | 
| 29 | import time | 
| 30 | from dataclasses import dataclass | 
| 31 | from pathlib import Path | 
| 32 | |
| 33 | import tomllib | 
| 34 | |
| 35 | logger = logging.getLogger(__name__) | 
| 36 | |
| 37 | |
| 38 | @dataclass | 
| 39 | class RTHeader: | 
| 40 | name: str | 
| 41 | bps_index: int | 
| 42 | channel: int | 
| 43 | VBAN_PROTOCOL_TXT = 0x40 | 
| 44 | framecounter: bytes = (0).to_bytes(4, "little") | 
| 45 | |
| 46 | def __sr(self): | 
| 47 | return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little") | 
| 48 | |
| 49 | def __nbc(self): | 
| 50 | return (self.channel).to_bytes(1, "little") | 
| 51 | |
| 52 | def build(self) -> bytes: | 
| 53 | header = "VBAN".encode("utf-8") | 
| 54 | header += self.__sr() | 
| 55 | header += (0).to_bytes(1, "little") | 
| 56 | header += self.__nbc() | 
| 57 | header += (0x10).to_bytes(1, "little") | 
| 58 | header += self.name.encode() + bytes(16 - len(self.name)) | 
| 59 | header += RTHeader.framecounter | 
| 60 | return header | 
| 61 | |
| 62 | |
| 63 | class RequestPacket: | 
| 64 | def __init__(self, header: RTHeader): | 
| 65 | self.header = header | 
| 66 | |
| 67 | def encode(self, text: str) -> bytes: | 
| 68 | return self.header.build() + text.encode("utf-8") | 
| 69 | |
| 70 | |
| 71 | class VbanSendText: | 
| 72 | def __init__(self, **kwargs): | 
| 73 | defaultkwargs = { | 
| 74 | "host": "localhost", | 
| 75 | "port": 6980, | 
| 76 | "streamname": "Command1", | 
| 77 | "bps_index": 0, | 
| 78 | "channel": 0, | 
| 79 | "delay": 0.02, | 
| 80 | } | 
| 81 | defaultkwargs.update(kwargs) | 
| 82 | self.__dict__.update(defaultkwargs) | 
| 83 | self._request = RequestPacket( | 
| 84 | RTHeader(self.streamname, self.bps_index, self.channel) | 
| 85 | ) | 
| 86 | |
| 87 | def __enter__(self): | 
| 88 | self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | 
| 89 | return self | 
| 90 | |
| 91 | def __exit__(self, exc_type, exc_value, traceback): | 
| 92 | self._sock.close() | 
| 93 | |
| 94 | def sendtext(self, text: str): | 
| 95 | """ | 
| 96 | Sends a text message to the specified host and port. | 
| 97 | Args: | 
| 98 | text (str): The text message to be sent. | 
| 99 | """ | 
| 100 | |
| 101 | self._sock.sendto(self._request.encode(text), (self.host, self.port)) | 
| 102 | self._request.header.framecounter = ( | 
| 103 | int.from_bytes(self._request.header.framecounter, "little") + 1 | 
| 104 | ).to_bytes(4, "little") | 
| 105 | logger.debug( | 
| 106 | f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}" | 
| 107 | ) | 
| 108 | time.sleep(self.delay) | 
| 109 | |
| 110 | |
| 111 | def conn_from_toml(filepath: str = "config.toml") -> dict: | 
| 112 | """ | 
| 113 | Reads a TOML configuration file and returns its contents as a dictionary. | 
| 114 | Args: | 
| 115 | filepath (str): The path to the TOML file. Defaults to "config.toml". | 
| 116 | Returns: | 
| 117 | dict: The contents of the TOML file as a dictionary. | 
| 118 | Example: | 
| 119 | # config.toml | 
| 120 | host = "localhost" | 
| 121 | port = 6980 | 
| 122 | streamname = "Command1" | 
| 123 | """ | 
| 124 | |
| 125 | pn = Path(filepath) | 
| 126 | if not pn.exists(): | 
| 127 | logger.info( | 
| 128 | "no config.toml found, using defaults: localhost:6980 streamname: Command1" | 
| 129 | ) | 
| 130 | return {} | 
| 131 | |
| 132 | try: | 
| 133 | with open(filepath, "rb") as f: | 
| 134 | return tomllib.load(f) | 
| 135 | except tomllib.TOMLDecodeError as e: | 
| 136 | raise ValueError(f"Error decoding TOML file: {e}") from e | 
| 137 | |
| 138 | |
| 139 | def parse_args() -> argparse.Namespace: | 
| 140 | """ | 
| 141 | Parse command-line arguments. | 
| 142 | Returns: | 
| 143 | argparse.Namespace: Parsed command-line arguments. | 
| 144 | Command-line arguments: | 
| 145 | --log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO". | 
| 146 | --config (str): Path to config file. Default is "config.toml". | 
| 147 | -i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin. | 
| 148 | text (str, optional): Text to send. | 
| 149 | """ | 
| 150 | |
| 151 | parser = argparse.ArgumentParser(description="Send text to VBAN") | 
| 152 | parser.add_argument( | 
| 153 | "--log-level", | 
| 154 | type=str, | 
| 155 | choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | 
| 156 | default="INFO", | 
| 157 | help="Set the logging level", | 
| 158 | ) | 
| 159 | parser.add_argument( | 
| 160 | "--config", type=str, default="config.toml", help="Path to config file" | 
| 161 | ) | 
| 162 | parser.add_argument( | 
| 163 | "-i", | 
| 164 | "--input-file", | 
| 165 | type=argparse.FileType("r"), | 
| 166 | default=sys.stdin, | 
| 167 | ) | 
| 168 | parser.add_argument("text", nargs="?", type=str, help="Text to send") | 
| 169 | return parser.parse_args() | 
| 170 | |
| 171 | |
| 172 | def main(config: dict): | 
| 173 | """ | 
| 174 | Main function to send text using VbanSendText. | 
| 175 | Args: | 
| 176 | config (dict): Configuration dictionary for VbanSendText. | 
| 177 | Behavior: | 
| 178 | - If 'args.text' is provided, sends the text and returns. | 
| 179 | - Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line. | 
| 180 | - Stops reading and sending if a line equals "Q". | 
| 181 | - Logs each line being sent at the debug level. | 
| 182 | """ | 
| 183 | |
| 184 | with VbanSendText(**config) as vban: | 
| 185 | if args.text: | 
| 186 | vban.sendtext(args.text) | 
| 187 | return | 
| 188 | |
| 189 | for line in args.input_file: | 
| 190 | line = line.strip() | 
| 191 | if line.upper() == "Q": | 
| 192 | break | 
| 193 | |
| 194 | logger.debug(f"Sending {line}") | 
| 195 | vban.sendtext(line) | 
| 196 | |
| 197 | |
| 198 | if __name__ == "__main__": | 
| 199 | args = parse_args() | 
| 200 | |
| 201 | logging.basicConfig(level=args.log_level) | 
| 202 | |
| 203 | main(conn_from_toml(args.config)) | 
| 204 |