Skip to content

File

Video file source for WebRTC - handles uploaded video files.

ThreadedVideoFileTrack

Bases: MediaStreamTrack

Video track that decodes frames from a file in a background thread.

Uses a dedicated thread with a queue to avoid deadlocks with the event loop.

Source code in inference/core/interfaces/webrtc_worker/sources/file.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class ThreadedVideoFileTrack(MediaStreamTrack):
    """Video track that decodes frames from a file in a background thread.

    Uses a dedicated thread with a queue to avoid deadlocks with the event loop.
    """

    kind = "video"

    def __init__(self, filepath: str, queue_size: int = 60):
        # TODO: add parameter queue size in settings
        super().__init__()
        self._queue = queue.Queue(maxsize=queue_size)
        self._stop_event = threading.Event()
        self._decode_thread = threading.Thread(
            target=_decode_worker,
            args=(filepath, self._queue, self._stop_event),
            daemon=True,
        )
        self._decode_thread.start()

    async def recv(self) -> VideoFrame:
        while True:
            try:
                data = self._queue.get_nowait()
                break
            except queue.Empty:
                await asyncio.sleep(0.001)

        if data is None:
            self.stop()
            raise MediaStreamError("End of video file")
        if isinstance(data, dict):
            logger.error("[ThreadedVideoTrack] Decode error: %s", data)
            self.stop()
            raise MediaStreamError(data.get("error", "Unknown decode error"))

        return data

    def stop(self):
        super().stop()
        self._stop_event.set()

VideoFileUploadHandler

Handles video file uploads via data channel.

Protocol: [chunk_index:u32][total_chunks:u32][payload] Auto-completes when all chunks received.

Source code in inference/core/interfaces/webrtc_worker/sources/file.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class VideoFileUploadHandler:
    """Handles video file uploads via data channel.

    Protocol: [chunk_index:u32][total_chunks:u32][payload]
    Auto-completes when all chunks received.
    """

    def __init__(self):
        self._chunks: Dict[int, bytes] = {}
        self._total_chunks: Optional[int] = None
        self._temp_file_path: Optional[str] = None
        self._state = VideoFileUploadState.IDLE
        self.upload_complete_event = asyncio.Event()

    @property
    def temp_file_path(self) -> Optional[str]:
        return self._temp_file_path

    def handle_chunk(self, chunk_index: int, total_chunks: int, data: bytes) -> None:
        """Handle a chunk. Auto-completes when all chunks received."""
        # TODO: we need to refactor this...
        if self._total_chunks is None:
            self._total_chunks = total_chunks
            self._state = VideoFileUploadState.UPLOADING

        self._chunks[chunk_index] = data

        if len(self._chunks) == total_chunks:
            self._write_to_temp_file()
            self._state = VideoFileUploadState.COMPLETE
            self.upload_complete_event.set()

    def _write_to_temp_file(self) -> None:
        """Reassemble chunks and write to temp file."""
        import tempfile

        # TODO: we need to refactor this...
        with tempfile.NamedTemporaryFile(mode="wb", suffix=".mp4", delete=False) as f:
            for i in range(self._total_chunks):
                f.write(self._chunks[i])
            self._temp_file_path = f.name

        self._chunks.clear()

    def try_start_processing(self) -> Optional[str]:
        """Check if upload complete and transition to PROCESSING. Returns path or None."""
        if self._state == VideoFileUploadState.COMPLETE:
            self._state = VideoFileUploadState.PROCESSING
            return self._temp_file_path
        return None

    async def cleanup(self) -> None:
        """Clean up temp file."""
        # TODO: we need to refactor this...
        if self._temp_file_path:
            import os

            path = self._temp_file_path
            self._temp_file_path = None
            try:
                await asyncio.to_thread(os.unlink, path)
            except Exception:
                pass

cleanup() async

Clean up temp file.

Source code in inference/core/interfaces/webrtc_worker/sources/file.py
161
162
163
164
165
166
167
168
169
170
171
172
async def cleanup(self) -> None:
    """Clean up temp file."""
    # TODO: we need to refactor this...
    if self._temp_file_path:
        import os

        path = self._temp_file_path
        self._temp_file_path = None
        try:
            await asyncio.to_thread(os.unlink, path)
        except Exception:
            pass

handle_chunk(chunk_index, total_chunks, data)

Handle a chunk. Auto-completes when all chunks received.

Source code in inference/core/interfaces/webrtc_worker/sources/file.py
128
129
130
131
132
133
134
135
136
137
138
139
140
def handle_chunk(self, chunk_index: int, total_chunks: int, data: bytes) -> None:
    """Handle a chunk. Auto-completes when all chunks received."""
    # TODO: we need to refactor this...
    if self._total_chunks is None:
        self._total_chunks = total_chunks
        self._state = VideoFileUploadState.UPLOADING

    self._chunks[chunk_index] = data

    if len(self._chunks) == total_chunks:
        self._write_to_temp_file()
        self._state = VideoFileUploadState.COMPLETE
        self.upload_complete_event.set()

try_start_processing()

Check if upload complete and transition to PROCESSING. Returns path or None.

Source code in inference/core/interfaces/webrtc_worker/sources/file.py
154
155
156
157
158
159
def try_start_processing(self) -> Optional[str]:
    """Check if upload complete and transition to PROCESSING. Returns path or None."""
    if self._state == VideoFileUploadState.COMPLETE:
        self._state = VideoFileUploadState.PROCESSING
        return self._temp_file_path
    return None