diff --git a/fastlane_bot/events/event_gatherer.py b/fastlane_bot/events/event_gatherer.py index 6694135f5..2cc90c5fa 100644 --- a/fastlane_bot/events/event_gatherer.py +++ b/fastlane_bot/events/event_gatherer.py @@ -1,12 +1,14 @@ import asyncio from itertools import chain from typing import Dict, List +from traceback import format_exc import nest_asyncio from web3 import AsyncWeb3 from web3.contract import Contract +from fastlane_bot.config import Config from fastlane_bot.config.constants import BLOCK_CHUNK_SIZE_MAP from .interfaces.subscription import Subscription from .exchanges.base import Exchange @@ -22,7 +24,7 @@ class EventGatherer: def __init__( self, - blockchain: str, + config: Config, w3: AsyncWeb3, exchanges: Dict[str, Exchange], event_contracts: Dict[str, Contract], @@ -32,7 +34,7 @@ def __init__( manager: The Manager object w3: The connected AsyncWeb3 object. """ - self._blockchain = blockchain + self._config = config self._w3 = w3 self._subscriptions = [] unique_topics = set() @@ -59,7 +61,7 @@ async def _get_events_for_subscription(self, from_block: int, to_block: int, sub return [subscription.parse_log(log) for log in await self._get_logs_for_topics(from_block, to_block, [subscription.topic])] async def _get_logs_for_topics(self, from_block: int, to_block: int, topics: List[str]): - chunk_size = BLOCK_CHUNK_SIZE_MAP[self._blockchain] + chunk_size = BLOCK_CHUNK_SIZE_MAP[self._config.network.NETWORK] if chunk_size > 0: return await self._get_logs_iterative(from_block, to_block, topics, chunk_size) else: @@ -86,13 +88,16 @@ async def _get_logs_recursive(self, from_block: int, to_block: int, topics: List "topics": topics }) except Exception as e: - if from_block < to_block: - mid_block = (from_block + to_block) // 2 - log_lists = await asyncio.gather( - self._get_logs_recursive(from_block, mid_block, topics), - self._get_logs_recursive(mid_block + 1, to_block, topics) - ) - return [log for log_list in log_lists for log in log_list] + if "eth_getLogs" in str(e): + if from_block < to_block: + mid_block = (from_block + to_block) // 2 + log_lists = await asyncio.gather( + self._get_logs_recursive(from_block, mid_block, topics), + self._get_logs_recursive(mid_block + 1, to_block, topics) + ) + return [log for log_list in log_lists for log in log_list] + else: + raise e else: - raise e + self._config.logger.error(f"Unexpected exception in EventGatherer: {format_exc()}") raise Exception(f"Illegal log query range: {from_block} -> {to_block}") diff --git a/main.py b/main.py index 4c3dffe51..dc402b112 100644 --- a/main.py +++ b/main.py @@ -307,7 +307,7 @@ def run(mgr, args, tenderly_uri=None) -> None: handle_static_pools_update(mgr) event_gatherer = EventGatherer( - blockchain=mgr.cfg.network.NETWORK, + config=mgr.cfg, w3=mgr.w3_async, exchanges=mgr.exchanges, event_contracts=mgr.event_contracts diff --git a/run_blockchain_terraformer.py b/run_blockchain_terraformer.py index ee35ceb99..df514dbee 100644 --- a/run_blockchain_terraformer.py +++ b/run_blockchain_terraformer.py @@ -679,6 +679,7 @@ def get_events_recursive(get_logs: any, start_block: int, end_block: int) -> lis try: return get_logs(fromBlock=start_block, toBlock=end_block) except Exception as e: + assert "eth_getLogs" in str(e), str(e) if start_block < end_block: mid_block = (start_block + end_block) // 2 event_list_1 = get_events_recursive(get_logs, start_block, mid_block)