From e6116e2a73299990d2f613508afb269fd63c35f6 Mon Sep 17 00:00:00 2001 From: Alex Toker Date: Wed, 19 Jul 2023 09:06:28 +0300 Subject: [PATCH] Adressing Gal's comments --- storey/redis_driver.py | 48 ++++++++++++------------------------------ 1 file changed, 13 insertions(+), 35 deletions(-) diff --git a/storey/redis_driver.py b/storey/redis_driver.py index 27b2ae68..fab29c4e 100644 --- a/storey/redis_driver.py +++ b/storey/redis_driver.py @@ -412,19 +412,7 @@ async def _save_key(self, container, table_path, key, aggr_item, partitioned_by_ await self._fetch_state_by_key(aggr_item, container, table_path, key) async def _get_all_fields(self, redis_key: str): - try: - cursor = 0 - values = {} - # Get all the fields except the internal ones - while True: - cursor, v = await RedisDriver.asyncify(self.redis.hscan)( - redis_key, cursor, match=f"[^{self.INTERFNAL_FIELD_PREFIX}]*" - ) - values.update(v) - if cursor == 0: - break - except redis.ResponseError as e: - raise RedisError(f"Failed to get key {redis_key}. Response error was: {e}") + values = await self.redis_hscan(redis_key, f"[^{self.INTERFNAL_FIELD_PREFIX}]*") res = { RedisDriver.convert_to_str(key): RedisDriver.convert_redis_value_to_python_obj(val) for key, val in values.items() @@ -502,18 +490,7 @@ async def _load_aggregates_by_key(self, container, table_path, key): # path, and "key," followed by ":aggr_" redis_key_prefix = self._make_key(container, table_path, key) redis_key = self._static_data_key(redis_key_prefix) - try: - cursor = 0 - values = {} - while True: - cursor, v = await RedisDriver.asyncify(self.redis.hscan)( - redis_key, cursor, match=f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*" - ) - values.update(v) - if cursor == 0: - break - except redis.ResponseError as e: - raise RedisError(f"Failed to get key {redis_key}. Response error was: {e}") + values = await self.redis_hscan(redis_key, f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*") for aggr_key, value in values.items(): # Build an attribute for this aggregation in the format that Storey @@ -546,26 +523,27 @@ async def _load_aggregates_by_key(self, container, table_path, key): additional_data_to_return = additional_data if additional_data else None return aggregations_to_return, additional_data_to_return - async def _fetch_state_by_key(self, aggr_item, container, table_path, key): - redis_key_prefix = self._make_key(container, table_path, key) - aggregations = {} - # Aggregation Redis keys start with the Redis key prefix for this Storey container, table - # path, and "key," followed by ":aggr_" - redis_key_prefix = self._make_key(container, table_path, key) - redis_key = self._static_data_key(redis_key_prefix) + async def redis_hscan(self, redis_key, match): try: cursor = 0 values = {} while True: - cursor, v = await RedisDriver.asyncify(self.redis.hscan)( - redis_key, cursor, match=f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*" - ) + cursor, v = await RedisDriver.asyncify(self.redis.hscan)(redis_key, cursor, match=match) values.update(v) if cursor == 0: break except redis.ResponseError as e: raise RedisError(f"Failed to get key {redis_key}. Response error was: {e}") + return values + async def _fetch_state_by_key(self, aggr_item, container, table_path, key): + redis_key_prefix = self._make_key(container, table_path, key) + aggregations = {} + # Aggregation Redis keys start with the Redis key prefix for this Storey container, table + # path, and "key," followed by ":aggr_" + redis_key_prefix = self._make_key(container, table_path, key) + redis_key = self._static_data_key(redis_key_prefix) + values = await self.redis_hscan(redis_key, f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*") for aggr_key, value in values.items(): # Build an attribute for this aggregation in the format that Storey # expects to receive from this method. The feature and aggregation