sendtext.py
· 6.2 KiB · Python
Raw
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Onyx and Iris
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import logging
import socket
import sys
import time
from dataclasses import dataclass
from pathlib import Path
import tomllib
logger = logging.getLogger(__name__)
@dataclass
class RTHeader:
name: str
bps_index: int
channel: int
VBAN_PROTOCOL_TXT = 0x40
framecounter: bytes = (0).to_bytes(4, "little")
def __sr(self):
return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
def __nbc(self):
return (self.channel).to_bytes(1, "little")
def build(self) -> bytes:
header = "VBAN".encode("utf-8")
header += self.__sr()
header += (0).to_bytes(1, "little")
header += self.__nbc()
header += (0x10).to_bytes(1, "little")
header += self.name.encode() + bytes(16 - len(self.name))
header += RTHeader.framecounter
return header
class RequestPacket:
def __init__(self, header: RTHeader):
self.header = header
def encode(self, text: str) -> bytes:
return self.header.build() + text.encode("utf-8")
class VbanSendText:
def __init__(self, **kwargs):
defaultkwargs = {
"host": "localhost",
"port": 6980,
"streamname": "Command1",
"bps_index": 0,
"channel": 0,
"delay": 0.02,
}
defaultkwargs.update(kwargs)
self.__dict__.update(defaultkwargs)
self._request = RequestPacket(
RTHeader(self.streamname, self.bps_index, self.channel)
)
def __enter__(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._sock.close()
def sendtext(self, text: str):
"""
Sends a text message to the specified host and port.
Args:
text (str): The text message to be sent.
"""
self._sock.sendto(self._request.encode(text), (self.host, self.port))
self._request.header.framecounter = (
int.from_bytes(self._request.header.framecounter, "little") + 1
).to_bytes(4, "little")
logger.debug(
f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}"
)
time.sleep(self.delay)
def conn_from_toml(filepath: str = "config.toml") -> dict:
"""
Reads a TOML configuration file and returns its contents as a dictionary.
Args:
filepath (str): The path to the TOML file. Defaults to "config.toml".
Returns:
dict: The contents of the TOML file as a dictionary.
Example:
# config.toml
host = "localhost"
port = 6980
streamname = "Command1"
"""
pn = Path(filepath)
if not pn.exists():
logger.info(
f"no {pn} found, using defaults: localhost:6980 streamname: Command1"
)
return {}
try:
with open(pn, "rb") as f:
return tomllib.load(f)
except tomllib.TOMLDecodeError as e:
raise ValueError(f"Error decoding TOML file: {e}") from e
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments.
Returns:
argparse.Namespace: Parsed command-line arguments.
Command-line arguments:
--log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO".
--config (str): Path to config file. Default is "config.toml".
-i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin.
text (str, optional): Text to send.
"""
parser = argparse.ArgumentParser(description="Send text to VBAN")
parser.add_argument(
"--log-level",
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
help="Set the logging level",
)
parser.add_argument(
"--config", type=str, default="config.toml", help="Path to config file"
)
parser.add_argument(
"-i",
"--input-file",
type=argparse.FileType("r"),
default=sys.stdin,
)
parser.add_argument("text", nargs="?", type=str, help="Text to send")
return parser.parse_args()
def main(config: dict):
"""
Main function to send text using VbanSendText.
Args:
config (dict): Configuration dictionary for VbanSendText.
Behavior:
- If 'args.text' is provided, sends the text and returns.
- Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line.
- Stops reading and sending if a line equals "Q".
- Logs each line being sent at the debug level.
"""
with VbanSendText(**config) as vban:
if args.text:
vban.sendtext(args.text)
return
for line in args.input_file:
line = line.strip()
if line.upper() == "Q":
break
logger.debug(f"Sending {line}")
vban.sendtext(line)
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(level=args.log_level)
main(conn_from_toml(args.config))
1 | #!/usr/bin/env python3 |
2 | |
3 | # MIT License |
4 | # |
5 | # Copyright (c) 2025 Onyx and Iris |
6 | # |
7 | # Permission is hereby granted, free of charge, to any person obtaining a copy |
8 | # of this software and associated documentation files (the "Software"), to deal |
9 | # in the Software without restriction, including without limitation the rights |
10 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
11 | # copies of the Software, and to permit persons to whom the Software is |
12 | # furnished to do so, subject to the following conditions: |
13 | # |
14 | # The above copyright notice and this permission notice shall be included in all |
15 | # copies or substantial portions of the Software. |
16 | # |
17 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
18 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
19 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
20 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
21 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
22 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
23 | # SOFTWARE. |
24 | |
25 | import argparse |
26 | import logging |
27 | import socket |
28 | import sys |
29 | import time |
30 | from dataclasses import dataclass |
31 | from pathlib import Path |
32 | |
33 | import tomllib |
34 | |
35 | logger = logging.getLogger(__name__) |
36 | |
37 | |
38 | @dataclass |
39 | class RTHeader: |
40 | name: str |
41 | bps_index: int |
42 | channel: int |
43 | VBAN_PROTOCOL_TXT = 0x40 |
44 | framecounter: bytes = (0).to_bytes(4, "little") |
45 | |
46 | def __sr(self): |
47 | return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little") |
48 | |
49 | def __nbc(self): |
50 | return (self.channel).to_bytes(1, "little") |
51 | |
52 | def build(self) -> bytes: |
53 | header = "VBAN".encode("utf-8") |
54 | header += self.__sr() |
55 | header += (0).to_bytes(1, "little") |
56 | header += self.__nbc() |
57 | header += (0x10).to_bytes(1, "little") |
58 | header += self.name.encode() + bytes(16 - len(self.name)) |
59 | header += RTHeader.framecounter |
60 | return header |
61 | |
62 | |
63 | class RequestPacket: |
64 | def __init__(self, header: RTHeader): |
65 | self.header = header |
66 | |
67 | def encode(self, text: str) -> bytes: |
68 | return self.header.build() + text.encode("utf-8") |
69 | |
70 | |
71 | class VbanSendText: |
72 | def __init__(self, **kwargs): |
73 | defaultkwargs = { |
74 | "host": "localhost", |
75 | "port": 6980, |
76 | "streamname": "Command1", |
77 | "bps_index": 0, |
78 | "channel": 0, |
79 | "delay": 0.02, |
80 | } |
81 | defaultkwargs.update(kwargs) |
82 | self.__dict__.update(defaultkwargs) |
83 | self._request = RequestPacket( |
84 | RTHeader(self.streamname, self.bps_index, self.channel) |
85 | ) |
86 | |
87 | def __enter__(self): |
88 | self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
89 | return self |
90 | |
91 | def __exit__(self, exc_type, exc_value, traceback): |
92 | self._sock.close() |
93 | |
94 | def sendtext(self, text: str): |
95 | """ |
96 | Sends a text message to the specified host and port. |
97 | Args: |
98 | text (str): The text message to be sent. |
99 | """ |
100 | |
101 | self._sock.sendto(self._request.encode(text), (self.host, self.port)) |
102 | self._request.header.framecounter = ( |
103 | int.from_bytes(self._request.header.framecounter, "little") + 1 |
104 | ).to_bytes(4, "little") |
105 | logger.debug( |
106 | f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}" |
107 | ) |
108 | time.sleep(self.delay) |
109 | |
110 | |
111 | def conn_from_toml(filepath: str = "config.toml") -> dict: |
112 | """ |
113 | Reads a TOML configuration file and returns its contents as a dictionary. |
114 | Args: |
115 | filepath (str): The path to the TOML file. Defaults to "config.toml". |
116 | Returns: |
117 | dict: The contents of the TOML file as a dictionary. |
118 | Example: |
119 | # config.toml |
120 | host = "localhost" |
121 | port = 6980 |
122 | streamname = "Command1" |
123 | """ |
124 | |
125 | pn = Path(filepath) |
126 | if not pn.exists(): |
127 | logger.info( |
128 | f"no {pn} found, using defaults: localhost:6980 streamname: Command1" |
129 | ) |
130 | return {} |
131 | |
132 | try: |
133 | with open(pn, "rb") as f: |
134 | return tomllib.load(f) |
135 | except tomllib.TOMLDecodeError as e: |
136 | raise ValueError(f"Error decoding TOML file: {e}") from e |
137 | |
138 | |
139 | def parse_args() -> argparse.Namespace: |
140 | """ |
141 | Parse command-line arguments. |
142 | Returns: |
143 | argparse.Namespace: Parsed command-line arguments. |
144 | Command-line arguments: |
145 | --log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO". |
146 | --config (str): Path to config file. Default is "config.toml". |
147 | -i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin. |
148 | text (str, optional): Text to send. |
149 | """ |
150 | |
151 | parser = argparse.ArgumentParser(description="Send text to VBAN") |
152 | parser.add_argument( |
153 | "--log-level", |
154 | type=str, |
155 | choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], |
156 | default="INFO", |
157 | help="Set the logging level", |
158 | ) |
159 | parser.add_argument( |
160 | "--config", type=str, default="config.toml", help="Path to config file" |
161 | ) |
162 | parser.add_argument( |
163 | "-i", |
164 | "--input-file", |
165 | type=argparse.FileType("r"), |
166 | default=sys.stdin, |
167 | ) |
168 | parser.add_argument("text", nargs="?", type=str, help="Text to send") |
169 | return parser.parse_args() |
170 | |
171 | |
172 | def main(config: dict): |
173 | """ |
174 | Main function to send text using VbanSendText. |
175 | Args: |
176 | config (dict): Configuration dictionary for VbanSendText. |
177 | Behavior: |
178 | - If 'args.text' is provided, sends the text and returns. |
179 | - Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line. |
180 | - Stops reading and sending if a line equals "Q". |
181 | - Logs each line being sent at the debug level. |
182 | """ |
183 | |
184 | with VbanSendText(**config) as vban: |
185 | if args.text: |
186 | vban.sendtext(args.text) |
187 | return |
188 | |
189 | for line in args.input_file: |
190 | line = line.strip() |
191 | if line.upper() == "Q": |
192 | break |
193 | |
194 | logger.debug(f"Sending {line}") |
195 | vban.sendtext(line) |
196 | |
197 | |
198 | if __name__ == "__main__": |
199 | args = parse_args() |
200 | |
201 | logging.basicConfig(level=args.log_level) |
202 | |
203 | main(conn_from_toml(args.config)) |
204 |