Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ethrpc/ethmonitor: raw blocks and logs #105

Merged
merged 15 commits into from
Sep 14, 2023
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:

strategy:
matrix:
go-version: [1.18.x, 1.19.x, 1.20.x]
go-version: [1.19.x, 1.20.x, 1.21.x]
os: [ubuntu-latest, macos-latest]

runs-on: ${{ matrix.os }}
Expand Down
13 changes: 9 additions & 4 deletions cmd/chain-watch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/0xsequence/ethkit/util"
"github.com/goware/cachestore"
"github.com/goware/cachestore/redis"
"github.com/goware/logger"
"github.com/goware/pp"
)

Expand Down Expand Up @@ -54,19 +55,25 @@ func main() {
cachestore.MaxKeyLength = 180
monitorOptions := ethmonitor.DefaultOptions
monitorOptions.PollingInterval = time.Duration(2000 * time.Millisecond)
monitorOptions.DebugLogging = true
monitorOptions.WithLogs = true
monitorOptions.BlockRetentionLimit = 64
monitorOptions.StartBlockNumber = nil // track the head
monitorOptions.Bootstrap = true
// monitorOptions.StartBlockNumber = big.NewInt(47496451)
// monitorOptions.Bootstrap = true

monitorOptions.Logger = logger.NewLogger(logger.LogLevel_DEBUG)
monitorOptions.DebugLogging = true

// monitorOptions.TrailNumBlocksBehindHead = 4
// monitorOptions.UnsubscribeOnStop = true

if os.Getenv("REDIS_ENABLED") == "1" {
monitorOptions.CacheBackend = redis.Backend(&redis.Config{
Enabled: true,
Host: "localhost",
Port: 6379,
})
monitorOptions.RetainPayloads = true
}

chain, feed, err := chainWatch(provider, monitorOptions)
Expand Down Expand Up @@ -105,8 +112,6 @@ func chainWatch(provider *ethrpc.Provider, monitorOptions ethmonitor.Options) (*
}
snapshotFile := filepath.Join(cwd, "snapshot.json")

// monitorOptions.UnsubscribeOnStop = true

monitor, err := ethmonitor.NewMonitor(provider, monitorOptions)
if err != nil {
log.Fatal(err)
Expand Down
37 changes: 33 additions & 4 deletions ethmonitor/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ func (c *Chain) Blocks() Blocks {
return blocks
}

func (c *Chain) ReadyHead() *Block {
c.mu.Lock()
defer c.mu.Unlock()
for i := len(c.blocks) - 1; i >= 0; i-- {
if c.blocks[i].OK {
return c.blocks[i]
}
}
return nil
}

func (c *Chain) GetBlock(hash common.Hash) *Block {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -210,6 +221,11 @@ type Block struct {

// OK flag which represents the block is ready for broadcasting
OK bool

// Raw byte payloads for block and logs responses from the nodes.
// The values are only set if RetainPayloads is set to true on monitor.
BlockPayload []byte
LogsPayload []byte
}

type Blocks []*Block
Expand Down Expand Up @@ -287,11 +303,24 @@ func (blocks Blocks) Copy() Blocks {
if b.Logs != nil {
copy(logs, b.Logs)
}

var blockPayload []byte
if b.BlockPayload != nil {
copy(blockPayload, b.BlockPayload)
}

var logsPayload []byte
if b.LogsPayload != nil {
copy(logsPayload, b.LogsPayload)
}

nb[i] = &Block{
Block: b.Block,
Event: b.Event,
Logs: logs,
OK: b.OK,
Block: b.Block,
Event: b.Event,
Logs: logs,
OK: b.OK,
BlockPayload: blockPayload,
LogsPayload: logsPayload,
}
}

Expand Down
Loading
Loading