1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
from enum import Enum
from typing import Annotated
from fastapi import APIRouter, Depends, Response
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from modules.database import cursor, conn
router = APIRouter()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@router.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = form_data.username
return {"access_token": user, "token_type": "bearer"}
class AuthResult(Enum):
SUCCESS = 0
NOT_FOUND = 1
TOKEN_MISMATCH = 2
async def auth_agent(callsign: str, token: str) -> AuthResult:
cursor.execute("SELECT token_hash from agents WHERE callsign = ?", (callsign,))
row = cursor.fetchone()
if row is None:
return AuthResult.NOT_FOUND
return AuthResult.SUCCESS if pwd_context.verify(token, row[0]) else AuthResult.TOKEN_MISMATCH
@router.post("/{callsign}/init", status_code=201)
async def init_agent(callsign: str, token: Annotated[str, Depends(oauth2_scheme)], response: Response):
result = await auth_agent(callsign, token)
if result == AuthResult.SUCCESS:
response.status_code = 200
return '{"result": "Agent already registered."}'
if result == AuthResult.TOKEN_MISMATCH:
response.status_code = 400
return '{"error": "Agent already registered but with a different token."}'
# TODO: test token on spacetraders api
token_hash = pwd_context.hash(token)
cursor.execute("INSERT INTO agents (callsign, token_hash) VALUES (?, ?)", (callsign, token_hash))
conn.commit()
return '{"result": "Agent successfully registered."}'
|