diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..b669720 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,6 @@ +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +indent_style = space +indent_size = 4 \ No newline at end of file diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..9d17836 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,7 @@ +# Handle line endings automatically for files detected as text +# and leave all files detected as binary untouched. +* text=auto + +# Never modify line endings of our bash scripts +*.sh text eol=lf +*.py text eof=lf \ No newline at end of file diff --git a/.gitignore b/.gitignore index 9d7e44d..82aa1e9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,7 @@ tests/__pycache__ oracleservice/__pycache__ *.swp + +prompt.py +*.cmd +*.json diff --git a/oracleservice/oracle.py b/oracleservice/oracle.py index 119d809..72bcf92 100644 --- a/oracleservice/oracle.py +++ b/oracleservice/oracle.py @@ -1,15 +1,23 @@ from dataclasses import dataclass, field +from prometheus_metrics import metrics_exporter from service_parameters import ServiceParameters from substrateinterface.exceptions import BlockNotFound, SubstrateRequestException -from substrateinterface.utils.ss58 import ss58_decode +from substrateinterface import Keypair from substrate_interface_utils import SubstrateInterfaceUtils from utils import create_provider from web3.exceptions import BadFunctionCallOutput +from web3 import Account from websocket._exceptions import WebSocketConnectionClosedException from websockets.exceptions import ConnectionClosedError, InvalidMessage import logging +import signal +import socket +import threading as th import time +import os +import sys + logger = logging.getLogger(__name__) @@ -17,15 +25,20 @@ @dataclass class Oracle: """A class that contains all the logic of the oracle's work""" - priv_key: str + account: Account service_params: ServiceParameters - account = None default_mode_started: bool = False failure_reqs_count: dict = field(default_factory=dict) last_era_reported: dict = field(default_factory=dict) - nonce: int = 0 + previous_era_id: int = -1 undesirable_urls: set = field(default_factory=set) + watchdog: th.Timer = field(init=False) + + def __post_init__(self): + self._create_watchdog() + if os.name != 'nt': + signal.signal(signal.SIGALRM, self._close_connection_to_relaychain) def start_default_mode(self): """Start of the Oracle default mode""" @@ -36,6 +49,7 @@ def start_default_mode(self): logger.info("Starting default mode") + metrics_exporter.agent.info({'relay_chain_node_address': self.service_params.substrate.url}) if self.service_params.substrate.url not in self.failure_reqs_count: self.failure_reqs_count[self.service_params.substrate.url] = 0 if self.service_params.w3.provider.endpoint_uri not in self.failure_reqs_count: @@ -44,19 +58,21 @@ def start_default_mode(self): self.failure_reqs_count[self.service_params.substrate.url] += 1 self.failure_reqs_count[self.service_params.w3.provider.endpoint_uri] += 1 - self.account = self.service_params.w3.eth.account.from_key(self.priv_key) - self.nonce = self.service_params.w3.eth.get_transaction_count(self.account.address) + with metrics_exporter.para_exceptions_count.count_exceptions(): + self._restore_state() self._start_era_monitoring() def start_recovery_mode(self): """Start of the Oracle recovery mode.""" logger.info("Starting recovery mode") + metrics_exporter.is_recovery_mode_active.set(True) self.default_mode_started = False self._recover_connection_to_relaychain() self._recover_connection_to_parachain() + metrics_exporter.is_recovery_mode_active.set(False) logger.info("Recovery mode is completed") def _recover_connection_to_relaychain(self): @@ -68,6 +84,7 @@ def _recover_connection_to_relaychain(self): try: if self.failure_reqs_count[self.service_params.substrate.url] > self.service_params.max_num_of_failure_reqs: self.undesirable_urls.add(self.service_params.substrate.url) + self.service_params.substrate.websocket.shutdown() self.service_params.substrate = SubstrateInterfaceUtils.create_interface( urls=self.service_params.ws_urls_relay, ss58_format=self.service_params.ss58_format, @@ -75,10 +92,12 @@ def _recover_connection_to_relaychain(self): timeout=self.service_params.timeout, undesirable_urls=self.undesirable_urls, ) + metrics_exporter.agent.info({'relay_chain_node_address': self.service_params.substrate.url}) break except ( BadFunctionCallOutput, + BrokenPipeError, ConnectionClosedError, ConnectionRefusedError, ConnectionResetError, @@ -109,6 +128,7 @@ def _recover_connection_to_parachain(self): except ( BadFunctionCallOutput, + BrokenPipeError, ConnectionClosedError, ConnectionRefusedError, ConnectionResetError, @@ -127,6 +147,18 @@ def _recover_connection_to_parachain(self): else: self.failure_reqs_count[self.service_params.w3.provider.endpoint_uri] = 1 + def _restore_state(self): + """Restore the state after starting the default mode""" + stash_accounts = self._get_stash_accounts() + for stash_acc in stash_accounts: + (era_id, is_reported) = self.service_params.w3.eth.contract( + address=self.service_params.contract_address, + abi=self.service_params.abi + ).functions.isReportedLastEra(self.account.address, stash_acc).call() + + stash = Keypair(public_key=stash_acc, ss58_format=self.service_params.ss58_format) + self.last_era_reported[stash.public_key] = era_id if is_reported else era_id - 1 + def _get_stash_accounts(self) -> list: """Get list of stash accounts and the last era reported using 'getStashAccounts' function""" return self.service_params.w3.eth.contract( @@ -142,54 +174,111 @@ def _start_era_monitoring(self): subscription_handler=self._handle_era_change, ) + def _handle_watchdog_tick(self): + """Start the timer for SIGALRM and end the thread""" + if os.name != 'nt': + signal.alarm(self.service_params.era_duration_in_seconds + self.service_params.watchdog_delay) + sys.exit() + + def _close_connection_to_relaychain(self, sig: int = signal.SIGINT, frame=None): + """Close connection to relaychain node, increase failure requests counter and exit""" + self.failure_reqs_count[self.service_params.substrate.url] += 1 + logger.debug(f"Closing connection to relaychain node: {self.service_params.substrate.url}") + try: + self.service_params.substrate.websocket.sock.shutdown(socket.SHUT_RDWR) + except ( + AttributeError, + OSError, + ) as exc: + logger.warning(exc) + + if sig == signal.SIGALRM: + raise BrokenPipeError + + sys.exit() + + def _create_watchdog(self): + """Create watchdog as a Timer""" + self.watchdog = th.Timer(1, self._handle_watchdog_tick) + + def _wait_in_two_blocks(self, tx_receipt: dict): + """Wait for two blocks based on information from web3""" + if 'blockNumber' not in tx_receipt: + logger.error("The block number in transaction receipt was not found") + return + + logger.debug("Waiting in two blocks") + while True: + current_block = self.service_params.w3.eth.get_block('latest') + if current_block is not None and 'number' in current_block: + if current_block['number'] > tx_receipt['blockNumber']: + break + time.sleep(1) + def _handle_era_change(self, era, update_nr: int, subscription_id: str): """ Read the staking parameters for each stash account separately from the block where the era value is changed, generate the transaction body, sign and send to the parachain. """ - eraId = era.value['index'] - logger.info(f"Active era index: {eraId}, start timestamp: {era.value['start']}") + self.watchdog.cancel() + self._create_watchdog() + self.watchdog.start() + + era_id = era.value['index'] + if era_id <= self.previous_era_id: + logger.info(f"Skip sporadic new era event {era_id}") + return + logger.info(f"Active era index: {era_id}, start timestamp: {era.value['start']}") + metrics_exporter.active_era_id.set(era.value['index']) + metrics_exporter.total_stashes_free_balance.set(0) self.failure_reqs_count[self.service_params.substrate.url] += 1 - stash_accounts = self._get_stash_accounts() + with metrics_exporter.para_exceptions_count.count_exceptions(): + stash_accounts = self._get_stash_accounts() self.failure_reqs_count[self.service_params.substrate.url] -= 1 if not stash_accounts: - logger.info("No stake accounts found: waiting for the next era") + logger.info("No stash accounts found: waiting for the next era") + self.previous_era_id = era_id return - block_hash = self._find_start_block(era.value['index']) - if block_hash is None: - logger.error("Can't find the required block") - raise BlockNotFound + with metrics_exporter.relay_exceptions_count.count_exceptions(): + block_hash, block_number = self._find_start_block(era.value['index']) + if block_hash is None: + logger.error("Can't find the required block") + raise BlockNotFound + logger.info(f"Block hash: {block_hash}") + metrics_exporter.previous_era_change_block_number.set(block_number) - for stash_acc, stash_eraId in stash_accounts: + for stash_acc in stash_accounts: self.failure_reqs_count[self.service_params.substrate.url] += 1 - stash_acc = '0x' + stash_acc.hex() - logger.info(f"Current stash is {stash_acc}; era is {stash_eraId}") - if eraId < stash_eraId: - logger.info(f"Current era less than the specified era for stash '{stash_acc}': skipping current era") - continue - if stash_acc in self.last_era_reported and self.last_era_reported[stash_acc] >= era.value['index']: - logger.info(f"The report has already been sent for stash {stash_acc}") + stash = Keypair(public_key=stash_acc, ss58_format=self.service_params.ss58_format) + + if self.last_era_reported.get(stash.public_key, 0) >= era_id: + logger.info(f"The report has already been sent for stash {stash.ss58_address}") continue - staking_parameters = self._read_staking_parameters(stash_acc, block_hash) + staking_parameters = self._read_staking_parameters(stash, block_hash) self.failure_reqs_count[self.service_params.substrate.url] -= 1 + logger.info("The parameters are read. Preparing the transaction body.") logger.debug(';'.join([ - f"stash: {stash_acc}", - f"era: {eraId}", + f"stash: {stash.ss58_address}", + f"era: {era_id}", f"staking parameters: {staking_parameters}", f"Relay chain failure requests counter: {self.failure_reqs_count[self.service_params.substrate.url]}", f"Parachain failure requests counter: {self.failure_reqs_count[self.service_params.w3.provider.endpoint_uri]}", ])) - tx = self._create_tx(eraId, staking_parameters) - self._sign_and_send_to_para(tx, stash_acc, eraId) - self.last_era_reported[stash_acc] = eraId + with metrics_exporter.para_exceptions_count.count_exceptions(): + tx = self._create_tx(era_id, staking_parameters) + self._sign_and_send_to_para(tx, stash, era_id) + self.last_era_reported[stash.public_key] = era_id logger.info("Waiting for the next era") + metrics_exporter.last_era_reported.set(era.value['index'] - 1) + self.previous_era_id = era_id + self.failure_reqs_count[self.service_params.substrate.url] = 0 self.failure_reqs_count[self.service_params.w3.provider.endpoint_uri] = 0 if self.service_params.w3.provider.endpoint_uri in self.undesirable_urls: @@ -197,26 +286,29 @@ def _handle_era_change(self, era, update_nr: int, subscription_id: str): def _find_start_block(self, era_id: int) -> str: """Find the hash of the block at which the era change occurs""" - block_number = era_id * self.service_params.era_duration + self.service_params.initial_block_number + block_number = era_id * self.service_params.era_duration_in_blocks + self.service_params.initial_block_number try: block_hash = self.service_params.substrate.get_block_hash(block_number) - logger.info(f"Block hash: {block_hash} number: {block_number}") - return block_hash + logger.info(f"Block hash: {block_hash}. Block number: {block_number}") + return block_hash, block_number except SubstrateRequestException: - return None + return None, None - def _read_staking_parameters(self, stash: str, block_hash: str = None) -> dict: + def _read_staking_parameters(self, stash: Keypair, block_hash: str = None) -> dict: """Read staking parameters from specific block or from the head""" if block_hash is None: block_hash = self.service_params.substrate.get_chain_head() - stash_free_balance = self._get_stash_free_balance(stash) - staking_ledger_result = self._get_ledger_data(block_hash, stash) + with metrics_exporter.relay_exceptions_count.count_exceptions(): + stash_free_balance = self._get_stash_free_balance(stash) + stake_status = self._get_stake_status(stash, block_hash) + staking_ledger_result = self._get_ledger_data(block_hash, stash) + if staking_ledger_result is None: return { - 'stashAccount': stash, - 'controllerAccount': stash, + 'stashAccount': stash.public_key, + 'controllerAccount': stash.public_key, 'stakeStatus': 3, # this value means that stake status is None 'activeBalance': 0, 'totalBalance': 0, @@ -225,42 +317,45 @@ def _read_staking_parameters(self, stash: str, block_hash: str = None) -> dict: 'stashBalance': stash_free_balance, } - stake_status = self._get_stake_status(stash, block_hash) + controller = staking_ledger_result['controller'] - for controller, controller_info in staking_ledger_result.items(): - unlocking_values = [{'balance': elem['value'], 'era': elem['era']} for elem in controller_info.value['unlocking']] + return { + 'stashAccount': stash.public_key, + 'controllerAccount': controller.public_key, + 'stakeStatus': stake_status, + 'activeBalance': staking_ledger_result['active'], + 'totalBalance': staking_ledger_result['total'], + 'unlocking': [{'balance': elem['value'], 'era': elem['era']} for elem in staking_ledger_result['unlocking']], + 'claimedRewards': [], # put aside until storage proof has been implemented // staking_ledger_result['claimedRewards'], + 'stashBalance': stash_free_balance, + } - return { - 'stashAccount': '0x' + ss58_decode(controller_info.value['stash']), - 'controllerAccount': '0x' + ss58_decode(controller), - 'stakeStatus': stake_status, - 'activeBalance': controller_info.value['active'], - 'totalBalance': controller_info.value['total'], - 'unlocking': unlocking_values, - 'claimedRewards': controller_info.value['claimedRewards'], - 'stashBalance': stash_free_balance, - } - - def _get_ledger_data(self, block_hash: str, stash: str) -> dict: + def _get_ledger_data(self, block_hash: str, stash: Keypair) -> dict: """Get ledger data using stash account address""" controller = SubstrateInterfaceUtils.get_controller(self.service_params.substrate, stash, block_hash) if controller.value is None: return None - ledger = SubstrateInterfaceUtils.get_ledger(self.service_params.substrate, controller.value, block_hash) + controller = Keypair(ss58_address=controller.value) - return {controller.value: ledger} + ledger = SubstrateInterfaceUtils.get_ledger(self.service_params.substrate, controller, block_hash) - def _get_stash_free_balance(self, stash: str) -> dict: + result = {'controller': controller, 'stash': stash} + result.update(ledger.value) + + return result + + def _get_stash_free_balance(self, stash: Keypair) -> int: """Get stash accounts free balances""" - account = SubstrateInterfaceUtils.get_account(self.service_params.substrate, stash) + account_info = SubstrateInterfaceUtils.get_account(self.service_params.substrate, stash) + metrics_exporter.total_stashes_free_balance.inc(account_info.value['data']['free']) - return account.value['data']['free'] + return account_info.value['data']['free'] - def _get_stake_status(self, stash: str, block_hash: str = None) -> int: + def _get_stake_status(self, stash: Keypair, block_hash: str = None) -> int: """ Get stash account status. - 0 - Chill, 1 - Nominator, 2 - Validator + 0 - Idle, 1 - Nominator, 2 - Validator, 3 - None """ if block_hash is None: block_hash = self.service_params.substrate.get_chain_head() @@ -271,10 +366,10 @@ def _get_stake_status(self, stash: str, block_hash: str = None) -> int: nominators = set(nominator.value for nominator, _ in staking_nominators) validators = set(validator for validator in staking_validators.value) - if stash in nominators: + if stash.ss58_address in nominators: return 1 - if stash in validators: + if stash.ss58_address in validators: return 2 return 0 @@ -282,7 +377,6 @@ def _get_stake_status(self, stash: str, block_hash: str = None) -> int: def _create_tx(self, era_id: int, staking_parameters: dict) -> dict: """Create a transaction body using the staking parameters, era id and parachain balance""" nonce = self.service_params.w3.eth.get_transaction_count(self.account.address) - logger.warning(f"nonce self:{self.nonce} chain:{nonce}") return self.service_params.w3.eth.contract( address=self.service_params.contract_address, @@ -290,11 +384,10 @@ def _create_tx(self, era_id: int, staking_parameters: dict) -> dict: ).functions.reportRelay( era_id, staking_parameters, - ).buildTransaction({'from': self.account.address, 'gas': self.service_params.gas_limit, 'nonce': self.nonce}) + ).buildTransaction({'from': self.account.address, 'gas': self.service_params.gas_limit, 'nonce': nonce}) - def _sign_and_send_to_para(self, tx: dict, stash: str, eraId: int): + def _sign_and_send_to_para(self, tx: dict, stash: Keypair, era_id: int) -> bool: """Sign transaction and send to parachain""" - try: self.service_params.w3.eth.call(dict((k, v) for k, v in tx.items() if v)) @@ -303,22 +396,25 @@ def _sign_and_send_to_para(self, tx: dict, stash: str, eraId: int): msg = exc.args[0]["message"] if isinstance(exc.args[0], dict) else str(exc) self.failure_reqs_count[self.service_params.w3.provider.endpoint_uri] += 1 - logger.warning(f"Report for '{stash}' era {eraId} probably will fail with {msg}") + logger.warning(f"Report for '{stash.ss58_address}' era {era_id} probably will fail with {msg}") return False - tx_signed = self.service_params.w3.eth.account.sign_transaction(tx, private_key=self.priv_key) + tx_signed = self.service_params.w3.eth.account.sign_transaction(tx, private_key=self.account.privateKey) self.failure_reqs_count[self.service_params.w3.provider.endpoint_uri] += 1 + logger.info(f"Sending a transaction for stash {stash.ss58_address}") tx_hash = self.service_params.w3.eth.send_raw_transaction(tx_signed.rawTransaction) tx_receipt = self.service_params.w3.eth.wait_for_transaction_receipt(tx_hash) - self.nonce += 1 logger.debug(f"Transaction receipt: {tx_receipt}") if tx_receipt.status == 1: - logger.info(f"The report for stash '{stash}' era {eraId} was sent successfully") + logger.info(f"The report for stash '{stash.ss58_address}' era {era_id} was sent successfully") + metrics_exporter.tx_success.observe(1) + metrics_exporter.time_elapsed_until_last_report.set(time.time()) self.failure_reqs_count[self.service_params.w3.provider.endpoint_uri] -= 1 - time.sleep(30) + self._wait_in_two_blocks(tx_receipt) return True else: - logger.warning(f"Transaction status for stash '{stash}' era {eraId} is reverted") + logger.warning(f"Transaction status for stash '{stash.ss58_address}' era {era_id} is reverted") + metrics_exporter.tx_revert.observe(1) return False diff --git a/oracleservice/prometheus_metrics.py b/oracleservice/prometheus_metrics.py new file mode 100644 index 0000000..451d7ec --- /dev/null +++ b/oracleservice/prometheus_metrics.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from prometheus_client import Counter, Gauge, Histogram, Info + + +@dataclass +class MetricsExporter: + """Prometheus metrics that the Oracle collects""" + agent = Info('agent', 'the address of the connected relay chain node') + + is_recovery_mode_active = Gauge('is_recovery_mode_active', '1, if the recovery mode, otherwise - the default mode') + + active_era_id = Gauge('active_era_id', 'active era index') + last_era_reported = Gauge('last_era_reported', 'the last era that the Oracle has reported') + previous_era_change_block_number = Gauge('previous_era_change_block_number', 'block number of the previous era change') + time_elapsed_until_last_report = Gauge('time_elapsed_until_last_report', + 'the time elapsed until the last report from the unix epoch in seconds') + + total_stashes_free_balance = Gauge('total_stashes_free_balance', 'total free balance of all stash accounts') + + tx_revert = Histogram('tx_revert', 'reverted transactions') + tx_success = Histogram('tx_success', 'successful transactions') + + para_exceptions_count = Counter('para_exceptions_count', 'parachain exceptions count') + relay_exceptions_count = Counter('relay_exceptions_count', 'relay chain exceptions count') + + +metrics_exporter = MetricsExporter() diff --git a/oracleservice/service_parameters.py b/oracleservice/service_parameters.py index 5d7f6f3..b086a5f 100644 --- a/oracleservice/service_parameters.py +++ b/oracleservice/service_parameters.py @@ -9,11 +9,13 @@ class ServiceParameters: abi: list = field(default_factory=list) gas_limit: int = 10000000 - era_duration: int = 30 + era_duration_in_blocks: int = 30 + era_duration_in_seconds: int = 180 initial_block_number: int = 1 max_num_of_failure_reqs: int = 10 timeout: int = 60 + watchdog_delay: int = 5 stash_accounts: list = field(default_factory=list) diff --git a/oracleservice/start.py b/oracleservice/start.py index d493d59..77b286c 100755 --- a/oracleservice/start.py +++ b/oracleservice/start.py @@ -2,11 +2,11 @@ from functools import partial from log import init_log from oracle import Oracle +from prometheus_client import start_http_server from service_parameters import ServiceParameters -from substrateinterface import SubstrateInterface from substrateinterface.exceptions import BlockNotFound from substrate_interface_utils import SubstrateInterfaceUtils -from utils import create_provider, get_abi, remove_invalid_urls +from utils import create_provider, get_abi, remove_invalid_urls, stop_signal_handler from utils import check_abi, check_contract_address, check_log_level, perform_sanity_checks from web3.exceptions import ABIFunctionNotFound, BadFunctionCallOutput, TimeExhausted, ValidationError from websocket._exceptions import WebSocketConnectionClosedException @@ -17,24 +17,17 @@ import signal import sys + logger = logging.getLogger(__name__) +DEFAULT_ERA_DURATION_IN_BLOCKS = 30 +DEFAULT_ERA_DURATION_IN_SECONDS = 180 DEFAULT_GAS_LIMIT = 10000000 +DEFAULT_INITIAL_BLOCK_NUMBER = 1 DEFAULT_MAX_NUMBER_OF_FAILURE_REQUESTS = 10 +DEFAULT_PROMETHEUS_METRICS_PORT = 8000 DEFAULT_TIMEOUT = 60 -DEFAULT_ERA_DURATION = 30 -DEFAULT_INITIAL_BLOCK_NUMBER = 1 - - -def stop_signal_handler(sig: int, frame, substrate: SubstrateInterface = None): - """Handle signal, close substrate interface websocket connection, if it is open, and terminate the process""" - logger.debug(f"Receiving signal: {sig}") - if substrate is not None: - logger.debug("Closing substrate interface websocket connection") - substrate.websocket.shutdown() - logger.debug("Connection closed") - - sys.exit() +DEFAULT_WATCHDOG_DELAY = 5 def main(): @@ -43,6 +36,10 @@ def main(): check_log_level(log_level) init_log(stdout_level=log_level) + prometheus_metrics_port = int(os.getenv('PROMETHEUS_METRICS_PORT', DEFAULT_PROMETHEUS_METRICS_PORT)) + logger.info(f"Starting the prometheus server on port {prometheus_metrics_port}") + start_http_server(prometheus_metrics_port) + ws_url_relay = os.getenv('WS_URL_RELAY', 'ws://localhost:9951/').split(',') ws_url_para = os.getenv('WS_URL_PARA', 'ws://localhost:10055/').split(',') ss58_format = int(os.getenv('SS58_FORMAT', 2)) @@ -61,8 +58,10 @@ def main(): DEFAULT_MAX_NUMBER_OF_FAILURE_REQUESTS, )) timeout = int(os.getenv('TIMEOUT', DEFAULT_TIMEOUT)) + watchdog_delay = int(os.getenv('WATCHDOG_DELAY', DEFAULT_WATCHDOG_DELAY)) - era_duration = int(os.getenv('ERA_DURATION', DEFAULT_ERA_DURATION)) + era_duration_in_blocks = int(os.getenv('ERA_DURATION_IN_BLOCKS', DEFAULT_ERA_DURATION_IN_BLOCKS)) + era_duration_in_seconds = int(os.getenv('ERA_DURATION_IN_SECONDS', DEFAULT_ERA_DURATION_IN_SECONDS)) initial_block_number = int(os.getenv('INITIAL_BLOCK_NUMBER', DEFAULT_INITIAL_BLOCK_NUMBER)) oracle_private_key = os.getenv('ORACLE_PRIVATE_KEY') @@ -72,13 +71,15 @@ def main(): perform_sanity_checks( abi_path=abi_path, contract_address=contract_address, - era_duration=era_duration, + era_duration_in_blocks=era_duration_in_blocks, + era_duration_in_seconds=era_duration_in_seconds, gas_limit=gas_limit, initial_block_number=initial_block_number, max_number_of_failure_requests=max_number_of_failure_requests, para_id=para_id, private_key=oracle_private_key, timeout=timeout, + watchdog_delay=watchdog_delay, ws_url_para=ws_url_para, ws_url_relay=ws_url_relay, ) @@ -90,15 +91,38 @@ def main(): w3 = create_provider(ws_url_para, timeout) substrate = SubstrateInterfaceUtils.create_interface(ws_url_relay, ss58_format, type_registry_preset) - signal.signal(signal.SIGTERM, partial(stop_signal_handler, substrate=substrate)) - signal.signal(signal.SIGINT, partial(stop_signal_handler, substrate=substrate)) - check_contract_address(w3, contract_address) - check_abi(w3, contract_address, abi, w3.eth.account.from_key(oracle_private_key).address) + oracle = w3.eth.account.from_key(oracle_private_key) + check_abi(w3, contract_address, abi, oracle.address) + + service_params = ServiceParameters( + abi=abi, + contract_address=contract_address, + era_duration_in_blocks=era_duration_in_blocks, + era_duration_in_seconds=era_duration_in_seconds, + gas_limit=gas_limit, + initial_block_number=initial_block_number, + max_num_of_failure_reqs=max_number_of_failure_requests, + para_id=para_id, + ss58_format=ss58_format, + substrate=substrate, + timeout=timeout, + type_registry_preset=type_registry_preset, + watchdog_delay=watchdog_delay, + ws_urls_relay=ws_url_relay, + ws_urls_para=ws_url_para, + w3=w3, + ) + + oracle = Oracle(account=oracle, service_params=service_params) except ( ABIFunctionNotFound, FileNotFoundError, + InvalidMessage, + IsADirectoryError, + OSError, + OverflowError, ValueError, ) as exc: sys.exit(exc) @@ -106,24 +130,8 @@ def main(): except KeyboardInterrupt: sys.exit() - service_params = ServiceParameters( - abi=abi, - contract_address=contract_address, - era_duration=era_duration, - gas_limit=gas_limit, - initial_block_number=initial_block_number, - max_num_of_failure_reqs=max_number_of_failure_requests, - para_id=para_id, - ss58_format=ss58_format, - substrate=substrate, - timeout=timeout, - type_registry_preset=type_registry_preset, - ws_urls_relay=ws_url_relay, - ws_urls_para=ws_url_para, - w3=w3, - ) - - oracle = Oracle(priv_key=oracle_private_key, service_params=service_params) + signal.signal(signal.SIGTERM, partial(stop_signal_handler, substrate=substrate, timer=oracle.watchdog)) + signal.signal(signal.SIGINT, partial(stop_signal_handler, substrate=substrate, timer=oracle.watchdog)) while True: try: @@ -138,6 +146,7 @@ def main(): except ( BadFunctionCallOutput, BlockNotFound, + BrokenPipeError, ConnectionClosedError, ConnectionRefusedError, ConnectionResetError, diff --git a/oracleservice/substrate_interface_utils.py b/oracleservice/substrate_interface_utils.py index 95fa606..1787aa5 100644 --- a/oracleservice/substrate_interface_utils.py +++ b/oracleservice/substrate_interface_utils.py @@ -1,5 +1,4 @@ from substrateinterface import Keypair, SubstrateInterface -from substrateinterface.utils.ss58 import is_valid_ss58_address from websocket._exceptions import WebSocketAddressException from websockets.exceptions import InvalidStatusCode @@ -66,38 +65,6 @@ def create_interface( logger.info(f"Timeout: {timeout} seconds") time.sleep(timeout) - def get_parachain_balance(substrate: SubstrateInterface, para_id: int = 1000, block_hash: str = None) -> int: - """Get parachain balance using parachain id""" - if not block_hash: - block_hash = substrate.get_chain_head() - - para_addr = SubstrateInterfaceUtils.get_parachain_address(para_id, substrate.ss58_format) - result = substrate.query( - module='System', - storage_function='Account', - params=[para_addr.ss58_address], - ) - - if result is None: - logger.warning(f"{para_id} is gone") - return 0 - - return result.value['data']['free'] - - def get_active_era(substrate: SubstrateInterface, block_hash: str = None): - """Get active era from specific block or head""" - if block_hash: - return substrate.query( - module='Staking', - storage_function='ActiveEra', - block_hash=block_hash, - ) - - return substrate.query( - module='Staking', - storage_function='ActiveEra', - ) - def get_validators(substrate: SubstrateInterface, block_hash: str = None): """Get list of validators using 'Validators' storage function from 'Session' module""" if block_hash is None: @@ -126,47 +93,35 @@ def get_nominators(substrate: SubstrateInterface, block_hash: str = None): storage_function='Nominators', ) - def get_account(substrate: SubstrateInterface, stash: str): + def get_account(substrate: SubstrateInterface, stash: Keypair): """Get account using 'Account' storage function from 'System' module""" return substrate.query( module='System', storage_function='Account', - params=[stash], + params=[stash.ss58_address], ) - def get_ledger(substrate: SubstrateInterface, controller: str, block_hash: str = None): + def get_ledger(substrate: SubstrateInterface, controller: Keypair, block_hash: str = None): """Get ledger using 'Ledger' storage function from 'Staking' module""" - if block_hash is None: - return substrate.query( - module='Staking', - storage_function='Ledger', - params=[controller], - ) return substrate.query( module='Staking', storage_function='Ledger', - params=[controller], + params=[controller.ss58_address], block_hash=block_hash, ) - def get_controller(substrate: SubstrateInterface, stash: str, block_hash: str = None): + def get_controller(substrate: SubstrateInterface, stash: Keypair, block_hash: str = None): """Get controller using 'Bonded' storage function from 'Staking' module""" - if block_hash is None: - return substrate.query( - module='Staking', - storage_function='Bonded', - params=[stash], - ) return substrate.query( module='Staking', storage_function='Bonded', - params=[stash], + params=[stash.ss58_address], block_hash=block_hash, ) - def get_parachain_address(_para_id: int, ss58_format: int) -> Keypair: + def get_parachain_address(_para_id: int) -> str: """Get parachain address using parachain id with ss58 format provided""" prefix = b'para' para_addr = bytearray(prefix) @@ -176,19 +131,4 @@ def get_parachain_address(_para_id: int, ss58_format: int) -> Keypair: _para_id = _para_id >> 8 para_addr.append(_para_id & 0xFF) - return Keypair(public_key=para_addr.ljust(32, b'\0').hex(), ss58_format=ss58_format) - - def remove_invalid_ss58_addresses(ss58_format, addresses: [str]) -> [str]: - """Check if given value is a valid SS58 formatted address""" - checked_addresses = [] - - for addr in addresses: - if is_valid_ss58_address(addr, ss58_format): - checked_addresses.append(addr) - else: - logger.warning(f"Invalid address {addr} removed from the list") - - if not len(checked_addresses): - raise ValueError("No valid ss58 addresses founded or ss58 format is invalid") - - return checked_addresses + return para_addr.ljust(32, b'\0').hex() diff --git a/oracleservice/utils.py b/oracleservice/utils.py index c68bbfe..8cb9d7b 100644 --- a/oracleservice/utils.py +++ b/oracleservice/utils.py @@ -1,4 +1,5 @@ from os.path import exists +from substrateinterface import SubstrateInterface from web3 import Web3 from web3.auto import w3 from web3.exceptions import ABIFunctionNotFound @@ -6,6 +7,9 @@ import json import logging +import socket +import sys +import threading as th import time import urllib @@ -21,15 +25,43 @@ ) NON_NEGATIVE_PARAMETERS = ( - 'ERA_DURATION', + 'ERA_DURATION_IN_BLOCKS', + 'ERA_DURATION_IN_SECONDS', 'GAS_LIMIT', 'INITIAL_BLOCK_NUMBER', 'MAX_NUMBER_OF_FAILURE_REQUESTS', 'PARA_ID', 'TIMEOUT', + 'WATCHDOG_DELAY', ) +def stop_signal_handler(sig: int, frame, substrate: SubstrateInterface = None, timer: th.Timer = None): + """Handle signal, close substrate interface websocket connection, if it is open, stop timer thread and terminate the process""" + logger.debug(f"Receiving signal: {sig}") + if substrate is not None: + logger.debug("Closing substrate interface websocket connection") + try: + substrate.websocket.sock.shutdown(socket.SHUT_RDWR) + except ( + AttributeError, + OSError, + ) as exc: + logger.warning(exc) + else: + logger.debug(f"Connection to relaychain node {substrate.url} is closed") + + if timer is not None: + try: + if timer.is_alive(): + timer.cancel() + + except Exception as exc: + logger.warning(exc) + + sys.exit() + + def create_provider(urls: list, timeout: int = 60, undesirable_urls: set = set()) -> Web3: """Create web3 websocket provider with one of the nodes given in the list""" provider = None @@ -139,24 +171,28 @@ def remove_invalid_urls(urls: [str]) -> [str]: def perform_sanity_checks( abi_path: str, contract_address: str, - era_duration: int, + era_duration_in_blocks: int, + era_duration_in_seconds: int, gas_limit: int, initial_block_number: int, max_number_of_failure_requests: int, para_id: int, private_key: str, timeout: int, + watchdog_delay: int, ws_url_relay: [str], ws_url_para: [str], ): """Check the parameters passed to the Oracle""" try: - assert era_duration > 0 - assert initial_block_number >= 0 - assert timeout >= 0 + assert era_duration_in_blocks > 0 + assert era_duration_in_seconds > 0 assert gas_limit > 0 + assert initial_block_number >= 0 assert max_number_of_failure_requests >= 0 assert para_id >= 0 + assert timeout >= 0 + assert watchdog_delay >= 0 except AssertionError: raise ValueError(f"The following parameters must be non-negative: {NON_NEGATIVE_PARAMETERS}") diff --git a/requirements.txt b/requirements.txt index 06a851f..16f07b3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ -substrate-interface==0.13.12 -web3==5.23.0 flake8==3.9.2 +prometheus-client==0.11.0 pytest==6.2.4 +substrate-interface==0.13.12 +web3==5.23.0 diff --git a/tests/test_parachain_account.py b/tests/test_parachain_account.py index 006faff..76c6e7d 100644 --- a/tests/test_parachain_account.py +++ b/tests/test_parachain_account.py @@ -1,9 +1,12 @@ from oracleservice.substrate_interface_utils import SubstrateInterfaceUtils +from substrateinterface import Keypair def test_show_correct_parachain_address(): para_id = 1000 ss58_format = 2 - para_addr = 'F7fq1jSNVTPfJmaHaXCMtatT1EZefCUsa7rRiQVNR5efcah' - assert para_addr == SubstrateInterfaceUtils.get_parachain_address(para_id, ss58_format).ss58_address + para_addr_expected = 'F7fq1jSNVTPfJmaHaXCMtatT1EZefCUsa7rRiQVNR5efcah' + para_addr_actual = Keypair(public_key=SubstrateInterfaceUtils.get_parachain_address(para_id), ss58_format=ss58_format) + + assert para_addr_expected == para_addr_actual.ss58_address