diff --git a/src/ape/api/providers.py b/src/ape/api/providers.py index 21b251e743..4a2878b474 100644 --- a/src/ape/api/providers.py +++ b/src/ape/api/providers.py @@ -598,6 +598,35 @@ def get_transaction_trace( # type: ignore[empty-body] Iterator(:class:`~ape.type.trace.TraceFrame`): Transaction execution trace. """ + @raises_not_implemented + def poll_blocks( # type: ignore[empty-body] + self, + stop_block: Optional[int] = None, + required_confirmations: Optional[int] = None, + new_block_timeout: Optional[int] = None, + ) -> Iterator[BlockAPI]: # type: ignore[empty-body] + """ + Poll new blocks. + **NOTE**: When a chain reorganization occurs, this method logs an error and + yields the missed blocks, even if they were previously yielded with different + block numbers. + **NOTE**: This is a daemon method; it does not terminate unless an exception occurs + or a ``stop`` is given. + Args: + start_block (Optional[int]): The block number to start with. Defaults to the pending + block number. + stop_block (Optional[int]): Optionally set a future block number to stop at. + Defaults to never-ending. + required_confirmations (Optional[int]): The amount of confirmations to wait + before yielding the block. The more confirmations, the less likely a reorg will occur. + Defaults to the network's configured required confirmations. + new_block_timeout (Optional[float]): The amount of time to wait for a new block before + timing out. Defaults to 10 seconds for local networks or ``50 * block_time`` for live + networks. + Returns: + Iterator[:class:`~ape.api.providers.BlockAPI`] + """ + @raises_not_implemented def get_call_tree(self, txn_hash: str) -> CallTreeNode: # type: ignore[empty-body] """ @@ -1316,6 +1345,76 @@ def _find_txn_by_account_and_nonce( stop_block, ) + def poll_blocks(self, stop_block: Optional[int] = None, required_confirmations: Optional[int] = None, new_block_timeout: Optional[int] = None) -> Iterator[BlockAPI]: + filter = self.web3.eth.filter("latest") + network_name = self.network.name + block_time = self.network.block_time + timeout = ( + ( + 10.0 + if network_name == LOCAL_NETWORK_NAME or network_name.endswith("-fork") + else 50 * block_time + ) + if new_block_timeout is None + else new_block_timeout + ) + + if required_confirmations is None: + required_confirmations = self.network_confirmations + + last_yielded_height = None + + while True: + if stop_block is not None and last_yielded_height is not None and last_yielded_height >= stop_block: + break + changes = self.web3.eth.get_filter_changes(filter.filter_id) + for new_block_hash in changes: + block = self.web3.eth.get_block(new_block_hash) + confirmed_block_number = block.number - required_confirmations + if last_yielded_height and confirmed_block_number < last_yielded_height: + num_blocks_behind = last_yielded_height - confirmed_block_number + if num_blocks_behind > required_confirmations: + logger.error(f"{num_blocks_behind} Block reorganization detected. Try adjusting the required network confirmations") + else: + logger.warning(f"{num_blocks_behind} Block reorganization detected. Reorg is within the required network confirmations") + last_yielded_height = confirmed_block_number + continue + confirmed_block = self.web3.eth.get_block(confirmed_block_number) + start_time = time.time() + while confirmed_block is None: + if time.time() - start_time > timeout: + raise RuntimeError( + f"Timed out waiting for block {confirmed_block_number} to be available." + ) + logger.warning( + f"Block {confirmed_block_number} not found. Waiting 1 seconds." + ) + time.sleep(1) + confirmed_block = self.web3.eth.get_block(confirmed_block_number) + if last_yielded_height and confirmed_block.number < last_yielded_height: + num_blocks_behind = last_yielded_height - confirmed_block.number + logger.error(f"{num_blocks_behind} Block reorganization detected. Try adjusting the required network confirmations") + last_yielded_height = confirmed_block.number + yield confirmed_block + + def poll_logs( + self, + stop_block: Optional[int] = None, + address: Optional[AddressType] = None, + topics: Optional[List[Union[str, List[str]]]] = None, + required_confirmations: Optional[int] = None, + new_block_timeout: Optional[int] = None) -> Iterator[ContractLog]: + required_confirmations = ( + required_confirmations or self.provider.network.required_confirmations + ) + for block in self.poll_blocks(stop_block, required_confirmations, new_block_timeout): + yield from self.web3.eth.get_logs({ + "fromBlock": block.number, + "toBlock": block.number, + "address": address, + "topics": topics + }) + def block_ranges(self, start=0, stop=None, page=None): if stop is None: stop = self.chain_manager.blocks.height