Skip to content

Commit

Permalink
Fix: Data Integrity Issue (#122)
Browse files Browse the repository at this point in the history
* wip

* add migration file and update event processing logic

* use update instead of upsert

* fix unit test

* test coverage

* test coverage

* test coverage

* increase test coverage

* fix migration file

* fix migration file

* go mod tidy

* Add launch.json for debugging

* apply feedback

* fix migration file

* Bump the Kurtosis CDK to v0.2.23

* Fix TestClient_GetStatus UT

* apply feedback

* fix typo

* Bump the Kurtosis CDK version

---------

Co-authored-by: Stefan Negovanović <stefan@ethernal.tech>
  • Loading branch information
rachit77 and Stefan-Ethernal authored Dec 2, 2024
1 parent d97c040 commit 1f69650
Show file tree
Hide file tree
Showing 20 changed files with 488 additions and 920 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/regression-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
ref: "v0.2.22"
ref: v0.2.24
path: kurtosis-cdk

- name: Install Kurtosis CDK tools
Expand Down
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,3 @@ packages:
SequencerTracker:
config:
filename: sequencer_tracker.generated.go
github.com/0xPolygon/cdk-data-availability/services/status:
config:
interfaces:
GapsDetector:
config:
filename: gaps_detector.generated.go
21 changes: 21 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run DAC",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/cmd",
"args": [
"run",
"-cfg",
"test/config/test.local.toml",
],
"cwd": ".",
},
]
}
10 changes: 5 additions & 5 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func TestClient_GetStatus(t *testing.T) {
}{
{
name: "successfully got status",
result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"backfill_progress":5}}`,
result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"last_synchronized_block":5}}`,
status: &types.DACStatus{
Uptime: "123",
Version: "v1.0.0",
KeyCount: 2,
BackfillProgress: 5,
Uptime: "123",
Version: "v1.0.0",
KeyCount: 2,
LastSynchronizedBlock: 5,
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func start(cliCtx *cli.Context) error {
[]rpc.Service{
{
Name: status.APISTATUS,
Service: status.NewEndpoints(storage, batchSynchronizer),
Service: status.NewEndpoints(storage),
},
{
Name: sync.APISYNC,
Expand Down
140 changes: 42 additions & 98 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,32 @@ import (
const (
// storeLastProcessedBlockSQL is a query that stores the last processed block for a given task
storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();`
UPDATE data_node.sync_tasks
SET block = $2, processed = NOW()
WHERE task = $1;`

// getLastProcessedBlockSQL is a query that returns the last processed block for a given task
getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;`

// getUnresolvedBatchKeysSQL is a query that returns the unresolved batch keys from the database
getUnresolvedBatchKeysSQL = `SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;`
// getMissingBatchKeysSQL is a query that returns the missing batch keys from the database
getMissingBatchKeysSQL = `SELECT num, hash FROM data_node.missing_batches LIMIT $1;`

// getOffchainDataSQL is a query that returns the offchain data for a given key
getOffchainDataSQL = `
SELECT key, value, batch_num
SELECT key, value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`

// listOffchainDataSQL is a query that returns the offchain data for a given list of keys
listOffchainDataSQL = `
SELECT key, value, batch_num
SELECT key, value
FROM data_node.offchain_data
WHERE key IN (?);
`

// countOffchainDataSQL is a query that returns the count of rows in the offchain_data table
countOffchainDataSQL = "SELECT COUNT(*) FROM data_node.offchain_data;"

// selectOffchainDataGapsSQL is a query that returns the gaps in the offchain_data table
selectOffchainDataGapsSQL = `
WITH numbered_batches AS (
SELECT
batch_num,
ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number
FROM data_node.offchain_data
)
SELECT
nb1.batch_num AS current_batch_num,
nb2.batch_num AS next_batch_num
FROM
numbered_batches nb1
LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1
WHERE
nb1.batch_num IS NOT NULL
AND nb2.batch_num IS NOT NULL
AND nb1.batch_num + 1 <> nb2.batch_num;`
)

var (
Expand All @@ -73,15 +53,14 @@ type DB interface {
StoreLastProcessedBlock(ctx context.Context, block uint64, task string) error
GetLastProcessedBlock(ctx context.Context, task string) (uint64, error)

StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error
GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error)
DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error
StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error
GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error)
DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error

GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error)
ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error)
StoreOffChainData(ctx context.Context, od []types.OffChainData) error
CountOffchainData(ctx context.Context) (uint64, error)
DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error)
}

// DB is the database layer of the data node
Expand All @@ -90,10 +69,9 @@ type pgDB struct {

storeLastProcessedBlockStmt *sqlx.Stmt
getLastProcessedBlockStmt *sqlx.Stmt
getUnresolvedBatchKeysStmt *sqlx.Stmt
getMissingBatchKeysStmt *sqlx.Stmt
getOffChainDataStmt *sqlx.Stmt
countOffChainDataStmt *sqlx.Stmt
detectOffChainDataGapsStmt *sqlx.Stmt
}

// New instantiates a DB
Expand All @@ -108,9 +86,9 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) {
return nil, fmt.Errorf("failed to prepare the get last processed block statement: %w", err)
}

getUnresolvedBatchKeysStmt, err := pg.PreparexContext(ctx, getUnresolvedBatchKeysSQL)
getMissingBatchKeysStmt, err := pg.PreparexContext(ctx, getMissingBatchKeysSQL)
if err != nil {
return nil, fmt.Errorf("failed to prepare the get unresolved batch keys statement: %w", err)
return nil, fmt.Errorf("failed to prepare the get missing batch keys statement: %w", err)
}

getOffChainDataStmt, err := pg.PreparexContext(ctx, getOffchainDataSQL)
Expand All @@ -123,19 +101,13 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) {
return nil, fmt.Errorf("failed to prepare the count offchain data statement: %w", err)
}

detectOffChainDataGapsStmt, err := pg.PreparexContext(ctx, selectOffchainDataGapsSQL)
if err != nil {
return nil, fmt.Errorf("failed to prepare the detect offchain data gaps statement: %w", err)
}

return &pgDB{
pg: pg,
storeLastProcessedBlockStmt: storeLastProcessedBlockStmt,
getLastProcessedBlockStmt: getLastProcessedBlockStmt,
getUnresolvedBatchKeysStmt: getUnresolvedBatchKeysStmt,
getMissingBatchKeysStmt: getMissingBatchKeysStmt,
getOffChainDataStmt: getOffChainDataStmt,
countOffChainDataStmt: countOffChainDataStmt,
detectOffChainDataGapsStmt: detectOffChainDataGapsStmt,
}, nil
}

Expand All @@ -156,24 +128,28 @@ func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64,
return lastBlock, nil
}

// StoreUnresolvedBatchKeys stores unresolved batch keys in the database
func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
// StoreMissingBatchKeys stores missing batch keys in the database
func (db *pgDB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error {
if len(bks) == 0 {
return nil
}

query, args := buildBatchKeysInsertQuery(bks)

if _, err := db.pg.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("failed to store unresolved batches: %w", err)
batchNumbers := make([]string, len(bks))
for i, bk := range bks {
batchNumbers[i] = fmt.Sprintf("%d", bk.Number)
}
return fmt.Errorf("failed to store missing batches (batch numbers: %s): %w", strings.Join(batchNumbers, ", "), err)
}

return nil
}

// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) {
rows, err := db.getUnresolvedBatchKeysStmt.QueryxContext(ctx, limit)
// GetMissingBatchKeys returns the missing batch keys that is not yet present in offchain table
func (db *pgDB) GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) {
rows, err := db.getMissingBatchKeysStmt.QueryxContext(ctx, limit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,8 +177,8 @@ func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types
return bks, nil
}

// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database
func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
// DeleteMissingBatchKeys deletes the missing batch keys from the missing_batch table in the db
func (db *pgDB) DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error {
if len(bks) == 0 {
return nil
}
Expand All @@ -218,11 +194,11 @@ func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.Batch
}

query := fmt.Sprintf(`
DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (%s);
DELETE FROM data_node.missing_batches WHERE (num, hash) IN (%s);
`, strings.Join(values, ","))

if _, err := db.pg.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("failed to delete unresolved batches: %w", err)
return fmt.Errorf("failed to delete missing batches: %w", err)
}

return nil
Expand All @@ -245,9 +221,8 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, ods []types.OffChainData)
// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) {
data := struct {
Key string `db:"key"`
Value string `db:"value"`
BatchNum uint64 `db:"batch_num"`
Key string `db:"key"`
Value string `db:"value"`
}{}

if err := db.getOffChainDataStmt.QueryRowxContext(ctx, key.Hex()).StructScan(&data); err != nil {
Expand All @@ -259,9 +234,8 @@ func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.Of
}

return &types.OffChainData{
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
BatchNum: data.BatchNum,
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
}, nil
}

Expand Down Expand Up @@ -292,9 +266,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ
defer rows.Close()

type row struct {
Key string `db:"key"`
Value string `db:"value"`
BatchNum uint64 `db:"batch_num"`
Key string `db:"key"`
Value string `db:"value"`
}

list := make([]types.OffChainData, 0, len(keys))
Expand All @@ -305,9 +278,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ
}

list = append(list, types.OffChainData{
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
BatchNum: data.BatchNum,
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
})
}

Expand All @@ -324,34 +296,7 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) {
return count, nil
}

// DetectOffchainDataGaps returns the number of gaps in the offchain_data table
func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) {
rows, err := db.detectOffChainDataGapsStmt.QueryxContext(ctx)
if err != nil {
return nil, err
}

defer rows.Close()

type row struct {
CurrentBatchNum uint64 `db:"current_batch_num"`
NextBatchNum uint64 `db:"next_batch_num"`
}

gaps := make(map[uint64]uint64)
for rows.Next() {
var data row
if err = rows.StructScan(&data); err != nil {
return nil, err
}

gaps[data.CurrentBatchNum] = data.NextBatchNum
}

return gaps, nil
}

// buildBatchKeysInsertQuery builds the query to insert unresolved batch keys
// buildBatchKeysInsertQuery builds the query to insert missing batch keys
func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {
const columnsAffected = 2

Expand All @@ -364,32 +309,31 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {
}

return fmt.Sprintf(`
INSERT INTO data_node.unresolved_batches (num, hash)
INSERT INTO data_node.missing_batches (num, hash)
VALUES %s
ON CONFLICT (num, hash) DO NOTHING;
`, strings.Join(values, ",")), args
}

// buildOffchainDataInsertQuery builds the query to insert offchain data
func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface{}) {
const columnsAffected = 3
const columnsAffected = 2

// Remove duplicates from the given offchain data
ods = types.RemoveDuplicateOffChainData(ods)

args := make([]interface{}, len(ods)*columnsAffected)
values := make([]string, len(ods))
for i, od := range ods {
values[i] = fmt.Sprintf("($%d, $%d, $%d)", i*columnsAffected+1, i*columnsAffected+2, i*columnsAffected+3) //nolint:mnd
values[i] = fmt.Sprintf("($%d, $%d)", i*columnsAffected+1, i*columnsAffected+2) //nolint:mnd
args[i*columnsAffected] = od.Key.Hex()
args[i*columnsAffected+1] = common.Bytes2Hex(od.Value)
args[i*columnsAffected+2] = od.BatchNum
}

return fmt.Sprintf(`
INSERT INTO data_node.offchain_data (key, value, batch_num)
INSERT INTO data_node.offchain_data (key, value)
VALUES %s
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num;
SET value = EXCLUDED.value;
`, strings.Join(values, ",")), args
}
Loading

0 comments on commit 1f69650

Please sign in to comment.