diff --git a/.env.example b/.env.example index 643a4726..026bd61b 100644 --- a/.env.example +++ b/.env.example @@ -41,6 +41,28 @@ PERCENTAGE_CHANGE_THRESHOLD="0.005" # percentage difference >= 0.5% triggers a r #LP_PULSE_NETWORK_URL="https://rpc.v4.testnet.pulsechain.com" -#PLS_CURRENCY_SOURCES="usdc,dai" -#PLS_ADDR_SOURCES="0xb7f1f5a3b79b0664184bb0a5893aa9359615171b,0xa2d510bf42d2b9766db186f44a902228e76ef262" -#PLS_LPS_ORDER="WPLS/USDC,WPLS/tDAI" +### testnet sources +# LP_PULSE_NETWORK_URL="https://rpc.v4.testnet.pulsechain.com" +# PLS_CURRENCY_SOURCES="usdc,dai" +# PLS_ADDR_SOURCES="0xb7f1f5a3b79b0664184bb0a5893aa9359615171b,0xa2d510bf42d2b9766db186f44a902228e76ef262" +# PLS_LPS_ORDER="WPLS/USDC,WPLS/tDAI" + +### FetchRNG settings + +# Etherscan API key is required in order to report using the included RNG +# This is required for reporting to RNG feeds +# ETHERSCAN_API_KEY= + +### Settings for interval based RNG feeds +## Interval time with will be used to calculate the timestamp +# INTERVAL=300 + +# Start time syncs the interval reporter in order to sync with consuming smart contracts +# START_TIME=1653350400 + +# For RNG managed feeds, this is the name of the RNG feed +# FETCH_RNG_NAME=test + + + + diff --git a/src/telliot_feeds/cli/commands/report.py b/src/telliot_feeds/cli/commands/report.py index df1b838b..ceef214e 100644 --- a/src/telliot_feeds/cli/commands/report.py +++ b/src/telliot_feeds/cli/commands/report.py @@ -2,6 +2,7 @@ from typing import Optional import os +import os import click from chained_accounts import find_accounts from click.core import Context @@ -26,6 +27,7 @@ from telliot_feeds.queries.query_catalog import query_catalog from telliot_feeds.reporters.flashbot import FlashbotsReporter from telliot_feeds.reporters.rng_interval import RNGReporter +from telliot_feeds.reporters.rng_interval_custom import RNGCustomReporter from telliot_feeds.reporters.fetch_360 import Fetch360Reporter from telliot_feeds.reporters.fetch_flex import FetchFlexReporter from telliot_feeds.utils.cfg import check_endpoint @@ -371,6 +373,7 @@ async def report( continue_reporting_on_validator_unreachable: bool, ) -> None: """Report values to Fetch oracle""" + ctx.obj["ACCOUNT_NAME"] = account_str ctx.obj["SIGNATURE_ACCOUNT_NAME"] = signature_account @@ -534,24 +537,33 @@ async def report( **common_reporter_kwargs, ) # type: ignore else: - reporter = FetchFlexReporter(**{ - **common_reporter_kwargs, - "continue_reporting_on_dispute": continue_reporting_on_dispute, - "price_validation_method": price_validation_method, - "price_validation_consensus": price_validation_consensus, - "continue_reporting_on_validator_unreachable": continue_reporting_on_validator_unreachable, - "use_estimate_fee": use_estimate_fee, - "use_gas_api": use_gas_api, - "force_nonce": force_nonce, - "tx_timeout": tx_timeout, - }) # type: ignore + if getattr(chosen_feed.query, 'is_custom_rng', False): + common_reporter_kwargs["wait_period"] = int(os.getenv('REPORT_INTERVAL', "300")) + reporter = RNGCustomReporter(**{ + **common_reporter_kwargs, + }) # type: ignore + else: + reporter = FetchFlexReporter(**{ + **common_reporter_kwargs, + "continue_reporting_on_dispute": continue_reporting_on_dispute, + "price_validation_method": price_validation_method, + "price_validation_consensus": price_validation_consensus, + "continue_reporting_on_validator_unreachable": continue_reporting_on_validator_unreachable, + "use_estimate_fee": use_estimate_fee, + "use_gas_api": use_gas_api, + "force_nonce": force_nonce, + "tx_timeout": tx_timeout, + }) # type: ignore os.environ["PRICE_VALIDATION_METHOD"] = price_validation_method if submit_once: + if chosen_feed.query.asset == 'validated-feed': await reporter.managed_feed_report(submit_once=True) - else: - _, _ = await reporter.report_once() + return + + _, _ = await reporter.report_once() + else: await reporter.report() diff --git a/src/telliot_feeds/feeds/__init__.py b/src/telliot_feeds/feeds/__init__.py index 7ec06912..22ffea78 100644 --- a/src/telliot_feeds/feeds/__init__.py +++ b/src/telliot_feeds/feeds/__init__.py @@ -55,6 +55,7 @@ from telliot_feeds.feeds.sushi_usd_feed import sushi_usd_median_feed from telliot_feeds.feeds.fetch_rng_feed import fetch_rng_feed from telliot_feeds.feeds.fetch_rng_manual_feed import fetch_rng_manual_feed +from telliot_feeds.feeds.fetch_rng_custom_feed import fetch_rng_custom_feed from telliot_feeds.feeds.fetch_usd_feed import fetch_usd_median_feed from telliot_feeds.feeds.twap_manual_feed import twap_30d_example_manual_feed from telliot_feeds.feeds.twap_manual_feed import twap_manual_feed @@ -66,7 +67,6 @@ from telliot_feeds.feeds.xdai_usd_feed import xdai_usd_median_feed from telliot_feeds.feeds.yfi_usd_feed import yfi_usd_median_feed - CATALOG_FEEDS = { "ampleforth-custom": ampl_usd_vwap_feed, "ampleforth-uspce": uspce_feed, @@ -89,6 +89,7 @@ "diva-protocol-example": diva_manual_feed, "string-query-example": string_query_feed, "fetch-rng-example": fetch_rng_feed, + "fetch-rng-custom": fetch_rng_custom_feed, "twap-eth-usd-example": twap_30d_example_manual_feed, "pls-usd-spot": pls_usd_feed, "pls-usd-spot-twap": pls_usd_twap_feed, @@ -135,6 +136,7 @@ "TWAP": twap_manual_feed, "DailyVolatility": daily_volatility_manual_feed, "FetchRNG": fetch_rng_feed, + "FetchRNGCustom": fetch_rng_custom_feed, "FetchRNGManualResponse": fetch_rng_manual_feed, "AmpleforthCustomSpotPrice": ampl_usd_vwap_feed, "AmpleforthUSPCE": uspce_feed, diff --git a/src/telliot_feeds/feeds/fetch_rng_custom_feed.py b/src/telliot_feeds/feeds/fetch_rng_custom_feed.py new file mode 100644 index 00000000..768b5010 --- /dev/null +++ b/src/telliot_feeds/feeds/fetch_rng_custom_feed.py @@ -0,0 +1,26 @@ +"""Datafeed for pseudorandom number from hashing multiple blockhashes together.""" +from typing import Optional + +import os +import chained_accounts +from telliot_core.model.endpoints import RPCEndpoint + +from telliot_feeds.datafeed import DataFeed +from telliot_feeds.queries.fetch_rng_custom import FetchRNGCustom +from telliot_feeds.sources.custom_blockhash_aggregator import FetchRNGCustomManualSource + +FETCH_RNG_NAME = os.getenv('FETCH_RNG_NAME', "custom") +START_TIME = int(os.getenv('START_TIME', 1653350400)) # 2022-5-24 00:00:00 GMT + +local_source = FetchRNGCustomManualSource() + +fetch_rng_custom_feed = DataFeed(source=local_source, query=FetchRNGCustom(name=FETCH_RNG_NAME, interval=START_TIME)) + +async def assemble_rng_datafeed( + timestamp: int +) -> Optional[DataFeed[float]]: + """Assembles a FetchRNG custom datafeed for the given start timestamp.""" + local_source.set_timestamp(timestamp) + feed = DataFeed(source=local_source, query=FetchRNGCustom(name=FETCH_RNG_NAME, interval=START_TIME)) + + return feed diff --git a/src/telliot_feeds/feeds/fetch_rng_feed.py b/src/telliot_feeds/feeds/fetch_rng_feed.py index 3b64c6e3..9a88ef2f 100644 --- a/src/telliot_feeds/feeds/fetch_rng_feed.py +++ b/src/telliot_feeds/feeds/fetch_rng_feed.py @@ -9,7 +9,7 @@ from telliot_feeds.sources.blockhash_aggregator import FetchRNGManualSource local_source = FetchRNGManualSource() -local_source.set_timestamp(1660567612) + fetch_rng_feed = DataFeed(source=local_source, query=FetchRNG(timestamp=local_source.timestamp)) diff --git a/src/telliot_feeds/queries/fetch_rng_custom.py b/src/telliot_feeds/queries/fetch_rng_custom.py new file mode 100644 index 00000000..7d5d99b0 --- /dev/null +++ b/src/telliot_feeds/queries/fetch_rng_custom.py @@ -0,0 +1,64 @@ +import logging +from dataclasses import field, dataclass +from hexbytes import HexBytes +from telliot_feeds.dtypes.value_type import ValueType +from telliot_feeds.queries.abi_query import AbiQuery + +from eth_abi import decode_abi +from eth_abi import encode_abi + +logger = logging.getLogger(__name__) + +@dataclass +class FetchRNGCustomReturnType(ValueType): + + abi_type: str = "(bytes32,ufixed256x18)" + + def encode(self, value: tuple) -> bytes: + """An encoder for Fetch RNG response type + + Encodes a tuple of float values. + """ + if len(value) != 2 or not isinstance(value[0], HexBytes) or not isinstance(value[1], int): + raise ValueError("Invalid response type") + + return encode_abi(["bytes32", "uint256"], value) + + def decode(self, bytes_val: bytes) -> tuple: + """A decoder for for Fetch RNG response type + + Decodes a tuple of float values. + """ + return decode_abi(["bytes32", "uint256"], bytes_val) + +@dataclass +class FetchRNGCustom(AbiQuery): + """Returns a pseudorandom number generated from hashing together blockhashes from multiple chains. + + Attributes: + name: + name of the custom rng feed + timestamp: + time at which to take the most recent blockhashes (example: 1647624359) + """ + is_custom_rng: bool = field(default=True, init=False) + is_managed_feed: bool = field(default=True, init=False) + + name: str + interval: int + + #: ABI used for encoding/decoding parameters + abi = [ + {"name": "name", "type": "string"}, + {"name": "interval", "type": "uint256"} + ] + + @property + def value_type(self) -> ValueType: + """Data type returned for a FetchRNG query. + + - `bytes32`: 32 bytes hexadecimal value + - `packed`: false + """ + #return ValueType(abi_type="bytes32", packed=False) + return FetchRNGCustomReturnType() \ No newline at end of file diff --git a/src/telliot_feeds/queries/query_catalog.py b/src/telliot_feeds/queries/query_catalog.py index 2b8ed0f9..b4ca7083 100644 --- a/src/telliot_feeds/queries/query_catalog.py +++ b/src/telliot_feeds/queries/query_catalog.py @@ -12,6 +12,11 @@ from telliot_feeds.queries.snapshot import Snapshot from telliot_feeds.queries.string_query import StringQuery from telliot_feeds.queries.fetch_rng import FetchRNG +from telliot_feeds.queries.fetch_rng_custom import FetchRNGCustom +import os + +START_TIME = int(os.getenv('START_TIME', "1653350400")) # 2022-5-24 00:00:00 GMT +FETCH_RNG_NAME = os.getenv('FETCH_RNG_NAME', "custom") #default feed name to custom """Main instance of the Query Catalog.""" query_catalog = Catalog() @@ -324,3 +329,10 @@ q=MimicryCollectionStat( collectionAddress="0x5180db8F5c931aaE63c74266b211F580155ecac8", chainId=1, metric=0), ) + +query_catalog.add_entry( + tag="fetch-rng-custom", + title="Fetch RNG Custom", + q=FetchRNGCustom( + name=FETCH_RNG_NAME, interval=START_TIME) +) \ No newline at end of file diff --git a/src/telliot_feeds/reporters/interval.py b/src/telliot_feeds/reporters/interval.py index ea87a78f..a46c36d1 100644 --- a/src/telliot_feeds/reporters/interval.py +++ b/src/telliot_feeds/reporters/interval.py @@ -721,7 +721,12 @@ async def report(self, report_count: Optional[int] = None) -> None: """Submit values to Fetch oracles on an interval.""" datafeed = await self.fetch_datafeed() - asset = datafeed.query.asset + + try: + asset = getattr(datafeed.query, 'asset', '') + except: + asset = '' + logger.warning("Could not fetch datafeed") if asset == "validated-feed": await self.managed_feed_report() diff --git a/src/telliot_feeds/reporters/rng_interval.py b/src/telliot_feeds/reporters/rng_interval.py index 2bc1d8f2..dd887998 100644 --- a/src/telliot_feeds/reporters/rng_interval.py +++ b/src/telliot_feeds/reporters/rng_interval.py @@ -3,6 +3,7 @@ """ import calendar import time +import os from typing import Any from typing import Optional @@ -13,15 +14,14 @@ from telliot_feeds.feeds.fetch_rng_feed import assemble_rng_datafeed from telliot_feeds.queries.fetch_rng import FetchRNG from telliot_feeds.reporters.reporter_autopay_utils import get_feed_tip -from telliot_feeds.reporters.fetch_360 import Fetch360Reporter +from telliot_feeds.reporters.fetch_flex import FetchFlexReporter from telliot_feeds.utils.log import get_logger logger = get_logger(__name__) -INTERVAL = 60 * 30 # 30 minutes -START_TIME = 1653350400 # 2022-5-24 00:00:00 GMT - +INTERVAL = os.getenv('REPORT_INTERVAL', 60 * 5) # 5 minutes +START_TIME = os.getenv('START_TIME', 1653350400) # 2022-5-24 00:00:00 GMT def get_next_timestamp() -> int: """get next target timestamp""" @@ -30,9 +30,9 @@ def get_next_timestamp() -> int: return target_ts -class RNGReporter(Fetch360Reporter): +class RNGReporter(FetchFlexReporter): """Reports FetchRNG values at a fixed interval to FetchFlex - on Polygon.""" + on Pulsechain.""" async def fetch_datafeed(self) -> Optional[DataFeed[Any]]: status = ResponseStatus() @@ -53,7 +53,7 @@ async def fetch_datafeed(self) -> Optional[DataFeed[Any]]: logger.info(status.error) return None - datafeed = await assemble_rng_datafeed(timestamp=rng_timestamp, node=self.endpoint, account=self.account) + datafeed = await assemble_rng_datafeed(timestamp=rng_timestamp) if datafeed is None: msg = "Unable to assemble RNG datafeed" error_status(note=msg, log=logger.warning) @@ -77,4 +77,4 @@ async def fetch_datafeed(self) -> Optional[DataFeed[Any]]: return None tip += feed_tip logger.debug(f"Current tip for RNG query: {tip}") - return datafeed + return datafeed \ No newline at end of file diff --git a/src/telliot_feeds/reporters/rng_interval_custom.py b/src/telliot_feeds/reporters/rng_interval_custom.py new file mode 100644 index 00000000..4ff95ca2 --- /dev/null +++ b/src/telliot_feeds/reporters/rng_interval_custom.py @@ -0,0 +1,87 @@ +"""FetchRNG auto submitter. +submits FetchRNG values at a fixed time interval in managed feeds +""" +import calendar +import time +import os +import codecs +from typing import Any +from typing import Optional +from typing import Tuple + +from eth_abi import decode_abi + +from telliot_core.utils.response import error_status +from telliot_core.utils.response import ResponseStatus + +from telliot_feeds.datafeed import DataFeed +from telliot_feeds.feeds.fetch_rng_custom_feed import assemble_rng_datafeed +from telliot_feeds.queries.fetch_rng_custom import FetchRNGCustom +from telliot_feeds.reporters.reporter_autopay_utils import get_feed_tip +from telliot_feeds.reporters.fetch_flex import FetchFlexReporter +from telliot_feeds.utils.log import get_logger + + +logger = get_logger(__name__) + +INTERVAL = int(os.getenv('REPORT_INTERVAL', "300")) # 5 minutes +START_TIME = int(os.getenv('START_TIME', "1653350400")) # 2022-5-24 00:00:00 GMT +FETCH_RNG_NAME = os.getenv('FETCH_RNG_NAME', "custom") #default feed name to custom + +def get_next_timestamp() -> int: + """get next target timestamp""" + now = calendar.timegm(time.gmtime()) + target_ts = START_TIME + (now - START_TIME) // INTERVAL * INTERVAL + return target_ts + + +class RNGCustomReporter(FetchFlexReporter): + """Reports FetchRNG values at a fixed interval to FetchFlex + on Pulsechain.""" + + async def fetch_datafeed(self) -> Optional[DataFeed[Any]]: + status = ResponseStatus() + + rng_timestamp = get_next_timestamp() + query = FetchRNGCustom(FETCH_RNG_NAME, START_TIME) + timestamp_reported, read_status = await self.check_if_timestamp_reported(query.query_id, rng_timestamp) + + logger.info(f"rng_timestamp (timestamp interval): {rng_timestamp}") + logger.info(f"Generated query id: {codecs.encode(query.query_id, 'hex')}") + + if not read_status.ok: + status.error = "Unable to check if timestamp was reported: " + read_status.error # error won't be none # noqa: E501 + logger.error(status.error) + status.e = read_status.e + return None + + if timestamp_reported: + status.ok = False + status.error = f"Latest timestamp in interval {rng_timestamp} already reported" + logger.info(status.error) + return None + + datafeed = await assemble_rng_datafeed(timestamp=rng_timestamp) + if datafeed is None: + msg = "Unable to assemble RNG datafeed" + error_status(note=msg, log=logger.warning) + return None + + logger.info(f"Datafeed: {datafeed}") + + self.datafeed = datafeed + return datafeed + + + async def ensure_profitable(self, datafeed: DataFeed[Any]) -> ResponseStatus: + """This Reporter does not check for profitability""" + status = ResponseStatus() + status.ok = True + return status + + + async def check_if_timestamp_reported(self, query_id: bytes, rng_timestamp: int) -> Tuple[bool, ResponseStatus]: + value, read_status = await self.oracle.read(func_name="getCurrentValue", _queryId=query_id) + rng_hash, timestamp = decode_abi(["bytes32", "uint256"], value) + return (rng_timestamp == timestamp), read_status + diff --git a/src/telliot_feeds/sources/blockhash_aggregator.py b/src/telliot_feeds/sources/blockhash_aggregator.py index de1d4b86..6a33e714 100644 --- a/src/telliot_feeds/sources/blockhash_aggregator.py +++ b/src/telliot_feeds/sources/blockhash_aggregator.py @@ -1,5 +1,8 @@ import asyncio import time +import os + +from collections import deque from dataclasses import dataclass from datetime import datetime from datetime import timezone @@ -44,7 +47,7 @@ def get_mainnet_web3() -> Any: return None -def block_num_from_timestamp(timestamp: int) -> Optional[int]: +def pls_block_num_from_timestamp(timestamp: int) -> Optional[int]: with requests.Session() as s: s.mount("https://", adapter) try: @@ -84,6 +87,46 @@ def block_num_from_timestamp(timestamp: int) -> Optional[int]: return result +def eth_block_num_from_timestamp(timestamp: int) -> Optional[int]: + with requests.Session() as s: + s.mount("https://", adapter) + try: + rsp = s.get( + "https://api.etherscan.io/api" + "?module=block" + "&action=getblocknobytime" + f"×tamp={timestamp}" + "&closest=after" + f"&apikey={os.getenv('ETHERSCAN_API_KEY')}" + ) + except requests.exceptions.ConnectTimeout: + logger.error("Connection timeout getting Eth block num from timestamp") + return None + except requests.exceptions.RequestException as e: + logger.error(f"Etherscan API error: {e}") + return None + + try: + this_block = rsp.json() + except JSONDecodeError: + logger.error("Etherscan API returned invalid JSON") + return None + + try: + if this_block["status"] != "1": + logger.error(f"Etherscan API returned error: {this_block['message']}") + return None + except KeyError: + logger.error("Etherscan API returned JSON without status") + return None + + try: + result = int(this_block["result"]) + except ValueError: + logger.error("Etherscan API returned invalid block number") + return None + + return result async def get_pls_hash(timestamp: int) -> Optional[str]: """Fetches next Pulsechain blockhash after timestamp from API.""" @@ -102,7 +145,7 @@ async def get_pls_hash(timestamp: int) -> Optional[str]: logger.error(f"BTC Timestamp {timestamp} is older than current PLS block timestamp {this_block['timestamp']}") return None - block_num = block_num_from_timestamp(timestamp) + block_num = pls_block_num_from_timestamp(timestamp) if block_num is None: logger.warning("Unable to retrieve block number from Pulsescan API") return None @@ -154,11 +197,54 @@ async def get_btc_hash(timestamp: int) -> Tuple[Optional[str], Optional[int]]: break if block["time"] < timestamp: - logger.warning("Blockchain.info API returned no blocks after timestamp") + logger.warning(f"Blockchain.info API returned no blocks after timestamp time is {block['time']}") return None, None logger.info(f"Using BTC block number {block['height']}") return str(block["hash"]), block["time"] +async def get_eth_hash(timestamp: int) -> Tuple[Optional[str], Optional[int]]: + """Fetches next Ethereum blockhash after timestamp from API.""" + + blockNumber = eth_block_num_from_timestamp(timestamp) + + if blockNumber is None: + logger.warning("Could not get Ethereum block number") + return None, None + + with requests.Session() as s: + s.mount("https://", adapter) + try: + rsp = s.get( + "https://api.etherscan.io/api" + "?module=proxy" + "&action=eth_getBlockByNumber" + f"&tag={hex(blockNumber)}" + "&boolean=false" + f"&apikey={os.getenv('ETHERSCAN_API_KEY')}" + ) + except requests.exceptions.ConnectTimeout: + logger.error("Connection timeout getting Eth block num from timestamp") + return None, None + except requests.exceptions.RequestException as e: + logger.error(f"Etherscan API error: {e}") + return None, None + + try: + this_block = rsp.json() + except JSONDecodeError: + logger.error("Etherscan API returned invalid JSON") + return None, None + + try: + if this_block["id"] != 1: + logger.error(f"Etherscan API returned error: {this_block['message']}") + return None + except KeyError: + logger.error("Etherscan API returned JSON without status") + return None, None + + logger.info(f"Using ETH block number {blockNumber}") + return str(this_block["result"]["hash"]), int(this_block["result"]["timestamp"], 16) @dataclass class FetchRNGManualSource(DataSource[Any]): @@ -223,6 +309,10 @@ async def fetch_new_datapoint(self) -> OptionalDataPoint[bytes]: Returns: Current time-stamped value """ + + """ To avoid caching issues, we delete the previously reported values """ + #self._history = deque(maxlen=self.max_datapoints) + if not self.is_valid_timestamp(self.timestamp): try: timestamp = self.parse_user_val() @@ -232,28 +322,31 @@ async def fetch_new_datapoint(self) -> OptionalDataPoint[bytes]: else: timestamp = self.timestamp - btc_hash, btc_timestamp = await get_btc_hash(timestamp) + eth_hash, eth_timestamp = await get_eth_hash(timestamp) - if btc_hash is None: - logger.warning("Unable to retrieve Bitcoin blockhash") + if eth_hash is None: + logger.warning("Unable to retrieve Ethereum blockhash") return None, None - if btc_timestamp is None: - logger.warning("Unable to retrieve Bitcoin timestamp") + if eth_timestamp is None: + logger.warning("Unable to retrieve Ethereum timestamp") return None, None - pls_hash = await get_pls_hash(btc_timestamp) + pls_hash = await get_pls_hash(eth_timestamp) if pls_hash is None: logger.warning("Unable to retrieve Pulsechain blockhash") return None, None - data = Web3.solidityKeccak(["string", "string"], [pls_hash, btc_hash]) + logger.debug(f"using ETH blockhash { eth_hash }") + logger.debug(f"using PLS blockhash { pls_hash }") + + data = Web3.solidityKeccak(["string", "string"], [pls_hash, eth_hash]) dt = datetime.fromtimestamp(self.timestamp, tz=timezone.utc) datapoint = (data, dt) self.store_datapoint(datapoint) + logger.info(datapoint) logger.info(f"Stored random number for timestamp {timestamp}: {data.hex()}") return datapoint - if __name__ == "__main__": s = FetchRNGManualSource() v, t = asyncio.run(s.fetch_new_datapoint())