#!/usr/bin/env python3 # MIT License # # Copyright (c) 2025 Onyx and Iris # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import argparse import logging import socket import sys import time from dataclasses import dataclass from pathlib import Path import tomllib logger = logging.getLogger(__name__) @dataclass class RTHeader: name: str bps_index: int channel: int VBAN_PROTOCOL_TXT = 0x40 framecounter: bytes = (0).to_bytes(4, "little") def __sr(self): return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little") def __nbc(self): return (self.channel).to_bytes(1, "little") def build(self) -> bytes: header = "VBAN".encode("utf-8") header += self.__sr() header += (0).to_bytes(1, "little") header += self.__nbc() header += (0x10).to_bytes(1, "little") header += self.name.encode() + bytes(16 - len(self.name)) header += RTHeader.framecounter return header class RequestPacket: def __init__(self, header: RTHeader): self.header = header def encode(self, text: str) -> bytes: return self.header.build() + text.encode("utf-8") class VbanSendText: def __init__(self, **kwargs): defaultkwargs = { "host": "localhost", "port": 6980, "streamname": "Command1", "bps_index": 0, "channel": 0, "delay": 0.02, } defaultkwargs.update(kwargs) self.__dict__.update(defaultkwargs) self._request = RequestPacket( RTHeader(self.streamname, self.bps_index, self.channel) ) def __enter__(self): self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return self def __exit__(self, exc_type, exc_value, traceback): self._sock.close() def sendtext(self, text: str): """ Sends a text message to the specified host and port. Args: text (str): The text message to be sent. """ self._sock.sendto(self._request.encode(text), (self.host, self.port)) self._request.header.framecounter = ( int.from_bytes(self._request.header.framecounter, "little") + 1 ).to_bytes(4, "little") logger.debug( f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}" ) time.sleep(self.delay) def conn_from_toml(filepath: str = "config.toml") -> dict: """ Reads a TOML configuration file and returns its contents as a dictionary. Args: filepath (str): The path to the TOML file. Defaults to "config.toml". Returns: dict: The contents of the TOML file as a dictionary. Example: # config.toml host = "localhost" port = 6980 streamname = "Command1" """ pn = Path(filepath) if not pn.exists(): logger.info( f"no {pn} found, using defaults: localhost:6980 streamname: Command1" ) return {} try: with open(filepath, "rb") as f: return tomllib.load(f) except tomllib.TOMLDecodeError as e: raise ValueError(f"Error decoding TOML file: {e}") from e def parse_args() -> argparse.Namespace: """ Parse command-line arguments. Returns: argparse.Namespace: Parsed command-line arguments. Command-line arguments: --log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO". --config (str): Path to config file. Default is "config.toml". -i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin. text (str, optional): Text to send. """ parser = argparse.ArgumentParser(description="Send text to VBAN") parser.add_argument( "--log-level", type=str, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="INFO", help="Set the logging level", ) parser.add_argument( "--config", type=str, default="config.toml", help="Path to config file" ) parser.add_argument( "-i", "--input-file", type=argparse.FileType("r"), default=sys.stdin, ) parser.add_argument("text", nargs="?", type=str, help="Text to send") return parser.parse_args() def main(config: dict): """ Main function to send text using VbanSendText. Args: config (dict): Configuration dictionary for VbanSendText. Behavior: - If 'args.text' is provided, sends the text and returns. - Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line. - Stops reading and sending if a line equals "Q". - Logs each line being sent at the debug level. """ with VbanSendText(**config) as vban: if args.text: vban.sendtext(args.text) return for line in args.input_file: line = line.strip() if line.upper() == "Q": break logger.debug(f"Sending {line}") vban.sendtext(line) if __name__ == "__main__": args = parse_args() logging.basicConfig(level=args.log_level) main(conn_from_toml(args.config))