Skip to content

inference_cli API Reference

lib

Internal adapters for Docker container management, benchmarking, cloud deployment, and inference execution.

inference_cli.lib.container_adapter

Functions:

terminate_running_containers

terminate_running_containers(
    containers, interactive_mode=True
)

Parameters:

Name Type Description Default
containers List[Container]

List of containers to handle

required
interactive_mode bool

Flag to determine if user prompt should decide on container termination

True

boolean value that informs if there are containers that have not received SIGKILL

Type Description
bool

as a result of procedure.

Source code in inference_cli/lib/container_adapter.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def terminate_running_containers(
    containers: List[Container], interactive_mode: bool = True
) -> bool:
    """
    Args:
        containers (List[Container]): List of containers to handle
        interactive_mode (bool): Flag to determine if user prompt should decide on container termination

    Returns: boolean value that informs if there are containers that have not received SIGKILL
        as a result of procedure.
    """
    running_inference_containers = [
        c for c in containers if is_container_running(container=c)
    ]
    containers_to_kill = running_inference_containers
    if interactive_mode:
        containers_to_kill = [
            c for c in running_inference_containers if ask_user_to_kill_container(c)
        ]
    kill_containers(containers=containers_to_kill)
    return len(containers_to_kill) < len(running_inference_containers)

lib/enterprise/inference_compiler/core/compilation_handlers

inference_cli.lib.enterprise.inference_compiler.core.compilation_handlers.engine_builder

Classes

EngineBuilder

Parses an ONNX graph and builds a TensorRT engine from it.

Source code in inference_cli/lib/enterprise/inference_compiler/core/compilation_handlers/engine_builder.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class EngineBuilder:
    """
    Parses an ONNX graph and builds a TensorRT engine from it.
    """

    def __init__(
        self,
        workspace: int,
    ):
        self.trt_logger = InferenceTRTLogger()
        trt.init_libnvinfer_plugins(self.trt_logger, namespace="")
        self.builder = trt.Builder(self.trt_logger)
        self.config = self.builder.create_builder_config()
        self.config.set_memory_pool_limit(
            trt.MemoryPoolType.WORKSPACE, workspace * (2**30)
        )
        self.network: Optional[trt.tensorrt.INetworkDefinition] = None
        self.parser: Optional[trt.OnnxParser] = None
        self.cache_manager: Optional[TimingCacheManager] = None

    def set_timing_cache_manager(self, cache_manager: TimingCacheManager) -> None:
        self.cache_manager = cache_manager

    def create_network(self, onnx_path: str) -> None:
        """
        Parse the ONNX graph and create the corresponding TensorRT network definition.
        :param onnx_path: The path to the ONNX graph to load.
        """
        self.network = self.builder.create_network(0)
        self.parser = trt.OnnxParser(self.network, self.trt_logger)

        onnx_path = os.path.realpath(onnx_path)
        with open(onnx_path, "rb") as f:
            if not self.parser.parse(f.read()):
                LOGGER.error("Failed to load ONNX file: {}".format(onnx_path))
                for error in range(self.parser.num_errors):
                    LOGGER.error(self.parser.get_error(error))
                raise NetworkParsingError("Could not parse ONNX file")

        network_inputs = [
            self.network.get_input(i) for i in range(self.network.num_inputs)
        ]
        network_outputs = [
            self.network.get_output(i) for i in range(self.network.num_outputs)
        ]
        LOGGER.info("Network Description")
        for network_input in network_inputs:
            LOGGER.info(
                "Input '{}' with shape {} and dtype {}".format(
                    network_input.name, network_input.shape, network_input.dtype
                )
            )
        for network_output in network_outputs:
            LOGGER.info(
                "Output '{}' with shape {} and dtype {}".format(
                    network_output.name, network_output.shape, network_output.dtype
                )
            )

    def get_static_batch_size_of_input(self) -> int:
        network_input = self._get_image_input()
        try:
            return int(network_input.shape[0])
        except ValueError as error:
            raise InvalidNetworkInputsError(
                f"Expected the input to have static batch size, detected shape: {network_input.shape}"
            ) from error

    def create_engine(
        self,
        engine_path: str,
        precision: Literal["fp32", "fp16"],
        input_size: Tuple[int, int],
        dynamic_batch_sizes: Optional[Tuple[int, int, int]] = None,
        trt_version_compatible: bool = False,
        same_compute_compatibility: bool = False,
    ) -> None:
        if self.cache_manager:
            cache_bytes = self.cache_manager.get_cache_for_features()
            cache = self.config.create_timing_cache(cache_bytes)
            self.config.set_timing_cache(cache, ignore_mismatch=False)
        engine_path = os.path.abspath(engine_path)
        engine_dir = os.path.dirname(engine_path)
        os.makedirs(engine_dir, exist_ok=True)
        LOGGER.info("Building {} Engine in {}".format(precision, engine_path))
        network_input = self._get_image_input()
        input_name = network_input.name
        if precision == "fp16":
            if not self.builder.platform_has_fast_fp16:
                raise QuantizationNotSupportedError("FP16 quantization not supported")
            self.config.set_flag(trt.BuilderFlag.FP16)
        if trt_version_compatible:
            self.config.set_flag(trt.BuilderFlag.VERSION_COMPATIBLE)
        if same_compute_compatibility:
            self.config.hardware_compatibility_level = (
                trt.HardwareCompatibilityLevel.SAME_COMPUTE_CAPABILITY
            )
        profile = self.builder.create_optimization_profile()
        if dynamic_batch_sizes:
            bs_min, bs_opt, bs_max = dynamic_batch_sizes
            h, w = input_size
            profile.set_shape(
                input_name, (bs_min, 3, h, w), (bs_opt, 3, h, w), (bs_max, 3, h, w)
            )
        self.config.add_optimization_profile(profile)
        engine_bytes = self.builder.build_serialized_network(self.network, self.config)
        if engine_bytes is None:
            raise TRTModelCompilationError("Failed to create TRT engine")
        with open(engine_path, "wb") as f:
            LOGGER.info("Serializing engine to file: {:}".format(engine_path))
            f.write(engine_bytes)
        if self.cache_manager:
            cache = self.config.get_timing_cache()
            self.cache_manager.save_cache_for_features(cache=cache.serialize())

    def _get_image_input(self) -> trt.ITensor:
        if self.network is None:
            raise TRTModelCompilationError(
                "Attempted to get network input before parsing the model"
            )
        network_inputs = [
            self.network.get_input(i) for i in range(self.network.num_inputs)
        ]
        if len(network_inputs) != 1:
            raise InvalidNetworkInputsError("Detected network with multiple inputs")
        return network_inputs[0]
Methods:
create_network
create_network(onnx_path)

Parse the ONNX graph and create the corresponding TensorRT network definition. :param onnx_path: The path to the ONNX graph to load.

Source code in inference_cli/lib/enterprise/inference_compiler/core/compilation_handlers/engine_builder.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_network(self, onnx_path: str) -> None:
    """
    Parse the ONNX graph and create the corresponding TensorRT network definition.
    :param onnx_path: The path to the ONNX graph to load.
    """
    self.network = self.builder.create_network(0)
    self.parser = trt.OnnxParser(self.network, self.trt_logger)

    onnx_path = os.path.realpath(onnx_path)
    with open(onnx_path, "rb") as f:
        if not self.parser.parse(f.read()):
            LOGGER.error("Failed to load ONNX file: {}".format(onnx_path))
            for error in range(self.parser.num_errors):
                LOGGER.error(self.parser.get_error(error))
            raise NetworkParsingError("Could not parse ONNX file")

    network_inputs = [
        self.network.get_input(i) for i in range(self.network.num_inputs)
    ]
    network_outputs = [
        self.network.get_output(i) for i in range(self.network.num_outputs)
    ]
    LOGGER.info("Network Description")
    for network_input in network_inputs:
        LOGGER.info(
            "Input '{}' with shape {} and dtype {}".format(
                network_input.name, network_input.shape, network_input.dtype
            )
        )
    for network_output in network_outputs:
        LOGGER.info(
            "Output '{}' with shape {} and dtype {}".format(
                network_output.name, network_output.shape, network_output.dtype
            )
        )

lib/roboflow_cloud/data_staging

Data staging operations for uploading and managing data in the Roboflow cloud.

inference_cli.lib.roboflow_cloud.data_staging.api_operations

Functions:

create_images_batch_from_cloud_storage

create_images_batch_from_cloud_storage(
    bucket_path,
    batch_id,
    api_key,
    batch_name=None,
    ingest_id=None,
    notifications_url=None,
    notification_categories=None,
    presign_expiration_seconds=86400,
)

Create image batch from cloud storage by generating presigned URLs.

Parameters:

Name Type Description Default
bucket_path str

Cloud path with optional glob pattern (e.g., 's3://bucket/*/.jpg')

required
batch_id str

Batch identifier

required
api_key str

Roboflow API key

required
presign_expiration_seconds int

Presigned URL expiration time (default: 24 hours)

86400

Internally calls trigger_images_references_ingest with generated presigned URLs.

Source code in inference_cli/lib/roboflow_cloud/data_staging/api_operations.py
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
def create_images_batch_from_cloud_storage(
    bucket_path: str,
    batch_id: str,
    api_key: str,
    batch_name: Optional[str] = None,
    ingest_id: Optional[str] = None,
    notifications_url: Optional[str] = None,
    notification_categories: Optional[List[str]] = None,
    presign_expiration_seconds: int = 86400,
) -> None:
    """
    Create image batch from cloud storage by generating presigned URLs.

    Args:
        bucket_path: Cloud path with optional glob pattern (e.g., 's3://bucket/**/*.jpg')
        batch_id: Batch identifier
        api_key: Roboflow API key
        presign_expiration_seconds: Presigned URL expiration time (default: 24 hours)

    Internally calls trigger_images_references_ingest with generated presigned URLs.
    """
    try:
        import fsspec
    except ImportError:
        raise ImportError(
            "Cloud storage support requires additional dependencies. "
            "Install with: pip install 'inference-cli[cloud-storage]'"
        )

    base_path, glob_pattern = _parse_bucket_path(bucket_path)
    protocol = base_path.split("://")[0]
    fs = fsspec.filesystem(protocol, **_get_fs_kwargs(protocol))

    # Stream and filter image files with progress
    image_files_generator = _list_and_filter_files_streaming(
        fs, base_path, glob_pattern, IMAGES_EXTENSIONS
    )

    # Generate presigned URLs in parallel (consumes generator and shows progress)
    references = _generate_presigned_urls_parallel(
        fs, image_files_generator, base_path, presign_expiration_seconds
    )

    if len(references) == 0:
        pattern_desc = glob_pattern if glob_pattern else "all image files"
        raise ValueError(
            f"No image files found matching pattern: {pattern_desc} in {base_path}\n"
            f"Supported extensions: {', '.join(IMAGES_EXTENSIONS)}\n"
            f"Note: If you're getting connection errors, check your cloud credentials and network access."
        )

    if len(references) > MAX_IMAGE_REFERENCES_IN_INGEST_REQUEST:
        num_chunks = (
            len(references) + MAX_IMAGE_REFERENCES_IN_INGEST_REQUEST - 1
        ) // MAX_IMAGE_REFERENCES_IN_INGEST_REQUEST
        print(
            f"Files will be split into {num_chunks} chunks of up to {MAX_IMAGE_REFERENCES_IN_INGEST_REQUEST} files each"
        )

    workspace = get_workspace(api_key=api_key)

    # Split into batches if needed
    ingest_parts = list(
        create_batches(
            sequence=references, batch_size=MAX_IMAGE_REFERENCES_IN_INGEST_REQUEST
        )
    )
    if len(ingest_parts) > 1:
        print(
            f"Your ingest exceeds {MAX_IMAGE_REFERENCES_IN_INGEST_REQUEST} files - we split the ingest "
            f"into {len(ingest_parts)} chunks."
        )

    # Trigger ingest for each batch
    for batch_references in ingest_parts:
        response = trigger_images_references_ingest(
            workspace=workspace,
            batch_id=batch_id,
            references=batch_references,
            api_key=api_key,
            ingest_id=ingest_id,
            batch_name=batch_name,
            notifications_url=notifications_url,
            notification_categories=notification_categories,
        )
        print(f"Ingest triggered. Ingest ID: {response.ingest_id}")

    if notifications_url:
        print(f"Monitor updates that will be sent to: {notifications_url}")
        print(
            f"You can also use `inference rf-cloud data-staging list-ingest-details --batch-id {batch_id}` command "
            f"to check progress."
        )
    else:
        print(
            f"Use `inference rf-cloud data-staging list-ingest-details --batch-id {batch_id}` "
            "command to watch the ingest progress. If you want automated updates - use `--notifications-url` option "
            "of this command."
        )

create_videos_batch_from_cloud_storage

create_videos_batch_from_cloud_storage(
    bucket_path,
    batch_id,
    api_key,
    batch_name=None,
    ingest_id=None,
    notifications_url=None,
    notification_categories=None,
    presign_expiration_seconds=86400,
)

Create video batch from cloud storage by generating presigned URLs.

Parameters:

Name Type Description Default
bucket_path str

Cloud path with optional glob pattern (e.g., 's3://bucket/*/.mp4')

required
batch_id str

Batch identifier

required
api_key str

Roboflow API key

required
presign_expiration_seconds int

Presigned URL expiration time (default: 24 hours)

86400

Internally calls trigger_videos_references_ingest with generated presigned URLs.

Source code in inference_cli/lib/roboflow_cloud/data_staging/api_operations.py
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
def create_videos_batch_from_cloud_storage(
    bucket_path: str,
    batch_id: str,
    api_key: str,
    batch_name: Optional[str] = None,
    ingest_id: Optional[str] = None,
    notifications_url: Optional[str] = None,
    notification_categories: Optional[List[str]] = None,
    presign_expiration_seconds: int = 86400,
) -> None:
    """
    Create video batch from cloud storage by generating presigned URLs.

    Args:
        bucket_path: Cloud path with optional glob pattern (e.g., 's3://bucket/**/*.mp4')
        batch_id: Batch identifier
        api_key: Roboflow API key
        presign_expiration_seconds: Presigned URL expiration time (default: 24 hours)

    Internally calls trigger_videos_references_ingest with generated presigned URLs.
    """
    try:
        import fsspec
    except ImportError:
        raise ImportError(
            "Cloud storage support requires additional dependencies. "
            "Install with: pip install 'inference-cli[cloud-storage]'"
        )

    base_path, glob_pattern = _parse_bucket_path(bucket_path)
    protocol = base_path.split("://")[0]
    fs = fsspec.filesystem(protocol, **_get_fs_kwargs(protocol))

    # Stream and filter video files with progress
    video_files_generator = _list_and_filter_files_streaming(
        fs, base_path, glob_pattern, VIDEOS_EXTENSIONS
    )

    # Generate presigned URLs in parallel (consumes generator and shows progress)
    references = _generate_presigned_urls_parallel(
        fs, video_files_generator, base_path, presign_expiration_seconds
    )

    if len(references) == 0:
        pattern_desc = glob_pattern if glob_pattern else "all video files"
        raise ValueError(
            f"No video files found matching pattern: {pattern_desc} in {base_path}\n"
            f"Supported extensions: {', '.join(VIDEOS_EXTENSIONS)}\n"
            f"Note: If you're getting connection errors, check your cloud credentials and network access."
        )

    print(f"Found {len(references)} video files")
    if len(references) > SUGGESTED_MAX_VIDEOS_IN_BATCH:
        print(
            f"Warning: Found {len(references)} videos. Suggested max is {SUGGESTED_MAX_VIDEOS_IN_BATCH} videos per batch."
        )

    workspace = get_workspace(api_key=api_key)

    # Trigger ingest directly with the list of references
    response = trigger_videos_references_ingest(
        workspace=workspace,
        batch_id=batch_id,
        references=references,
        api_key=api_key,
        ingest_id=ingest_id,
        batch_name=batch_name,
        notifications_url=notifications_url,
        notification_categories=notification_categories,
    )
    print(f"Ingest triggered. Ingest ID: {response.ingest_id}")

    if notifications_url:
        print(f"Monitor updates that will be sent to: {notifications_url}")
        print(
            f"You can also use `inference rf-cloud data-staging list-ingest-details --batch-id {batch_id}` command "
            f"to check progress."
        )
    else:
        print(
            f"Use `inference rf-cloud data-staging list-ingest-details --batch-id {batch_id}` "
            "command to watch the ingest progress. If you want automated updates - use `--notifications-url` option "
            "of this command."
        )

lib/workflows

Workflow execution adapters for local images, remote images, and video sources.

inference_cli.lib.workflows.local_image_adapter