sendtext.py
· 5.8 KiB · Python
Raw
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Onyx and Iris
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import logging
import socket
import sys
import time
from dataclasses import dataclass
import tomllib
logger = logging.getLogger(__name__)
@dataclass
class RTHeader:
name: str
bps_index: int
channel: int
VBAN_PROTOCOL_TXT = 0x40
framecounter: bytes = (0).to_bytes(4, "little")
def __sr(self):
return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
def __nbc(self):
return (self.channel).to_bytes(1, "little")
def build(self) -> bytes:
header = "VBAN".encode("utf-8")
header += self.__sr()
header += (0).to_bytes(1, "little")
header += self.__nbc()
header += (0x10).to_bytes(1, "little")
header += self.name.encode() + bytes(16 - len(self.name))
header += RTHeader.framecounter
return header
class RequestPacket:
def __init__(self, header: RTHeader):
self.header = header
def encode(self, text: str) -> bytes:
return self.header.build() + text.encode("utf-8")
class VbanSendText:
def __init__(self, **kwargs):
defaultkwargs = {
"host": "localhost",
"port": 6980,
"streamname": "Command1",
"bps_index": 0,
"channel": 0,
"delay": 0.02,
}
defaultkwargs.update(kwargs)
self.__dict__.update(defaultkwargs)
self._request = RequestPacket(
RTHeader(self.streamname, self.bps_index, self.channel)
)
def __enter__(self):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return self
def __exit__(self, exc_type, exc_value, traceback):
self._sock.close()
def sendtext(self, text: str):
"""
Sends a text message to the specified host and port.
Args:
text (str): The text message to be sent.
"""
self._sock.sendto(self._request.encode(text), (self.host, self.port))
self._request.header.framecounter = (
int.from_bytes(self._request.header.framecounter, "little") + 1
).to_bytes(4, "little")
logger.debug(
f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}"
)
time.sleep(self.delay)
def conn_from_toml(filepath: str = "config.toml") -> dict:
"""
Reads a TOML configuration file and returns its contents as a dictionary.
Args:
filepath (str): The path to the TOML file. Defaults to "config.toml".
Returns:
dict: The contents of the TOML file as a dictionary.
Example:
# config.toml
host = "localhost"
port = 6980
streamname = "Command1"
"""
with open(filepath, "rb") as f:
return tomllib.load(f)
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments.
Returns:
argparse.Namespace: Parsed command-line arguments.
Command-line arguments:
--log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO".
--config (str): Path to config file. Default is "config.toml".
-i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin.
text (str, optional): Text to send.
"""
parser = argparse.ArgumentParser(description="Send text to VBAN")
parser.add_argument(
"--log-level",
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
help="Set the logging level",
)
parser.add_argument(
"--config", type=str, default="config.toml", help="Path to config file"
)
parser.add_argument(
"-i",
"--input-file",
type=argparse.FileType("r"),
default=sys.stdin,
)
parser.add_argument("text", nargs="?", type=str, help="Text to send")
return parser.parse_args()
def main(config: dict):
"""
Main function to send text using VbanSendText.
Args:
config (dict): Configuration dictionary for VbanSendText.
Behavior:
- If 'args.text' is provided, sends the text and returns.
- Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line.
- Stops reading and sending if a line equals "Q".
- Logs each line being sent at the debug level.
"""
with VbanSendText(**config) as vban:
if args.text:
vban.sendtext(args.text)
return
for line in args.input_file:
line = line.strip()
if line.upper() == "Q":
break
logger.debug(f"Sending {line}")
vban.sendtext(line)
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(level=args.log_level)
main(conn_from_toml(args.config))
1 | #!/usr/bin/env python3 |
2 | |
3 | # MIT License |
4 | # |
5 | # Copyright (c) 2025 Onyx and Iris |
6 | # |
7 | # Permission is hereby granted, free of charge, to any person obtaining a copy |
8 | # of this software and associated documentation files (the "Software"), to deal |
9 | # in the Software without restriction, including without limitation the rights |
10 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
11 | # copies of the Software, and to permit persons to whom the Software is |
12 | # furnished to do so, subject to the following conditions: |
13 | # |
14 | # The above copyright notice and this permission notice shall be included in all |
15 | # copies or substantial portions of the Software. |
16 | # |
17 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
18 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
19 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
20 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
21 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
22 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
23 | # SOFTWARE. |
24 | |
25 | import argparse |
26 | import logging |
27 | import socket |
28 | import sys |
29 | import time |
30 | from dataclasses import dataclass |
31 | |
32 | import tomllib |
33 | |
34 | logger = logging.getLogger(__name__) |
35 | |
36 | |
37 | @dataclass |
38 | class RTHeader: |
39 | name: str |
40 | bps_index: int |
41 | channel: int |
42 | VBAN_PROTOCOL_TXT = 0x40 |
43 | framecounter: bytes = (0).to_bytes(4, "little") |
44 | |
45 | def __sr(self): |
46 | return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little") |
47 | |
48 | def __nbc(self): |
49 | return (self.channel).to_bytes(1, "little") |
50 | |
51 | def build(self) -> bytes: |
52 | header = "VBAN".encode("utf-8") |
53 | header += self.__sr() |
54 | header += (0).to_bytes(1, "little") |
55 | header += self.__nbc() |
56 | header += (0x10).to_bytes(1, "little") |
57 | header += self.name.encode() + bytes(16 - len(self.name)) |
58 | header += RTHeader.framecounter |
59 | return header |
60 | |
61 | |
62 | class RequestPacket: |
63 | def __init__(self, header: RTHeader): |
64 | self.header = header |
65 | |
66 | def encode(self, text: str) -> bytes: |
67 | return self.header.build() + text.encode("utf-8") |
68 | |
69 | |
70 | class VbanSendText: |
71 | def __init__(self, **kwargs): |
72 | defaultkwargs = { |
73 | "host": "localhost", |
74 | "port": 6980, |
75 | "streamname": "Command1", |
76 | "bps_index": 0, |
77 | "channel": 0, |
78 | "delay": 0.02, |
79 | } |
80 | defaultkwargs.update(kwargs) |
81 | self.__dict__.update(defaultkwargs) |
82 | self._request = RequestPacket( |
83 | RTHeader(self.streamname, self.bps_index, self.channel) |
84 | ) |
85 | |
86 | def __enter__(self): |
87 | self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
88 | return self |
89 | |
90 | def __exit__(self, exc_type, exc_value, traceback): |
91 | self._sock.close() |
92 | |
93 | def sendtext(self, text: str): |
94 | """ |
95 | Sends a text message to the specified host and port. |
96 | Args: |
97 | text (str): The text message to be sent. |
98 | """ |
99 | |
100 | self._sock.sendto(self._request.encode(text), (self.host, self.port)) |
101 | self._request.header.framecounter = ( |
102 | int.from_bytes(self._request.header.framecounter, "little") + 1 |
103 | ).to_bytes(4, "little") |
104 | logger.debug( |
105 | f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}" |
106 | ) |
107 | time.sleep(self.delay) |
108 | |
109 | |
110 | def conn_from_toml(filepath: str = "config.toml") -> dict: |
111 | """ |
112 | Reads a TOML configuration file and returns its contents as a dictionary. |
113 | Args: |
114 | filepath (str): The path to the TOML file. Defaults to "config.toml". |
115 | Returns: |
116 | dict: The contents of the TOML file as a dictionary. |
117 | Example: |
118 | # config.toml |
119 | host = "localhost" |
120 | port = 6980 |
121 | streamname = "Command1" |
122 | """ |
123 | |
124 | with open(filepath, "rb") as f: |
125 | return tomllib.load(f) |
126 | |
127 | |
128 | def parse_args() -> argparse.Namespace: |
129 | """ |
130 | Parse command-line arguments. |
131 | Returns: |
132 | argparse.Namespace: Parsed command-line arguments. |
133 | Command-line arguments: |
134 | --log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO". |
135 | --config (str): Path to config file. Default is "config.toml". |
136 | -i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin. |
137 | text (str, optional): Text to send. |
138 | """ |
139 | |
140 | parser = argparse.ArgumentParser(description="Send text to VBAN") |
141 | parser.add_argument( |
142 | "--log-level", |
143 | type=str, |
144 | choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], |
145 | default="INFO", |
146 | help="Set the logging level", |
147 | ) |
148 | parser.add_argument( |
149 | "--config", type=str, default="config.toml", help="Path to config file" |
150 | ) |
151 | parser.add_argument( |
152 | "-i", |
153 | "--input-file", |
154 | type=argparse.FileType("r"), |
155 | default=sys.stdin, |
156 | ) |
157 | parser.add_argument("text", nargs="?", type=str, help="Text to send") |
158 | return parser.parse_args() |
159 | |
160 | |
161 | def main(config: dict): |
162 | """ |
163 | Main function to send text using VbanSendText. |
164 | Args: |
165 | config (dict): Configuration dictionary for VbanSendText. |
166 | Behavior: |
167 | - If 'args.text' is provided, sends the text and returns. |
168 | - Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line. |
169 | - Stops reading and sending if a line equals "Q". |
170 | - Logs each line being sent at the debug level. |
171 | """ |
172 | |
173 | with VbanSendText(**config) as vban: |
174 | if args.text: |
175 | vban.sendtext(args.text) |
176 | return |
177 | |
178 | for line in args.input_file: |
179 | line = line.strip() |
180 | if line.upper() == "Q": |
181 | break |
182 | |
183 | logger.debug(f"Sending {line}") |
184 | vban.sendtext(line) |
185 | |
186 | |
187 | if __name__ == "__main__": |
188 | args = parse_args() |
189 | |
190 | logging.basicConfig(level=args.log_level) |
191 | |
192 | main(conn_from_toml(args.config)) |
193 |