dependencies.py
· 1.5 KiB · Python
Originalformat
from fastapi import Request, Header, Depends
import niquests
from fastapi import HTTPException
import logging
import hashlib
import hmac
from settings import settings
from ipaddress import ip_address, ip_network
from typing import Annotated
logger = logging.getLogger(__name__)
def get_request_client(request: Request) -> niquests.AsyncSession:
return request.app.state.request_client
async def validate_source_ip(
request_client: Annotated[niquests.AsyncSession, Depends(get_request_client)],
x_real_ip: str = Header(...),
) -> None:
resp = await request_client.get("https://api.github.com/meta")
allowed_ips = resp.json().get("hooks", [])
if not any(
ip_address(x_real_ip) in ip_network(valid_ip) for valid_ip in allowed_ips
):
logger.debug(f"Received request from invalid IP: {x_real_ip}")
ERR_MSG = f"IP address {x_real_ip} is not allowed"
raise HTTPException(status_code=403, detail=ERR_MSG)
async def validate_webhook_secret(
request: Request, x_hub_signature: str = Header(...)
) -> None:
if not x_hub_signature:
logger.debug("Missing X-Hub-Signature header")
raise HTTPException(status_code=400, detail="Missing X-Hub-Signature header")
mac = hmac.new(
settings.GITHUB_WEBHOOK_SECRET.encode(),
msg=await request.body(),
digestmod=hashlib.sha1,
)
if not hmac.compare_digest("sha1=" + mac.hexdigest(), x_hub_signature):
logger.debug("Invalid HMAC signature")
raise HTTPException(status_code=403, detail="Invalid HMAC signature")
| 1 | from fastapi import Request, Header, Depends |
| 2 | import niquests |
| 3 | from fastapi import HTTPException |
| 4 | import logging |
| 5 | import hashlib |
| 6 | import hmac |
| 7 | from settings import settings |
| 8 | from ipaddress import ip_address, ip_network |
| 9 | from typing import Annotated |
| 10 | |
| 11 | logger = logging.getLogger(__name__) |
| 12 | |
| 13 | |
| 14 | def get_request_client(request: Request) -> niquests.AsyncSession: |
| 15 | return request.app.state.request_client |
| 16 | |
| 17 | |
| 18 | async def validate_source_ip( |
| 19 | request_client: Annotated[niquests.AsyncSession, Depends(get_request_client)], |
| 20 | x_real_ip: str = Header(...), |
| 21 | ) -> None: |
| 22 | resp = await request_client.get("https://api.github.com/meta") |
| 23 | allowed_ips = resp.json().get("hooks", []) |
| 24 | if not any( |
| 25 | ip_address(x_real_ip) in ip_network(valid_ip) for valid_ip in allowed_ips |
| 26 | ): |
| 27 | logger.debug(f"Received request from invalid IP: {x_real_ip}") |
| 28 | ERR_MSG = f"IP address {x_real_ip} is not allowed" |
| 29 | raise HTTPException(status_code=403, detail=ERR_MSG) |
| 30 | |
| 31 | |
| 32 | async def validate_webhook_secret( |
| 33 | request: Request, x_hub_signature: str = Header(...) |
| 34 | ) -> None: |
| 35 | if not x_hub_signature: |
| 36 | logger.debug("Missing X-Hub-Signature header") |
| 37 | raise HTTPException(status_code=400, detail="Missing X-Hub-Signature header") |
| 38 | |
| 39 | mac = hmac.new( |
| 40 | settings.GITHUB_WEBHOOK_SECRET.encode(), |
| 41 | msg=await request.body(), |
| 42 | digestmod=hashlib.sha1, |
| 43 | ) |
| 44 | |
| 45 | if not hmac.compare_digest("sha1=" + mac.hexdigest(), x_hub_signature): |
| 46 | logger.debug("Invalid HMAC signature") |
| 47 | raise HTTPException(status_code=403, detail="Invalid HMAC signature") |
| 48 |
handlers.py
· 1.2 KiB · Python
Originalformat
from niquests import AsyncSession
from settings import settings
import logging
from fastapi import HTTPException
logger = logging.getLogger(__name__)
async def issues(payload, request_client: AsyncSession):
if payload["action"] not in ["opened", "closed", "reopened", "deleted"]:
return
embed = {
"author": {
"name": payload["sender"]["login"],
"icon_url": payload["sender"]["avatar_url"],
},
"title": f"[{payload['repository']['full_name']}] Issue {payload['action'].capitalize()}: #{payload['issue']['number']} {payload['issue']['title']}",
"url": payload["issue"]["html_url"],
"description": payload["issue"].get("body"),
}
data = {
"username": "Github-OpenGist",
"embeds": [embed],
}
await send_discord_webhook(request_client=request_client, data=data)
async def send_discord_webhook(request_client: AsyncSession, data):
resp = await request_client.post(settings.DISCORD_WEBHOOK_URL, json=data)
if resp.status_code != 204:
raise HTTPException(status_code=400, detail="Failed to send Discord webhook")
logger.info(f"Discord webhook sent with status code {resp.status_code}")
| 1 | from niquests import AsyncSession |
| 2 | from settings import settings |
| 3 | import logging |
| 4 | from fastapi import HTTPException |
| 5 | |
| 6 | logger = logging.getLogger(__name__) |
| 7 | |
| 8 | |
| 9 | async def issues(payload, request_client: AsyncSession): |
| 10 | if payload["action"] not in ["opened", "closed", "reopened", "deleted"]: |
| 11 | return |
| 12 | |
| 13 | embed = { |
| 14 | "author": { |
| 15 | "name": payload["sender"]["login"], |
| 16 | "icon_url": payload["sender"]["avatar_url"], |
| 17 | }, |
| 18 | "title": f"[{payload['repository']['full_name']}] Issue {payload['action'].capitalize()}: #{payload['issue']['number']} {payload['issue']['title']}", |
| 19 | "url": payload["issue"]["html_url"], |
| 20 | "description": payload["issue"].get("body"), |
| 21 | } |
| 22 | data = { |
| 23 | "username": "Github-OpenGist", |
| 24 | "embeds": [embed], |
| 25 | } |
| 26 | await send_discord_webhook(request_client=request_client, data=data) |
| 27 | |
| 28 | |
| 29 | async def send_discord_webhook(request_client: AsyncSession, data): |
| 30 | resp = await request_client.post(settings.DISCORD_WEBHOOK_URL, json=data) |
| 31 | if resp.status_code != 204: |
| 32 | raise HTTPException(status_code=400, detail="Failed to send Discord webhook") |
| 33 | logger.info(f"Discord webhook sent with status code {resp.status_code}") |
| 34 |
main.py
· 1.3 KiB · Python
Originalformat
from fastapi import FastAPI, Request, Depends, Header
from contextlib import asynccontextmanager
import niquests
import logging
import handlers
from dependencies import (
get_request_client,
validate_source_ip,
validate_webhook_secret,
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.request_client = niquests.AsyncSession()
yield
await app.state.request_client.close()
app = FastAPI(lifespan=lifespan)
@app.post(
"/github/{repo_name}",
dependencies=[
Depends(validate_source_ip),
Depends(validate_webhook_secret),
],
)
async def github_webhook(
repo_name: str,
request: Request,
client: niquests.AsyncSession = Depends(get_request_client),
x_real_ip: str = Header(...),
x_github_event: str = Header(...),
x_hub_signature: str = Header(...),
):
payload = await request.json()
match x_github_event:
case "ping":
return {"message": "pong"}
case "issues":
await handlers.issues(payload, client)
case _:
logger.debug(f"unhandled event {x_github_event}")
return {
"message": f"received webhook for {repo_name}: {x_github_event}",
"status": "success",
}
| 1 | from fastapi import FastAPI, Request, Depends, Header |
| 2 | from contextlib import asynccontextmanager |
| 3 | import niquests |
| 4 | import logging |
| 5 | |
| 6 | import handlers |
| 7 | from dependencies import ( |
| 8 | get_request_client, |
| 9 | validate_source_ip, |
| 10 | validate_webhook_secret, |
| 11 | ) |
| 12 | |
| 13 | logger = logging.getLogger(__name__) |
| 14 | |
| 15 | |
| 16 | @asynccontextmanager |
| 17 | async def lifespan(app: FastAPI): |
| 18 | app.state.request_client = niquests.AsyncSession() |
| 19 | yield |
| 20 | await app.state.request_client.close() |
| 21 | |
| 22 | |
| 23 | app = FastAPI(lifespan=lifespan) |
| 24 | |
| 25 | |
| 26 | @app.post( |
| 27 | "/github/{repo_name}", |
| 28 | dependencies=[ |
| 29 | Depends(validate_source_ip), |
| 30 | Depends(validate_webhook_secret), |
| 31 | ], |
| 32 | ) |
| 33 | async def github_webhook( |
| 34 | repo_name: str, |
| 35 | request: Request, |
| 36 | client: niquests.AsyncSession = Depends(get_request_client), |
| 37 | x_real_ip: str = Header(...), |
| 38 | x_github_event: str = Header(...), |
| 39 | x_hub_signature: str = Header(...), |
| 40 | ): |
| 41 | payload = await request.json() |
| 42 | match x_github_event: |
| 43 | case "ping": |
| 44 | return {"message": "pong"} |
| 45 | case "issues": |
| 46 | await handlers.issues(payload, client) |
| 47 | case _: |
| 48 | logger.debug(f"unhandled event {x_github_event}") |
| 49 | return { |
| 50 | "message": f"received webhook for {repo_name}: {x_github_event}", |
| 51 | "status": "success", |
| 52 | } |
| 53 |
settings.py
· 295 B · Python
Originalformat
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings."""
GITHUB_WEBHOOK_SECRET: str
DISCORD_WEBHOOK_URL: str
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
settings = Settings()
| 1 | from pydantic_settings import BaseSettings, SettingsConfigDict |
| 2 | |
| 3 | |
| 4 | class Settings(BaseSettings): |
| 5 | """Application settings.""" |
| 6 | |
| 7 | GITHUB_WEBHOOK_SECRET: str |
| 8 | DISCORD_WEBHOOK_URL: str |
| 9 | |
| 10 | model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") |
| 11 | |
| 12 | |
| 13 | settings = Settings() |
| 14 |