Inference pipeline
InferencePipeline
¶
Source code in inference/core/interfaces/stream/inference_pipeline.py
|
|
init(video_reference, model_id, on_prediction=None, api_key=None, max_fps=None, watchdog=None, status_update_handlers=None, source_buffer_filling_strategy=None, source_buffer_consumption_strategy=None, class_agnostic_nms=None, confidence=None, iou_threshold=None, max_candidates=None, max_detections=None, mask_decode_mode='accurate', tradeoff_factor=0.0, active_learning_enabled=None, video_source_properties=None, active_learning_target_dataset=None, batch_collection_timeout=None, sink_mode=SinkMode.ADAPTIVE)
classmethod
¶
This class creates the abstraction for making inferences from Roboflow models against video stream. It allows to choose model from Roboflow platform and run predictions against video streams - just by the price of specifying which model to use and what to do with predictions.
It allows to set the model post-processing parameters (via .init() or env) and intercept updates
related to state of pipeline via PipelineWatchDog
abstraction (although that is something probably
useful only for advanced use-cases).
For maximum efficiency, all separate chunks of processing: video decoding, inference, results dispatching are handled by separate threads.
Given that reference to stream is passed and connectivity is lost - it attempts to re-connect with delay.
Since version 0.9.11 it works not only for object detection models but is also compatible with stubs, classification, instance-segmentation and keypoint-detection models.
Since version 0.9.18, InferencePipeline
is capable of handling multiple video sources at once. If multiple
sources are provided - source multiplexing will happen. One of the change introduced in that release is switch
from get_video_frames_generator(...)
as video frames provider into multiplex_videos(...)
. For a single
video source, the behaviour of InferencePipeline
is remained unchanged when default parameters are used.
For multiple videos - frames are multiplexed, and we can adjust the pipeline behaviour using new configuration
options. batch_collection_timeout
is one of the new option - it is the parameter of multiplex_videos(...)
that dictates how long the batch frames collection process may wait for all sources to provide video frame.
It can be set infinite (None) or with specific value representing fraction of second. We advise that value to
be set in production solutions to avoid processing slow-down caused by source with unstable latency spikes.
For more information on multiplexing process - please visit multiplex_videos(...)
function docs.
Another change is the way on how sinks work. They can work in SinkMode.ADAPTIVE
- which means that
video frames and predictions will be either provided to sink as list of objects, or specific elements -
and the determining factor is number of sources (it will behave SEQUENTIAL for one source and BATCH if multiple
ones are provided). All old sinks were adjusted to work in both modes, custom ones should be migrated
to reflect changes in sink function signature.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_id
|
str
|
Name and version of model on the Roboflow platform (example: "my-model/3") |
required |
video_reference
|
Union[str, int, List[Union[str, int]]]
|
Reference of source or sources to be used to make
predictions against. It can be video file path, stream URL and device (like camera) id
(we handle whatever cv2 handles). It can also be a list of references (since v0.9.18) - and then
it will trigger parallel processing of multiple sources. It has some implication on sinks. See:
|
required |
on_prediction
|
Callable[AnyPrediction, VideoFrame], None]
|
Function to be called once prediction is ready - passing both decoded frame, their metadata and dict with standard Roboflow model prediction (different for specific types of models). |
None
|
api_key
|
Optional[str]
|
Roboflow API key - if not passed - will be looked in env under "ROBOFLOW_API_KEY" and "API_KEY" variables. API key, passed in some form is required. |
None
|
max_fps
|
Optional[Union[float, int]]
|
Specific value passed as this parameter will be used to
dictate max FPS of each video source.
The implementation details of this option has been changed in release |
None
|
watchdog
|
Optional[PipelineWatchDog]
|
Implementation of class that allows profiling of inference pipeline - if not given null implementation (doing nothing) will be used. |
None
|
status_update_handlers
|
Optional[List[Callable[[StatusUpdate], None]]]
|
List of handlers to intercept status updates of all elements of the pipeline. Should be used only if detailed inspection of pipeline behaviour in time is needed. Please point out that handlers should be possible to be executed fast - otherwise they will impair pipeline performance. All errors will be logged as warnings without re-raising. Default: None. |
None
|
source_buffer_filling_strategy
|
Optional[BufferFillingStrategy]
|
Parameter dictating strategy for
video stream decoding behaviour. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
source_buffer_consumption_strategy
|
Optional[BufferConsumptionStrategy]
|
Parameter dictating strategy for
video stream frames consumption. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
class_agnostic_nms
|
Optional[bool]
|
Parameter of model post-processing. If not given - value checked in env variable "CLASS_AGNOSTIC_NMS" with default "False" |
None
|
confidence
|
Optional[float]
|
Parameter of model post-processing. If not given - value checked in env variable "CONFIDENCE" with default "0.5" |
None
|
iou_threshold
|
Optional[float]
|
Parameter of model post-processing. If not given - value checked in env variable "IOU_THRESHOLD" with default "0.5" |
None
|
max_candidates
|
Optional[int]
|
Parameter of model post-processing. If not given - value checked in env variable "MAX_CANDIDATES" with default "3000" |
None
|
max_detections
|
Optional[int]
|
Parameter of model post-processing. If not given - value checked in env variable "MAX_DETECTIONS" with default "300" |
None
|
mask_decode_mode
|
Optional[str]
|
(Optional[str]): Parameter of model post-processing. If not given - model "accurate" is used. Applicable for instance segmentation models |
'accurate'
|
tradeoff_factor
|
Optional[float]
|
Parameter of model post-processing. If not 0.0 - model default is used. Applicable for instance segmentation models |
0.0
|
active_learning_enabled
|
Optional[bool]
|
Flag to enable / disable Active Learning middleware (setting it
true does not guarantee any data to be collected, as data collection is controlled by Roboflow backend -
it just enables middleware intercepting predictions). If not given, env variable
|
None
|
video_source_properties
|
Optional[Union[Dict[str, float], List[Optional[Dict[str, float]]]]]
|
Optional source properties to set up the video source, corresponding to cv2 VideoCapture properties
cv2.CAP_PROP_*. If not given, defaults for the video source will be used.
It is optional and if provided can be provided as single dict (applicable for all sources) or
as list of configs. Then the list must be of length of |
None
|
active_learning_target_dataset
|
Optional[str]
|
Parameter to be used when Active Learning data registration should happen against different dataset than the one pointed by model_id |
None
|
batch_collection_timeout
|
Optional[float]
|
Parameter of multiplex_videos(...) dictating how long process
to grab frames from multiple sources can wait for batch to be filled before yielding already collected
frames. Please set this value in PRODUCTION to avoid performance drops when specific sources shows
unstable latency. Visit |
None
|
sink_mode
|
SinkMode
|
Parameter that controls how video frames and predictions will be passed to sink
handler. With SinkMode.SEQUENTIAL - each frame and prediction triggers separate call for sink,
in case of SinkMode.BATCH - list of frames and predictions will be provided to sink, always aligned
in the order of video sources - with None values in the place of vide_frames / predictions that
were skipped due to |
ADAPTIVE
|
Other ENV variables involved in low-level configuration: * INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching * INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop * ACTIVE_LEARNING_ENABLED - controls Active Learning middleware if explicit parameter not given
Returns: Instance of InferencePipeline
Throws
- SourceConnectionError if source cannot be connected at start, however it attempts to reconnect always if connection to stream is lost.
Source code in inference/core/interfaces/stream/inference_pipeline.py
|
|
init_with_custom_logic(video_reference, on_video_frame, on_prediction=None, on_pipeline_start=None, on_pipeline_end=None, max_fps=None, watchdog=None, status_update_handlers=None, source_buffer_filling_strategy=None, source_buffer_consumption_strategy=None, video_source_properties=None, batch_collection_timeout=None, sink_mode=SinkMode.ADAPTIVE)
classmethod
¶
This class creates the abstraction for making inferences from given workflow against video stream.
The way of how InferencePipeline
works is displayed in InferencePipeline.init(...)
initialiser
method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
video_reference
|
Union[str, int, List[Union[str, int]]]
|
Reference of source or sources to be used to make
predictions against. It can be video file path, stream URL and device (like camera) id
(we handle whatever cv2 handles). It can also be a list of references (since v0.9.18) - and then
it will trigger parallel processing of multiple sources. It has some implication on sinks. See:
|
required |
on_video_frame
|
Callable[[VideoFrame], AnyPrediction]
|
function supposed to make prediction (or do another
kind of custom processing according to your will). Accept |
required |
on_prediction
|
Callable[AnyPrediction, VideoFrame], None]
|
Function to be called
once prediction is ready - passing both decoded frame, their metadata and dict with output from your
custom callable |
None
|
on_pipeline_start
|
Optional[Callable[[], None]]
|
Optional (parameter-free) function to be called whenever pipeline starts |
None
|
on_pipeline_end
|
Optional[Callable[[], None]]
|
Optional (parameter-free) function to be called whenever pipeline ends |
None
|
max_fps
|
Optional[Union[float, int]]
|
Specific value passed as this parameter will be used to
dictate max FPS of each video source.
The implementation details of this option has been changed in release |
None
|
watchdog
|
Optional[PipelineWatchDog]
|
Implementation of class that allows profiling of inference pipeline - if not given null implementation (doing nothing) will be used. |
None
|
status_update_handlers
|
Optional[List[Callable[[StatusUpdate], None]]]
|
List of handlers to intercept status updates of all elements of the pipeline. Should be used only if detailed inspection of pipeline behaviour in time is needed. Please point out that handlers should be possible to be executed fast - otherwise they will impair pipeline performance. All errors will be logged as warnings without re-raising. Default: None. |
None
|
source_buffer_filling_strategy
|
Optional[BufferFillingStrategy]
|
Parameter dictating strategy for
video stream decoding behaviour. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
source_buffer_consumption_strategy
|
Optional[BufferConsumptionStrategy]
|
Parameter dictating strategy for
video stream frames consumption. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
video_source_properties
|
Optional[Union[Dict[str, float], List[Optional[Dict[str, float]]]]]
|
Optional source properties to set up the video source, corresponding to cv2 VideoCapture properties
cv2.CAP_PROP_*. If not given, defaults for the video source will be used.
It is optional and if provided can be provided as single dict (applicable for all sources) or
as list of configs. Then the list must be of length of |
None
|
batch_collection_timeout
|
Optional[float]
|
Parameter of multiplex_videos(...) dictating how long process
to grab frames from multiple sources can wait for batch to be filled before yielding already collected
frames. Please set this value in PRODUCTION to avoid performance drops when specific sources shows
unstable latency. Visit |
None
|
sink_mode
|
SinkMode
|
Parameter that controls how video frames and predictions will be passed to sink
handler. With SinkMode.SEQUENTIAL - each frame and prediction triggers separate call for sink,
in case of SinkMode.BATCH - list of frames and predictions will be provided to sink, always aligned
in the order of video sources - with None values in the place of vide_frames / predictions that
were skipped due to |
ADAPTIVE
|
Other ENV variables involved in low-level configuration: * INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching * INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop
Returns: Instance of InferencePipeline
Throws
- SourceConnectionError if source cannot be connected at start, however it attempts to reconnect always if connection to stream is lost.
Source code in inference/core/interfaces/stream/inference_pipeline.py
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 |
|
init_with_workflow(video_reference, workflow_specification=None, workspace_name=None, workflow_id=None, api_key=None, image_input_name='image', workflows_parameters=None, on_prediction=None, max_fps=None, watchdog=None, status_update_handlers=None, source_buffer_filling_strategy=None, source_buffer_consumption_strategy=None, video_source_properties=None, workflow_init_parameters=None, workflows_thread_pool_workers=4, cancel_thread_pool_tasks_on_exit=True, video_metadata_input_name='video_metadata', batch_collection_timeout=None, profiling_directory='./inference_profiling', use_workflow_definition_cache=True, serialize_results=False)
classmethod
¶
This class creates the abstraction for making inferences from given workflow against video stream.
The way of how InferencePipeline
works is displayed in InferencePipeline.init(...)
initializer
method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
video_reference
|
Union[str, int, List[Union[str, int]]]
|
Reference of source to be used to make predictions
against. It can be video file path, stream URL and device (like camera) id
(we handle whatever cv2 handles). It can also be a list of references (since v0.13.0) - and then
it will trigger parallel processing of multiple sources. It has some implication on sinks. See:
|
required |
workflow_specification
|
Optional[dict]
|
Valid specification of workflow. See workflow docs.
It can be provided optionally, but if not given, both |
None
|
workspace_name
|
Optional[str]
|
When using registered workflows - Roboflow workspace name needs to be given. |
None
|
workflow_id
|
Optional[str]
|
When using registered workflows - Roboflow workflow id needs to be given. |
None
|
api_key
|
Optional[str]
|
Roboflow API key - if not passed - will be looked in env under "ROBOFLOW_API_KEY" and "API_KEY" variables. API key, passed in some form is required. |
None
|
image_input_name
|
str
|
Name of input image defined in |
'image'
|
workflows_parameters
|
Optional[Dict[str, Any]]
|
Dictionary with additional parameters that can be
defined within |
None
|
on_prediction
|
Callable[AnyPrediction, VideoFrame], None]
|
Function to be called once prediction is ready - passing both decoded frame, their metadata and dict with workflow output. |
None
|
max_fps
|
Optional[Union[float, int]]
|
Specific value passed as this parameter will be used to
dictate max FPS of each video source.
The implementation details of this option has been changed in release |
None
|
watchdog
|
Optional[PipelineWatchDog]
|
Implementation of class that allows profiling of inference pipeline - if not given null implementation (doing nothing) will be used. |
None
|
status_update_handlers
|
Optional[List[Callable[[StatusUpdate], None]]]
|
List of handlers to intercept status updates of all elements of the pipeline. Should be used only if detailed inspection of pipeline behaviour in time is needed. Please point out that handlers should be possible to be executed fast - otherwise they will impair pipeline performance. All errors will be logged as warnings without re-raising. Default: None. |
None
|
source_buffer_filling_strategy
|
Optional[BufferFillingStrategy]
|
Parameter dictating strategy for
video stream decoding behaviour. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
source_buffer_consumption_strategy
|
Optional[BufferConsumptionStrategy]
|
Parameter dictating strategy for
video stream frames consumption. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
video_source_properties
|
Optional[dict[str, float]]
|
Optional source properties to set up the video source, corresponding to cv2 VideoCapture properties cv2.CAP_PROP_*. If not given, defaults for the video source will be used. Example valid properties are: {"frame_width": 1920, "frame_height": 1080, "fps": 30.0} |
None
|
workflow_init_parameters
|
Optional[Dict[str, Any]]
|
Additional init parameters to be used by workflows Execution Engine to init steps of your workflow - may be required when running workflows with custom plugins. |
None
|
workflows_thread_pool_workers
|
int
|
Number of workers for workflows thread pool which is used by workflows blocks to run background tasks. |
4
|
cancel_thread_pool_tasks_on_exit
|
bool
|
Flag to decide if unstated background tasks should be canceled at the end of InferencePipeline processing. By default, when video file ends or pipeline is stopped, tasks that has not started will be cancelled. |
True
|
video_metadata_input_name
|
str
|
Name of input for video metadata defined in |
'video_metadata'
|
batch_collection_timeout
|
Optional[float]
|
Parameter of multiplex_videos(...) dictating how long process
to grab frames from multiple sources can wait for batch to be filled before yielding already collected
frames. Please set this value in PRODUCTION to avoid performance drops when specific sources shows
unstable latency. Visit |
None
|
profiling_directory
|
str
|
Directory where workflows profiler traces will be dumped. To enable profiling
export |
'./inference_profiling'
|
use_workflow_definition_cache
|
bool
|
Controls usage of cache for workflow definitions. Set this to False when you frequently modify definition saved in Roboflow app and want to fetch the newest version for the request. Only applies for Workflows definitions saved on Roboflow platform. |
True
|
serialize_results
|
bool
|
Boolean flag to decide if ExecutionEngine run should serialize workflow results for each frame. If that is set true, sinks will receive serialized workflow responses. |
False
|
Other ENV variables involved in low-level configuration: * INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching * INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop
Returns: Instance of InferencePipeline
Throws
- SourceConnectionError if source cannot be connected at start, however it attempts to reconnect always if connection to stream is lost.
- ValueError if workflow specification not provided and registered workflow not pointed out
- NotImplementedError if workflow used against multiple videos which is not supported yet
- MissingApiKeyError - if API key is not provided in situation when retrieving workflow definition from Roboflow API is needed
Source code in inference/core/interfaces/stream/inference_pipeline.py
|
|
init_with_yolo_world(video_reference, classes, model_size='s', on_prediction=None, max_fps=None, watchdog=None, status_update_handlers=None, source_buffer_filling_strategy=None, source_buffer_consumption_strategy=None, class_agnostic_nms=None, confidence=None, iou_threshold=None, max_candidates=None, max_detections=None, video_source_properties=None, batch_collection_timeout=None, sink_mode=SinkMode.ADAPTIVE)
classmethod
¶
This class creates the abstraction for making inferences from YoloWorld against video stream.
The way of how InferencePipeline
works is displayed in InferencePipeline.init(...)
initializer
method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
video_reference
|
Union[str, int, List[Union[str, int]]]
|
Reference of source or sources to be used to make
predictions against. It can be video file path, stream URL and device (like camera) id
(we handle whatever cv2 handles). It can also be a list of references (since v0.9.18) - and then
it will trigger parallel processing of multiple sources. It has some implication on sinks. See:
|
required |
classes
|
List[str]
|
List of classes to execute zero-shot detection against |
required |
model_size
|
str
|
version of model - to be chosen from |
's'
|
on_prediction
|
Callable[AnyPrediction, VideoFrame], None]
|
Function to be called once prediction is ready - passing both decoded frame, their metadata and dict with standard Roboflow Object Detection prediction. |
None
|
max_fps
|
Optional[Union[float, int]]
|
Specific value passed as this parameter will be used to
dictate max FPS of each video source.
The implementation details of this option has been changed in release |
None
|
watchdog
|
Optional[PipelineWatchDog]
|
Implementation of class that allows profiling of inference pipeline - if not given null implementation (doing nothing) will be used. |
None
|
status_update_handlers
|
Optional[List[Callable[[StatusUpdate], None]]]
|
List of handlers to intercept status updates of all elements of the pipeline. Should be used only if detailed inspection of pipeline behaviour in time is needed. Please point out that handlers should be possible to be executed fast - otherwise they will impair pipeline performance. All errors will be logged as warnings without re-raising. Default: None. |
None
|
source_buffer_filling_strategy
|
Optional[BufferFillingStrategy]
|
Parameter dictating strategy for
video stream decoding behaviour. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
source_buffer_consumption_strategy
|
Optional[BufferConsumptionStrategy]
|
Parameter dictating strategy for
video stream frames consumption. By default - tweaked to the type of source given.
Please find detailed explanation in docs of |
None
|
class_agnostic_nms
|
Optional[bool]
|
Parameter of model post-processing. If not given - value checked in env variable "CLASS_AGNOSTIC_NMS" with default "False" |
None
|
confidence
|
Optional[float]
|
Parameter of model post-processing. If not given - value checked in env variable "CONFIDENCE" with default "0.5" |
None
|
iou_threshold
|
Optional[float]
|
Parameter of model post-processing. If not given - value checked in env variable "IOU_THRESHOLD" with default "0.5" |
None
|
max_candidates
|
Optional[int]
|
Parameter of model post-processing. If not given - value checked in env variable "MAX_CANDIDATES" with default "3000" |
None
|
max_detections
|
Optional[int]
|
Parameter of model post-processing. If not given - value checked in env variable "MAX_DETECTIONS" with default "300" |
None
|
video_source_properties
|
Optional[Union[Dict[str, float], List[Optional[Dict[str, float]]]]]
|
Optional source properties to set up the video source, corresponding to cv2 VideoCapture properties
cv2.CAP_PROP_*. If not given, defaults for the video source will be used.
It is optional and if provided can be provided as single dict (applicable for all sources) or
as list of configs. Then the list must be of length of |
None
|
batch_collection_timeout
|
Optional[float]
|
Parameter of multiplex_videos(...) dictating how long process
to grab frames from multiple sources can wait for batch to be filled before yielding already collected
frames. Please set this value in PRODUCTION to avoid performance drops when specific sources shows
unstable latency. Visit |
None
|
sink_mode
|
SinkMode
|
Parameter that controls how video frames and predictions will be passed to sink
handler. With SinkMode.SEQUENTIAL - each frame and prediction triggers separate call for sink,
in case of SinkMode.BATCH - list of frames and predictions will be provided to sink, always aligned
in the order of video sources - with None values in the place of vide_frames / predictions that
were skipped due to |
ADAPTIVE
|
Other ENV variables involved in low-level configuration: * INFERENCE_PIPELINE_PREDICTIONS_QUEUE_SIZE - size of buffer for predictions that are ready for dispatching * INFERENCE_PIPELINE_RESTART_ATTEMPT_DELAY - delay for restarts on stream connection drop
Returns: Instance of InferencePipeline
Throws
- SourceConnectionError if source cannot be connected at start, however it attempts to reconnect always if connection to stream is lost.
Source code in inference/core/interfaces/stream/inference_pipeline.py
|
|