import threading import uuid from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from queue import Queue from typing import Dict, List, Optional from app.schemas import TaskCreateRequest, TaskResult from app.services.media import generate_cover, generate_hls, transcode_to_mp4 from app.services.storage import get_upload_path def now_iso() -> str: return datetime.utcnow().replace(microsecond=0).isoformat() + "Z" @dataclass class TaskRecord: task_id: str file_id: str options: TaskCreateRequest status: str = "queued" result: TaskResult = field(default_factory=TaskResult) error: Optional[str] = None created_at: str = field(default_factory=now_iso) updated_at: str = field(default_factory=now_iso) def touch(self) -> None: self.updated_at = now_iso() class TaskManager: def __init__(self) -> None: self.tasks: Dict[str, TaskRecord] = {} self.queue: Queue = Queue() self.lock = threading.Lock() self.worker = threading.Thread(target=self._worker_loop, daemon=True) self.worker_started = False def start(self) -> None: if not self.worker_started: self.worker.start() self.worker_started = True def create_task(self, payload: TaskCreateRequest) -> TaskRecord: task_id = uuid.uuid4().hex[:12] record = TaskRecord(task_id=task_id, file_id=payload.file_id, options=payload) with self.lock: self.tasks[task_id] = record self.queue.put(task_id) return record def get_task(self, task_id: str) -> Optional[TaskRecord]: return self.tasks.get(task_id) def list_tasks(self) -> List[TaskRecord]: return sorted(self.tasks.values(), key=lambda task: task.created_at, reverse=True) def _update_status(self, task: TaskRecord, status: str) -> None: with self.lock: task.status = status task.touch() def _append_log(self, task: TaskRecord, message: str) -> None: with self.lock: task.result.log.append(message) task.touch() def _fail_task(self, task: TaskRecord, error: str) -> None: with self.lock: task.status = "failed" task.error = error task.touch() def _worker_loop(self) -> None: while True: task_id = self.queue.get() task = self.tasks.get(task_id) if task is None: self.queue.task_done() continue try: self._process_task(task) except Exception as exc: # noqa: BLE001 self._fail_task(task, str(exc)) finally: self.queue.task_done() def _process_task(self, task: TaskRecord) -> None: source_path = get_upload_path(task.file_id) if not source_path.exists(): raise FileNotFoundError(f"uploaded file not found: {task.file_id}") self._update_status(task, "processing") working_source: Path = source_path if task.options.transcode_mp4: self._append_log(task, "Transcoding to MP4 with optional clipping and watermark") mp4_path = transcode_to_mp4( source=source_path, task_id=task.task_id, clip_seconds=task.options.clip_seconds, watermark_text=task.options.watermark_text, ) task.result.mp4_url = f"/media/processed/{mp4_path.name}" working_source = mp4_path if task.options.generate_cover: self._append_log(task, "Generating cover image") cover_path = generate_cover(working_source, task.task_id) task.result.cover_url = f"/media/covers/{cover_path.name}" if task.options.generate_hls: self._append_log(task, "Generating HLS playlist") playlist = generate_hls(working_source, task.task_id) task.result.hls_url = f"/media/hls/{task.task_id}/{playlist.name}" self._append_log(task, "Task completed") self._update_status(task, "completed") task_manager = TaskManager()