rcon.py
· 4.0 KiB · Python
Brut
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2025 Onyx and Iris
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import socket
class Packet:
MAGIC = bytearray([0xFF, 0xFF, 0xFF, 0xFF])
class Request(Packet):
"""Class used to encode a request packet."""
def __init__(self, password: str):
self.password = password
def _header(self) -> bytes:
return Packet.MAGIC + b"rcon"
def encode(self, cmd: str) -> bytes:
return self._header() + f" {self.password} {cmd}".encode()
class ResponseBuilder(Packet):
"""Class used to build and decode a response packet.
The response may be built from multiple fragments.
"""
def __init__(self):
self._fragments = bytearray()
def _header(self) -> bytes:
return Packet.MAGIC + b"print\n"
def is_valid_fragment(self, fragment: bytes) -> bool:
return fragment.startswith(self._header())
def add_fragment(self, fragment: bytes):
self._fragments.extend(fragment.removeprefix(self._header()))
def build(self) -> str:
return self._fragments.decode()
def send(sock: socket.socket, request: Request, host: str, port: int, cmd: str) -> str:
"""Send a single rcon command to the server and return the response.
Args:
sock (socket.socket): UDP socket object
request (Request): reference to a Request object
host (str): hostname or IP address
port (int): port number
cmd (str): the rcon command to send
Returns:
str: the response from the server
"""
sock.sendto(request.encode(cmd), (host, port))
response_builder = ResponseBuilder()
while True:
try:
data, _ = sock.recvfrom(4096)
except socket.timeout:
break
if response_builder.is_valid_fragment(data):
response_builder.add_fragment(data)
return response_builder.build()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", type=int, default=27960)
parser.add_argument("--password", default="secret")
parser.add_argument("--timeout", type=float, default=0.2)
parser.add_argument("--interactive", action="store_true")
parser.add_argument("cmd", nargs="?", default="status")
args = parser.parse_args()
return args
def main():
"""Fire command in one-shot mode or interactive mode."""
args = parse_args()
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(args.timeout)
request = Request(args.password)
if not args.interactive:
if resp := send(sock, request, args.host, args.port, args.cmd):
print(resp)
return
while cmd := input("cmd: ").strip():
if cmd == "Q":
break
if resp := send(sock, request, args.host, args.port, cmd):
print(resp)
if __name__ == "__main__":
main()
1 | #!/usr/bin/env python3 |
2 | |
3 | # MIT License |
4 | # |
5 | # Copyright (c) 2025 Onyx and Iris |
6 | # |
7 | # Permission is hereby granted, free of charge, to any person obtaining a copy |
8 | # of this software and associated documentation files (the "Software"), to deal |
9 | # in the Software without restriction, including without limitation the rights |
10 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
11 | # copies of the Software, and to permit persons to whom the Software is |
12 | # furnished to do so, subject to the following conditions: |
13 | # |
14 | # The above copyright notice and this permission notice shall be included in all |
15 | # copies or substantial portions of the Software. |
16 | # |
17 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
18 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
19 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
20 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
21 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
22 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
23 | # SOFTWARE. |
24 | |
25 | import argparse |
26 | import socket |
27 | |
28 | |
29 | class Packet: |
30 | MAGIC = bytearray([0xFF, 0xFF, 0xFF, 0xFF]) |
31 | |
32 | |
33 | class Request(Packet): |
34 | """Class used to encode a request packet.""" |
35 | |
36 | def __init__(self, password: str): |
37 | self.password = password |
38 | |
39 | def _header(self) -> bytes: |
40 | return Packet.MAGIC + b"rcon" |
41 | |
42 | def encode(self, cmd: str) -> bytes: |
43 | return self._header() + f" {self.password} {cmd}".encode() |
44 | |
45 | |
46 | class ResponseBuilder(Packet): |
47 | """Class used to build and decode a response packet. |
48 | The response may be built from multiple fragments. |
49 | """ |
50 | |
51 | def __init__(self): |
52 | self._fragments = bytearray() |
53 | |
54 | def _header(self) -> bytes: |
55 | return Packet.MAGIC + b"print\n" |
56 | |
57 | def is_valid_fragment(self, fragment: bytes) -> bool: |
58 | return fragment.startswith(self._header()) |
59 | |
60 | def add_fragment(self, fragment: bytes): |
61 | self._fragments.extend(fragment.removeprefix(self._header())) |
62 | |
63 | def build(self) -> str: |
64 | return self._fragments.decode() |
65 | |
66 | |
67 | def send(sock: socket.socket, request: Request, host: str, port: int, cmd: str) -> str: |
68 | """Send a single rcon command to the server and return the response. |
69 | |
70 | Args: |
71 | sock (socket.socket): UDP socket object |
72 | request (Request): reference to a Request object |
73 | host (str): hostname or IP address |
74 | port (int): port number |
75 | cmd (str): the rcon command to send |
76 | |
77 | Returns: |
78 | str: the response from the server |
79 | """ |
80 | sock.sendto(request.encode(cmd), (host, port)) |
81 | |
82 | response_builder = ResponseBuilder() |
83 | while True: |
84 | try: |
85 | data, _ = sock.recvfrom(4096) |
86 | except socket.timeout: |
87 | break |
88 | |
89 | if response_builder.is_valid_fragment(data): |
90 | response_builder.add_fragment(data) |
91 | |
92 | return response_builder.build() |
93 | |
94 | |
95 | def parse_args() -> argparse.Namespace: |
96 | parser = argparse.ArgumentParser() |
97 | parser.add_argument("--host", default="localhost") |
98 | parser.add_argument("--port", type=int, default=27960) |
99 | parser.add_argument("--password", default="secret") |
100 | parser.add_argument("--timeout", type=float, default=0.2) |
101 | parser.add_argument("--interactive", action="store_true") |
102 | parser.add_argument("cmd", nargs="?", default="status") |
103 | args = parser.parse_args() |
104 | return args |
105 | |
106 | |
107 | def main(): |
108 | """Fire command in one-shot mode or interactive mode.""" |
109 | args = parse_args() |
110 | |
111 | with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: |
112 | sock.settimeout(args.timeout) |
113 | |
114 | request = Request(args.password) |
115 | |
116 | if not args.interactive: |
117 | if resp := send(sock, request, args.host, args.port, args.cmd): |
118 | print(resp) |
119 | return |
120 | |
121 | while cmd := input("cmd: ").strip(): |
122 | if cmd == "Q": |
123 | break |
124 | |
125 | if resp := send(sock, request, args.host, args.port, cmd): |
126 | print(resp) |
127 | |
128 | |
129 | if __name__ == "__main__": |
130 | main() |
131 |