onyx_online revised this gist . Go to revision
1 file changed, 1 insertion, 1 deletion
sendtext.py
@@ -130,7 +130,7 @@ def conn_from_toml(filepath: str = "config.toml") -> dict: | |||
130 | 130 | return {} | |
131 | 131 | ||
132 | 132 | try: | |
133 | - | with open(filepath, "rb") as f: | |
133 | + | with open(pn, "rb") as f: | |
134 | 134 | return tomllib.load(f) | |
135 | 135 | except tomllib.TOMLDecodeError as e: | |
136 | 136 | raise ValueError(f"Error decoding TOML file: {e}") from e |
onyx_online revised this gist . Go to revision
1 file changed, 1 insertion, 1 deletion
sendtext.py
@@ -125,7 +125,7 @@ def conn_from_toml(filepath: str = "config.toml") -> dict: | |||
125 | 125 | pn = Path(filepath) | |
126 | 126 | if not pn.exists(): | |
127 | 127 | logger.info( | |
128 | - | "no config.toml found, using defaults: localhost:6980 streamname: Command1" | |
128 | + | f"no {pn} found, using defaults: localhost:6980 streamname: Command1" | |
129 | 129 | ) | |
130 | 130 | return {} | |
131 | 131 |
onyx_online revised this gist . Go to revision
1 file changed, 13 insertions, 2 deletions
sendtext.py
@@ -28,6 +28,7 @@ import socket | |||
28 | 28 | import sys | |
29 | 29 | import time | |
30 | 30 | from dataclasses import dataclass | |
31 | + | from pathlib import Path | |
31 | 32 | ||
32 | 33 | import tomllib | |
33 | 34 | ||
@@ -121,8 +122,18 @@ def conn_from_toml(filepath: str = "config.toml") -> dict: | |||
121 | 122 | streamname = "Command1" | |
122 | 123 | """ | |
123 | 124 | ||
124 | - | with open(filepath, "rb") as f: | |
125 | - | return tomllib.load(f) | |
125 | + | pn = Path(filepath) | |
126 | + | if not pn.exists(): | |
127 | + | logger.info( | |
128 | + | "no config.toml found, using defaults: localhost:6980 streamname: Command1" | |
129 | + | ) | |
130 | + | return {} | |
131 | + | ||
132 | + | try: | |
133 | + | with open(filepath, "rb") as f: | |
134 | + | return tomllib.load(f) | |
135 | + | except tomllib.TOMLDecodeError as e: | |
136 | + | raise ValueError(f"Error decoding TOML file: {e}") from e | |
126 | 137 | ||
127 | 138 | ||
128 | 139 | def parse_args() -> argparse.Namespace: |
onyx_online revised this gist . Go to revision
1 file changed, 3 insertions, 3 deletions
sendtext.py
@@ -90,7 +90,7 @@ class VbanSendText: | |||
90 | 90 | def __exit__(self, exc_type, exc_value, traceback): | |
91 | 91 | self._sock.close() | |
92 | 92 | ||
93 | - | def send(self, text: str): | |
93 | + | def sendtext(self, text: str): | |
94 | 94 | """ | |
95 | 95 | Sends a text message to the specified host and port. | |
96 | 96 | Args: | |
@@ -172,7 +172,7 @@ def main(config: dict): | |||
172 | 172 | ||
173 | 173 | with VbanSendText(**config) as vban: | |
174 | 174 | if args.text: | |
175 | - | vban.send(args.text) | |
175 | + | vban.sendtext(args.text) | |
176 | 176 | return | |
177 | 177 | ||
178 | 178 | for line in args.input_file: | |
@@ -181,7 +181,7 @@ def main(config: dict): | |||
181 | 181 | break | |
182 | 182 | ||
183 | 183 | logger.debug(f"Sending {line}") | |
184 | - | vban.send(line) | |
184 | + | vban.sendtext(line) | |
185 | 185 | ||
186 | 186 | ||
187 | 187 | if __name__ == "__main__": |
onyx_online revised this gist . Go to revision
1 file changed, 192 insertions
sendtext.py(file created)
@@ -0,0 +1,192 @@ | |||
1 | + | #!/usr/bin/env python3 | |
2 | + | ||
3 | + | # MIT License | |
4 | + | # | |
5 | + | # Copyright (c) 2025 Onyx and Iris | |
6 | + | # | |
7 | + | # Permission is hereby granted, free of charge, to any person obtaining a copy | |
8 | + | # of this software and associated documentation files (the "Software"), to deal | |
9 | + | # in the Software without restriction, including without limitation the rights | |
10 | + | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
11 | + | # copies of the Software, and to permit persons to whom the Software is | |
12 | + | # furnished to do so, subject to the following conditions: | |
13 | + | # | |
14 | + | # The above copyright notice and this permission notice shall be included in all | |
15 | + | # copies or substantial portions of the Software. | |
16 | + | # | |
17 | + | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
18 | + | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
19 | + | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
20 | + | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
21 | + | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
22 | + | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
23 | + | # SOFTWARE. | |
24 | + | ||
25 | + | import argparse | |
26 | + | import logging | |
27 | + | import socket | |
28 | + | import sys | |
29 | + | import time | |
30 | + | from dataclasses import dataclass | |
31 | + | ||
32 | + | import tomllib | |
33 | + | ||
34 | + | logger = logging.getLogger(__name__) | |
35 | + | ||
36 | + | ||
37 | + | @dataclass | |
38 | + | class RTHeader: | |
39 | + | name: str | |
40 | + | bps_index: int | |
41 | + | channel: int | |
42 | + | VBAN_PROTOCOL_TXT = 0x40 | |
43 | + | framecounter: bytes = (0).to_bytes(4, "little") | |
44 | + | ||
45 | + | def __sr(self): | |
46 | + | return (RTHeader.VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little") | |
47 | + | ||
48 | + | def __nbc(self): | |
49 | + | return (self.channel).to_bytes(1, "little") | |
50 | + | ||
51 | + | def build(self) -> bytes: | |
52 | + | header = "VBAN".encode("utf-8") | |
53 | + | header += self.__sr() | |
54 | + | header += (0).to_bytes(1, "little") | |
55 | + | header += self.__nbc() | |
56 | + | header += (0x10).to_bytes(1, "little") | |
57 | + | header += self.name.encode() + bytes(16 - len(self.name)) | |
58 | + | header += RTHeader.framecounter | |
59 | + | return header | |
60 | + | ||
61 | + | ||
62 | + | class RequestPacket: | |
63 | + | def __init__(self, header: RTHeader): | |
64 | + | self.header = header | |
65 | + | ||
66 | + | def encode(self, text: str) -> bytes: | |
67 | + | return self.header.build() + text.encode("utf-8") | |
68 | + | ||
69 | + | ||
70 | + | class VbanSendText: | |
71 | + | def __init__(self, **kwargs): | |
72 | + | defaultkwargs = { | |
73 | + | "host": "localhost", | |
74 | + | "port": 6980, | |
75 | + | "streamname": "Command1", | |
76 | + | "bps_index": 0, | |
77 | + | "channel": 0, | |
78 | + | "delay": 0.02, | |
79 | + | } | |
80 | + | defaultkwargs.update(kwargs) | |
81 | + | self.__dict__.update(defaultkwargs) | |
82 | + | self._request = RequestPacket( | |
83 | + | RTHeader(self.streamname, self.bps_index, self.channel) | |
84 | + | ) | |
85 | + | ||
86 | + | def __enter__(self): | |
87 | + | self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
88 | + | return self | |
89 | + | ||
90 | + | def __exit__(self, exc_type, exc_value, traceback): | |
91 | + | self._sock.close() | |
92 | + | ||
93 | + | def send(self, text: str): | |
94 | + | """ | |
95 | + | Sends a text message to the specified host and port. | |
96 | + | Args: | |
97 | + | text (str): The text message to be sent. | |
98 | + | """ | |
99 | + | ||
100 | + | self._sock.sendto(self._request.encode(text), (self.host, self.port)) | |
101 | + | self._request.header.framecounter = ( | |
102 | + | int.from_bytes(self._request.header.framecounter, "little") + 1 | |
103 | + | ).to_bytes(4, "little") | |
104 | + | logger.debug( | |
105 | + | f"framecounter: {int.from_bytes(self._request.header.framecounter, 'little')}" | |
106 | + | ) | |
107 | + | time.sleep(self.delay) | |
108 | + | ||
109 | + | ||
110 | + | def conn_from_toml(filepath: str = "config.toml") -> dict: | |
111 | + | """ | |
112 | + | Reads a TOML configuration file and returns its contents as a dictionary. | |
113 | + | Args: | |
114 | + | filepath (str): The path to the TOML file. Defaults to "config.toml". | |
115 | + | Returns: | |
116 | + | dict: The contents of the TOML file as a dictionary. | |
117 | + | Example: | |
118 | + | # config.toml | |
119 | + | host = "localhost" | |
120 | + | port = 6980 | |
121 | + | streamname = "Command1" | |
122 | + | """ | |
123 | + | ||
124 | + | with open(filepath, "rb") as f: | |
125 | + | return tomllib.load(f) | |
126 | + | ||
127 | + | ||
128 | + | def parse_args() -> argparse.Namespace: | |
129 | + | """ | |
130 | + | Parse command-line arguments. | |
131 | + | Returns: | |
132 | + | argparse.Namespace: Parsed command-line arguments. | |
133 | + | Command-line arguments: | |
134 | + | --log-level (str): Set the logging level. Choices are "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Default is "INFO". | |
135 | + | --config (str): Path to config file. Default is "config.toml". | |
136 | + | -i, --input-file (argparse.FileType): Input file to read from. Default is sys.stdin. | |
137 | + | text (str, optional): Text to send. | |
138 | + | """ | |
139 | + | ||
140 | + | parser = argparse.ArgumentParser(description="Send text to VBAN") | |
141 | + | parser.add_argument( | |
142 | + | "--log-level", | |
143 | + | type=str, | |
144 | + | choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | |
145 | + | default="INFO", | |
146 | + | help="Set the logging level", | |
147 | + | ) | |
148 | + | parser.add_argument( | |
149 | + | "--config", type=str, default="config.toml", help="Path to config file" | |
150 | + | ) | |
151 | + | parser.add_argument( | |
152 | + | "-i", | |
153 | + | "--input-file", | |
154 | + | type=argparse.FileType("r"), | |
155 | + | default=sys.stdin, | |
156 | + | ) | |
157 | + | parser.add_argument("text", nargs="?", type=str, help="Text to send") | |
158 | + | return parser.parse_args() | |
159 | + | ||
160 | + | ||
161 | + | def main(config: dict): | |
162 | + | """ | |
163 | + | Main function to send text using VbanSendText. | |
164 | + | Args: | |
165 | + | config (dict): Configuration dictionary for VbanSendText. | |
166 | + | Behavior: | |
167 | + | - If 'args.text' is provided, sends the text and returns. | |
168 | + | - Otherwise, reads lines from 'args.input_file', strips whitespace, and sends each line. | |
169 | + | - Stops reading and sending if a line equals "Q". | |
170 | + | - Logs each line being sent at the debug level. | |
171 | + | """ | |
172 | + | ||
173 | + | with VbanSendText(**config) as vban: | |
174 | + | if args.text: | |
175 | + | vban.send(args.text) | |
176 | + | return | |
177 | + | ||
178 | + | for line in args.input_file: | |
179 | + | line = line.strip() | |
180 | + | if line.upper() == "Q": | |
181 | + | break | |
182 | + | ||
183 | + | logger.debug(f"Sending {line}") | |
184 | + | vban.send(line) | |
185 | + | ||
186 | + | ||
187 | + | if __name__ == "__main__": | |
188 | + | args = parse_args() | |
189 | + | ||
190 | + | logging.basicConfig(level=args.log_level) | |
191 | + | ||
192 | + | main(conn_from_toml(args.config)) |