Skip to content

Redis queue

RedisQueue ΒΆ

Store and forget, keys with specified hash tag are handled by external service

Source code in inference/usage_tracking/redis_queue.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class RedisQueue:
    """
    Store and forget, keys with specified hash tag are handled by external service
    """

    def __init__(
        self,
        hash_tag: str = "UsageCollector",
        redis_cache: Optional[RedisCache] = None,
    ):
        # prefix must contain hash-tag to avoid CROSSLOT errors when using mget
        # hash-tag is common part of the key wrapped within '{}'
        # removing hash-tag will cause clients utilizing mget to fail
        self._prefix: str = f"{{{hash_tag}}}:{time.time()}:{uuid4().hex[:5]}"
        self._redis_cache: RedisCache = redis_cache or cache
        self._increment: int = 0
        self._lock: Lock = Lock()

    def put(self, payload: Any):
        if not isinstance(payload, str):
            try:
                payload = json.dumps(payload)
            except Exception as exc:
                logger.error("Failed to parse payload '%s' to JSON - %s", payload, exc)
                return
        with self._lock:
            try:
                self._increment += 1
                redis_key = f"{self._prefix}:{self._increment}"
                # https://redis.io/docs/latest/develop/interact/transactions/
                redis_pipeline = self._redis_cache.client.pipeline()
                redis_pipeline.set(
                    name=redis_key,
                    value=payload,
                )
                redis_pipeline.zadd(
                    name="UsageCollector",
                    mapping={redis_key: time.time()},
                )
                results = redis_pipeline.execute()
                if not all(results):
                    # TODO: partial insert, retry
                    logger.error(
                        "Failed to store payload and sorted set (partial insert): %s",
                        results,
                    )
            except Exception as exc:
                logger.error("Failed to store usage records '%s', %s", payload, exc)

    @staticmethod
    def full() -> bool:
        return False

    def empty(self) -> bool:
        return True

    def get_nowait(self) -> List[Dict[str, Any]]:
        return []