Data representations in Workflows¶
Many frameworks enforce standard data types for developers to work with, and the Workflows ecosystem is no
exception. While the kind in a Workflow represents a high-level abstraction of the data
being passed through, it's important to understand the specific data types that will be provided to the
WorkflowBlock.run(...)
method when building Workflow blocks.
And this is exactly what you will learn here.
Batch
¶
When a Workflow block declares batch processing, it uses a special container type called Batch.
All batch-oriented parameters are wrapped with Batch[X]
, where X is the data type:
from inference.core.workflows.execution_engine.entities.base import Batch
from inference.core.workflows.prototypes.block import BlockResult
# run method of Workflow block
def run(self, x: Batch[int], y: Batch[float]) -> BlockResult:
pass
The Batch
type functions similarly to a Python list, with one key difference: it is read-only. You cannot modify
its elements, nor can you add or remove elements. However, several useful operations are available:
Iteration through elements¶
from inference.core.workflows.execution_engine.entities.base import Batch
def iterate(batch: Batch[int]) -> None:
for element in batch:
print(element)
Zipping multiple batches¶
Do not worry about batches alignment
The Execution Engine ensures that batches provided to the run method are of equal size, preventing any loss of elements due to unequal batch sizes during iteration.
from inference.core.workflows.execution_engine.entities.base import Batch
def zip_batches(batch_1: Batch[int], batch_2: Batch[int]) -> None:
for element_1, element_2 in zip(batch_1, batch_2):
print(element_1, element_2)
Getting batch element indices¶
This returns a list of tuples where each tuple represents the position of the batch element in potentially nested batch structures.
from inference.core.workflows.execution_engine.entities.base import Batch
def discover_indices(batch: Batch[int]) -> None:
for index in batch.indices:
print(index) # e.g., (0,) for 1D batch, (1, 3) for 2D nested batch, etc.
Iterating while retrieving both elements and their indices¶
from inference.core.workflows.execution_engine.entities.base import Batch
def iterate_with_indices(batch: Batch[int]) -> None:
for index, element in batch.iter_with_indices():
print(index, element)
Additional methods of Batch
container¶
There are other methods in the Batch interface, such as remove_by_indices(...)
or broadcast(...)
, but these
are not intended for use within Workflow blocks. These methods are primarily used
by the Execution Engine when providing data to the block.
WorkflowImageData
¶
WorkflowImageData
is a dataclass that encapsulates an image along with its metadata, providing useful methods to
manipulate the image representation within a Workflow block.
Some users may expect an np.ndarray
to be provided directly by the Execution Engine when an image
kind is declared.
While that could be a convenient and straightforward approach, it introduces limitations, such as:
-
Lack of metadata: With only an
np.ndarray
, there's no way to attach metadata such as data lineage or the image's location within the original file (e.g., when working with cropped images). -
Inability to cache multiple representations: If multiple blocks need to serialize and send the image over HTTP, WorkflowImageData allows caching of different image representations, such as base64-encoded versions, improving efficiency.
Video Metadata
Since Execution Enginge v1.2.0
, we have added video_metadata
into WorkflowImageData
. This
object is supposed to hold the context of video processing and will only be relevant for video processing
blocks. Other blocks may ignore it's existance if not creating output image (covered in the next section).
Operating on WorkflowImageData
is fairly simple once you understand its interface. Here are some of the key
methods and properties:
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
def operate_on_image(workflow_image: WorkflowImageData) -> None:
# Getting an np.ndarray representing the image.
numpy_image = workflow_image.numpy_image
# Getting base64-encoded JPEG representation of the image, ideal for API transmission.
base64_image: str = workflow_image.base64_image
# Converting the image into a format compatible with inference models.
inference_format = workflow_image.to_inference_format()
# Accesses metadata related to the parent image
parent_metadata = workflow_image.parent_metadata
print(parent_metadata.parent_id) # parent identifier
origin_coordinates = parent_metadata.origin_coordinates # optional object with coordinates
print(
origin_coordinates.left_top_x, origin_coordinates.left_top_y,
origin_coordinates.origin_width, origin_coordinates.origin_height,
)
# or the same for root metadata (the oldest ancestor of the image - Workflow input image)
root_metadata = workflow_image.workflow_root_ancestor_metadata
# retrieving `VideoMetadata` object - see the usage guide section below
# if `workflow_image` is not provided with `VideoMetadata` - default metadata object will
# be created on accessing the property
video_metadata = workflow_image.video_metadata
Below you can find an example showcasing how to preserve metadata, while transforming image
import numpy as np
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
def transform_image(image: WorkflowImageData) -> WorkflowImageData:
transformed_image = some_transformation(image.numpy_image)
# `WorkflowImageData` exposes helper method to return a new object with
# updated image, but with preserved metadata. Metadata preservation
# should only be used when the output image is compatible regarding
# data lineage (the predecessor-successor relation for images).
# Lineage is not preserved for cropping and merging images (without common predecessor)
# - below you may find implementation tips.
return WorkflowImageData.copy_and_replace(
origin_image_data=image,
numpy_image=transformed_image,
)
def some_transformation(image: np.ndarray) -> np.ndarray:
...
Images cropping
When your block increases dimensionality and provides output with image
kind - usually that means cropping the
image. In such cases input image video_metadata
is to be removed (as usually it does not make sense to
keep them, as underlying video processing blocks will not work correctly when for dynamically created blocks).
Below you can find scratch of implementation for that operation:
from typing import List, Tuple
from dataclasses import replace
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
def crop_images(
image: WorkflowImageData,
crops: List[Tuple[str, int, int, int, int]],
) -> List[WorkflowImageData]:
crops = []
original_image = image.numpy_image
for crop_id, x_min, y_min, x_max, y_max in crops:
cropped_image = original_image[y_min:y_max, x_min:x_max]
if not cropped_image.size:
# discarding empty crops
continue
result_crop = WorkflowImageData.create_crop(
origin_image_data=image,
crop_identifier=crop_id,
cropped_image=cropped_image,
offset_x=x_min,
offset_y=y_min,
)
crops.append(result_crop)
return crops
In some cases you may want to preserve video_metadata
. Example of such situation is when
your block produces crops based on fixed coordinates (like video single footage with multiple fixed Regions of
Interest to be applied individual trackers) - then you want result crops to be processed in context of video,
as if they were produced by separate cameras. To adjust behaviour of create_crop(...)
method, simply add
preserve_video_metadata=True
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Merging images without common predecessor
If common parent_metadata
cannot be pointed for multiple images you try to merge, you should denote that
"a new" image appears in the Workflow. To do it simply:
from typing import List, Tuple
from dataclasses import replace
from inference.core.workflows.execution_engine.entities.base import \
WorkflowImageData, ImageParentMetadata
def merge_images(image_1: WorkflowImageData, image_2: WorkflowImageData) -> WorkflowImageData:
merged_image = some_mergin_operation(
image_1=image_1.numpy_image,
image_2=image_2.numpy_image
)
new_parent_metadata = ImageParentMetadata(
# this is just one of the option for creating id, yet sensible one
parent_id=f"{image_1.parent_metadata.parent_id} + {image_2.parent_metadata.parent_id}"
)
return WorkflowImageData(
parent_metadata=new_parent_metadata,
numpy_image=merged_imagem
)
VideoMetadata
¶
Deprecation
video_metadata
kind is deprecated - we advise not using that kind in new
blocks. VideoMetadata
data representation became a member of WorkflowImageData
in Execution Engine v1.2.0
(inference
release v0.23.0
)
VideoMetadata
is a dataclass that provides the following metadata about video frame and video source:
from inference.core.workflows.execution_engine.entities.base import VideoMetadata
def inspect_vide_metadata(video_metadata: VideoMetadata) -> None:
# Identifier string for video. To be treated as opaque.
print(video_metadata.video_identifier)
# Sequential number of the frame
print(video_metadata.frame_number)
# The timestamp of video frame. When processing video it is suggested that "
# "blocks rely on `fps` and `frame_number`, as real-world time-elapse will not "
# "match time-elapse in video file
print(video_metadata.frame_timestamp)
# Field represents FPS value (if possible to be retrieved) (optional)
print(video_metadata.fps)
# Field represents measured FPS of live stream (optional)
print(video_metadata.measured_fps)
# Field is a flag telling if frame comes from video file or stream.
# If not possible to be determined - None
print(video_metadata.comes_from_video_file)