# Run in dev mode using: # fastapi dev app.py # # After starting the server, you can navigate to http://localhost:8000 to see the web interface. # # Note: This requires the user to have the fastapi CLI tool installed. # The user should be in the same directory as `app.py` as well as `index.html`. import asyncio from contextlib import asynccontextmanager import socket from fastapi import FastAPI, Request from fastapi.responses import FileResponse, StreamingResponse from pydantic import BaseModel clients = set() # Broadcast function to notify all SSE clients async def notify_clients(message: str): for queue in clients: await queue.put(message) async def sock_recvfrom(nonblocking_sock, *pos, loop, **kw): while True: try: return nonblocking_sock.recvfrom(*pos, **kw) except BlockingIOError: future = asyncio.Future(loop=loop) loop.add_reader(nonblocking_sock.fileno(), future.set_result, None) try: await future finally: loop.remove_reader(nonblocking_sock.fileno()) # Background task: UDP listener async def udp_listener(): loop = asyncio.get_running_loop() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("0.0.0.0", 5001)) sock.setblocking(False) while True: data, addr = await sock_recvfrom(sock, 1024, loop=loop) message = data.decode() await notify_clients(message) @asynccontextmanager async def lifespan(app: FastAPI): asyncio.create_task(udp_listener()) yield app = FastAPI(lifespan=lifespan) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # sock.bind(("0.0.0.0", 5002)) SERVER_IP = "127.0.0.1" SERVER_PORT = 5000 @app.get("/") async def read_index(): return FileResponse('index.html') @app.post("/facialexpressions") def read_item(weights: list[float]): msg = ';'.join(str(w) for w in weights) print(len(weights), msg) sock.sendto(msg.encode('utf-8'), (SERVER_IP, SERVER_PORT)) return { "status": "ok" } class Word(BaseModel): target: str lastWordStatus: int timeSeconds: float word: str class WordList(BaseModel): words: list[str] @app.post("/word") def read_word(word: Word): msg = f"CHARADE:{word.lastWordStatus};{word.timeSeconds};{word.word}" print(msg) sock.sendto(msg.encode('utf-8'), (word.target, 5000)) return { "status": "ok" } @app.post("/shuffle") def shuffle_words(word_list: WordList): import random shuffled = word_list.words.copy() random.shuffle(shuffled) return { "status": "ok", "shuffled_words": shuffled } # SSE endpoint @app.get("/news") async def sse_endpoint(request: Request): queue = asyncio.Queue() clients.add(queue) async def event_generator(): try: while True: if await request.is_disconnected(): break message = await queue.get() yield f"event: update\ndata: {message}\n\n" finally: clients.remove(queue) return StreamingResponse(event_generator(), media_type="text/event-stream")