Skip to content

Commit

Permalink
Merge pull request #3 from mixbytes/feature_prometheus_metrics
Browse files Browse the repository at this point in the history
Added prometheus metrics,
New watchdog timer based on alarm signal.
  • Loading branch information
ddulesov authored Sep 17, 2021
2 parents b263bfc + b6d8315 commit 365a163
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 188 deletions.
6 changes: 6 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 4
7 changes: 7 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Handle line endings automatically for files detected as text
# and leave all files detected as binary untouched.
* text=auto

# Never modify line endings of our bash scripts
*.sh text eol=lf
*.py text eof=lf
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ tests/__pycache__
oracleservice/__pycache__

*.swp

prompt.py
*.cmd
*.json
238 changes: 167 additions & 71 deletions oracleservice/oracle.py

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions oracleservice/prometheus_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dataclasses import dataclass
from prometheus_client import Counter, Gauge, Histogram, Info


@dataclass
class MetricsExporter:
"""Prometheus metrics that the Oracle collects"""
agent = Info('agent', 'the address of the connected relay chain node')

is_recovery_mode_active = Gauge('is_recovery_mode_active', '1, if the recovery mode, otherwise - the default mode')

active_era_id = Gauge('active_era_id', 'active era index')
last_era_reported = Gauge('last_era_reported', 'the last era that the Oracle has reported')
previous_era_change_block_number = Gauge('previous_era_change_block_number', 'block number of the previous era change')
time_elapsed_until_last_report = Gauge('time_elapsed_until_last_report',
'the time elapsed until the last report from the unix epoch in seconds')

total_stashes_free_balance = Gauge('total_stashes_free_balance', 'total free balance of all stash accounts')

tx_revert = Histogram('tx_revert', 'reverted transactions')
tx_success = Histogram('tx_success', 'successful transactions')

para_exceptions_count = Counter('para_exceptions_count', 'parachain exceptions count')
relay_exceptions_count = Counter('relay_exceptions_count', 'relay chain exceptions count')


metrics_exporter = MetricsExporter()
4 changes: 3 additions & 1 deletion oracleservice/service_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ class ServiceParameters:
abi: list = field(default_factory=list)
gas_limit: int = 10000000

era_duration: int = 30
era_duration_in_blocks: int = 30
era_duration_in_seconds: int = 180
initial_block_number: int = 1

max_num_of_failure_reqs: int = 10
timeout: int = 60
watchdog_delay: int = 5

stash_accounts: list = field(default_factory=list)

Expand Down
87 changes: 48 additions & 39 deletions oracleservice/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from functools import partial
from log import init_log
from oracle import Oracle
from prometheus_client import start_http_server
from service_parameters import ServiceParameters
from substrateinterface import SubstrateInterface
from substrateinterface.exceptions import BlockNotFound
from substrate_interface_utils import SubstrateInterfaceUtils
from utils import create_provider, get_abi, remove_invalid_urls
from utils import create_provider, get_abi, remove_invalid_urls, stop_signal_handler
from utils import check_abi, check_contract_address, check_log_level, perform_sanity_checks
from web3.exceptions import ABIFunctionNotFound, BadFunctionCallOutput, TimeExhausted, ValidationError
from websocket._exceptions import WebSocketConnectionClosedException
Expand All @@ -17,24 +17,17 @@
import signal
import sys


logger = logging.getLogger(__name__)

DEFAULT_ERA_DURATION_IN_BLOCKS = 30
DEFAULT_ERA_DURATION_IN_SECONDS = 180
DEFAULT_GAS_LIMIT = 10000000
DEFAULT_INITIAL_BLOCK_NUMBER = 1
DEFAULT_MAX_NUMBER_OF_FAILURE_REQUESTS = 10
DEFAULT_PROMETHEUS_METRICS_PORT = 8000
DEFAULT_TIMEOUT = 60
DEFAULT_ERA_DURATION = 30
DEFAULT_INITIAL_BLOCK_NUMBER = 1


def stop_signal_handler(sig: int, frame, substrate: SubstrateInterface = None):
"""Handle signal, close substrate interface websocket connection, if it is open, and terminate the process"""
logger.debug(f"Receiving signal: {sig}")
if substrate is not None:
logger.debug("Closing substrate interface websocket connection")
substrate.websocket.shutdown()
logger.debug("Connection closed")

sys.exit()
DEFAULT_WATCHDOG_DELAY = 5


def main():
Expand All @@ -43,6 +36,10 @@ def main():
check_log_level(log_level)
init_log(stdout_level=log_level)

prometheus_metrics_port = int(os.getenv('PROMETHEUS_METRICS_PORT', DEFAULT_PROMETHEUS_METRICS_PORT))
logger.info(f"Starting the prometheus server on port {prometheus_metrics_port}")
start_http_server(prometheus_metrics_port)

ws_url_relay = os.getenv('WS_URL_RELAY', 'ws://localhost:9951/').split(',')
ws_url_para = os.getenv('WS_URL_PARA', 'ws://localhost:10055/').split(',')
ss58_format = int(os.getenv('SS58_FORMAT', 2))
Expand All @@ -61,8 +58,10 @@ def main():
DEFAULT_MAX_NUMBER_OF_FAILURE_REQUESTS,
))
timeout = int(os.getenv('TIMEOUT', DEFAULT_TIMEOUT))
watchdog_delay = int(os.getenv('WATCHDOG_DELAY', DEFAULT_WATCHDOG_DELAY))

era_duration = int(os.getenv('ERA_DURATION', DEFAULT_ERA_DURATION))
era_duration_in_blocks = int(os.getenv('ERA_DURATION_IN_BLOCKS', DEFAULT_ERA_DURATION_IN_BLOCKS))
era_duration_in_seconds = int(os.getenv('ERA_DURATION_IN_SECONDS', DEFAULT_ERA_DURATION_IN_SECONDS))
initial_block_number = int(os.getenv('INITIAL_BLOCK_NUMBER', DEFAULT_INITIAL_BLOCK_NUMBER))

oracle_private_key = os.getenv('ORACLE_PRIVATE_KEY')
Expand All @@ -72,13 +71,15 @@ def main():
perform_sanity_checks(
abi_path=abi_path,
contract_address=contract_address,
era_duration=era_duration,
era_duration_in_blocks=era_duration_in_blocks,
era_duration_in_seconds=era_duration_in_seconds,
gas_limit=gas_limit,
initial_block_number=initial_block_number,
max_number_of_failure_requests=max_number_of_failure_requests,
para_id=para_id,
private_key=oracle_private_key,
timeout=timeout,
watchdog_delay=watchdog_delay,
ws_url_para=ws_url_para,
ws_url_relay=ws_url_relay,
)
Expand All @@ -90,40 +91,47 @@ def main():
w3 = create_provider(ws_url_para, timeout)
substrate = SubstrateInterfaceUtils.create_interface(ws_url_relay, ss58_format, type_registry_preset)

signal.signal(signal.SIGTERM, partial(stop_signal_handler, substrate=substrate))
signal.signal(signal.SIGINT, partial(stop_signal_handler, substrate=substrate))

check_contract_address(w3, contract_address)
check_abi(w3, contract_address, abi, w3.eth.account.from_key(oracle_private_key).address)
oracle = w3.eth.account.from_key(oracle_private_key)
check_abi(w3, contract_address, abi, oracle.address)

service_params = ServiceParameters(
abi=abi,
contract_address=contract_address,
era_duration_in_blocks=era_duration_in_blocks,
era_duration_in_seconds=era_duration_in_seconds,
gas_limit=gas_limit,
initial_block_number=initial_block_number,
max_num_of_failure_reqs=max_number_of_failure_requests,
para_id=para_id,
ss58_format=ss58_format,
substrate=substrate,
timeout=timeout,
type_registry_preset=type_registry_preset,
watchdog_delay=watchdog_delay,
ws_urls_relay=ws_url_relay,
ws_urls_para=ws_url_para,
w3=w3,
)

oracle = Oracle(account=oracle, service_params=service_params)

except (
ABIFunctionNotFound,
FileNotFoundError,
InvalidMessage,
IsADirectoryError,
OSError,
OverflowError,
ValueError,
) as exc:
sys.exit(exc)

except KeyboardInterrupt:
sys.exit()

service_params = ServiceParameters(
abi=abi,
contract_address=contract_address,
era_duration=era_duration,
gas_limit=gas_limit,
initial_block_number=initial_block_number,
max_num_of_failure_reqs=max_number_of_failure_requests,
para_id=para_id,
ss58_format=ss58_format,
substrate=substrate,
timeout=timeout,
type_registry_preset=type_registry_preset,
ws_urls_relay=ws_url_relay,
ws_urls_para=ws_url_para,
w3=w3,
)

oracle = Oracle(priv_key=oracle_private_key, service_params=service_params)
signal.signal(signal.SIGTERM, partial(stop_signal_handler, substrate=substrate, timer=oracle.watchdog))
signal.signal(signal.SIGINT, partial(stop_signal_handler, substrate=substrate, timer=oracle.watchdog))

while True:
try:
Expand All @@ -138,6 +146,7 @@ def main():
except (
BadFunctionCallOutput,
BlockNotFound,
BrokenPipeError,
ConnectionClosedError,
ConnectionRefusedError,
ConnectionResetError,
Expand Down
76 changes: 8 additions & 68 deletions oracleservice/substrate_interface_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from substrateinterface import Keypair, SubstrateInterface
from substrateinterface.utils.ss58 import is_valid_ss58_address
from websocket._exceptions import WebSocketAddressException
from websockets.exceptions import InvalidStatusCode

Expand Down Expand Up @@ -66,38 +65,6 @@ def create_interface(
logger.info(f"Timeout: {timeout} seconds")
time.sleep(timeout)

def get_parachain_balance(substrate: SubstrateInterface, para_id: int = 1000, block_hash: str = None) -> int:
"""Get parachain balance using parachain id"""
if not block_hash:
block_hash = substrate.get_chain_head()

para_addr = SubstrateInterfaceUtils.get_parachain_address(para_id, substrate.ss58_format)
result = substrate.query(
module='System',
storage_function='Account',
params=[para_addr.ss58_address],
)

if result is None:
logger.warning(f"{para_id} is gone")
return 0

return result.value['data']['free']

def get_active_era(substrate: SubstrateInterface, block_hash: str = None):
"""Get active era from specific block or head"""
if block_hash:
return substrate.query(
module='Staking',
storage_function='ActiveEra',
block_hash=block_hash,
)

return substrate.query(
module='Staking',
storage_function='ActiveEra',
)

def get_validators(substrate: SubstrateInterface, block_hash: str = None):
"""Get list of validators using 'Validators' storage function from 'Session' module"""
if block_hash is None:
Expand Down Expand Up @@ -126,47 +93,35 @@ def get_nominators(substrate: SubstrateInterface, block_hash: str = None):
storage_function='Nominators',
)

def get_account(substrate: SubstrateInterface, stash: str):
def get_account(substrate: SubstrateInterface, stash: Keypair):
"""Get account using 'Account' storage function from 'System' module"""
return substrate.query(
module='System',
storage_function='Account',
params=[stash],
params=[stash.ss58_address],
)

def get_ledger(substrate: SubstrateInterface, controller: str, block_hash: str = None):
def get_ledger(substrate: SubstrateInterface, controller: Keypair, block_hash: str = None):
"""Get ledger using 'Ledger' storage function from 'Staking' module"""
if block_hash is None:
return substrate.query(
module='Staking',
storage_function='Ledger',
params=[controller],
)

return substrate.query(
module='Staking',
storage_function='Ledger',
params=[controller],
params=[controller.ss58_address],
block_hash=block_hash,
)

def get_controller(substrate: SubstrateInterface, stash: str, block_hash: str = None):
def get_controller(substrate: SubstrateInterface, stash: Keypair, block_hash: str = None):
"""Get controller using 'Bonded' storage function from 'Staking' module"""
if block_hash is None:
return substrate.query(
module='Staking',
storage_function='Bonded',
params=[stash],
)

return substrate.query(
module='Staking',
storage_function='Bonded',
params=[stash],
params=[stash.ss58_address],
block_hash=block_hash,
)

def get_parachain_address(_para_id: int, ss58_format: int) -> Keypair:
def get_parachain_address(_para_id: int) -> str:
"""Get parachain address using parachain id with ss58 format provided"""
prefix = b'para'
para_addr = bytearray(prefix)
Expand All @@ -176,19 +131,4 @@ def get_parachain_address(_para_id: int, ss58_format: int) -> Keypair:
_para_id = _para_id >> 8
para_addr.append(_para_id & 0xFF)

return Keypair(public_key=para_addr.ljust(32, b'\0').hex(), ss58_format=ss58_format)

def remove_invalid_ss58_addresses(ss58_format, addresses: [str]) -> [str]:
"""Check if given value is a valid SS58 formatted address"""
checked_addresses = []

for addr in addresses:
if is_valid_ss58_address(addr, ss58_format):
checked_addresses.append(addr)
else:
logger.warning(f"Invalid address {addr} removed from the list")

if not len(checked_addresses):
raise ValueError("No valid ss58 addresses founded or ss58 format is invalid")

return checked_addresses
return para_addr.ljust(32, b'\0').hex()
Loading

0 comments on commit 365a163

Please sign in to comment.