Files
vplatform/backend/app/services/tasks.py
2026-04-09 08:47:37 +00:00

125 lines
4.1 KiB
Python

import threading
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from queue import Queue
from typing import Dict, List, Optional
from app.schemas import TaskCreateRequest, TaskResult
from app.services.media import generate_cover, generate_hls, transcode_to_mp4
from app.services.storage import get_upload_path
def now_iso() -> str:
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
@dataclass
class TaskRecord:
task_id: str
file_id: str
options: TaskCreateRequest
status: str = "queued"
result: TaskResult = field(default_factory=TaskResult)
error: Optional[str] = None
created_at: str = field(default_factory=now_iso)
updated_at: str = field(default_factory=now_iso)
def touch(self) -> None:
self.updated_at = now_iso()
class TaskManager:
def __init__(self) -> None:
self.tasks: Dict[str, TaskRecord] = {}
self.queue: Queue = Queue()
self.lock = threading.Lock()
self.worker = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_started = False
def start(self) -> None:
if not self.worker_started:
self.worker.start()
self.worker_started = True
def create_task(self, payload: TaskCreateRequest) -> TaskRecord:
task_id = uuid.uuid4().hex[:12]
record = TaskRecord(task_id=task_id, file_id=payload.file_id, options=payload)
with self.lock:
self.tasks[task_id] = record
self.queue.put(task_id)
return record
def get_task(self, task_id: str) -> Optional[TaskRecord]:
return self.tasks.get(task_id)
def list_tasks(self) -> List[TaskRecord]:
return sorted(self.tasks.values(), key=lambda task: task.created_at, reverse=True)
def _update_status(self, task: TaskRecord, status: str) -> None:
with self.lock:
task.status = status
task.touch()
def _append_log(self, task: TaskRecord, message: str) -> None:
with self.lock:
task.result.log.append(message)
task.touch()
def _fail_task(self, task: TaskRecord, error: str) -> None:
with self.lock:
task.status = "failed"
task.error = error
task.touch()
def _worker_loop(self) -> None:
while True:
task_id = self.queue.get()
task = self.tasks.get(task_id)
if task is None:
self.queue.task_done()
continue
try:
self._process_task(task)
except Exception as exc: # noqa: BLE001
self._fail_task(task, str(exc))
finally:
self.queue.task_done()
def _process_task(self, task: TaskRecord) -> None:
source_path = get_upload_path(task.file_id)
if not source_path.exists():
raise FileNotFoundError(f"uploaded file not found: {task.file_id}")
self._update_status(task, "processing")
working_source: Path = source_path
if task.options.transcode_mp4:
self._append_log(task, "Transcoding to MP4 with optional clipping and watermark")
mp4_path = transcode_to_mp4(
source=source_path,
task_id=task.task_id,
clip_seconds=task.options.clip_seconds,
watermark_text=task.options.watermark_text,
)
task.result.mp4_url = f"/media/processed/{mp4_path.name}"
working_source = mp4_path
if task.options.generate_cover:
self._append_log(task, "Generating cover image")
cover_path = generate_cover(working_source, task.task_id)
task.result.cover_url = f"/media/covers/{cover_path.name}"
if task.options.generate_hls:
self._append_log(task, "Generating HLS playlist")
playlist = generate_hls(working_source, task.task_id)
task.result.hls_url = f"/media/hls/{task.task_id}/{playlist.name}"
self._append_log(task, "Task completed")
self._update_status(task, "completed")
task_manager = TaskManager()