from fastapi import Request, Header, Depends import niquests from fastapi import HTTPException import logging import hashlib import hmac from settings import settings from ipaddress import ip_address, ip_network from typing import Annotated logger = logging.getLogger(__name__) def get_request_client(request: Request) -> niquests.AsyncSession: return request.app.state.request_client async def validate_source_ip( request_client: Annotated[niquests.AsyncSession, Depends(get_request_client)], x_real_ip: str = Header(...), ) -> None: resp = await request_client.get("https://api.github.com/meta") allowed_ips = resp.json().get("hooks", []) if not any( ip_address(x_real_ip) in ip_network(valid_ip) for valid_ip in allowed_ips ): logger.debug(f"Received request from invalid IP: {x_real_ip}") ERR_MSG = f"IP address {x_real_ip} is not allowed" raise HTTPException(status_code=403, detail=ERR_MSG) async def validate_webhook_secret( request: Request, x_hub_signature: str = Header(...) ) -> None: if not x_hub_signature: logger.debug("Missing X-Hub-Signature header") raise HTTPException(status_code=400, detail="Missing X-Hub-Signature header") mac = hmac.new( settings.GITHUB_WEBHOOK_SECRET.encode(), msg=await request.body(), digestmod=hashlib.sha1, ) if not hmac.compare_digest("sha1=" + mac.hexdigest(), x_hub_signature): logger.debug("Invalid HMAC signature") raise HTTPException(status_code=403, detail="Invalid HMAC signature")