Skip to content

Config

InferenceSDKDeprecationWarning

Bases: Warning

Class used for warning of deprecated features in the Inference SDK

Source code in inference_sdk/config.py
106
107
108
109
class InferenceSDKDeprecationWarning(Warning):
    """Class used for warning of deprecated features in the Inference SDK"""

    pass

RemoteProcessingTimeCollector

Thread-safe collector for GPU processing times from remote execution responses.

A single instance is shared across all threads handling a single request. Each entry stores a model_id alongside the processing time.

Uses threading.Lock (not asyncio.Lock) because add() is only called from synchronous worker threads (ThreadPoolExecutor). The middleware reads via drain() after await call_next() returns, at which point all worker threads have completed — so there is no contention in the async context.

Source code in inference_sdk/config.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class RemoteProcessingTimeCollector:
    """Thread-safe collector for GPU processing times from remote execution responses.

    A single instance is shared across all threads handling a single request.
    Each entry stores a model_id alongside the processing time.

    Uses threading.Lock (not asyncio.Lock) because add() is only called from
    synchronous worker threads (ThreadPoolExecutor). The middleware reads via
    drain() after await call_next() returns, at which point all worker threads
    have completed — so there is no contention in the async context.
    """

    def __init__(self):
        self._entries: list = []  # list of (model_id, time) tuples
        self._lock = threading.Lock()

    def add(self, processing_time: float, model_id: str = "unknown") -> None:
        with self._lock:
            self._entries.append((model_id, processing_time))

    def drain(self) -> list:
        """Atomically return all entries and clear the internal list."""
        with self._lock:
            entries = self._entries
            self._entries = []
            return entries

    def has_data(self) -> bool:
        with self._lock:
            return len(self._entries) > 0

    def summarize(self, max_detail_bytes: int = 4096) -> Tuple[float, Optional[str]]:
        """Atomically drain entries and return (total_time, entries_json_or_none).

        Returns the total processing time and a JSON string of individual entries.
        If the JSON exceeds max_detail_bytes, the detail string is omitted (None).
        """
        entries = self.drain()
        total = sum(t for _, t in entries)
        detail = json.dumps([{"m": m, "t": t} for m, t in entries])
        if len(detail) > max_detail_bytes:
            detail = None
        return total, detail

drain()

Atomically return all entries and clear the internal list.

Source code in inference_sdk/config.py
32
33
34
35
36
37
def drain(self) -> list:
    """Atomically return all entries and clear the internal list."""
    with self._lock:
        entries = self._entries
        self._entries = []
        return entries

summarize(max_detail_bytes=4096)

Atomically drain entries and return (total_time, entries_json_or_none).

Returns the total processing time and a JSON string of individual entries. If the JSON exceeds max_detail_bytes, the detail string is omitted (None).

Source code in inference_sdk/config.py
43
44
45
46
47
48
49
50
51
52
53
54
def summarize(self, max_detail_bytes: int = 4096) -> Tuple[float, Optional[str]]:
    """Atomically drain entries and return (total_time, entries_json_or_none).

    Returns the total processing time and a JSON string of individual entries.
    If the JSON exceeds max_detail_bytes, the detail string is omitted (None).
    """
    entries = self.drain()
    total = sum(t for _, t in entries)
    detail = json.dumps([{"m": m, "t": t} for m, t in entries])
    if len(detail) > max_detail_bytes:
        detail = None
    return total, detail