Store and forget, keys with specified hash tag are handled by external service
Source code in inference/usage_tracking/redis_queue.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 | class RedisQueue:
"""
Store and forget, keys with specified hash tag are handled by external service
"""
def __init__(
self,
hash_tag: str = "UsageCollector",
redis_cache: Optional[RedisCache] = None,
):
# prefix must contain hash-tag to avoid CROSSLOT errors when using mget
# hash-tag is common part of the key wrapped within '{}'
# removing hash-tag will cause clients utilizing mget to fail
self._prefix: str = f"{{{hash_tag}}}:{time.time()}:{uuid4().hex[:5]}"
self._redis_cache: RedisCache = redis_cache or cache
self._increment: int = 0
self._lock: Lock = Lock()
def put(self, payload: Any):
if not isinstance(payload, str):
try:
payload = json.dumps(payload)
except Exception as exc:
logger.error("Failed to parse payload '%s' to JSON - %s", payload, exc)
return
with self._lock:
try:
self._increment += 1
redis_key = f"{self._prefix}:{self._increment}"
# https://redis.io/docs/latest/develop/interact/transactions/
redis_pipeline = self._redis_cache.client.pipeline()
redis_pipeline.set(
name=redis_key,
value=payload,
)
redis_pipeline.zadd(
name="UsageCollector",
mapping={redis_key: time.time()},
)
results = redis_pipeline.execute()
if not all(results):
# TODO: partial insert, retry
logger.error(
"Failed to store payload and sorted set (partial insert): %s",
results,
)
except Exception as exc:
logger.error("Failed to store usage records '%s', %s", payload, exc)
@staticmethod
def full() -> bool:
return False
def empty(self) -> bool:
return True
def get_nowait(self) -> List[Dict[str, Any]]:
return []
|