forked from nucypher/nucypher-contracts
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Script to survey infractions before reporting on-chain
- Loading branch information
Showing
3 changed files
with
193 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from ape import project, chain, Contract | ||
from ape.logging import logger | ||
|
||
# Address of the deployed InfractionCollector contract | ||
contract_address = "0x63d2A01f006D553a2348386355f8FC3028CDf3bB" | ||
deployment_block = 61059570 | ||
|
||
# Load the contract using the project's ABI | ||
infraction_collector = Contract(contract_address) | ||
|
||
|
||
# Define a function to collect InfractionReported events | ||
def main(): | ||
# Get all InfractionReported events from the contract | ||
events = infraction_collector.InfractionReported.range(deployment_block) | ||
|
||
# Iterate through the events and print the details | ||
for event in events: | ||
print(event) | ||
ritual_id = event['ritualId'] | ||
staking_provider = event['stakingProvider'] | ||
infraction_type = event['infractionType'] | ||
logger.info(f"Ritual ID: {ritual_id}, Staking Provider: {staking_provider}, Infraction Type: {infraction_type}") | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from datetime import datetime, timedelta | ||
|
||
|
||
class StakingProvider: | ||
def __init__(self, provider_id): | ||
self.provider_id = provider_id | ||
self.violations = 0 | ||
self.penalties = [] | ||
|
||
def report_violation(self): | ||
self.violations += 1 | ||
self.apply_penalty() | ||
|
||
def apply_penalty(self): | ||
now = datetime.now() | ||
penalty_duration = timedelta(days=90) # 3 months | ||
penalty = None | ||
|
||
if self.violations == 1: | ||
penalty = {"withholding": 0.30, "end_date": now + penalty_duration} | ||
elif self.violations == 2: | ||
penalty = {"withholding": 0.60, "end_date": now + penalty_duration} | ||
elif self.violations == 3: | ||
penalty = {"withholding": 0.90, "end_date": now + penalty_duration} | ||
elif self.violations >= 4: | ||
penalty = {"slashing": "TBD", "effective_date": now} | ||
|
||
if penalty: | ||
self.penalties.append(penalty) | ||
|
||
def current_penalty(self): | ||
now = datetime.now() | ||
active_penalties = [ | ||
penalty for penalty in self.penalties | ||
if "end_date" in penalty and penalty["end_date"] > now | ||
] | ||
return active_penalties[-1] if active_penalties else None | ||
|
||
def is_slashing_applicable(self): | ||
return any("slashing" in penalty for penalty in self.penalties) | ||
|
||
|
||
# Example usage: | ||
provider = StakingProvider("provider_1") | ||
|
||
# Simulate violations | ||
provider.report_violation() # 1st violation | ||
print(provider.current_penalty()) # Should show 30% withholding | ||
|
||
provider.report_violation() # 2nd violation | ||
print(provider.current_penalty()) # Should show 60% withholding | ||
|
||
provider.report_violation() # 3rd violation | ||
print(provider.current_penalty()) # Should show 90% withholding | ||
|
||
provider.report_violation() # 4th violation | ||
print(provider.is_slashing_applicable()) # Should indicate slashing is applicable |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
from collections import Counter, defaultdict | ||
from enum import IntEnum | ||
|
||
import click | ||
from ape import networks, project | ||
from ape.cli import ConnectedProviderCommand, network_option | ||
|
||
from deployment.constants import SUPPORTED_TACO_DOMAINS | ||
from deployment.registry import contracts_from_registry | ||
from deployment.utils import registry_filepath_from_domain | ||
|
||
# Define RitualState as an IntEnum for clarity and ease of use | ||
RitualState = IntEnum( | ||
"RitualState", | ||
[ | ||
"NON_INITIATED", | ||
"DKG_AWAITING_TRANSCRIPTS", | ||
"DKG_AWAITING_AGGREGATIONS", | ||
"DKG_TIMEOUT", | ||
"DKG_INVALID", | ||
"ACTIVE", | ||
"EXPIRED", | ||
], | ||
start=0, | ||
) | ||
|
||
# Define end states and successful end states | ||
END_STATES = [ | ||
RitualState.DKG_TIMEOUT, | ||
RitualState.DKG_INVALID, | ||
RitualState.ACTIVE, | ||
RitualState.EXPIRED, | ||
] | ||
|
||
SUCCESSFUL_END_STATES = [RitualState.ACTIVE, RitualState.EXPIRED] | ||
|
||
|
||
def calculate_penalty(offense_count): | ||
"""Calculate penalty percentage based on the number of offenses.""" | ||
if offense_count == 1: | ||
return '30% withholding for 3 months' | ||
elif offense_count == 2: | ||
return '60% withholding for 3 months' | ||
elif offense_count == 3: | ||
return '90% withholding for 3 months' | ||
elif offense_count >= 4: | ||
return "Slashing (TBD)" | ||
return 0 | ||
|
||
|
||
@click.command(cls=ConnectedProviderCommand) | ||
@network_option(required=True) | ||
@click.option( | ||
"--domain", | ||
"-d", | ||
help="TACo domain", | ||
type=click.Choice(SUPPORTED_TACO_DOMAINS), | ||
required=True, | ||
) | ||
@click.option( | ||
"--from-ritual", | ||
"-r", | ||
help="Ritual ID to start from", | ||
default=0, | ||
type=int, | ||
) | ||
def cli(network, domain, from_ritual): | ||
registry_filepath = registry_filepath_from_domain(domain=domain) | ||
contracts = contracts_from_registry( | ||
registry_filepath, chain_id=networks.active_provider.chain_id | ||
) | ||
coordinator = project.Coordinator.at(contracts["Coordinator"].address) | ||
last_ritual_id = coordinator.numberOfRituals() | ||
|
||
counter = Counter() | ||
offenders = defaultdict(list) | ||
provider_offense_count = defaultdict(int) | ||
|
||
for ritual_id in range(from_ritual, last_ritual_id - 1): | ||
ritual_state = coordinator.getRitualState(ritual_id) | ||
|
||
if ritual_state in SUCCESSFUL_END_STATES: | ||
counter['ok'] += 1 | ||
print(f"Ritual ID: {ritual_id} OK") | ||
continue | ||
|
||
ritual = coordinator.rituals(ritual_id) | ||
participants = coordinator.getParticipants(ritual_id) | ||
missing_transcripts = len(participants) - ritual.totalTranscripts | ||
missing_aggregates = len(participants) - ritual.totalAggregations | ||
|
||
if missing_transcripts or missing_aggregates: | ||
issue = 'transcripts' if missing_transcripts else 'aggregates' | ||
counter[f'missing_{issue}'] += 1 | ||
print(f"(!) Ritual {ritual_id} missing " | ||
f"{missing_transcripts or missing_aggregates}/{len(participants)} {issue}") | ||
|
||
for participant in participants: | ||
if not participant.transcript: | ||
offenders[ritual_id].append(participant.provider) | ||
provider_offense_count[participant.provider] += 1 | ||
print(f"\t{participant.provider} (!) Missing transcript") | ||
|
||
print(f"Total rituals: {last_ritual_id - from_ritual}") | ||
print("Provider Offense Count and Penalties") | ||
for provider, count in provider_offense_count.items(): | ||
penalty = calculate_penalty(count) | ||
print(f"\t{provider}: {count} offenses, Penalty: {penalty}") | ||
|
||
|
||
if __name__ == "__main__": | ||
cli() |