Skip to content

Sources

Stream source abstractions for WebRTC SDK.

This module defines the StreamSource interface and concrete implementations for different video streaming sources (webcam, RTSP, video files, manual frames).

ManualSource

Bases: StreamSource

Stream source for manually sent frames.

This source allows the user to programmatically send frames to be processed by the workflow using the send() method.

Source code in inference_sdk/webrtc/sources.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
class ManualSource(StreamSource):
    """Stream source for manually sent frames.

    This source allows the user to programmatically send frames
    to be processed by the workflow using the send() method.
    """

    def __init__(self):
        """Initialize manual source."""
        self._track: Optional[_ManualTrack] = None

    async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
        """Create manual track and add it to the peer connection."""
        # Create special track that accepts programmatic frames
        self._track = _ManualTrack()
        pc.addTrack(self._track)

    def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
        """Return manual mode flag."""
        return {"manual_mode": True}

    def send(self, frame: np.ndarray) -> None:
        """Send a frame to be processed by the workflow.

        Args:
            frame: BGR numpy array (H, W, 3) uint8

        Raises:
            RuntimeError: If session not started
        """
        if not self._track:
            raise RuntimeError("Session not started. Use within 'with' context.")
        self._track.queue_frame(frame)

__init__()

Initialize manual source.

Source code in inference_sdk/webrtc/sources.py
342
343
344
def __init__(self):
    """Initialize manual source."""
    self._track: Optional[_ManualTrack] = None

configure_peer_connection(pc) async

Create manual track and add it to the peer connection.

Source code in inference_sdk/webrtc/sources.py
346
347
348
349
350
async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
    """Create manual track and add it to the peer connection."""
    # Create special track that accepts programmatic frames
    self._track = _ManualTrack()
    pc.addTrack(self._track)

get_initialization_params(config)

Return manual mode flag.

Source code in inference_sdk/webrtc/sources.py
352
353
354
def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
    """Return manual mode flag."""
    return {"manual_mode": True}

send(frame)

Send a frame to be processed by the workflow.

Parameters:

Name Type Description Default
frame ndarray

BGR numpy array (H, W, 3) uint8

required

Raises:

Type Description
RuntimeError

If session not started

Source code in inference_sdk/webrtc/sources.py
356
357
358
359
360
361
362
363
364
365
366
367
def send(self, frame: np.ndarray) -> None:
    """Send a frame to be processed by the workflow.

    Args:
        frame: BGR numpy array (H, W, 3) uint8

    Raises:
        RuntimeError: If session not started
    """
    if not self._track:
        raise RuntimeError("Session not started. Use within 'with' context.")
    self._track.queue_frame(frame)

RTSPSource

Bases: StreamSource

Stream source for RTSP camera streams.

This source doesn't create a local track - instead, the server captures the RTSP stream and sends processed video back to the client.

Source code in inference_sdk/webrtc/sources.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class RTSPSource(StreamSource):
    """Stream source for RTSP camera streams.

    This source doesn't create a local track - instead, the server
    captures the RTSP stream and sends processed video back to the client.
    """

    def __init__(self, url: str):
        """Initialize RTSP source.

        Args:
            url: RTSP URL (e.g., "rtsp://camera.local/stream")
                Credentials can be included: "rtsp://user:pass@host/stream"
        """
        if not url.startswith(("rtsp://", "rtsps://")):
            raise InvalidParameterError(
                f"Invalid RTSP URL: {url}. Must start with rtsp:// or rtsps://"
            )
        self.url = url

    async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
        """Add receive-only video transceiver (server sends video to us)."""
        # Don't create a local track - we're receiving video from server
        # Add receive-only transceiver
        pc.addTransceiver("video", direction="recvonly")

    def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
        """Return RTSP URL for server to capture."""
        # Server needs to know the RTSP URL to capture
        return {"rtsp_url": self.url}

__init__(url)

Initialize RTSP source.

Parameters:

Name Type Description Default
url str

RTSP URL (e.g., "rtsp://camera.local/stream") Credentials can be included: "rtsp://user:pass@host/stream"

required
Source code in inference_sdk/webrtc/sources.py
191
192
193
194
195
196
197
198
199
200
201
202
def __init__(self, url: str):
    """Initialize RTSP source.

    Args:
        url: RTSP URL (e.g., "rtsp://camera.local/stream")
            Credentials can be included: "rtsp://user:pass@host/stream"
    """
    if not url.startswith(("rtsp://", "rtsps://")):
        raise InvalidParameterError(
            f"Invalid RTSP URL: {url}. Must start with rtsp:// or rtsps://"
        )
    self.url = url

configure_peer_connection(pc) async

Add receive-only video transceiver (server sends video to us).

Source code in inference_sdk/webrtc/sources.py
204
205
206
207
208
async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
    """Add receive-only video transceiver (server sends video to us)."""
    # Don't create a local track - we're receiving video from server
    # Add receive-only transceiver
    pc.addTransceiver("video", direction="recvonly")

get_initialization_params(config)

Return RTSP URL for server to capture.

Source code in inference_sdk/webrtc/sources.py
210
211
212
213
def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
    """Return RTSP URL for server to capture."""
    # Server needs to know the RTSP URL to capture
    return {"rtsp_url": self.url}

StreamSource

Bases: ABC

Base interface for all stream sources.

A StreamSource is responsible for: 1. Configuring the RTCPeerConnection (adding tracks or transceivers) 2. Providing initialization parameters for the server 3. Cleaning up resources when done

Source code in inference_sdk/webrtc/sources.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class StreamSource(ABC):
    """Base interface for all stream sources.

    A StreamSource is responsible for:
    1. Configuring the RTCPeerConnection (adding tracks or transceivers)
    2. Providing initialization parameters for the server
    3. Cleaning up resources when done
    """

    @abstractmethod
    async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
        """Configure the peer connection for this source type.

        This is where the source decides:
        - Whether to add a local track (webcam, video file, manual)
        - Whether to add a receive-only transceiver (RTSP)
        - Any other peer connection configuration

        Args:
            pc: The RTCPeerConnection to configure
        """
        pass

    @abstractmethod
    def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
        """Get parameters to send to server in /initialise_webrtc_worker payload.

        Args:
            config: Stream configuration with stream_output, data_output, etc.

        Returns:
            Dictionary of parameters specific to this source type.
            Examples:
            - RTSP: {"rtsp_url": "rtsp://..."}
            - Video file: {"stream_output": [], "data_output": [...]}
            - Webcam/Manual: {} (empty, no server-side source)
        """
        pass

    async def cleanup(self) -> None:
        """Cleanup resources when session ends.

        Default implementation does nothing. Override if cleanup is needed.
        """
        pass

cleanup() async

Cleanup resources when session ends.

Default implementation does nothing. Override if cleanup is needed.

Source code in inference_sdk/webrtc/sources.py
67
68
69
70
71
72
async def cleanup(self) -> None:
    """Cleanup resources when session ends.

    Default implementation does nothing. Override if cleanup is needed.
    """
    pass

configure_peer_connection(pc) abstractmethod async

Configure the peer connection for this source type.

This is where the source decides: - Whether to add a local track (webcam, video file, manual) - Whether to add a receive-only transceiver (RTSP) - Any other peer connection configuration

Parameters:

Name Type Description Default
pc RTCPeerConnection

The RTCPeerConnection to configure

required
Source code in inference_sdk/webrtc/sources.py
37
38
39
40
41
42
43
44
45
46
47
48
49
@abstractmethod
async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
    """Configure the peer connection for this source type.

    This is where the source decides:
    - Whether to add a local track (webcam, video file, manual)
    - Whether to add a receive-only transceiver (RTSP)
    - Any other peer connection configuration

    Args:
        pc: The RTCPeerConnection to configure
    """
    pass

get_initialization_params(config) abstractmethod

Get parameters to send to server in /initialise_webrtc_worker payload.

Parameters:

Name Type Description Default
config StreamConfig

Stream configuration with stream_output, data_output, etc.

required

Returns:

Name Type Description
Dict[str, Any]

Dictionary of parameters specific to this source type.

Examples Dict[str, Any]
Dict[str, Any]
  • RTSP: {"rtsp_url": "rtsp://..."}
Dict[str, Any]
  • Video file: {"stream_output": [], "data_output": [...]}
Dict[str, Any]
  • Webcam/Manual: {} (empty, no server-side source)
Source code in inference_sdk/webrtc/sources.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@abstractmethod
def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
    """Get parameters to send to server in /initialise_webrtc_worker payload.

    Args:
        config: Stream configuration with stream_output, data_output, etc.

    Returns:
        Dictionary of parameters specific to this source type.
        Examples:
        - RTSP: {"rtsp_url": "rtsp://..."}
        - Video file: {"stream_output": [], "data_output": [...]}
        - Webcam/Manual: {} (empty, no server-side source)
    """
    pass

VideoFileSource

Bases: StreamSource

Stream source for video files.

Uploads video file via datachannel to the server, which processes it and streams results back. This is more efficient than frame-by-frame streaming for pre-recorded video files.

Supports two output modes: - Datachannel mode (default): Frames received as base64 JSON via datachannel. Higher bandwidth but includes all workflow output data inline. - Video track mode: Frames received via WebRTC video track with hardware- accelerated codec (H.264/VP8). Lower bandwidth, workflow data sent separately.

Source code in inference_sdk/webrtc/sources.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
class VideoFileSource(StreamSource):
    """Stream source for video files.

    Uploads video file via datachannel to the server, which processes it
    and streams results back. This is more efficient than frame-by-frame
    streaming for pre-recorded video files.

    Supports two output modes:
    - Datachannel mode (default): Frames received as base64 JSON via datachannel.
      Higher bandwidth but includes all workflow output data inline.
    - Video track mode: Frames received via WebRTC video track with hardware-
      accelerated codec (H.264/VP8). Lower bandwidth, workflow data sent separately.
    """

    def __init__(
        self,
        path: str,
        on_upload_progress: Optional[UploadProgressCallback] = None,
        use_datachannel_frames: bool = True,
        realtime_processing: bool = False,
    ):
        """Initialize video file source.

        Args:
            path: Path to video file (any format supported by FFmpeg)
            on_upload_progress: Optional callback called during upload with
                (uploaded_chunks, total_chunks). Use to track upload progress.
            use_datachannel_frames: If enabled, frames are received through the
                datachannel. It consumes much more network bandwidth, but it
                provides guaranteed in-order and high quality delivery of the
                frames. If False, frames are received via WebRTC video track
                with hardware-accelerated codec (lower bandwidth).
            realtime_processing: If True, process frames at original video FPS
                (throttled playback for live preview). If False (default),
                process all frames as fast as possible (batch mode).
        """
        self.path = path
        self.on_upload_progress = on_upload_progress
        self.use_datachannel_frames = use_datachannel_frames
        self.realtime_processing = realtime_processing
        self._upload_channel: Optional["RTCDataChannel"] = None
        self._uploader: Optional[VideoFileUploader] = None
        # Note: _upload_started is created lazily in configure_peer_connection()
        # to avoid Python 3.9 issue where asyncio.Event binds to wrong event loop
        self._upload_started: Optional[asyncio.Event] = None

    async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
        """Configure peer connection for video file upload.

        Creates video_upload datachannel for file transfer. In video track mode,
        also adds a receive-only transceiver for processed video output.
        """
        # Create event in the async context to bind to correct event loop (Python 3.9 compat)
        self._upload_started = asyncio.Event()

        # Create upload channel - server will create VideoFileUploadHandler
        self._upload_channel = pc.createDataChannel("video_upload")

        # Add receive-only transceiver for video track output mode (when not using datachannel)
        if not self.use_datachannel_frames:
            pc.addTransceiver("video", direction="recvonly")

        # Setup channel open handler to signal upload can start
        @self._upload_channel.on("open")
        def on_open() -> None:
            self._upload_started.set()

    def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
        """Return params for video file processing mode.

        In datachannel mode (default), merges stream_output into data_output
        so frames are received as base64 via the inference datachannel.
        In video track mode, preserves stream_output for video track rendering.
        """
        params: Dict[str, Any] = {
            "webrtc_realtime_processing": self.realtime_processing,
            "video_file_upload": True,  # Signal to server that video will be uploaded
        }

        if not self.use_datachannel_frames:
            # Video track mode: keep stream_output for video track rendering
            return params

        # Datachannel mode (default): merge stream_output into data_output
        data_output = list(config.data_output or [])
        if config.stream_output:
            for field in config.stream_output:
                if field and field not in data_output:
                    data_output.append(field)

        params["stream_output"] = []  # No video track
        params["data_output"] = data_output  # Receive frames via data channel
        return params

    async def start_upload(self) -> None:
        """Start uploading the video file.

        Called by session after connection is established.
        Uses self.on_upload_progress if provided.
        """
        # Wait for channel to open
        await self._upload_started.wait()

        if not self._upload_channel:
            raise RuntimeError("Upload channel not configured")

        self._uploader = VideoFileUploader(self.path, self._upload_channel)
        await self._uploader.upload(on_progress=self.on_upload_progress)
        # self._upload_complete.set()

    async def cleanup(self) -> None:
        """No cleanup needed - upload channel is managed by peer connection."""
        pass

__init__(path, on_upload_progress=None, use_datachannel_frames=True, realtime_processing=False)

Initialize video file source.

Parameters:

Name Type Description Default
path str

Path to video file (any format supported by FFmpeg)

required
on_upload_progress Optional[UploadProgressCallback]

Optional callback called during upload with (uploaded_chunks, total_chunks). Use to track upload progress.

None
use_datachannel_frames bool

If enabled, frames are received through the datachannel. It consumes much more network bandwidth, but it provides guaranteed in-order and high quality delivery of the frames. If False, frames are received via WebRTC video track with hardware-accelerated codec (lower bandwidth).

True
realtime_processing bool

If True, process frames at original video FPS (throttled playback for live preview). If False (default), process all frames as fast as possible (batch mode).

False
Source code in inference_sdk/webrtc/sources.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def __init__(
    self,
    path: str,
    on_upload_progress: Optional[UploadProgressCallback] = None,
    use_datachannel_frames: bool = True,
    realtime_processing: bool = False,
):
    """Initialize video file source.

    Args:
        path: Path to video file (any format supported by FFmpeg)
        on_upload_progress: Optional callback called during upload with
            (uploaded_chunks, total_chunks). Use to track upload progress.
        use_datachannel_frames: If enabled, frames are received through the
            datachannel. It consumes much more network bandwidth, but it
            provides guaranteed in-order and high quality delivery of the
            frames. If False, frames are received via WebRTC video track
            with hardware-accelerated codec (lower bandwidth).
        realtime_processing: If True, process frames at original video FPS
            (throttled playback for live preview). If False (default),
            process all frames as fast as possible (batch mode).
    """
    self.path = path
    self.on_upload_progress = on_upload_progress
    self.use_datachannel_frames = use_datachannel_frames
    self.realtime_processing = realtime_processing
    self._upload_channel: Optional["RTCDataChannel"] = None
    self._uploader: Optional[VideoFileUploader] = None
    # Note: _upload_started is created lazily in configure_peer_connection()
    # to avoid Python 3.9 issue where asyncio.Event binds to wrong event loop
    self._upload_started: Optional[asyncio.Event] = None

cleanup() async

No cleanup needed - upload channel is managed by peer connection.

Source code in inference_sdk/webrtc/sources.py
326
327
328
async def cleanup(self) -> None:
    """No cleanup needed - upload channel is managed by peer connection."""
    pass

configure_peer_connection(pc) async

Configure peer connection for video file upload.

Creates video_upload datachannel for file transfer. In video track mode, also adds a receive-only transceiver for processed video output.

Source code in inference_sdk/webrtc/sources.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
    """Configure peer connection for video file upload.

    Creates video_upload datachannel for file transfer. In video track mode,
    also adds a receive-only transceiver for processed video output.
    """
    # Create event in the async context to bind to correct event loop (Python 3.9 compat)
    self._upload_started = asyncio.Event()

    # Create upload channel - server will create VideoFileUploadHandler
    self._upload_channel = pc.createDataChannel("video_upload")

    # Add receive-only transceiver for video track output mode (when not using datachannel)
    if not self.use_datachannel_frames:
        pc.addTransceiver("video", direction="recvonly")

    # Setup channel open handler to signal upload can start
    @self._upload_channel.on("open")
    def on_open() -> None:
        self._upload_started.set()

get_initialization_params(config)

Return params for video file processing mode.

In datachannel mode (default), merges stream_output into data_output so frames are received as base64 via the inference datachannel. In video track mode, preserves stream_output for video track rendering.

Source code in inference_sdk/webrtc/sources.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
    """Return params for video file processing mode.

    In datachannel mode (default), merges stream_output into data_output
    so frames are received as base64 via the inference datachannel.
    In video track mode, preserves stream_output for video track rendering.
    """
    params: Dict[str, Any] = {
        "webrtc_realtime_processing": self.realtime_processing,
        "video_file_upload": True,  # Signal to server that video will be uploaded
    }

    if not self.use_datachannel_frames:
        # Video track mode: keep stream_output for video track rendering
        return params

    # Datachannel mode (default): merge stream_output into data_output
    data_output = list(config.data_output or [])
    if config.stream_output:
        for field in config.stream_output:
            if field and field not in data_output:
                data_output.append(field)

    params["stream_output"] = []  # No video track
    params["data_output"] = data_output  # Receive frames via data channel
    return params

start_upload() async

Start uploading the video file.

Called by session after connection is established. Uses self.on_upload_progress if provided.

Source code in inference_sdk/webrtc/sources.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
async def start_upload(self) -> None:
    """Start uploading the video file.

    Called by session after connection is established.
    Uses self.on_upload_progress if provided.
    """
    # Wait for channel to open
    await self._upload_started.wait()

    if not self._upload_channel:
        raise RuntimeError("Upload channel not configured")

    self._uploader = VideoFileUploader(self.path, self._upload_channel)
    await self._uploader.upload(on_progress=self.on_upload_progress)

WebcamSource

Bases: StreamSource

Stream source for local webcam/USB camera.

This source creates a local video track that captures frames from a webcam device using OpenCV and sends them to the server.

Source code in inference_sdk/webrtc/sources.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class WebcamSource(StreamSource):
    """Stream source for local webcam/USB camera.

    This source creates a local video track that captures frames from
    a webcam device using OpenCV and sends them to the server.
    """

    def __init__(
        self, device_id: int = 0, resolution: Optional[Tuple[int, int]] = None
    ):
        """Initialize webcam source.

        Args:
            device_id: Camera device index (0 for default camera)
            resolution: Optional (width, height) tuple to set camera resolution
        """
        self.device_id = device_id
        self.resolution = resolution
        self._track: Optional[_WebcamVideoTrack] = None
        self._declared_fps: Optional[float] = None

    async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
        """Create webcam video track and add it to the peer connection."""
        # Create local video track that reads from OpenCV
        self._track = _WebcamVideoTrack(self.device_id, self.resolution)

        # Capture FPS for server
        self._declared_fps = self._track.get_declared_fps()

        # Add track to send video
        pc.addTrack(self._track)

    def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
        """Return FPS if available."""
        params: Dict[str, Any] = {}
        if self._declared_fps:
            params["declared_fps"] = self._declared_fps
        return params

    async def cleanup(self) -> None:
        """Release webcam resources."""
        if self._track:
            self._track.release()

__init__(device_id=0, resolution=None)

Initialize webcam source.

Parameters:

Name Type Description Default
device_id int

Camera device index (0 for default camera)

0
resolution Optional[Tuple[int, int]]

Optional (width, height) tuple to set camera resolution

None
Source code in inference_sdk/webrtc/sources.py
146
147
148
149
150
151
152
153
154
155
156
157
158
def __init__(
    self, device_id: int = 0, resolution: Optional[Tuple[int, int]] = None
):
    """Initialize webcam source.

    Args:
        device_id: Camera device index (0 for default camera)
        resolution: Optional (width, height) tuple to set camera resolution
    """
    self.device_id = device_id
    self.resolution = resolution
    self._track: Optional[_WebcamVideoTrack] = None
    self._declared_fps: Optional[float] = None

cleanup() async

Release webcam resources.

Source code in inference_sdk/webrtc/sources.py
178
179
180
181
async def cleanup(self) -> None:
    """Release webcam resources."""
    if self._track:
        self._track.release()

configure_peer_connection(pc) async

Create webcam video track and add it to the peer connection.

Source code in inference_sdk/webrtc/sources.py
160
161
162
163
164
165
166
167
168
169
async def configure_peer_connection(self, pc: RTCPeerConnection) -> None:
    """Create webcam video track and add it to the peer connection."""
    # Create local video track that reads from OpenCV
    self._track = _WebcamVideoTrack(self.device_id, self.resolution)

    # Capture FPS for server
    self._declared_fps = self._track.get_declared_fps()

    # Add track to send video
    pc.addTrack(self._track)

get_initialization_params(config)

Return FPS if available.

Source code in inference_sdk/webrtc/sources.py
171
172
173
174
175
176
def get_initialization_params(self, config: "StreamConfig") -> Dict[str, Any]:
    """Return FPS if available."""
    params: Dict[str, Any] = {}
    if self._declared_fps:
        params["declared_fps"] = self._declared_fps
    return params