Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
[etl_optimisations] Removed component, changed abstrations, & redid b…
Browse files Browse the repository at this point in the history
…lock syncing
  • Loading branch information
Ethen Pociask committed Nov 2, 2023
1 parent 3dcb2a1 commit 46842a7
Show file tree
Hide file tree
Showing 42 changed files with 536 additions and 1,155 deletions.
2 changes: 1 addition & 1 deletion config.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
L1_RPC_ENDPOINT=
L2_RPC_ENDPOINT=

# Oracle Geth Block Poll Intervals (ms)
# Chain
L1_POLL_INTERVAL=5000
L2_POLL_INTERVAL=5000

Expand Down
25 changes: 12 additions & 13 deletions docs/architecture/etl.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ A component refers to a graph node within the ETL system. Every component perfor
Currently, there are three total component types:

1. `Pipe` - Used to perform local arbitrary computations _(e.g. Extracting L1Withdrawal transactions from a block)_
2. `Oracle` - Used to poll and collect data from some third-party source _(e.g. Querying real-time account balance amounts from an op-geth execution client)_
2. `Reader` - Used to poll and collect data from some third-party source _(e.g. Querying real-time account balance amounts from an op-geth execution client)_
3. `Aggregator` - Used to synchronize events between asynchronous data sources _(e.g. Synchronizing L1/L2 blocks to understand real-time changes in bridging TVL)_

### Inter-Connectivity
Expand Down Expand Up @@ -97,20 +97,19 @@ Once input data processing has been completed, the output data is then submitted
* Generating opcode traces for some EVM transaction
* Parsing emitted events from a transaction

### Oracle
### Reader

Oracles are responsible for collecting data from some external third party _(e.g. L1 geth node, L2 rollup node, etc.)_. As of now, oracle's are configurable through the use of a standard `OracleDefinition` interface that allows developers to write arbitrary oracle logic.
Oracles are responsible for collecting data from some external third party _(e.g. L1 geth node, L2 rollup node, etc.)_. As of now, reader's are configurable through the use of a standard `OracleDefinition` interface that allows developers to write arbitrary reader logic.
The following key interface functions are supported/enforced:

* `ReadRoutine` - Routine used for reading/polling real-time data for some arbitrarily configured data source
* `BackTestRoutine` - _Optional_ routine used for sequentially backtesting from some starting to ending block heights.

Unlike other components, `Oracles` actually employ _2 go routines_ to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual `Oracle` event loop. This is visualized below:
Unlike other components, `Oracles` actually employ _2 go routines_ to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual `Reader` event loop. This is visualized below:

{% raw %}
<div class="mermaid">
graph LR;
subgraph A[Oracle]
subgraph A[Reader]
B[eventLoop]-->|channel|ODefRoutine;
B[eventLoop]-->|context|ODefRoutine;
B-->B;
Expand Down Expand Up @@ -185,7 +184,7 @@ A registry submodule is used to store all ETL data register definitions that pro

## Addressing

Some component's require knowledge of a specific address to properly function. For example, an oracle that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems.
Some component's require knowledge of a specific address to properly function. For example, an reader that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems.

Shown below is how the ETL and Risk Engine interact with the shared state store using a `BalanceOracle` component as an example:

Expand All @@ -210,22 +209,22 @@ graph LR;
GETH --> |"{4} []balance"|BO

BO("Balance
Oracle") --> |"{1} Get(PUUID)"|state
Reader") --> |"{1} Get(PUUID)"|state
BO -."eventLoop()".-> BO

state --> |"{2} []address"|BO
end
</div>
{% endraw %}

### Geth Block Oracle Register
### Geth Block Reader Register

A `GethBlock` register refers to a block output extracted from a go-ethereum node. This register is used for creating `Oracle` components that poll and extract block data from a go-ethereum node in real-time.
A `BlockHeader` register refers to a block output extracted from a go-ethereum node. This register is used for creating `Reader` components that poll and extract block data from a go-ethereum node in real-time.

### Geth Account Balance Oracle Register
### Geth Account Balance Reader Register

An `AccountBalance` register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating `Oracle` components that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time.
Unlike, the `GethBlock` register, this register requires knowledge of an address set that's shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable.
An `AccountBalance` register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating `Reader` components that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time.
Unlike, the `BlockHeader` register, this register requires knowledge of an address set that's shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable.

## Managed ETL

Expand Down
2 changes: 1 addition & 1 deletion docs/heuristics.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ curl --location --request POST 'http://localhost:8080/v0/heuristic' \

**NOTE:** This heuristic requires an active RPC connection to both L1 and L2 networks. Furthermore, the Pessimism implementation of fault-detector assumes that a submitted L2 output on L1 will correspond to a canonical block on L2.

The hardcoded `fault_detector` heuristic scans for active `OutputProposed` events on an L1 Output Oracle contract. Once an event is detected, the heuristic implementation proceeds to reconstruct a local state output for the corresponding L2 block. If there is a mismatch between the L1 output and the local state output, the heuristic alerts.
The hardcoded `fault_detector` heuristic scans for active `OutputProposed` events on an L1 Output Reader contract. Once an event is detected, the heuristic implementation proceeds to reconstruct a local state output for the corresponding L2 block. If there is a mismatch between the L1 output and the local state output, the heuristic alerts.

### Parameters

Expand Down
36 changes: 34 additions & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/logging"
indexer_client "github.com/ethereum-optimism/optimism/indexer/client"
ix_client "github.com/ethereum-optimism/optimism/indexer/client"
ix_node "github.com/ethereum-optimism/optimism/indexer/node"
"go.uber.org/zap"
)

Expand All @@ -15,13 +16,15 @@ type Config struct {
L1RpcEndpoint string
L2RpcEndpoint string

IndexerCfg *indexer_client.Config
IndexerCfg *ix_client.Config
}

// Bundle ... Used to store all client object references
type Bundle struct {
IndexerClient IndexerClient
L1Client EthClient
L1Node ix_node.EthClient
L2Node ix_node.EthClient
L2Client EthClient
L2Geth GethClient
}
Expand All @@ -36,12 +39,24 @@ func NewBundle(ctx context.Context, cfg *Config) (*Bundle, error) {
return nil, err
}

l1NodeClient, err := NewNodeClient(ctx, cfg.L1RpcEndpoint)
if err != nil {
logger.Fatal("Error creating L1 node client", zap.Error(err))
return nil, err
}

l2Client, err := NewEthClient(ctx, cfg.L2RpcEndpoint)
if err != nil {
logger.Fatal("Error creating L1 client", zap.Error(err))
return nil, err
}

l2NodeClient, err := NewNodeClient(ctx, cfg.L2RpcEndpoint)
if err != nil {
logger.Fatal("Error creating L2 node client", zap.Error(err))
return nil, err
}

l2Geth, err := NewGethClient(cfg.L2RpcEndpoint)
if err != nil {
logger.Fatal("Error creating L2 GETH client", zap.Error(err))
Expand All @@ -56,7 +71,9 @@ func NewBundle(ctx context.Context, cfg *Config) (*Bundle, error) {
return &Bundle{
IndexerClient: indexerClient,
L1Client: l1Client,
L1Node: l1NodeClient,
L2Client: l2Client,
L2Node: l2NodeClient,
L2Geth: l2Geth,
}, nil
}
Expand All @@ -71,6 +88,21 @@ func FromContext(ctx context.Context) (*Bundle, error) {
return b, nil
}

// NodeClient ...
func (b *Bundle) NodeClient(n core.Network) (ix_node.EthClient, error) {
switch n {
case core.Layer1:
return b.L1Node, nil

case core.Layer2:
return b.L2Node, nil

default:
return nil, fmt.Errorf("invalid network supplied")
}

Check failure on line 103 in internal/client/client.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}

// FromNetwork ... Retrieves an eth client from the context
func FromNetwork(ctx context.Context, n core.Network) (EthClient, error) {
bundle, err := FromContext(ctx)
Expand Down
19 changes: 8 additions & 11 deletions internal/client/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,18 @@

package client

/*
NOTE
eth client docs: https://pkg.go.dev/github.com/ethereum/go-ethereum/ethclient
eth api docs: https://geth.ethereum.org/docs/rpc/server
*/

import (
"context"
"math/big"

"github.com/base-org/pessimism/internal/metrics"
ix_node "github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

// TODO (#20) : Introduce optional Retry-able EthClient

// EthClient ... Provides interface wrapper for ethClient functions
// Useful for mocking go-ethereum json rpc client logic
type EthClient interface {
CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error)
CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error)
Expand All @@ -40,3 +31,9 @@ type EthClient interface {
func NewEthClient(ctx context.Context, rawURL string) (EthClient, error) {
return ethclient.DialContext(ctx, rawURL)
}

func NewNodeClient(ctx context.Context, rpcURL string) (ix_node.EthClient, error) {
stats := metrics.WithContext(ctx)

return ix_node.DialEthClient(rpcURL, stats)
}
9 changes: 2 additions & 7 deletions internal/core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"
)

// ClientConfig ... Configuration passed through to an oracle component constructor
// ClientConfig ... Configuration passed through to an reader component constructor
type ClientConfig struct {
Network Network
PollInterval time.Duration
Expand All @@ -31,12 +31,7 @@ type PipelineConfig struct {
ClientConfig *ClientConfig
}

// Backfill ... Returns true if the oracle is configured to backfill
// Backfill ... Returns true if the reader is configured to backfill
func (oc *ClientConfig) Backfill() bool {
return oc.StartHeight != nil
}

// Backtest ... Returns true if the oracle is configured to backtest
func (oc *ClientConfig) Backtest() bool {
return oc.EndHeight != nil
}
2 changes: 1 addition & 1 deletion internal/core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
Clients
)

// Network ... Represents the network for which a pipeline's oracle
// Network ... Represents the network for which a pipeline's reader
// is subscribed to.
type Network uint8

Expand Down
4 changes: 2 additions & 2 deletions internal/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func Test_TransitData(t *testing.T) {
// Verify construction
td := core.NewTransitData(
core.GethBlock,
core.BlockHeader,
nil,
)

Expand All @@ -33,7 +33,7 @@ func Test_EngineRelay(t *testing.T) {
outChan := make(chan core.HeuristicInput)

eir := core.NewEngineRelay(core.NilPUUID(), outChan)
dummyTD := core.NewTransitData(core.AccountBalance, nil)
dummyTD := core.NewTransitData(core.BlockHeader, nil)

// Verify relay and wrapping

Expand Down
15 changes: 6 additions & 9 deletions internal/core/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,19 @@ package core
type ComponentType uint8

const (
Oracle ComponentType = iota + 1
Pipe
Aggregator
Reader ComponentType = iota + 1
Transformer
)

// String ... Converts the component type to a string
func (ct ComponentType) String() string {
switch ct {
case Oracle:
return "oracle"
case Reader:
return "reader"

case Pipe:
return "pipe"
case Transformer:
return "transformer"

Check failure on line 19 in internal/core/etl.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
case Aggregator:
return "aggregator"
}

return UnknownType
Expand Down
4 changes: 2 additions & 2 deletions internal/core/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Test_Component_ID(t *testing.T) {

assert.Equal(t, expectedPID, actualID.PID)

expectedStr := "layer1:backtest:oracle:account_balance"
expectedStr := "layer1:backtest:reader:account_balance"
actualStr := actualID.PID.String()

assert.Equal(t, expectedStr, actualStr)
Expand All @@ -28,7 +28,7 @@ func Test_Pipeline_ID(t *testing.T) {

assert.Equal(t, expectedID, actualID.PID)

expectedStr := "backtest::layer1:backtest:oracle:account_balance::layer1:backtest:oracle:account_balance"
expectedStr := "backtest::layer1:backtest:reader:account_balance::layer1:backtest:reader:account_balance"
actualStr := actualID.PID.String()

assert.Equal(t, expectedStr, actualStr)
Expand Down
17 changes: 7 additions & 10 deletions internal/core/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@ package core
type RegisterType uint8

const (
AccountBalance RegisterType = iota + 1
GethBlock
BlockHeader RegisterType = iota + 1
EventLog
)

// String ... Returns string representation of a
// register enum
func (rt RegisterType) String() string {
switch rt {

Check failure on line 14 in internal/core/register.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)
case AccountBalance:
return "account_balance"

case GethBlock:
return "geth_block"
case BlockHeader:
return "block_header"

case EventLog:
return "event_log"
Expand All @@ -32,10 +29,10 @@ type DataRegister struct {
Addressing bool
Sk *StateKey

DataType RegisterType
ComponentType ComponentType
ComponentConstructor interface{}
Dependencies []RegisterType
DataType RegisterType
ComponentType ComponentType
Constructor interface{}
Dependencies []RegisterType
}

// StateKey ... Returns a cloned state key for a data register
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Test_EventLoop(t *testing.T) {
hi := core.HeuristicInput{
PUUID: testPUUID,
Input: core.TransitData{
Type: core.AccountBalance,
Type: core.BlockHeader,
Address: common.HexToAddress("0x69"),
Value: float64(666),
},
Expand Down
Loading

0 comments on commit 46842a7

Please sign in to comment.