Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage optimized sync tracking #28

Merged
merged 5 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 12 additions & 27 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/0xPolygon/cdk-data-availability/offchaindata"
"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
Expand Down Expand Up @@ -76,7 +75,7 @@ func (db *DB) GetOffChainData(ctx context.Context, key common.Hash, dbTx pgx.Tx)

// Exists checks if a key exists in offchain data table
func (db *DB) Exists(ctx context.Context, key common.Hash) bool {
var keyExists = "SELECT COUNT(*) FROM data_node.offchain_data WHERE key = $1"
var keyExists = "SELECT COUNT(*) FROM data_node.offchain_data WHERE key = $1;"
var (
count uint
)
Expand All @@ -87,42 +86,28 @@ func (db *DB) Exists(ctx context.Context, key common.Hash) bool {
return count > 0
}

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer
func (db *DB) GetLastProcessedBlock(ctx context.Context) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT max(block) FROM data_node.sync_info;"
// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *DB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"
var (
lastBlock uint64
)

if err := db.pg.QueryRow(ctx, getLastProcessedBlockSQL).Scan(&lastBlock); err != nil {
if err := db.pg.QueryRow(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil {
return 0, err
}
return lastBlock, nil
}

// ResetLastProcessedBlock removes all sync_info for blocks greater than `block`
func (db *DB) ResetLastProcessedBlock(ctx context.Context, block uint64) (uint64, error) {
const resetLastProcessedBlock = "DELETE FROM data_node.sync_info WHERE block > $1"
var (
ct pgconn.CommandTag
err error
)
if ct, err = db.pg.Exec(ctx, resetLastProcessedBlock, block); err != nil {
return 0, err
}
return uint64(ct.RowsAffected()), nil
}

// StoreLastProcessedBlock stores a record of a block processed by the synchronizer
func (db *DB) StoreLastProcessedBlock(ctx context.Context, block uint64, dbTx pgx.Tx) error {
// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *DB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx pgx.Tx) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_info (block)
VALUES ($1)
ON CONFLICT (block) DO UPDATE
SET processed = NOW();
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
`

if _, err := dbTx.Exec(ctx, storeLastProcessedBlockSQL, block); err != nil {
if _, err := dbTx.Exec(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
}
return nil
Expand Down
16 changes: 16 additions & 0 deletions db/migrations/0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- +migrate Down
DROP TABLE IF EXISTS data_node.sync_tasks CASCADE;

-- +migrate Up
CREATE TABLE data_node.sync_tasks
(
task VARCHAR PRIMARY KEY,
block BIGINT NOT NULL,
processed TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- transfer data from old table to new
INSERT INTO data_node.sync_tasks (task, block)
SELECT 'L1', MAX(block) FROM data_node.sync_info;

DROP TABLE IF EXISTS data_node.sync_info CASCADE;
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hermeznetwork/tracerr v0.3.2
github.com/invopop/jsonschema v0.7.0
github.com/jackc/pgconn v1.14.1
github.com/jackc/pgx/v4 v4.18.1
github.com/miguelmota/go-solidity-sha3 v0.1.1
github.com/mitchellh/mapstructure v1.5.0
Expand Down Expand Up @@ -43,6 +42,7 @@ require (
github.com/holiman/uint256 v1.2.3 // indirect
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
Expand Down
2 changes: 1 addition & 1 deletion synchronizer/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (bs *BatchSynchronizer) handleReorgs() {
// only reset start block if necessary
continue
}
err = rewindStartBlock(bs.db, r.Number)
err = setStartBlock(bs.db, r.Number)
if err != nil {
log.Errorf("failed to store new start block to %d: %v", r.Number, err)
}
Expand Down
20 changes: 4 additions & 16 deletions synchronizer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (

const dbTimeout = 2 * time.Second

const l1SyncTask = "L1"

func getStartBlock(db *db.DB) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

start, err := db.GetLastProcessedBlock(ctx)
start, err := db.GetLastProcessedBlock(ctx, l1SyncTask)
if err != nil {
log.Errorf("error retrieving last processed block, starting from 0: %v", err)
}
Expand All @@ -37,7 +39,7 @@ func setStartBlock(db *db.DB, block uint64) error {
if dbTx, err = db.BeginStateTransaction(ctx); err != nil {
return err
}
err = db.StoreLastProcessedBlock(ctx, block, dbTx)
err = db.StoreLastProcessedBlock(ctx, l1SyncTask, block, dbTx)
if err != nil {
return err
}
Expand All @@ -47,20 +49,6 @@ func setStartBlock(db *db.DB, block uint64) error {
return nil
}

func rewindStartBlock(db *db.DB, lca uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

rewind, err := db.ResetLastProcessedBlock(ctx, lca)
if err != nil {
return err
}
if rewind > 0 {
log.Infof("rewound %d blocks", rewind)
}
return nil
}

func exists(db *db.DB, key common.Hash) bool {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()
Expand Down
21 changes: 4 additions & 17 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ run: stop ## Runs a full node for e2e
.PHONY: stop
stop: ## Stop a full data node
make stop-dacs
make stop-dac-dbs
$(STOP)

.PHONY: stop-dacs
stop-dacs:
@./stop-dacs > /dev/null 2>&1

.PHONY: test-e2e
test-e2e: run ## Runs the E2E tests
trap '$(STOP)' EXIT; MallocNanoZone=0 go test -count=1 -race -v -p 1 -timeout 600s ./e2e/...
Expand Down Expand Up @@ -54,19 +57,3 @@ run-network: ## Runs the l1 network
stop-network: ## Stops the l1 network
docker compose stop l1 && docker compose rm -f l1


### manual utility to stop dac nodes that were started by e2e, but not stopped by e2e ###
.PHONY: stop-dac-nodes
stop-dac-nodes:
for i in 0 1 2 3 4 ; do \
(docker kill cdk-data-availability-$$i || true) && (docker rm cdk-data-availability-$$i || true) ; \
done

.PHONY: stop-dac-dbs
stop-dac-dbs:
for i in 0 1 2 3 4 ; do \
(docker kill cdk-validium-data-node-db-$$i || true) && (docker rm cdk-validium-data-node-db-$$i || true); \
done

.PHONY: stop-dacs
stop-dacs: stop-dac-nodes stop-dac-dbs
9 changes: 9 additions & 0 deletions test/stop-dacs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/sh

for x in cdk-data-availability cdk-validium-data-node-db; do
echo $x
for i in 0 1 2 3 4; do
docker kill $x-$i || true
docker rm $x-$i || true
done
done