import ipaddress import logging import time import ovh try: import tomllib except ModuleNotFoundError: import tomli as tomllib from pathlib import Path from discord_webhook import DiscordWebhook logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ddos-notifier") mitigation_list = [] def send_webhook_message(webhook_url, msg): webhook = DiscordWebhook( url=webhook_url, username="ovh-servers-mitigation", content=msg ) webhook.execute() def check_ips_in_mitigation(webhook_url, client, ips): for ip in ips: logger.debug(f"Checking {ip}") try: _ = client.get(f"/ip/{ip}/mitigation/{ip}") if ip not in mitigation_list: mitigation_list.append(ip) send_webhook_message( webhook_url, f"{ip} is in mitigation! DDOS detected." ) logger.info(f"{ip} is being ddosed") except ovh.exceptions.ResourceNotFoundError: if ip in mitigation_list: mitigation_list.remove(ip) send_webhook_message( webhook_url, f"{ip} is out of mitigation, DDOS over." ) logger.info(f"{ip} is no longer being ddosed") def _conn_from_toml(): filepath = Path(__file__).parent.resolve() / "config.toml" if not filepath.exists(): raise OSError(filepath) with open(filepath, "rb") as f: return tomllib.load(f) def main(): conn = _conn_from_toml() client = ovh.Client( **conn["ovh"], ) ips = list( filter( lambda ip: ipaddress.ip_address(ip).version == 4, [ip.split("/")[0] for ip in client.get("/ip")], ) ) logger.info(ips) webhook_url = conn["discord"]["webhook"] while True: check_ips_in_mitigation(webhook_url, client, ips) time.sleep(30) if __name__ == "__main__": main()