from enum import Enum from typing import Annotated from fastapi import APIRouter, Depends, Response from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from passlib.context import CryptContext from ..modules.database import cursor, conn router = APIRouter() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") @router.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = form_data.username return {"access_token": user, "token_type": "bearer"} class AuthResult(Enum): SUCCESS = 0 NOT_FOUND = 1 TOKEN_MISMATCH = 2 async def auth_agent(callsign: str, token: str) -> AuthResult: cursor.execute("SELECT token_hash from agents WHERE callsign = ?", (callsign,)) row = cursor.fetchone() if row is None: return AuthResult.NOT_FOUND return AuthResult.SUCCESS if pwd_context.verify(token, row[0]) else AuthResult.TOKEN_MISMATCH @router.post("/{callsign}/init", status_code=201) async def init_agent(callsign: str, token: Annotated[str, Depends(oauth2_scheme)], response: Response): result = await auth_agent(callsign, token) if result == AuthResult.SUCCESS: response.status_code = 200 return '{"result": "Agent already registered."}' if result == AuthResult.TOKEN_MISMATCH: response.status_code = 400 return '{"error": "Agent already registered but with a different token."}' # TODO: test token on spacetraders api token_hash = pwd_context.hash(token) cursor.execute("INSERT INTO agents (callsign, token_hash) VALUES (?, ?)", (callsign, token_hash)) conn.commit() return '{"result": "Agent successfully registered."}'