Skip to content

Commit

Permalink
Merge pull request #29 from makerdao/TECH-3107-rpc-failover
Browse files Browse the repository at this point in the history
Tech 3107 rpc failover
  • Loading branch information
jeannettemcd authored May 28, 2024
2 parents 5e5bb11 + 80675ef commit fd217f0
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 153 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,29 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Set environment variable
run: echo "RUN_TESTS=false" >> $GITHUB_ENV # Adjust this to 'true' or 'false'

- name: Checkout
if: env.RUN_TESTS != 'false'
uses: actions/checkout@v3
with:
submodules: recursive

- name: setup python
if: env.RUN_TESTS != 'false'
uses: actions/setup-python@v4
with:
python-version: '3.7'
python-version: '3.9'

- name: install packages
if: env.RUN_TESTS != 'false'
run: |
sudo apt-get update
sudo apt-get -y install python3-pip jshon jq virtualenv pkg-config openssl libssl-dev autoconf libtool libsecp256k1-dev
pip3 install -r requirements.txt
pip3 install -r requirements-dev.txt
- name: Run tests
if: env.RUN_TESTS != 'false'
run: ./test.sh
225 changes: 120 additions & 105 deletions chief_keeper/chief_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import argparse
import logging
import sys
import os
import requests
import time
import types

from web3 import Web3, HTTPProvider
from web3.exceptions import TimeExhausted

from urllib.parse import urlparse

from chief_keeper.database import SimpleDatabase
from chief_keeper.spell import DSSSpell
Expand All @@ -35,6 +39,27 @@
from pymaker.deployment import DssDeployment

HEALTHCHECK_FILE_PATH = "/tmp/health.log"
BACKOFF_MAX_TIME = 120

class ExitOnCritical(logging.StreamHandler):
"""Custom class to terminate script execution once
log records with severity level ERROR or higher occurred"""

def emit(self, record):
super().emit(record)
if record.levelno > logging.ERROR:
sys.exit(1)


logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z",
force=True,
handlers=[ExitOnCritical()],
)
logger = logging.getLogger()
log_level = logging.getLevelName(os.environ.get("LOG_LEVEL") or "INFO")
logger.setLevel(log_level)


def healthy(func):
Expand All @@ -51,110 +76,43 @@ def wrapper(*args, **kwargs):
class ChiefKeeper:
"""Keeper that lifts the hat and streamlines executive actions"""

logger = logging.getLogger("chief-keeper")


def __init__(self, args: list, **kwargs):
"""Pass in arguements assign necessary variables/objects and instantiate other Classes"""

parser = argparse.ArgumentParser("chief-keeper")

parser.add_argument(
"--rpc-host",
type=str,
required=True,
help="JSON-RPC host url",
)

parser.add_argument(
"--rpc-timeout",
type=int,
default=60,
help="JSON-RPC timeout (in seconds, default: 60)",
)

parser.add_argument(
"--network",
type=str,
required=True,
help="Network that you're running the Keeper on (options, 'mainnet', 'kovan', 'testnet')",
)

parser.add_argument(
"--eth-from",
type=str,
required=True,
help="Ethereum address from which to send transactions; checksummed (e.g. '0x12AebC')",
)

parser.add_argument(
"--eth-key",
type=str,
nargs="*",
help="Ethereum private key(s) to use (e.g. 'key_file=/path/to/keystore.json,pass_file=/path/to/passphrase.txt')",
)

parser.add_argument(
"--dss-deployment-file",
type=str,
required=False,
help="Json description of all the system addresses (e.g. /Full/Path/To/configFile.json)",
)

parser.add_argument(
"--chief-deployment-block",
type=int,
required=False,
default=0,
help=" Block that the Chief from dss-deployment-file was deployed at (e.g. 8836668",
)

parser.add_argument(
"--max-errors",
type=int,
default=100,
help="Maximum number of allowed errors before the keeper terminates (default: 100)",
)

parser.add_argument(
"--debug", dest="debug", action="store_true", help="Enable debug output"
)

parser.add_argument(
"--blocknative-api-key",
type=str,
default=None,
help="Blocknative API key",
)

parser.add_argument(
"--gas-initial-multiplier",
type=str,
default=1.0,
help="gas multiplier",
)
parser.add_argument(
"--gas-reactive-multiplier",
type=str,
default=2.25,
help="gas strategy tuning",
)
parser.add_argument(
"--gas-maximum", type=str, default=5000, help="gas strategy tuning"
)
parser.add_argument("--rpc-primary-url", type=str, required=True, help="Primary JSON-RPC host URL")
parser.add_argument("--rpc-primary-timeout", type=int, default=1200, help="Primary JSON-RPC timeout (in seconds, default: 1200)")
parser.add_argument("--rpc-backup-url", type=str, required=True, help="Backup JSON-RPC host URL")
parser.add_argument("--rpc-backup-timeout", type=int, default=1200, help="Backup JSON-RPC timeout (in seconds, default: 1200)")
parser.add_argument("--network", type=str, required=True, help="Network that you're running the Keeper on (options, 'mainnet', 'kovan', 'testnet')")
parser.add_argument("--eth-from", type=str, required=True, help="Ethereum address from which to send transactions; checksummed (e.g. '0x12AebC')")
parser.add_argument("--eth-key", type=str, nargs="*", help="Ethereum private key(s) to use (e.g. 'key_file=/path/to/keystore.json,pass_file=/path/to/passphrase.txt')")
parser.add_argument("--dss-deployment-file", type=str, required=False, help="Json description of all the system addresses (e.g. /Full/Path/To/configFile.json)")
parser.add_argument("--chief-deployment-block", type=int, required=False, default=0, help="Block that the Chief from dss-deployment-file was deployed at (e.g. 8836668")
parser.add_argument("--max-errors", type=int, default=100, help="Maximum number of allowed errors before the keeper terminates (default: 100)")
parser.add_argument("--debug", dest="debug", action="store_true", help="Enable debug output")
parser.add_argument("--blocknative-api-key", type=str, default=None, help="Blocknative API key")
parser.add_argument("--gas-initial-multiplier", type=float, default=1.0, help="gas multiplier")
parser.add_argument("--gas-reactive-multiplier", type=float, default=2.25, help="gas strategy tuning")
parser.add_argument("--gas-maximum", type=int, default=5000, help="gas strategy tuning")

parser.set_defaults(cageFacilitated=False)
self.arguments = parser.parse_args(args)

self.web3 = kwargs['web3'] if 'web3' in kwargs else Web3(HTTPProvider(endpoint_uri=self.arguments.rpc_host,
request_kwargs={"timeout": self.arguments.rpc_timeout}))
# Initialize logger before any method that uses it
self.logger = logger

self.web3.eth.defaultAccount = self.arguments.eth_from
register_keys(self.web3, self.arguments.eth_key)
self.our_address = Address(self.arguments.eth_from)
self.print_arguments()

self.web3 = None
self.node_type = None
self._initialize_blockchain_connection()

isConnected = self.web3.isConnected()
self.logger.info(f'web3 isConntected is: {isConnected}')
# Set the Ethereum address and register keys
# self.web3.eth.defaultAccount = self.arguments.eth_from
# register_keys(self.web3, self.arguments.eth_key)
self.our_address = Address(self.arguments.eth_from)

if self.arguments.dss_deployment_file:
self.dss = DssDeployment.from_json(
Expand All @@ -173,11 +131,58 @@ def __init__(self, args: list, **kwargs):

self.confirmations = 0

logging.basicConfig(
format="%(asctime)-15s %(levelname)-8s %(message)s",
level=(logging.DEBUG if self.arguments.debug else logging.INFO),
def print_arguments(self):
"""Print all the arguments passed to the script."""
for arg in vars(self.arguments):
self.logger.info(f"{arg}: {getattr(self.arguments, arg)}")

def _initialize_blockchain_connection(self):
"""Initialize connection with Ethereum node."""
if not self._connect_to_primary_node():
self.logger.info("Switching to backup node.")
if not self._connect_to_backup_node():
self.logger.critical(
"Error: Couldn't connect to the primary and backup Ethereum nodes."
)

def _connect_to_primary_node(self):
"""Connect to the primary Ethereum node"""
return self._connect_to_node(
self.arguments.rpc_primary_url, self.arguments.rpc_primary_timeout, "primary"
)

def _connect_to_backup_node(self):
"""Connect to the backup Ethereum node"""
return self._connect_to_node(
self.arguments.rpc_backup_url, self.arguments.rpc_backup_timeout, "backup"
)

def _connect_to_node(self, rpc_url, rpc_timeout, node_type):
"""Connect to an Ethereum node"""
try:
_web3 = Web3(HTTPProvider(rpc_url, {"timeout": rpc_timeout}))
except (TimeExhausted, Exception) as e:
self.logger.error(f"Error connecting to Ethereum node: {e}")
return False
else:
if _web3.isConnected():
self.web3 = _web3
self.node_type = node_type
return self._configure_web3()
return False

def _configure_web3(self):
"""Configure Web3 connection with private key"""
try:
self.web3.eth.defaultAccount = self.arguments.eth_from
register_keys(self.web3, self.arguments.eth_key)
except Exception as e:
self.logger.error(f"Error configuring Web3: {e}")
return False
else:
node_hostname = urlparse(self.web3.provider.endpoint_uri).hostname
self.logger.info(f"Connected to Ethereum node at {node_hostname}")
return True

def main(self):
"""Initialize the lifecycle and enter into the Keeper Lifecycle controller.
Expand Down Expand Up @@ -244,14 +249,18 @@ def process_block(self):
"""Callback called on each new block. If too many errors, terminate the keeper.
This is the entrypoint to the Keeper's monitoring logic
"""
isConnected = self.web3.isConnected()
self.logger.info(f'web3 isConntected is: {isConnected}')

if self.errors >= self.max_errors:
self.lifecycle.terminate()
else:
self.check_hat()
self.check_eta()
try:
isConnected = self.web3.isConnected()
self.logger.info(f'web3 isConnected: {isConnected}')

if self.errors >= self.max_errors:
self.lifecycle.terminate()
else:
self.check_hat()
self.check_eta()
except (TimeExhausted, Exception) as e:
self.logger.error(f"Error processing block: {e}")
self.errors += 1

def check_hat(self):
"""Ensures the Hat is on the proposal (spell, EOA, multisig, etc) with the most approval.
Expand All @@ -265,7 +274,13 @@ def check_hat(self):
blockNumber = self.web3.eth.blockNumber
self.logger.info(f"Checking Hat on block {blockNumber}")

self.database.update_db_yays(blockNumber)
try:
self.database.update_db_yays(blockNumber)
except (TimeExhausted, Exception) as e:
self.logger.error(f"Error updating database yays: {e}")
self.errors += 1
return

yays = self.database.db.get(doc_id=2)["yays"]

hat = self.dss.ds_chief.get_hat().address
Expand Down
1 change: 1 addition & 0 deletions chief_keeper/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from tinydb import TinyDB, Query
from web3 import Web3
from web3.exceptions import TimeExhausted

from chief_keeper.spell import DSSSpell

Expand Down
Loading

0 comments on commit fd217f0

Please sign in to comment.