#!/usr/bin/env python3 # MIT License # # Copyright (c) 2025 Onyx and Iris # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import argparse import socket class Packet: MAGIC = bytearray([0xFF, 0xFF, 0xFF, 0xFF]) class Request(Packet): """Class used to encode a request packet.""" def __init__(self, password: str): self.password = password def _header(self) -> bytes: return Packet.MAGIC + b"rcon" def encode(self, cmd: str) -> bytes: return self._header() + f" {self.password} {cmd}".encode() class ResponseBuilder(Packet): """Class used to build and decode a response packet. The response may be built from multiple fragments. """ def __init__(self): self._fragments = bytearray() def _header(self) -> bytes: return Packet.MAGIC + b"print\n" def is_valid_fragment(self, fragment: bytes) -> bool: return fragment.startswith(self._header()) def add_fragment(self, fragment: bytes): self._fragments.extend(fragment.removeprefix(self._header())) def build(self) -> str: return self._fragments.decode() def send(sock: socket.socket, request: Request, host: str, port: int, cmd: str) -> str: """Send a single rcon command to the server and return the response. Args: sock (socket.socket): UDP socket object request (Request): reference to a Request object host (str): hostname or IP address port (int): port number cmd (str): the rcon command to send Returns: str: the response from the server """ sock.sendto(request.encode(cmd), (host, port)) response_builder = ResponseBuilder() while True: try: data, _ = sock.recvfrom(4096) except socket.timeout: break if response_builder.is_valid_fragment(data): response_builder.add_fragment(data) return response_builder.build() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--host", default="localhost") parser.add_argument("--port", type=int, default=27960) parser.add_argument("--password", default="secret") parser.add_argument("--timeout", type=float, default=0.2) parser.add_argument("--interactive", action="store_true") parser.add_argument("cmd", nargs="?", default="status") args = parser.parse_args() return args def main(): """Fire command in one-shot mode or interactive mode.""" args = parse_args() with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.settimeout(args.timeout) request = Request(args.password) if not args.interactive: if resp := send(sock, request, args.host, args.port, args.cmd): print(resp) return while cmd := input("cmd: ").strip(): if cmd == "Q": break if resp := send(sock, request, args.host, args.port, cmd): print(resp) if __name__ == "__main__": main()