Skip to content

Commit

Permalink
single row per sync task table
Browse files Browse the repository at this point in the history
  • Loading branch information
christophercampbell committed Oct 20, 2023
1 parent 57e2e31 commit c7e08d6
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 44 deletions.
39 changes: 12 additions & 27 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/0xPolygon/cdk-data-availability/offchaindata"
"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
Expand Down Expand Up @@ -76,7 +75,7 @@ func (db *DB) GetOffChainData(ctx context.Context, key common.Hash, dbTx pgx.Tx)

// Exists checks if a key exists in offchain data table
func (db *DB) Exists(ctx context.Context, key common.Hash) bool {
var keyExists = "SELECT COUNT(*) FROM data_node.offchain_data WHERE key = $1"
var keyExists = "SELECT COUNT(*) FROM data_node.offchain_data WHERE key = $1;"
var (
count uint
)
Expand All @@ -87,42 +86,28 @@ func (db *DB) Exists(ctx context.Context, key common.Hash) bool {
return count > 0
}

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer
func (db *DB) GetLastProcessedBlock(ctx context.Context) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT max(block) FROM data_node.sync_info;"
// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *DB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"
var (
lastBlock uint64
)

if err := db.pg.QueryRow(ctx, getLastProcessedBlockSQL).Scan(&lastBlock); err != nil {
if err := db.pg.QueryRow(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil {
return 0, err
}
return lastBlock, nil
}

// ResetLastProcessedBlock removes all sync_info for blocks greater than `block`
func (db *DB) ResetLastProcessedBlock(ctx context.Context, block uint64) (uint64, error) {
const resetLastProcessedBlock = "DELETE FROM data_node.sync_info WHERE block > $1"
var (
ct pgconn.CommandTag
err error
)
if ct, err = db.pg.Exec(ctx, resetLastProcessedBlock, block); err != nil {
return 0, err
}
return uint64(ct.RowsAffected()), nil
}

// StoreLastProcessedBlock stores a record of a block processed by the synchronizer
func (db *DB) StoreLastProcessedBlock(ctx context.Context, block uint64, dbTx pgx.Tx) error {
// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *DB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx pgx.Tx) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_info (block)
VALUES ($1)
ON CONFLICT (block) DO UPDATE
SET processed = NOW();
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
`

if _, err := dbTx.Exec(ctx, storeLastProcessedBlockSQL, block); err != nil {
if _, err := dbTx.Exec(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
}
return nil
Expand Down
16 changes: 16 additions & 0 deletions db/migrations/0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- +migrate Down
DROP TABLE IF EXISTS data_node.sync_tasks CASCADE;

-- +migrate Up
CREATE TABLE data_node.sync_tasks
(
task VARCHAR PRIMARY KEY,
block BIGINT NOT NULL,
processed TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- transfer data from old table to new
INSERT INTO data_node.sync_tasks (task, block)
SELECT 'L1', MAX(block) FROM data_node.sync_info;

DROP TABLE IF EXISTS data_node.sync_info CASCADE;
2 changes: 1 addition & 1 deletion synchronizer/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (bs *BatchSynchronizer) handleReorgs() {
// only reset start block if necessary
continue
}
err = rewindStartBlock(bs.db, r.Number)
err = setStartBlock(bs.db, r.Number)
if err != nil {
log.Errorf("failed to store new start block to %d: %v", r.Number, err)
}
Expand Down
20 changes: 4 additions & 16 deletions synchronizer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (

const dbTimeout = 2 * time.Second

const L1SyncTask = "L1"

Check failure on line 16 in synchronizer/store.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported const L1SyncTask should have comment or be unexported (revive)

func getStartBlock(db *db.DB) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

start, err := db.GetLastProcessedBlock(ctx)
start, err := db.GetLastProcessedBlock(ctx, L1SyncTask)
if err != nil {
log.Errorf("error retrieving last processed block, starting from 0: %v", err)
}
Expand All @@ -37,7 +39,7 @@ func setStartBlock(db *db.DB, block uint64) error {
if dbTx, err = db.BeginStateTransaction(ctx); err != nil {
return err
}
err = db.StoreLastProcessedBlock(ctx, block, dbTx)
err = db.StoreLastProcessedBlock(ctx, L1SyncTask, block, dbTx)
if err != nil {
return err
}
Expand All @@ -47,20 +49,6 @@ func setStartBlock(db *db.DB, block uint64) error {
return nil
}

func rewindStartBlock(db *db.DB, lca uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()

rewind, err := db.ResetLastProcessedBlock(ctx, lca)
if err != nil {
return err
}
if rewind > 0 {
log.Infof("rewound %d blocks", rewind)
}
return nil
}

func exists(db *db.DB, key common.Hash) bool {
ctx, cancel := context.WithTimeout(context.Background(), dbTimeout)
defer cancel()
Expand Down

0 comments on commit c7e08d6

Please sign in to comment.