sendtext.py
· 5.8 KiB · Python
Raw
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Onyx and Iris
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import logging
import socket
import sys
import time
from dataclasses import dataclass
import tomllib
logger = logging.getLogger(__name__)
@dataclass
class RTHeader:
name: str
bps_index: int
channel: int
VBAN_PROTOCOL_TXT = 0x40
framecounter: bytes = (0).to_bytes(4, "little")
def __sr(self):
return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
def __nbc(self):
return (self.channel).to_bytes(1, "little")
def build(self) -> bytes:
header = "VBAN".encode("utf-8")
header += self.__sr()
header += (0).to_bytes(1, "little")
header += self.__nbc()
header += (0x10).to_bytes(1, "little")
header += self.name.encode() + bytes(16 - len(self.name))
header += RTHeader.framecounter
return header
class RequestPacket:
def __init__(self, header: RTHeader):
self.header = header
def encode(self, text: str) -> bytes:
return self.header.build() + text.encode("utf-8")
class VbanSendText:
def __init__(self, **kwargs):
defaultkwargs = {
"host": "localhost",
"port": 6980,
"streamname": "Command1",
"bps_index": 0,
"channel": 0,
"delay": 0.02,
}
defaultkwargs.update(kwargs)
self.__dict__.update(defaultkwargs)
self._request = RequestPacket(
RTHeader(self.streamname, self.bps_index, self.channel)
)
def __enter__(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._sock.close()
def send(self, text: str):
"""
Sends a text message to the specified host and port.
Args:
text (str): The text message to be sent.
"""
self._sock.sendto(self._request.encode(text), (self.host, self.port))
self._request.header.framecounter = (
int.from_bytes(self._request.header.framecounter, "little") + 1
).to_bytes(4, "little")
logger.debug(
f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}"
)
time.sleep(self.delay)
def conn_from_toml(filepath: str = "config.toml") -> dict:
"""
Reads a TOML configuration file and returns its contents as a dictionary.
Args:
filepath (str): The path to the TOML file. Defaults to "config.toml".
Returns:
dict: The contents of the TOML file as a dictionary.
Example:
# config.toml
host = "localhost"
port = 6980
streamname = "Command1"
"""
with open(filepath, "rb") as f:
return tomllib.load(f)
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments.
Returns:
argparse.Namespace: Parsed command-line arguments.
Command-line arguments:
--log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO".
--config (str): Path to config file. Default is "config.toml".
-i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin.
text (str, optional): Text to send.
"""
parser = argparse.ArgumentParser(description="Send text to VBAN")
parser.add_argument(
"--log-level",
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
help="Set the logging level",
)
parser.add_argument(
"--config", type=str, default="config.toml", help="Path to config file"
)
parser.add_argument(
"-i",
"--input-file",
type=argparse.FileType("r"),
default=sys.stdin,
)
parser.add_argument("text", nargs="?", type=str, help="Text to send")
return parser.parse_args()
def main(config: dict):
"""
Main function to send text using VbanSendText.
Args:
config (dict): Configuration dictionary for VbanSendText.
Behavior:
- If 'args.text' is provided, sends the text and returns.
- Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line.
- Stops reading and sending if a line equals "Q".
- Logs each line being sent at the debug level.
"""
with VbanSendText(**config) as vban:
if args.text:
vban.send(args.text)
return
for line in args.input_file:
line = line.strip()
if line.upper() == "Q":
break
logger.debug(f"Sending {line}")
vban.send(line)
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(level=args.log_level)
main(conn_from_toml(args.config))
| 1 | #!/usr/bin/env python3 |
| 2 | |
| 3 | # MIT License |
| 4 | # |
| 5 | # Copyright (c) 2025 Onyx and Iris |
| 6 | # |
| 7 | # Permission is hereby granted, free of charge, to any person obtaining a copy |
| 8 | # of this software and associated documentation files (the "Software"), to deal |
| 9 | # in the Software without restriction, including without limitation the rights |
| 10 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 11 | # copies of the Software, and to permit persons to whom the Software is |
| 12 | # furnished to do so, subject to the following conditions: |
| 13 | # |
| 14 | # The above copyright notice and this permission notice shall be included in all |
| 15 | # copies or substantial portions of the Software. |
| 16 | # |
| 17 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 18 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 19 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 20 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 21 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 22 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| 23 | # SOFTWARE. |
| 24 | |
| 25 | import argparse |
| 26 | import logging |
| 27 | import socket |
| 28 | import sys |
| 29 | import time |
| 30 | from dataclasses import dataclass |
| 31 | |
| 32 | import tomllib |
| 33 | |
| 34 | logger = logging.getLogger(__name__) |
| 35 | |
| 36 | |
| 37 | @dataclass |
| 38 | class RTHeader: |
| 39 | name: str |
| 40 | bps_index: int |
| 41 | channel: int |
| 42 | VBAN_PROTOCOL_TXT = 0x40 |
| 43 | framecounter: bytes = (0).to_bytes(4, "little") |
| 44 | |
| 45 | def __sr(self): |
| 46 | return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little") |
| 47 | |
| 48 | def __nbc(self): |
| 49 | return (self.channel).to_bytes(1, "little") |
| 50 | |
| 51 | def build(self) -> bytes: |
| 52 | header = "VBAN".encode("utf-8") |
| 53 | header += self.__sr() |
| 54 | header += (0).to_bytes(1, "little") |
| 55 | header += self.__nbc() |
| 56 | header += (0x10).to_bytes(1, "little") |
| 57 | header += self.name.encode() + bytes(16 - len(self.name)) |
| 58 | header += RTHeader.framecounter |
| 59 | return header |
| 60 | |
| 61 | |
| 62 | class RequestPacket: |
| 63 | def __init__(self, header: RTHeader): |
| 64 | self.header = header |
| 65 | |
| 66 | def encode(self, text: str) -> bytes: |
| 67 | return self.header.build() + text.encode("utf-8") |
| 68 | |
| 69 | |
| 70 | class VbanSendText: |
| 71 | def __init__(self, **kwargs): |
| 72 | defaultkwargs = { |
| 73 | "host": "localhost", |
| 74 | "port": 6980, |
| 75 | "streamname": "Command1", |
| 76 | "bps_index": 0, |
| 77 | "channel": 0, |
| 78 | "delay": 0.02, |
| 79 | } |
| 80 | defaultkwargs.update(kwargs) |
| 81 | self.__dict__.update(defaultkwargs) |
| 82 | self._request = RequestPacket( |
| 83 | RTHeader(self.streamname, self.bps_index, self.channel) |
| 84 | ) |
| 85 | |
| 86 | def __enter__(self): |
| 87 | self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 88 | return self |
| 89 | |
| 90 | def __exit__(self, exc_type, exc_value, traceback): |
| 91 | self._sock.close() |
| 92 | |
| 93 | def send(self, text: str): |
| 94 | """ |
| 95 | Sends a text message to the specified host and port. |
| 96 | Args: |
| 97 | text (str): The text message to be sent. |
| 98 | """ |
| 99 | |
| 100 | self._sock.sendto(self._request.encode(text), (self.host, self.port)) |
| 101 | self._request.header.framecounter = ( |
| 102 | int.from_bytes(self._request.header.framecounter, "little") + 1 |
| 103 | ).to_bytes(4, "little") |
| 104 | logger.debug( |
| 105 | f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}" |
| 106 | ) |
| 107 | time.sleep(self.delay) |
| 108 | |
| 109 | |
| 110 | def conn_from_toml(filepath: str = "config.toml") -> dict: |
| 111 | """ |
| 112 | Reads a TOML configuration file and returns its contents as a dictionary. |
| 113 | Args: |
| 114 | filepath (str): The path to the TOML file. Defaults to "config.toml". |
| 115 | Returns: |
| 116 | dict: The contents of the TOML file as a dictionary. |
| 117 | Example: |
| 118 | # config.toml |
| 119 | host = "localhost" |
| 120 | port = 6980 |
| 121 | streamname = "Command1" |
| 122 | """ |
| 123 | |
| 124 | with open(filepath, "rb") as f: |
| 125 | return tomllib.load(f) |
| 126 | |
| 127 | |
| 128 | def parse_args() -> argparse.Namespace: |
| 129 | """ |
| 130 | Parse command-line arguments. |
| 131 | Returns: |
| 132 | argparse.Namespace: Parsed command-line arguments. |
| 133 | Command-line arguments: |
| 134 | --log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO". |
| 135 | --config (str): Path to config file. Default is "config.toml". |
| 136 | -i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin. |
| 137 | text (str, optional): Text to send. |
| 138 | """ |
| 139 | |
| 140 | parser = argparse.ArgumentParser(description="Send text to VBAN") |
| 141 | parser.add_argument( |
| 142 | "--log-level", |
| 143 | type=str, |
| 144 | choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], |
| 145 | default="INFO", |
| 146 | help="Set the logging level", |
| 147 | ) |
| 148 | parser.add_argument( |
| 149 | "--config", type=str, default="config.toml", help="Path to config file" |
| 150 | ) |
| 151 | parser.add_argument( |
| 152 | "-i", |
| 153 | "--input-file", |
| 154 | type=argparse.FileType("r"), |
| 155 | default=sys.stdin, |
| 156 | ) |
| 157 | parser.add_argument("text", nargs="?", type=str, help="Text to send") |
| 158 | return parser.parse_args() |
| 159 | |
| 160 | |
| 161 | def main(config: dict): |
| 162 | """ |
| 163 | Main function to send text using VbanSendText. |
| 164 | Args: |
| 165 | config (dict): Configuration dictionary for VbanSendText. |
| 166 | Behavior: |
| 167 | - If 'args.text' is provided, sends the text and returns. |
| 168 | - Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line. |
| 169 | - Stops reading and sending if a line equals "Q". |
| 170 | - Logs each line being sent at the debug level. |
| 171 | """ |
| 172 | |
| 173 | with VbanSendText(**config) as vban: |
| 174 | if args.text: |
| 175 | vban.send(args.text) |
| 176 | return |
| 177 | |
| 178 | for line in args.input_file: |
| 179 | line = line.strip() |
| 180 | if line.upper() == "Q": |
| 181 | break |
| 182 | |
| 183 | logger.debug(f"Sending {line}") |
| 184 | vban.send(line) |
| 185 | |
| 186 | |
| 187 | if __name__ == "__main__": |
| 188 | args = parse_args() |
| 189 | |
| 190 | logging.basicConfig(level=args.log_level) |
| 191 | |
| 192 | main(conn_from_toml(args.config)) |
| 193 |