import os from typing import Dict, List from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from app.config import COVER_DIR, HLS_DIR, PROCESSED_DIR, STORAGE_DIR, UPLOAD_DIR from app.schemas import TaskCreateRequest, TaskResponse, UploadResponse from app.services.storage import get_upload_path, save_upload from app.services.tasks import task_manager def get_allowed_origins() -> List[str]: origins = os.getenv("CORS_ORIGINS", "http://localhost:5173,http://localhost:8080") return [origin.strip() for origin in origins.split(",") if origin.strip()] app = FastAPI(title="VPlatform API", version="0.1.0") app.add_middleware( CORSMiddleware, allow_origins=get_allowed_origins(), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/media/uploads", StaticFiles(directory=UPLOAD_DIR), name="uploads") app.mount("/media/processed", StaticFiles(directory=PROCESSED_DIR), name="processed") app.mount("/media/hls", StaticFiles(directory=HLS_DIR), name="hls") app.mount("/media/covers", StaticFiles(directory=COVER_DIR), name="covers") app.mount("/media", StaticFiles(directory=STORAGE_DIR), name="media") @app.on_event("startup") def start_worker() -> None: task_manager.start() @app.get("/api/health") def health() -> Dict[str, str]: return {"status": "ok"} @app.post("/api/upload", response_model=UploadResponse) async def upload_video(file: UploadFile = File(...)) -> UploadResponse: if not file.filename: raise HTTPException(status_code=400, detail="missing filename") file_id, original_name = save_upload(file) return UploadResponse( file_id=file_id, original_name=original_name, file_url=f"/media/uploads/{file_id}", ) @app.post("/api/tasks", response_model=TaskResponse) def create_task(payload: TaskCreateRequest) -> TaskResponse: if not get_upload_path(payload.file_id).exists(): raise HTTPException(status_code=404, detail="uploaded file not found") record = task_manager.create_task(payload) return TaskResponse(**record.__dict__) @app.get("/api/tasks", response_model=List[TaskResponse]) def list_tasks() -> List[TaskResponse]: return [TaskResponse(**task.__dict__) for task in task_manager.list_tasks()] @app.get("/api/tasks/{task_id}", response_model=TaskResponse) def get_task(task_id: str) -> TaskResponse: task = task_manager.get_task(task_id) if task is None: raise HTTPException(status_code=404, detail="task not found") return TaskResponse(**task.__dict__)