Skip to content

segment_anything2

SegmentAnything2

Bases: RoboflowCoreModel

SegmentAnything class for handling segmentation tasks.

Attributes:

Name Type Description
sam

The segmentation model.

predictor

The predictor for the segmentation model.

ort_session

ONNX runtime inference session.

embedding_cache

Cache for embeddings.

image_size_cache

Cache for image sizes.

embedding_cache_keys

Keys for the embedding cache.

Source code in inference/models/sam2/segment_anything2.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class SegmentAnything2(RoboflowCoreModel):
    """SegmentAnything class for handling segmentation tasks.

    Attributes:
        sam: The segmentation model.
        predictor: The predictor for the segmentation model.
        ort_session: ONNX runtime inference session.
        embedding_cache: Cache for embeddings.
        image_size_cache: Cache for image sizes.
        embedding_cache_keys: Keys for the embedding cache.

    """

    def __init__(
        self,
        *args,
        model_id: str = f"sam2/{SAM2_VERSION_ID}",
        low_res_logits_cache_size: int = SAM2_MAX_LOGITS_CACHE_SIZE,
        embedding_cache_size: int = SAM2_MAX_EMBEDDING_CACHE_SIZE,
        **kwargs,
    ):
        """Initializes the SegmentAnything.

        Args:
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.
        """
        super().__init__(*args, model_id=model_id, **kwargs)
        checkpoint = self.cache_file("weights.pt")
        model_cfg = {
            "hiera_large": "sam2_hiera_l.yaml",
            "hiera_small": "sam2_hiera_s.yaml",
            "hiera_tiny": "sam2_hiera_t.yaml",
            "hiera_b_plus": "sam2_hiera_b+.yaml",
        }[self.version_id]

        self.sam = build_sam2(model_cfg, checkpoint, device=DEVICE)
        self.low_res_logits_cache_size = low_res_logits_cache_size
        self.embedding_cache_size = embedding_cache_size

        self.predictor = SAM2ImagePredictor(self.sam)

        self.embedding_cache = {}
        self.image_size_cache = {}
        self.embedding_cache_keys = []
        self.low_res_logits_cache: Dict[Tuple[str, str], LogitsCacheType] = {}
        self.low_res_logits_cache_keys = []

        self.task_type = "unsupervised-segmentation"

    def get_infer_bucket_file_list(self) -> List[str]:
        """Gets the list of files required for inference.

        Returns:
            List[str]: List of file names.
        """
        return ["weights.pt"]

    def embed_image(
        self,
        image: Optional[InferenceRequestImage],
        image_id: Optional[str] = None,
        **kwargs,
    ):
        """
        Embeds an image and caches the result if an image_id is provided. If the image has been embedded before and cached,
        the cached result will be returned.

        Args:
            image (Any): The image to be embedded. The format should be compatible with the preproc_image method.
            image_id (Optional[str]): An identifier for the image. If provided, the embedding result will be cached
                                      with this ID. Defaults to None.
            **kwargs: Additional keyword arguments.

        Returns:
            Tuple[np.ndarray, Tuple[int, int]]: A tuple where the first element is the embedding of the image
                                               and the second element is the shape (height, width) of the processed image.

        Notes:
            - Embeddings and image sizes are cached to improve performance on repeated requests for the same image.
            - The cache has a maximum size defined by SAM2_MAX_CACHE_SIZE. When the cache exceeds this size,
              the oldest entries are removed.

        Example:
            >>> img_array = ... # some image array
            >>> embed_image(img_array, image_id="sample123")
            (array([...]), (224, 224))
        """
        if image_id and image_id in self.embedding_cache:
            return (
                self.embedding_cache[image_id],
                self.image_size_cache[image_id],
                image_id,
            )

        img_in = self.preproc_image(image)
        if image_id is None:
            image_id = hashlib.md5(img_in.tobytes()).hexdigest()[:12]

        if image_id in self.embedding_cache:
            return (
                self.embedding_cache[image_id],
                self.image_size_cache[image_id],
                image_id,
            )

        with torch.inference_mode():
            self.predictor.set_image(img_in)
            embedding_dict = self.predictor._features

        self.embedding_cache[image_id] = embedding_dict
        self.image_size_cache[image_id] = img_in.shape[:2]
        if image_id in self.embedding_cache_keys:
            self.embedding_cache_keys.remove(image_id)
        self.embedding_cache_keys.append(image_id)
        if len(self.embedding_cache_keys) > self.embedding_cache_size:
            cache_key = self.embedding_cache_keys.pop(0)
            del self.embedding_cache[cache_key]
            del self.image_size_cache[cache_key]
        return (embedding_dict, img_in.shape[:2], image_id)

    def infer_from_request(self, request: Sam2InferenceRequest):
        """Performs inference based on the request type.

        Args:
            request (SamInferenceRequest): The inference request.

        Returns:
            Union[SamEmbeddingResponse, SamSegmentationResponse]: The inference response.
        """
        t1 = perf_counter()
        if isinstance(request, Sam2EmbeddingRequest):
            _, _, image_id = self.embed_image(**request.dict())
            inference_time = perf_counter() - t1
            return Sam2EmbeddingResponse(time=inference_time, image_id=image_id)
        elif isinstance(request, Sam2SegmentationRequest):
            masks, scores, low_resolution_logits = self.segment_image(**request.dict())
            if request.format == "json":
                return turn_segmentation_results_into_api_response(
                    masks=masks,
                    scores=scores,
                    mask_threshold=self.predictor.mask_threshold,
                    inference_start_timestamp=t1,
                )
            elif request.format == "binary":
                binary_vector = BytesIO()
                np.savez_compressed(
                    binary_vector, masks=masks, low_res_masks=low_resolution_logits
                )
                binary_vector.seek(0)
                binary_data = binary_vector.getvalue()
                return binary_data
            else:
                raise ValueError(f"Invalid format {request.format}")

        else:
            raise ValueError(f"Invalid request type {type(request)}")

    def preproc_image(self, image: InferenceRequestImage):
        """Preprocesses an image.

        Args:
            image (InferenceRequestImage): The image to preprocess.

        Returns:
            np.array: The preprocessed image.
        """
        np_image = load_image_rgb(image)
        return np_image

    def segment_image(
        self,
        image: Optional[InferenceRequestImage],
        image_id: Optional[str] = None,
        prompts: Optional[Union[Sam2PromptSet, dict]] = None,
        multimask_output: Optional[bool] = True,
        mask_input: Optional[Union[np.ndarray, List[List[List[float]]]]] = None,
        save_logits_to_cache: bool = False,
        load_logits_from_cache: bool = False,
        **kwargs,
    ):
        """
        Segments an image based on provided embeddings, points, masks, or cached results.
        If embeddings are not directly provided, the function can derive them from the input image or cache.

        Args:
            image (Any): The image to be segmented.
            image_id (Optional[str]): A cached identifier for the image. Useful for accessing cached embeddings or masks.
            prompts (Optional[List[Sam2Prompt]]): List of prompts to use for segmentation. Defaults to None.
            mask_input (Optional[Union[np.ndarray, List[List[List[float]]]]]): Input low_res_logits for the image.
            multimask_output: (bool): Flag to decide if multiple masks proposal to be predicted (among which the most
                promising will be returned
            )
            use_logits_cache: (bool): Flag to decide to use cached logits from prior prompting
            **kwargs: Additional keyword arguments.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: Tuple of np.array, where:
                - first element is of size (prompt_set_size, h, w) and represent mask with the highest confidence
                    for each prompt element
                - second element is of size (prompt_set_size, ) and represents ths score for most confident mask
                    of each prompt element
                - third element is of size (prompt_set_size, 256, 256) and represents the low resolution logits
                    for most confident mask of each prompt element

        Raises:
            ValueError: If necessary inputs are missing or inconsistent.

        Notes:
            - Embeddings, segmentations, and low-resolution logits can be cached to improve performance
              on repeated requests for the same image.
            - The cache has a maximum size defined by SAM_MAX_EMBEDDING_CACHE_SIZE. When the cache exceeds this size,
              the oldest entries are removed.
        """
        load_logits_from_cache = (
            load_logits_from_cache and not DISABLE_SAM2_LOGITS_CACHE
        )
        save_logits_to_cache = save_logits_to_cache and not DISABLE_SAM2_LOGITS_CACHE
        with torch.inference_mode():
            if image is None and not image_id:
                raise ValueError("Must provide either image or  cached image_id")
            elif image_id and image is None and image_id not in self.embedding_cache:
                raise ValueError(
                    f"Image ID {image_id} not in embedding cache, must provide the image or embeddings"
                )
            embedding, original_image_size, image_id = self.embed_image(
                image=image, image_id=image_id
            )

            self.predictor._is_image_set = True
            self.predictor._features = embedding
            self.predictor._orig_hw = [original_image_size]
            self.predictor._is_batch = False
            args = dict()
            prompt_set: Sam2PromptSet
            if prompts:
                if type(prompts) is dict:
                    prompt_set = Sam2PromptSet(**prompts)
                    args = prompt_set.to_sam2_inputs()
                else:
                    prompt_set = prompts
                    args = prompts.to_sam2_inputs()
            else:
                prompt_set = Sam2PromptSet()

            if mask_input is None and load_logits_from_cache:
                mask_input = maybe_load_low_res_logits_from_cache(
                    image_id, prompt_set, self.low_res_logits_cache
                )

            args = pad_points(args)
            if not any(args.values()):
                args = {"point_coords": [[0, 0]], "point_labels": [-1], "box": None}
            masks, scores, low_resolution_logits = self.predictor.predict(
                mask_input=mask_input,
                multimask_output=multimask_output,
                return_logits=True,
                normalize_coords=True,
                **args,
            )
            masks, scores, low_resolution_logits = choose_most_confident_sam_prediction(
                masks=masks,
                scores=scores,
                low_resolution_logits=low_resolution_logits,
            )

            if save_logits_to_cache:
                self.add_low_res_logits_to_cache(
                    low_resolution_logits, image_id, prompt_set
                )

            return masks, scores, low_resolution_logits

    def add_low_res_logits_to_cache(
        self, logits: np.ndarray, image_id: str, prompt_set: Sam2PromptSet
    ) -> None:
        logits = logits[:, None, :, :]
        prompt_id = hash_prompt_set(image_id, prompt_set)
        self.low_res_logits_cache[prompt_id] = {
            "logits": logits,
            "prompt_set": prompt_set,
        }
        if prompt_id in self.low_res_logits_cache_keys:
            self.low_res_logits_cache_keys.remove(prompt_id)
        self.low_res_logits_cache_keys.append(prompt_id)
        if len(self.low_res_logits_cache_keys) > self.low_res_logits_cache_size:
            cache_key = self.low_res_logits_cache_keys.pop(0)
            del self.low_res_logits_cache[cache_key]

__init__(*args, model_id=f'sam2/{SAM2_VERSION_ID}', low_res_logits_cache_size=SAM2_MAX_LOGITS_CACHE_SIZE, embedding_cache_size=SAM2_MAX_EMBEDDING_CACHE_SIZE, **kwargs)

Initializes the SegmentAnything.

Parameters:

Name Type Description Default
*args

Variable length argument list.

()
**kwargs

Arbitrary keyword arguments.

{}
Source code in inference/models/sam2/segment_anything2.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def __init__(
    self,
    *args,
    model_id: str = f"sam2/{SAM2_VERSION_ID}",
    low_res_logits_cache_size: int = SAM2_MAX_LOGITS_CACHE_SIZE,
    embedding_cache_size: int = SAM2_MAX_EMBEDDING_CACHE_SIZE,
    **kwargs,
):
    """Initializes the SegmentAnything.

    Args:
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.
    """
    super().__init__(*args, model_id=model_id, **kwargs)
    checkpoint = self.cache_file("weights.pt")
    model_cfg = {
        "hiera_large": "sam2_hiera_l.yaml",
        "hiera_small": "sam2_hiera_s.yaml",
        "hiera_tiny": "sam2_hiera_t.yaml",
        "hiera_b_plus": "sam2_hiera_b+.yaml",
    }[self.version_id]

    self.sam = build_sam2(model_cfg, checkpoint, device=DEVICE)
    self.low_res_logits_cache_size = low_res_logits_cache_size
    self.embedding_cache_size = embedding_cache_size

    self.predictor = SAM2ImagePredictor(self.sam)

    self.embedding_cache = {}
    self.image_size_cache = {}
    self.embedding_cache_keys = []
    self.low_res_logits_cache: Dict[Tuple[str, str], LogitsCacheType] = {}
    self.low_res_logits_cache_keys = []

    self.task_type = "unsupervised-segmentation"

embed_image(image, image_id=None, **kwargs)

Embeds an image and caches the result if an image_id is provided. If the image has been embedded before and cached, the cached result will be returned.

Parameters:

Name Type Description Default
image Any

The image to be embedded. The format should be compatible with the preproc_image method.

required
image_id Optional[str]

An identifier for the image. If provided, the embedding result will be cached with this ID. Defaults to None.

None
**kwargs

Additional keyword arguments.

{}

Returns:

Type Description

Tuple[np.ndarray, Tuple[int, int]]: A tuple where the first element is the embedding of the image and the second element is the shape (height, width) of the processed image.

Notes
  • Embeddings and image sizes are cached to improve performance on repeated requests for the same image.
  • The cache has a maximum size defined by SAM2_MAX_CACHE_SIZE. When the cache exceeds this size, the oldest entries are removed.
Example

img_array = ... # some image array embed_image(img_array, image_id="sample123") (array([...]), (224, 224))

Source code in inference/models/sam2/segment_anything2.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def embed_image(
    self,
    image: Optional[InferenceRequestImage],
    image_id: Optional[str] = None,
    **kwargs,
):
    """
    Embeds an image and caches the result if an image_id is provided. If the image has been embedded before and cached,
    the cached result will be returned.

    Args:
        image (Any): The image to be embedded. The format should be compatible with the preproc_image method.
        image_id (Optional[str]): An identifier for the image. If provided, the embedding result will be cached
                                  with this ID. Defaults to None.
        **kwargs: Additional keyword arguments.

    Returns:
        Tuple[np.ndarray, Tuple[int, int]]: A tuple where the first element is the embedding of the image
                                           and the second element is the shape (height, width) of the processed image.

    Notes:
        - Embeddings and image sizes are cached to improve performance on repeated requests for the same image.
        - The cache has a maximum size defined by SAM2_MAX_CACHE_SIZE. When the cache exceeds this size,
          the oldest entries are removed.

    Example:
        >>> img_array = ... # some image array
        >>> embed_image(img_array, image_id="sample123")
        (array([...]), (224, 224))
    """
    if image_id and image_id in self.embedding_cache:
        return (
            self.embedding_cache[image_id],
            self.image_size_cache[image_id],
            image_id,
        )

    img_in = self.preproc_image(image)
    if image_id is None:
        image_id = hashlib.md5(img_in.tobytes()).hexdigest()[:12]

    if image_id in self.embedding_cache:
        return (
            self.embedding_cache[image_id],
            self.image_size_cache[image_id],
            image_id,
        )

    with torch.inference_mode():
        self.predictor.set_image(img_in)
        embedding_dict = self.predictor._features

    self.embedding_cache[image_id] = embedding_dict
    self.image_size_cache[image_id] = img_in.shape[:2]
    if image_id in self.embedding_cache_keys:
        self.embedding_cache_keys.remove(image_id)
    self.embedding_cache_keys.append(image_id)
    if len(self.embedding_cache_keys) > self.embedding_cache_size:
        cache_key = self.embedding_cache_keys.pop(0)
        del self.embedding_cache[cache_key]
        del self.image_size_cache[cache_key]
    return (embedding_dict, img_in.shape[:2], image_id)

get_infer_bucket_file_list()

Gets the list of files required for inference.

Returns:

Type Description
List[str]

List[str]: List of file names.

Source code in inference/models/sam2/segment_anything2.py
102
103
104
105
106
107
108
def get_infer_bucket_file_list(self) -> List[str]:
    """Gets the list of files required for inference.

    Returns:
        List[str]: List of file names.
    """
    return ["weights.pt"]

infer_from_request(request)

Performs inference based on the request type.

Parameters:

Name Type Description Default
request SamInferenceRequest

The inference request.

required

Returns:

Type Description

Union[SamEmbeddingResponse, SamSegmentationResponse]: The inference response.

Source code in inference/models/sam2/segment_anything2.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def infer_from_request(self, request: Sam2InferenceRequest):
    """Performs inference based on the request type.

    Args:
        request (SamInferenceRequest): The inference request.

    Returns:
        Union[SamEmbeddingResponse, SamSegmentationResponse]: The inference response.
    """
    t1 = perf_counter()
    if isinstance(request, Sam2EmbeddingRequest):
        _, _, image_id = self.embed_image(**request.dict())
        inference_time = perf_counter() - t1
        return Sam2EmbeddingResponse(time=inference_time, image_id=image_id)
    elif isinstance(request, Sam2SegmentationRequest):
        masks, scores, low_resolution_logits = self.segment_image(**request.dict())
        if request.format == "json":
            return turn_segmentation_results_into_api_response(
                masks=masks,
                scores=scores,
                mask_threshold=self.predictor.mask_threshold,
                inference_start_timestamp=t1,
            )
        elif request.format == "binary":
            binary_vector = BytesIO()
            np.savez_compressed(
                binary_vector, masks=masks, low_res_masks=low_resolution_logits
            )
            binary_vector.seek(0)
            binary_data = binary_vector.getvalue()
            return binary_data
        else:
            raise ValueError(f"Invalid format {request.format}")

    else:
        raise ValueError(f"Invalid request type {type(request)}")

preproc_image(image)

Preprocesses an image.

Parameters:

Name Type Description Default
image InferenceRequestImage

The image to preprocess.

required

Returns:

Type Description

np.array: The preprocessed image.

Source code in inference/models/sam2/segment_anything2.py
210
211
212
213
214
215
216
217
218
219
220
def preproc_image(self, image: InferenceRequestImage):
    """Preprocesses an image.

    Args:
        image (InferenceRequestImage): The image to preprocess.

    Returns:
        np.array: The preprocessed image.
    """
    np_image = load_image_rgb(image)
    return np_image

segment_image(image, image_id=None, prompts=None, multimask_output=True, mask_input=None, save_logits_to_cache=False, load_logits_from_cache=False, **kwargs)

Segments an image based on provided embeddings, points, masks, or cached results. If embeddings are not directly provided, the function can derive them from the input image or cache.

Parameters:

Name Type Description Default
image Any

The image to be segmented.

required
image_id Optional[str]

A cached identifier for the image. Useful for accessing cached embeddings or masks.

None
prompts Optional[List[Sam2Prompt]]

List of prompts to use for segmentation. Defaults to None.

None
mask_input Optional[Union[ndarray, List[List[List[float]]]]]

Input low_res_logits for the image.

None
multimask_output Optional[bool]

(bool): Flag to decide if multiple masks proposal to be predicted (among which the most promising will be returned

True
use_logits_cache

(bool): Flag to decide to use cached logits from prior prompting

required
**kwargs

Additional keyword arguments.

{}

Returns:

Type Description

Tuple[np.ndarray, np.ndarray, np.ndarray]: Tuple of np.array, where: - first element is of size (prompt_set_size, h, w) and represent mask with the highest confidence for each prompt element - second element is of size (prompt_set_size, ) and represents ths score for most confident mask of each prompt element - third element is of size (prompt_set_size, 256, 256) and represents the low resolution logits for most confident mask of each prompt element

Raises:

Type Description
ValueError

If necessary inputs are missing or inconsistent.

Notes
  • Embeddings, segmentations, and low-resolution logits can be cached to improve performance on repeated requests for the same image.
  • The cache has a maximum size defined by SAM_MAX_EMBEDDING_CACHE_SIZE. When the cache exceeds this size, the oldest entries are removed.
Source code in inference/models/sam2/segment_anything2.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def segment_image(
    self,
    image: Optional[InferenceRequestImage],
    image_id: Optional[str] = None,
    prompts: Optional[Union[Sam2PromptSet, dict]] = None,
    multimask_output: Optional[bool] = True,
    mask_input: Optional[Union[np.ndarray, List[List[List[float]]]]] = None,
    save_logits_to_cache: bool = False,
    load_logits_from_cache: bool = False,
    **kwargs,
):
    """
    Segments an image based on provided embeddings, points, masks, or cached results.
    If embeddings are not directly provided, the function can derive them from the input image or cache.

    Args:
        image (Any): The image to be segmented.
        image_id (Optional[str]): A cached identifier for the image. Useful for accessing cached embeddings or masks.
        prompts (Optional[List[Sam2Prompt]]): List of prompts to use for segmentation. Defaults to None.
        mask_input (Optional[Union[np.ndarray, List[List[List[float]]]]]): Input low_res_logits for the image.
        multimask_output: (bool): Flag to decide if multiple masks proposal to be predicted (among which the most
            promising will be returned
        )
        use_logits_cache: (bool): Flag to decide to use cached logits from prior prompting
        **kwargs: Additional keyword arguments.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: Tuple of np.array, where:
            - first element is of size (prompt_set_size, h, w) and represent mask with the highest confidence
                for each prompt element
            - second element is of size (prompt_set_size, ) and represents ths score for most confident mask
                of each prompt element
            - third element is of size (prompt_set_size, 256, 256) and represents the low resolution logits
                for most confident mask of each prompt element

    Raises:
        ValueError: If necessary inputs are missing or inconsistent.

    Notes:
        - Embeddings, segmentations, and low-resolution logits can be cached to improve performance
          on repeated requests for the same image.
        - The cache has a maximum size defined by SAM_MAX_EMBEDDING_CACHE_SIZE. When the cache exceeds this size,
          the oldest entries are removed.
    """
    load_logits_from_cache = (
        load_logits_from_cache and not DISABLE_SAM2_LOGITS_CACHE
    )
    save_logits_to_cache = save_logits_to_cache and not DISABLE_SAM2_LOGITS_CACHE
    with torch.inference_mode():
        if image is None and not image_id:
            raise ValueError("Must provide either image or  cached image_id")
        elif image_id and image is None and image_id not in self.embedding_cache:
            raise ValueError(
                f"Image ID {image_id} not in embedding cache, must provide the image or embeddings"
            )
        embedding, original_image_size, image_id = self.embed_image(
            image=image, image_id=image_id
        )

        self.predictor._is_image_set = True
        self.predictor._features = embedding
        self.predictor._orig_hw = [original_image_size]
        self.predictor._is_batch = False
        args = dict()
        prompt_set: Sam2PromptSet
        if prompts:
            if type(prompts) is dict:
                prompt_set = Sam2PromptSet(**prompts)
                args = prompt_set.to_sam2_inputs()
            else:
                prompt_set = prompts
                args = prompts.to_sam2_inputs()
        else:
            prompt_set = Sam2PromptSet()

        if mask_input is None and load_logits_from_cache:
            mask_input = maybe_load_low_res_logits_from_cache(
                image_id, prompt_set, self.low_res_logits_cache
            )

        args = pad_points(args)
        if not any(args.values()):
            args = {"point_coords": [[0, 0]], "point_labels": [-1], "box": None}
        masks, scores, low_resolution_logits = self.predictor.predict(
            mask_input=mask_input,
            multimask_output=multimask_output,
            return_logits=True,
            normalize_coords=True,
            **args,
        )
        masks, scores, low_resolution_logits = choose_most_confident_sam_prediction(
            masks=masks,
            scores=scores,
            low_resolution_logits=low_resolution_logits,
        )

        if save_logits_to_cache:
            self.add_low_res_logits_to_cache(
                low_resolution_logits, image_id, prompt_set
            )

        return masks, scores, low_resolution_logits

choose_most_confident_sam_prediction(masks, scores, low_resolution_logits)

This function is supposed to post-process SAM2 inference and choose most confident mask regardless of multimask_output parameter value Args: masks: np array with values 0.0 and 1.0 representing predicted mask of size (prompt_set_size, proposed_maks, h, w) or (proposed_maks, h, w) - depending on prompt set size - unfortunately, prompt_set_size=1 causes squeeze operation in SAM2 library, so to handle inference uniformly, we need to compensate with this function. scores: array of size (prompt_set_size, proposed_maks) or (proposed_maks, ) depending on prompt set size - this array gives confidence score for mask proposal low_resolution_logits: array of size (prompt_set_size, proposed_maks, 256, 256) or (proposed_maks, 256, 256) - depending on prompt set size. These low resolution logits can be passed to a subsequent iteration as mask input. Returns: Tuple of np.array, where: - first element is of size (prompt_set_size, h, w) and represent mask with the highest confidence for each prompt element - second element is of size (prompt_set_size, ) and represents ths score for most confident mask of each prompt element - third element is of size (prompt_set_size, 256, 256) and represents the low resolution logits for most confident mask of each prompt element

Source code in inference/models/sam2/segment_anything2.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def choose_most_confident_sam_prediction(
    masks: np.ndarray,
    scores: np.ndarray,
    low_resolution_logits: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    This function is supposed to post-process SAM2 inference and choose most confident
    mask regardless of `multimask_output` parameter value
    Args:
        masks: np array with values 0.0 and 1.0 representing predicted mask of size
            (prompt_set_size, proposed_maks, h, w) or (proposed_maks, h, w) - depending on
            prompt set size - unfortunately, prompt_set_size=1 causes squeeze operation
            in SAM2 library, so to handle inference uniformly, we need to compensate with
            this function.
        scores: array of size (prompt_set_size, proposed_maks) or (proposed_maks, ) depending
            on prompt set size - this array gives confidence score for mask proposal
        low_resolution_logits: array of size (prompt_set_size, proposed_maks, 256, 256) or
            (proposed_maks, 256, 256) - depending on prompt set size. These low resolution logits
             can be passed to a subsequent iteration as mask input.
    Returns:
        Tuple of np.array, where:
            - first element is of size (prompt_set_size, h, w) and represent mask with the highest confidence
                for each prompt element
            - second element is of size (prompt_set_size, ) and represents ths score for most confident mask
                of each prompt element
            - third element is of size (prompt_set_size, 256, 256) and represents the low resolution logits
                for most confident mask of each prompt element
    """
    if len(masks.shape) == 3:
        masks = np.expand_dims(masks, axis=0)
        scores = np.expand_dims(scores, axis=0)
        low_resolution_logits = np.expand_dims(low_resolution_logits, axis=0)
    selected_masks, selected_scores, selected_low_resolution_logits = [], [], []
    for mask, score, low_resolution_logit in zip(masks, scores, low_resolution_logits):
        selected_mask, selected_score, selected_low_resolution_logit = (
            choose_most_confident_prompt_set_element_prediction(
                mask=mask,
                score=score,
                low_resolution_logit=low_resolution_logit,
            )
        )
        selected_masks.append(selected_mask)
        selected_scores.append(selected_score)
        selected_low_resolution_logits.append(selected_low_resolution_logit)
    return (
        np.asarray(selected_masks),
        np.asarray(selected_scores),
        np.asarray(selected_low_resolution_logits),
    )

find_prior_prompt_in_cache(initial_prompt_set, image_id, cache)

Performs search over the cache to see if prior used prompts are subset of this one.

Source code in inference/models/sam2/segment_anything2.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def find_prior_prompt_in_cache(
    initial_prompt_set: Sam2PromptSet,
    image_id: str,
    cache: Dict[Tuple[str, str], LogitsCacheType],
) -> Optional[np.ndarray]:
    """
    Performs search over the cache to see if prior used prompts are subset of this one.
    """

    logits_for_image = [cache[k] for k in cache if k[0] == image_id]
    maxed_size = 0
    best_match: Optional[np.ndarray] = None
    desired_size = initial_prompt_set.num_points() - 1
    for cached_dict in logits_for_image[::-1]:
        logits = cached_dict["logits"]
        prompt_set: Sam2PromptSet = cached_dict["prompt_set"]
        is_viable = is_prompt_strict_subset(prompt_set, initial_prompt_set)
        if not is_viable:
            continue

        size = prompt_set.num_points()
        # short circuit search if we find prompt with one less point (most recent possible mask)
        if size == desired_size:
            return logits
        if size >= maxed_size:
            maxed_size = size
            best_match = logits

    return best_match

hash_prompt_set(image_id, prompt_set)

Computes unique hash from a prompt set.

Source code in inference/models/sam2/segment_anything2.py
342
343
344
345
346
def hash_prompt_set(image_id: str, prompt_set: Sam2PromptSet) -> Tuple[str, str]:
    """Computes unique hash from a prompt set."""
    md5_hash = hashlib.md5()
    md5_hash.update(str(prompt_set).encode("utf-8"))
    return image_id, md5_hash.hexdigest()[:12]

maybe_load_low_res_logits_from_cache(image_id, prompt_set, cache)

Loads prior masks from the cache by searching over possibel prior prompts.

Source code in inference/models/sam2/segment_anything2.py
349
350
351
352
353
354
355
356
357
358
359
def maybe_load_low_res_logits_from_cache(
    image_id: str,
    prompt_set: Sam2PromptSet,
    cache: Dict[Tuple[str, str], LogitsCacheType],
) -> Optional[np.ndarray]:
    "Loads prior masks from the cache by searching over possibel prior prompts."
    prompts = prompt_set.prompts
    if not prompts:
        return None

    return find_prior_prompt_in_cache(prompt_set, image_id, cache)

pad_points(args)

Pad arguments to be passed to sam2 model with not_a_point label (-1). This is necessary when there are multiple prompts per image so that a tensor can be created.

Also pads empty point lists with a dummy non-point entry.

Source code in inference/models/sam2/segment_anything2.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def pad_points(args: Dict[str, Any]) -> Dict[str, Any]:
    """
    Pad arguments to be passed to sam2 model with not_a_point label (-1).
    This is necessary when there are multiple prompts per image so that a tensor can be created.


    Also pads empty point lists with a dummy non-point entry.
    """
    args = copy.deepcopy(args)
    if args["point_coords"] is not None:
        max_len = max(max(len(prompt) for prompt in args["point_coords"]), 1)
        for prompt in args["point_coords"]:
            for _ in range(max_len - len(prompt)):
                prompt.append([0, 0])
        for label in args["point_labels"]:
            for _ in range(max_len - len(label)):
                label.append(-1)
    else:
        if args["point_labels"] is not None:
            raise ValueError(
                "Can't have point labels without corresponding point coordinates"
            )
    return args