Skip to content

Commit

Permalink
Merge pull request #658 from bancorprotocol/batch-get-logs
Browse files Browse the repository at this point in the history
feat: get logs batching
  • Loading branch information
platonfloria authored May 21, 2024
2 parents f1e0e06 + 6769d81 commit 62cd989
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 16 deletions.
13 changes: 13 additions & 0 deletions fastlane_bot/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,16 @@
SECTA_V3_NAME = "secta_v3"
METAVAULT_V3_NAME = "metavault_v3"
ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"

BLOCK_CHUNK_SIZE_MAP = {
"ethereum": 0,
"polygon": 0,
"polygon_zkevm": 0,
"arbitrum_one": 0,
"optimism": 0,
"coinbase_base": 0,
"fantom": 5000,
"mantle": 0,
"linea": 0,
"sei": 0,
}
56 changes: 46 additions & 10 deletions fastlane_bot/events/event_gatherer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
from itertools import chain
from typing import Dict
from typing import Dict, List

import asyncio
import nest_asyncio

from web3 import AsyncWeb3
from web3.contract import Contract

from fastlane_bot.config.constants import BLOCK_CHUNK_SIZE_MAP
from .interfaces.subscription import Subscription
from .exchanges.base import Exchange

Expand All @@ -21,6 +22,7 @@ class EventGatherer:

def __init__(
self,
blockchain: str,
w3: AsyncWeb3,
exchanges: Dict[str, Exchange],
event_contracts: Dict[str, Contract],
Expand All @@ -30,6 +32,7 @@ def __init__(
manager: The Manager object
w3: The connected AsyncWeb3 object.
"""
self._blockchain = blockchain
self._w3 = w3
self._subscriptions = []
unique_topics = set()
Expand All @@ -48,16 +51,49 @@ def get_all_events(self, from_block: int, to_block: int):
from_block_ = 0
else:
from_block_ = from_block
coroutines.append(self._get_events_for_topic(from_block_, to_block, sub))
coroutines.append(self._get_events_for_subscription(from_block_, to_block, sub))
results = asyncio.get_event_loop().run_until_complete(asyncio.gather(*coroutines))
return list(chain.from_iterable(results))

async def _get_events_for_topic(self, from_block: int, to_block: int, subscription: Subscription):
events = await self._w3.eth.get_logs(filter_params={
"fromBlock": from_block,
"toBlock": to_block,
"topics": [subscription.topic]
})
return [subscription.parse_log(event) for event in events]
async def _get_events_for_subscription(self, from_block: int, to_block: int, subscription: Subscription):
return [subscription.parse_log(log) for log in await self._get_logs_for_topics(from_block, to_block, [subscription.topic])]

async def _get_logs_for_topics(self, from_block: int, to_block: int, topics: List[str]):
chunk_size = BLOCK_CHUNK_SIZE_MAP[self._blockchain]
if chunk_size > 0:
return await self._get_logs_iterative(from_block, to_block, topics, chunk_size)
else:
return await self._get_logs_recursive(from_block, to_block, topics)

async def _get_logs_iterative(self, from_block: int, to_block: int, topics: List[str], chunk_size: int) -> list:
block_numbers = list(range(from_block, to_block + 1, chunk_size)) + [to_block + 1]
log_lists = await asyncio.gather([
self._w3.eth.get_logs(filter_params={
"fromBlock": r[0],
"toBlock": r[1],
"topics": topics
})
for r in zip(block_numbers, map(lambda n: n - 1, block_numbers[1:]))
])
return [log for log_list in log_lists for log in log_list]

async def _get_logs_recursive(self, from_block: int, to_block: int, topics: List[str]) -> list:
if from_block <= to_block:
try:
return await self._w3.eth.get_logs(filter_params={
"fromBlock": from_block,
"toBlock": to_block,
"topics": topics
})
except Exception as e:
assert "eth_getLogs" in str(e), str(e)
if from_block < to_block:
mid_block = (from_block + to_block) // 2
log_lists = await asyncio.gather(
self._get_logs_recursive(from_block, mid_block, topics),
self._get_logs_recursive(mid_block + 1, to_block, topics)
)
return [log for log_list in log_lists for log in log_list]
else:
raise e
raise Exception(f"Illegal log query range: {from_block} -> {to_block}")
7 changes: 6 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,12 @@ def run(mgr, args, tenderly_uri=None) -> None:
mainnet_uri = mgr.cfg.w3.provider.endpoint_uri
handle_static_pools_update(mgr)

event_gatherer = EventGatherer(w3=mgr.w3_async, exchanges=mgr.exchanges, event_contracts=mgr.event_contracts)
event_gatherer = EventGatherer(
blockchain=mgr.cfg.network.NETWORK,
w3=mgr.w3_async,
exchanges=mgr.exchanges,
event_contracts=mgr.event_contracts
)

pool_finder = PoolFinder(
carbon_forks=mgr.cfg.network.CARBON_V1_FORKS,
Expand Down
13 changes: 8 additions & 5 deletions run_blockchain_terraformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,11 +680,14 @@ def get_events_recursive(get_logs: any, start_block: int, end_block: int) -> lis
return get_logs(fromBlock=start_block, toBlock=end_block)
except Exception as e:
assert "eth_getLogs" in str(e), str(e)
mid_block = (start_block + end_block) // 2
event_list_1 = get_events_recursive(get_logs, start_block, mid_block)
event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block)
return event_list_1 + event_list_2
return []
if start_block < end_block:
mid_block = (start_block + end_block) // 2
event_list_1 = get_events_recursive(get_logs, start_block, mid_block)
event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block)
return event_list_1 + event_list_2
else:
raise e
raise Exception(f"Illegal log query range: {start_block} -> {end_block}")


def get_uni_v3_pools(
Expand Down

0 comments on commit 62cd989

Please sign in to comment.