Skip to content

Infer

get_batch(redis, model_names)

Run a heuristic to select the best batch to infer on redis[Redis]: redis client model_names[List[str]]: list of models with nonzero number of requests returns: Tuple[List[Dict], str] List[Dict] represents a batch of request dicts str is the model id

Source code in inference/enterprise/parallel/infer.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_batch(redis: Redis, model_names: List[str]) -> Tuple[List[Dict], str]:
    """
    Run a heuristic to select the best batch to infer on
    redis[Redis]: redis client
    model_names[List[str]]: list of models with nonzero number of requests
    returns:
        Tuple[List[Dict], str]
        List[Dict] represents a batch of request dicts
        str is the model id
    """
    batch_sizes = [
        RoboflowInferenceModel.model_metadata_from_memcache_endpoint(m)["batch_size"]
        for m in model_names
    ]
    batch_sizes = [b if not isinstance(b, str) else BATCH_SIZE for b in batch_sizes]
    batches = [
        redis.zrange(f"infer:{m}", 0, b - 1, withscores=True)
        for m, b in zip(model_names, batch_sizes)
    ]
    model_index = select_best_inference_batch(batches, batch_sizes)
    batch = batches[model_index]
    selected_model = model_names[model_index]
    redis.zrem(f"infer:{selected_model}", *[b[0] for b in batch])
    redis.hincrby(f"requests", selected_model, -len(batch))
    batch = [orjson.loads(b[0]) for b in batch]
    return batch, selected_model

write_infer_arrays_and_launch_postprocess(arrs, request, preproc_return_metadata)

Write inference results to shared memory and launch the postprocessing task

Source code in inference/enterprise/parallel/infer.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def write_infer_arrays_and_launch_postprocess(
    arrs: Tuple[np.ndarray, ...],
    request: InferenceRequest,
    preproc_return_metadata: Dict,
):
    """Write inference results to shared memory and launch the postprocessing task"""
    shms = [shared_memory.SharedMemory(create=True, size=arr.nbytes) for arr in arrs]
    with shm_manager(*shms):
        shm_metadatas = []
        for arr, shm in zip(arrs, shms):
            shared = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
            shared[:] = arr[:]
            shm_metadata = SharedMemoryMetadata(
                shm_name=shm.name, array_shape=arr.shape, array_dtype=arr.dtype.name
            )
            shm_metadatas.append(asdict(shm_metadata))

        postprocess.s(
            tuple(shm_metadatas), request.dict(), preproc_return_metadata
        ).delay()