Skip to content

Commit

Permalink
fix: lint + long lines
Browse files Browse the repository at this point in the history
  • Loading branch information
z80dev committed Sep 26, 2023
1 parent d818ecf commit 0e59efd
Showing 1 changed file with 77 additions and 21 deletions.
98 changes: 77 additions & 21 deletions src/ape/api/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,17 @@ def poll_blocks( # type: ignore[empty-body]
stop_block: Optional[int] = None,
required_confirmations: Optional[int] = None,
new_block_timeout: Optional[int] = None,
) -> Iterator[BlockAPI]: # type: ignore[empty-body]
) -> Iterator[BlockAPI]: # type: ignore[empty-body]
"""
Poll new blocks.
**NOTE**: When a chain reorganization occurs, this method logs an error and
yields the missed blocks, even if they were previously yielded with different
block numbers.
**NOTE**: This is a daemon method; it does not terminate unless an exception occurs
or a ``stop`` is given.
Args:
start_block (Optional[int]): The block number to start with. Defaults to the pending
block number.
Expand All @@ -623,10 +626,44 @@ def poll_blocks( # type: ignore[empty-body]
new_block_timeout (Optional[float]): The amount of time to wait for a new block before
timing out. Defaults to 10 seconds for local networks or ``50 * block_time`` for live
networks.
Returns:
Iterator[:class:`~ape.api.providers.BlockAPI`]
"""

@raises_not_implemented
def poll_logs( # type: ignore[empty-body]
self,
stop_block: Optional[int] = None,
address: Optional[AddressType] = None,
topics: Optional[List[Union[str, List[str]]]] = None,
required_confirmations: Optional[int] = None,
new_block_timeout: Optional[int] = None,
) -> Iterator[ContractLog]: # type: ignore[empty-body]
"""
Poll new blocks. Optionally set a start block to include historical blocks.
**NOTE**: This is a daemon method; it does not terminate unless an exception occurrs.
Usage example::
for new_log in contract.MyEvent.poll_logs():
print(f"New event log found: block_number={new_log.block_number}")
Args:
stop_block (Optional[int]): Optionally set a future block number to stop at.
Defaults to never-ending.
required_confirmations (Optional[int]): The amount of confirmations to wait
before yielding the block. The more confirmations, the less likely a reorg will occur.
Defaults to the network's configured required confirmations.
new_block_timeout (Optional[int]): The amount of time to wait for a new block before
quitting. Defaults to 10 seconds for local networks or ``50 * block_time`` for live
networks.
Returns:
Iterator[:class:`~ape.types.ContractLog`]
"""

@raises_not_implemented
def get_call_tree(self, txn_hash: str) -> CallTreeNode: # type: ignore[empty-body]
"""
Expand Down Expand Up @@ -1345,7 +1382,12 @@ def _find_txn_by_account_and_nonce(
stop_block,
)

def poll_blocks(self, stop_block: Optional[int] = None, required_confirmations: Optional[int] = None, new_block_timeout: Optional[int] = None) -> Iterator[BlockAPI]:
def poll_blocks(
self,
stop_block: Optional[int] = None,
required_confirmations: Optional[int] = None,
new_block_timeout: Optional[int] = None,
) -> Iterator[BlockAPI]:
filter = self.web3.eth.filter("latest")
network_name = self.network.name
block_time = self.network.block_time
Expand All @@ -1365,7 +1407,11 @@ def poll_blocks(self, stop_block: Optional[int] = None, required_confirmations:
last_yielded_height = None

while True:
if stop_block is not None and last_yielded_height is not None and last_yielded_height >= stop_block:
if (
stop_block is not None
and last_yielded_height is not None
and last_yielded_height >= stop_block
):
break
changes = self.web3.eth.get_filter_changes(filter.filter_id)
for new_block_hash in changes:
Expand All @@ -1374,9 +1420,15 @@ def poll_blocks(self, stop_block: Optional[int] = None, required_confirmations:
if last_yielded_height and confirmed_block_number < last_yielded_height:
num_blocks_behind = last_yielded_height - confirmed_block_number
if num_blocks_behind > required_confirmations:
logger.error(f"{num_blocks_behind} Block reorganization detected. Try adjusting the required network confirmations")
logger.error(
f"{num_blocks_behind} Block reorganization detected. "
+ "Try adjusting the required network confirmations"
)
else:
logger.warning(f"{num_blocks_behind} Block reorganization detected. Reorg is within the required network confirmations")
logger.warning(
f"{num_blocks_behind} Block reorganization detected. "
+ "Reorg is within the required network confirmations"
)
last_yielded_height = confirmed_block_number
continue
confirmed_block = self.web3.eth.get_block(confirmed_block_number)
Expand All @@ -1386,34 +1438,38 @@ def poll_blocks(self, stop_block: Optional[int] = None, required_confirmations:
raise RuntimeError(
f"Timed out waiting for block {confirmed_block_number} to be available."
)
logger.warning(
f"Block {confirmed_block_number} not found. Waiting 1 seconds."
)
logger.warning(f"Block {confirmed_block_number} not found. Waiting 1 seconds.")
time.sleep(1)
confirmed_block = self.web3.eth.get_block(confirmed_block_number)
if last_yielded_height and confirmed_block.number < last_yielded_height:
num_blocks_behind = last_yielded_height - confirmed_block.number
logger.error(f"{num_blocks_behind} Block reorganization detected. Try adjusting the required network confirmations")
logger.error(
f"{num_blocks_behind} Block reorganization detected. "
+ "Try adjusting the required network confirmations"
)
last_yielded_height = confirmed_block.number
yield self.network.ecosystem.decode_block(confirmed_block)

def poll_logs(
self,
stop_block: Optional[int] = None,
address: Optional[AddressType] = None,
topics: Optional[List[Union[str, List[str]]]] = None,
required_confirmations: Optional[int] = None,
new_block_timeout: Optional[int] = None) -> Iterator[ContractLog]:
self,
stop_block: Optional[int] = None,
address: Optional[AddressType] = None,
topics: Optional[List[Union[str, List[str]]]] = None,
required_confirmations: Optional[int] = None,
new_block_timeout: Optional[int] = None,
) -> Iterator[ContractLog]:
required_confirmations = (
required_confirmations or self.provider.network.required_confirmations
)
for block in self.poll_blocks(stop_block, required_confirmations, new_block_timeout):
for log in self.web3.eth.get_logs({
"fromBlock": block.number,
"toBlock": block.number,
"address": address,
"topics": topics
}):
for log in self.web3.eth.get_logs(
{
"fromBlock": block.number,
"toBlock": block.number,
"address": address,
"topics": topics,
}
):
yield ContractLog.parse_obj(log)

def block_ranges(self, start=0, stop=None, page=None):
Expand Down

0 comments on commit 0e59efd

Please sign in to comment.