Bases: Generic[V]
Base cache which is capable of hashing compound payloads based on
list of injected hash functions. Hash functions are to produce stable hashing strings.
Each function is invoked on get_hash_key(...)
kwarg (use named args only!),
output string is concatenated and md5 value is calculated.
Cache is size bounded, each entry lives until cache_size
new entries appear.
Raises WorkflowEnvironmentConfigurationError
when get_hash_key(...)
is not
provided with params corresponding to all hash functions.
Thread safe thanks to thread lock on get(...)
and cache(...)
.
Source code in inference/core/workflows/execution_engine/v1/compiler/cache.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 | class BasicWorkflowsCache(Generic[V]):
"""
Base cache which is capable of hashing compound payloads based on
list of injected hash functions. Hash functions are to produce stable hashing strings.
Each function is invoked on `get_hash_key(...)` kwarg (use named args only!),
output string is concatenated and md5 value is calculated.
Cache is size bounded, each entry lives until `cache_size` new entries appear.
Raises `WorkflowEnvironmentConfigurationError` when `get_hash_key(...)` is not
provided with params corresponding to all hash functions.
Thread safe thanks to thread lock on `get(...)` and `cache(...)`.
"""
def __init__(
self,
cache_size: int,
hash_functions: List[Tuple[str, Callable[[Any], str]]],
):
self._keys_buffer = deque(maxlen=max(cache_size, 1))
self._cache: Dict[str, V] = {}
self._hash_functions = hash_functions
self._cache_lock = Lock()
def get_hash_key(self, **kwargs) -> str:
hash_chunks = []
for key_name, hashing_function in self._hash_functions:
if key_name not in kwargs:
raise WorkflowEnvironmentConfigurationError(
public_message=f"Cache is miss configured.",
context="workflows_cache | hash_key_generation",
)
hash_value = hashing_function(kwargs[key_name])
hash_chunks.append(hash_value)
return hashlib.md5("<|>".join(hash_chunks).encode("utf-8")).hexdigest()
def get(self, key: str) -> Optional[V]:
with self._cache_lock:
return self._cache.get(key)
def cache(self, key: str, value: V) -> None:
with self._cache_lock:
if len(self._keys_buffer) == self._keys_buffer.maxlen:
to_pop = self._keys_buffer.popleft()
del self._cache[to_pop]
self._keys_buffer.append(key)
self._cache[key] = value
|