Skip to content

Utils

detect_image_output(workflow_output)

Detect the first available image output field in workflow output.

Source code in inference/core/interfaces/webrtc_worker/utils.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def detect_image_output(
    workflow_output: Dict[str, Union[WorkflowImageData, Any]],
) -> Optional[str]:
    """Detect the first available image output field in workflow output."""
    for output_name in workflow_output.keys():
        if (
            get_frame_from_workflow_output(
                workflow_output=workflow_output,
                frame_output_key=output_name,
            )
            is not None
        ):
            return output_name
    return None

get_cv2_rotation_code(rotation)

Get OpenCV rotation code to correct a given rotation.

Parameters:

Name Type Description Default
rotation int

Rotation angle in degrees from metadata

required

Returns:

Type Description
Optional[int]

cv2 rotation constant or None if no correction needed

Source code in inference/core/interfaces/webrtc_worker/utils.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def get_cv2_rotation_code(rotation: int) -> Optional[int]:
    """Get OpenCV rotation code to correct a given rotation.

    Args:
        rotation: Rotation angle in degrees from metadata

    Returns:
        cv2 rotation constant or None if no correction needed
    """
    # The displaymatrix rotation indicates how the video is rotated.
    # To correct it, we apply the OPPOSITE rotation.
    if rotation in (-90, 270):
        return cv.ROTATE_90_CLOCKWISE
    elif rotation in (90, -270):
        return cv.ROTATE_90_COUNTERCLOCKWISE
    elif rotation in (180, -180):
        return cv.ROTATE_180
    return None

get_video_fps(filepath)

Detect video FPS from container metadata.

Parameters:

Name Type Description Default
filepath str

Path to the video file

required

Returns:

Type Description
Optional[float]

FPS as float, or None if detection fails

Source code in inference/core/interfaces/webrtc_worker/utils.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def get_video_fps(filepath: str) -> Optional[float]:
    """Detect video FPS from container metadata.

    Args:
        filepath: Path to the video file

    Returns:
        FPS as float, or None if detection fails
    """
    import json
    import subprocess

    try:
        result = subprocess.run(
            [
                "ffprobe",
                "-v",
                "error",
                "-select_streams",
                "v:0",
                "-show_entries",
                "stream=r_frame_rate,avg_frame_rate",
                "-of",
                "json",
                filepath,
            ],
            capture_output=True,
            text=True,
            timeout=5,
        )
        if result.returncode == 0:
            data = json.loads(result.stdout)
            streams = data.get("streams", [])
            if streams:
                stream = streams[0]
                # Prefer avg_frame_rate (actual average) over r_frame_rate (container rate)
                for rate_key in ["avg_frame_rate", "r_frame_rate"]:
                    rate_str = stream.get(rate_key, "0/1")
                    if "/" in rate_str:
                        num, den = rate_str.split("/")
                        if int(den) != 0:
                            fps = int(num) / int(den)
                            if fps > 0:
                                logger.info(
                                    "Video FPS detected: %.2f from %s", fps, rate_key
                                )
                                return fps
        else:
            logger.warning("ffprobe FPS detection failed: %s", result.stderr.strip())
    except FileNotFoundError:
        logger.warning("ffprobe not available for FPS detection")
    except subprocess.TimeoutExpired:
        logger.warning("ffprobe timed out during FPS detection")
    except Exception as e:
        logger.warning("ffprobe FPS detection failed: %s", e)

    return None

get_video_rotation(filepath)

Detect video rotation from metadata (displaymatrix or rotate tag).

Parameters:

Name Type Description Default
filepath str

Path to the video file

required

Returns:

Type Description
int

Rotation in degrees (-90, 0, 90, 180, 270) or 0 if not found.

int

Negative values indicate counter-clockwise rotation.

Source code in inference/core/interfaces/webrtc_worker/utils.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def get_video_rotation(filepath: str) -> int:
    """Detect video rotation from metadata (displaymatrix or rotate tag).

    Args:
        filepath: Path to the video file

    Returns:
        Rotation in degrees (-90, 0, 90, 180, 270) or 0 if not found.
        Negative values indicate counter-clockwise rotation.
    """
    import json
    import subprocess

    try:
        # Use -show_streams which is compatible with all ffprobe versions
        result = subprocess.run(
            [
                "ffprobe",
                "-v",
                "error",
                "-select_streams",
                "v:0",
                "-show_streams",
                "-of",
                "json",
                filepath,
            ],
            capture_output=True,
            text=True,
            timeout=5,
        )
        if result.returncode == 0:
            data = json.loads(result.stdout)
            streams = data.get("streams", [])
            if streams:
                stream = streams[0]
                # Check displaymatrix side_data first
                for sd in stream.get("side_data_list", []):
                    if "rotation" in sd:
                        rotation = int(sd["rotation"])
                        logger.info("Video rotation detected: %d°", rotation)
                        return rotation
                # Fall back to rotate tag in stream tags
                rotate_str = stream.get("tags", {}).get("rotate", "0")
                rotation = int(rotate_str)
                if rotation != 0:
                    logger.info("Video rotation detected: %d°", rotation)
                    return rotation
        else:
            logger.warning("ffprobe failed: %s", result.stderr.strip())
    except FileNotFoundError:
        logger.warning("ffprobe not available")
    except subprocess.TimeoutExpired:
        logger.warning("ffprobe timed out")
    except Exception as e:
        logger.warning("ffprobe rotation detection failed: %s", e)

    return 0

parse_video_file_chunk(message)

Parse video file chunk message.

Returns: (chunk_index, total_chunks, payload)

Source code in inference/core/interfaces/webrtc_worker/utils.py
196
197
198
199
200
201
202
203
204
def parse_video_file_chunk(message: bytes) -> Tuple[int, int, bytes]:
    """Parse video file chunk message.

    Returns: (chunk_index, total_chunks, payload)
    """
    if len(message) < VIDEO_FILE_HEADER_SIZE:
        raise ValueError(f"Message too short: {len(message)} bytes")
    chunk_index, total_chunks = struct.unpack("<II", message[:8])
    return chunk_index, total_chunks, message[8:]

rotate_video_frame(frame, rotation_code)

Apply rotation to a video frame using OpenCV.

Parameters:

Name Type Description Default
frame VideoFrame

Input VideoFrame

required
rotation_code int

cv2 rotation constant (ROTATE_90_CLOCKWISE, etc.)

required

Returns:

Type Description
VideoFrame

Rotated VideoFrame

Source code in inference/core/interfaces/webrtc_worker/utils.py
382
383
384
385
386
387
388
389
390
391
392
393
394
def rotate_video_frame(frame: VideoFrame, rotation_code: int) -> VideoFrame:
    """Apply rotation to a video frame using OpenCV.

    Args:
        frame: Input VideoFrame
        rotation_code: cv2 rotation constant (ROTATE_90_CLOCKWISE, etc.)

    Returns:
        Rotated VideoFrame
    """
    img = frame.to_ndarray(format="bgr24")
    img = cv.rotate(img, rotation_code)
    return VideoFrame.from_ndarray(img, format="bgr24")