Skip to content

Commit

Permalink
Adressing Gal's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alxtkr77 committed Jul 19, 2023
1 parent 04b5bb3 commit e6116e2
Showing 1 changed file with 13 additions and 35 deletions.
48 changes: 13 additions & 35 deletions storey/redis_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,19 +412,7 @@ async def _save_key(self, container, table_path, key, aggr_item, partitioned_by_
await self._fetch_state_by_key(aggr_item, container, table_path, key)

async def _get_all_fields(self, redis_key: str):
try:
cursor = 0
values = {}
# Get all the fields except the internal ones
while True:
cursor, v = await RedisDriver.asyncify(self.redis.hscan)(
redis_key, cursor, match=f"[^{self.INTERFNAL_FIELD_PREFIX}]*"
)
values.update(v)
if cursor == 0:
break
except redis.ResponseError as e:
raise RedisError(f"Failed to get key {redis_key}. Response error was: {e}")
values = await self.redis_hscan(redis_key, f"[^{self.INTERFNAL_FIELD_PREFIX}]*")
res = {
RedisDriver.convert_to_str(key): RedisDriver.convert_redis_value_to_python_obj(val)
for key, val in values.items()
Expand Down Expand Up @@ -502,18 +490,7 @@ async def _load_aggregates_by_key(self, container, table_path, key):
# path, and "key," followed by ":aggr_"
redis_key_prefix = self._make_key(container, table_path, key)
redis_key = self._static_data_key(redis_key_prefix)
try:
cursor = 0
values = {}
while True:
cursor, v = await RedisDriver.asyncify(self.redis.hscan)(
redis_key, cursor, match=f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*"
)
values.update(v)
if cursor == 0:
break
except redis.ResponseError as e:
raise RedisError(f"Failed to get key {redis_key}. Response error was: {e}")
values = await self.redis_hscan(redis_key, f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*")

for aggr_key, value in values.items():
# Build an attribute for this aggregation in the format that Storey
Expand Down Expand Up @@ -546,26 +523,27 @@ async def _load_aggregates_by_key(self, container, table_path, key):
additional_data_to_return = additional_data if additional_data else None
return aggregations_to_return, additional_data_to_return

async def _fetch_state_by_key(self, aggr_item, container, table_path, key):
redis_key_prefix = self._make_key(container, table_path, key)
aggregations = {}
# Aggregation Redis keys start with the Redis key prefix for this Storey container, table
# path, and "key," followed by ":aggr_"
redis_key_prefix = self._make_key(container, table_path, key)
redis_key = self._static_data_key(redis_key_prefix)
async def redis_hscan(self, redis_key, match):
try:
cursor = 0
values = {}
while True:
cursor, v = await RedisDriver.asyncify(self.redis.hscan)(
redis_key, cursor, match=f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*"
)
cursor, v = await RedisDriver.asyncify(self.redis.hscan)(redis_key, cursor, match=match)
values.update(v)
if cursor == 0:
break
except redis.ResponseError as e:
raise RedisError(f"Failed to get key {redis_key}. Response error was: {e}")
return values

async def _fetch_state_by_key(self, aggr_item, container, table_path, key):
redis_key_prefix = self._make_key(container, table_path, key)
aggregations = {}
# Aggregation Redis keys start with the Redis key prefix for this Storey container, table
# path, and "key," followed by ":aggr_"
redis_key_prefix = self._make_key(container, table_path, key)
redis_key = self._static_data_key(redis_key_prefix)
values = await self.redis_hscan(redis_key, f"{self.AGGREGATION_ATTRIBUTE_PREFIX}*")
for aggr_key, value in values.items():
# Build an attribute for this aggregation in the format that Storey
# expects to receive from this method. The feature and aggregation
Expand Down

0 comments on commit e6116e2

Please sign in to comment.