InferencePipeline on RTSP Stream¶
The Roboflow Inference Pipeline is a drop-in replacement for the Hosted Inference API that can be deployed on your own hardware. The Inference Pipeline interface is made for streaming and is likely the best route to go for real time use cases. It is an asynchronous interface that can consume many different video sources including local devices (like webcams), RTSP video streams, video files, etc. With this interface, you define the source of a video stream and sinks.
We have optimized Inference Pipeline to get maximum performance from the NVIDIA Jetson line of edge-AI devices. We have done this by specifically tailoring the drivers, libraries, and binaries specifically to its CPU and GPU architectures.
Let's begin!
Install required packages¶
In this cookbook, we'll leverage two Python packages - inference
and supervision
!pip install inference supervision==0.18.0
Imports¶
from inference.core.interfaces.stream.inference_pipeline import InferencePipeline
from inference.core.interfaces.stream.sinks import render_boxes
import supervision as sv
import pandas as pd
from collections import defaultdict
import cv2
import numpy as np
import time
Run Inference Pipeline with COCO Model Aliases & Native FPS Monitor¶
# Create an instance of FPSMonitor
fps_monitor = sv.FPSMonitor()
REGISTERED_ALIASES = {
"yolov8n-640": "coco/3",
"yolov8n-1280": "coco/9",
"yolov8m-640": "coco/8"
}
API_KEY = "API_KEY"
RTSP_STREAM = "RTSP_URL"
# Example alias
alias = "yolov8n-640"
# Function to resolve an alias to the actual model ID
def resolve_roboflow_model_alias(model_id: str) -> str:
return REGISTERED_ALIASES.get(model_id, model_id)
# Resolve the alias to get the actual model ID
model_name = resolve_roboflow_model_alias(alias)
# Modify the render_boxes function to enable displaying statistics
def on_prediction(predictions, video_frame):
render_boxes(
predictions=predictions,
video_frame=video_frame,
fps_monitor=fps_monitor, # Pass the FPS monitor object
display_statistics=True, # Enable displaying statistics
)
pipeline = InferencePipeline.init(
model_id= model_name,
video_reference=RTSP_STREAM,
on_prediction=on_prediction,
api_key=API_KEY,
confidence=0.5,
)
pipeline.start()
pipeline.join()
Time in Zone with Bytetrack using Supervision, save data to CSV¶
#ByteTrack & Supervision
tracker = sv.ByteTrack()
annotator = sv.BoxAnnotator()
frame_count = defaultdict(int)
colors = sv.ColorPalette.default()
#define polygon zone of interest
polygons = [
np.array([
[390, 543],[1162, 503],[1510, 711],[410, 819],[298, 551],[394, 543]
])
]
#create zones, zone_annotator, and box_annotator based on polygon zone of interest
zones = [
sv.PolygonZone(
polygon=polygon,
)
for polygon
in polygons
]
zone_annotators = [
sv.PolygonZoneAnnotator(
zone=zone,
color=colors.by_idx(index),
thickness=4,
text_thickness=8,
text_scale=4
)
for index, zone
in enumerate(zones)
]
box_annotators = [
sv.BoxAnnotator(
color=colors.by_idx(index),
thickness=4,
text_thickness=4,
text_scale=2
)
for index
in range(len(polygons))
]
#columns for csv output
columns = ['trackerID', 'class_id', 'frame_count','entry_timestamp','exit_timestamp','time_in_zone']
frame_count_df = pd.DataFrame(columns=columns)
# Define a dictionary to store the first detection timestamp for each tracker_id
first_detection_timestamps = {}
last_detection_timestamps = {}
def render(predictions: dict, video_frame) -> None:
detections = sv.Detections.from_inference(predictions)
detections = tracker.update_with_detections(detections)
for zone, zone_annotator, box_annotator in zip(zones, zone_annotators, box_annotators):
mask = zone.trigger(detections=detections)
detections_filtered = detections[mask]
image = box_annotator.annotate(scene=video_frame.image, detections=detections, skip_label=False)
image = zone_annotator.annotate(scene=image)
for tracker_id, class_id in zip(detections_filtered.tracker_id, detections_filtered.class_id):
frame_count[tracker_id] += 1
# Check if tracker_id is not in first_detection_timestamps, if not, add the timestamp
if tracker_id not in first_detection_timestamps:
first_detection_timestamps[tracker_id] = time.time()
last_detection_timestamps[tracker_id] = time.time()
time_difference = last_detection_timestamps[tracker_id] - first_detection_timestamps[tracker_id]
# Add data to the DataFrame
frame_count_df.loc[tracker_id] = [tracker_id, class_id, frame_count[tracker_id], first_detection_timestamps[tracker_id],last_detection_timestamps[tracker_id], time_difference]
frame_count_df.to_csv('demo.csv', index=False)
cv2.imshow("Prediction", image)
cv2.waitKey(1)
#Initialize & Deploy InferencePipeline
pipeline = InferencePipeline.init(
model_id="coco/8",
video_reference="RTSP_URL",
on_prediction=render,
api_key = 'API_KEY',
confidence=0.5,
)
pipeline.start()
pipeline.join()