__main__.py
                        
                             · 1.9 KiB · Python
                        
                    
                    
                      
                        Surowy
                      
                      
                        
                          
                        
                    
                    
                
                
            import ipaddress
import logging
import time
import ovh
try:
    import tomllib
except ModuleNotFoundError:
    import tomli as tomllib
from pathlib import Path
from discord_webhook import DiscordWebhook
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ddos-notifier")
mitigation_list = []
def send_webhook_message(webhook_url, msg):
    webhook = DiscordWebhook(
        url=webhook_url, username="ovh-servers-mitigation", content=msg
    )
    webhook.execute()
def check_ips_in_mitigation(webhook_url, client, ips):
    for ip in ips:
        logger.debug(f"Checking {ip}")
        try:
            _ = client.get(f"/ip/{ip}/mitigation/{ip}")
            if ip not in mitigation_list:
                mitigation_list.append(ip)
                send_webhook_message(
                    webhook_url, f"{ip} is in mitigation! DDOS detected."
                )
                logger.info(f"{ip} is being ddosed")
        except ovh.exceptions.ResourceNotFoundError:
            if ip in mitigation_list:
                mitigation_list.remove(ip)
                send_webhook_message(
                    webhook_url, f"{ip} is out of mitigation, DDOS over."
                )
                logger.info(f"{ip} is no longer being ddosed")
def _conn_from_toml():
    filepath = Path(__file__).parent.resolve() / "config.toml"
    if not filepath.exists():
        raise OSError(filepath)
    with open(filepath, "rb") as f:
        return tomllib.load(f)
def main():
    conn = _conn_from_toml()
    client = ovh.Client(
        **conn["ovh"],
    )
    ips = list(
        filter(
            lambda ip: ipaddress.ip_address(ip).version == 4,
            [ip.split("/")[0] for ip in client.get("/ip")],
        )
    )
    logger.info(ips)
    webhook_url = conn["discord"]["webhook"]
    while True:
        check_ips_in_mitigation(webhook_url, client, ips)
        time.sleep(30)
if __name__ == "__main__":
    main()
                | 1 | import ipaddress | 
| 2 | import logging | 
| 3 | import time | 
| 4 | |
| 5 | import ovh | 
| 6 | |
| 7 | try: | 
| 8 | import tomllib | 
| 9 | except ModuleNotFoundError: | 
| 10 | import tomli as tomllib | 
| 11 | |
| 12 | from pathlib import Path | 
| 13 | |
| 14 | from discord_webhook import DiscordWebhook | 
| 15 | |
| 16 | logging.basicConfig(level=logging.INFO) | 
| 17 | logger = logging.getLogger("ddos-notifier") | 
| 18 | |
| 19 | |
| 20 | mitigation_list = [] | 
| 21 | |
| 22 | |
| 23 | def send_webhook_message(webhook_url, msg): | 
| 24 | webhook = DiscordWebhook( | 
| 25 | url=webhook_url, username="ovh-servers-mitigation", content=msg | 
| 26 | ) | 
| 27 | webhook.execute() | 
| 28 | |
| 29 | |
| 30 | def check_ips_in_mitigation(webhook_url, client, ips): | 
| 31 | for ip in ips: | 
| 32 | logger.debug(f"Checking {ip}") | 
| 33 | try: | 
| 34 | _ = client.get(f"/ip/{ip}/mitigation/{ip}") | 
| 35 | |
| 36 | if ip not in mitigation_list: | 
| 37 | mitigation_list.append(ip) | 
| 38 | send_webhook_message( | 
| 39 | webhook_url, f"{ip} is in mitigation! DDOS detected." | 
| 40 | ) | 
| 41 | logger.info(f"{ip} is being ddosed") | 
| 42 | except ovh.exceptions.ResourceNotFoundError: | 
| 43 | if ip in mitigation_list: | 
| 44 | mitigation_list.remove(ip) | 
| 45 | send_webhook_message( | 
| 46 | webhook_url, f"{ip} is out of mitigation, DDOS over." | 
| 47 | ) | 
| 48 | logger.info(f"{ip} is no longer being ddosed") | 
| 49 | |
| 50 | |
| 51 | def _conn_from_toml(): | 
| 52 | filepath = Path(__file__).parent.resolve() / "config.toml" | 
| 53 | if not filepath.exists(): | 
| 54 | raise OSError(filepath) | 
| 55 | with open(filepath, "rb") as f: | 
| 56 | return tomllib.load(f) | 
| 57 | |
| 58 | |
| 59 | def main(): | 
| 60 | conn = _conn_from_toml() | 
| 61 | client = ovh.Client( | 
| 62 | **conn["ovh"], | 
| 63 | ) | 
| 64 | |
| 65 | ips = list( | 
| 66 | filter( | 
| 67 | lambda ip: ipaddress.ip_address(ip).version == 4, | 
| 68 | [ip.split("/")[0] for ip in client.get("/ip")], | 
| 69 | ) | 
| 70 | ) | 
| 71 | logger.info(ips) | 
| 72 | |
| 73 | webhook_url = conn["discord"]["webhook"] | 
| 74 | while True: | 
| 75 | check_ips_in_mitigation(webhook_url, client, ips) | 
| 76 | time.sleep(30) | 
| 77 | |
| 78 | |
| 79 | if __name__ == "__main__": | 
| 80 | main() | 
| 81 |