From 138c61c9726c1f76251b49da117fb53a7c9f8469 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Tue, 3 Sep 2024 17:21:31 +0000 Subject: [PATCH] fix slow lookup of previous ATX for (ID, epoch) (#6314) ## Motivation Speed up `atxs.PrevIdByNodeId` --- activation/activation_test.go | 5 +- activation/handler_v1.go | 2 +- activation/handler_v2.go | 2 +- activation/handler_v2_test.go | 8 +-- api/grpcserver/admin_service_test.go | 2 +- checkpoint/runner_test.go | 7 ++- common/types/poet.go | 2 +- sql/atxs/atxs.go | 21 +++++-- sql/atxs/atxs_test.go | 58 ++++++++++--------- .../migrations/state_0021_migration.go | 20 ++++++- .../migrations/state_0021_migration_test.go | 2 +- .../0023_posts_has_publish_epoch.sql | 12 ++++ sql/statesql/schema/schema.sql | 5 +- 13 files changed, 98 insertions(+), 48 deletions(-) create mode 100644 sql/statesql/schema/migrations/0023_posts_has_publish_epoch.sql diff --git a/activation/activation_test.go b/activation/activation_test.go index 0305add4c3..4f3550d7dd 100644 --- a/activation/activation_test.go +++ b/activation/activation_test.go @@ -138,7 +138,10 @@ func publishAtxV1( return codec.Decode(got, &watx) }) require.NoError(tb, atxs.Add(tab.db, toAtx(tb, &watx), watx.Blob())) - require.NoError(tb, atxs.SetPost(tab.db, watx.ID(), watx.PrevATXID, 0, watx.SmesherID, watx.NumUnits)) + require.NoError( + tb, + atxs.SetPost(tab.db, watx.ID(), watx.PrevATXID, 0, watx.SmesherID, watx.NumUnits, watx.PublishEpoch), + ) tab.atxsdata.AddFromAtx(toAtx(tb, &watx), false) return &watx } diff --git a/activation/handler_v1.go b/activation/handler_v1.go index a6bb961ec3..273edb0a07 100644 --- a/activation/handler_v1.go +++ b/activation/handler_v1.go @@ -483,7 +483,7 @@ func (h *HandlerV1) storeAtx( if err != nil && !errors.Is(err, sql.ErrObjectExists) { return fmt.Errorf("add atx to db: %w", err) } - err = atxs.SetPost(tx, atx.ID(), watx.PrevATXID, 0, atx.SmesherID, watx.NumUnits) + err = atxs.SetPost(tx, atx.ID(), watx.PrevATXID, 0, atx.SmesherID, watx.NumUnits, watx.PublishEpoch) if err != nil && !errors.Is(err, sql.ErrObjectExists) { return fmt.Errorf("set atx units: %w", err) } diff --git a/activation/handler_v2.go b/activation/handler_v2.go index 880d9ca6cb..f675eb2c7e 100644 --- a/activation/handler_v2.go +++ b/activation/handler_v2.go @@ -871,7 +871,7 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx return fmt.Errorf("add atx to db: %w", err) } for id, post := range watx.ids { - err = atxs.SetPost(tx, atx.ID(), post.previous, post.previousIndex, id, post.units) + err = atxs.SetPost(tx, atx.ID(), post.previous, post.previousIndex, id, post.units, atx.PublishEpoch) if err != nil && !errors.Is(err, sql.ErrObjectExists) { return fmt.Errorf("setting atx units for ID %s: %w", id, err) } diff --git a/activation/handler_v2_test.go b/activation/handler_v2_test.go index 85080e276d..f8a7502494 100644 --- a/activation/handler_v2_test.go +++ b/activation/handler_v2_test.go @@ -1436,7 +1436,7 @@ func Test_ValidatePreviousATX(t *testing.T) { t.Parallel() prev := &types.ActivationTx{} prev.SetID(types.RandomATXID()) - require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 13)) + require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 13, 0)) _, err := atxHandler.validatePreviousAtx(types.RandomNodeID(), &wire.SubPostV2{}, []*types.ActivationTx{prev}) require.Error(t, err) @@ -1447,8 +1447,8 @@ func Test_ValidatePreviousATX(t *testing.T) { other := types.RandomNodeID() prev := &types.ActivationTx{} prev.SetID(types.RandomATXID()) - require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, id, 7)) - require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13)) + require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, id, 7, 0)) + require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13, 0)) units, err := atxHandler.validatePreviousAtx(id, &wire.SubPostV2{NumUnits: 100}, []*types.ActivationTx{prev}) require.NoError(t, err) @@ -1468,7 +1468,7 @@ func Test_ValidatePreviousATX(t *testing.T) { other := types.RandomNodeID() prev := &types.ActivationTx{} prev.SetID(types.RandomATXID()) - require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13)) + require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13, 0)) _, err := atxHandler.validatePreviousAtx(id, &wire.SubPostV2{NumUnits: 100}, []*types.ActivationTx{prev}) require.Error(t, err) diff --git a/api/grpcserver/admin_service_test.go b/api/grpcserver/admin_service_test.go index 02bb7f805c..1aa256e3ba 100644 --- a/api/grpcserver/admin_service_test.go +++ b/api/grpcserver/admin_service_test.go @@ -39,7 +39,7 @@ func newAtx(tb testing.TB, db sql.StateDatabase) { atx.SmesherID = types.BytesToNodeID(types.RandomBytes(20)) atx.SetReceived(time.Now().Local()) require.NoError(tb, atxs.Add(db, atx, types.AtxBlob{})) - require.NoError(tb, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits)) + require.NoError(tb, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch)) } func createMesh(tb testing.TB, db sql.StateDatabase) { diff --git a/checkpoint/runner_test.go b/checkpoint/runner_test.go index 727da2843f..e1bf80cac9 100644 --- a/checkpoint/runner_test.go +++ b/checkpoint/runner_test.go @@ -268,7 +268,10 @@ func createMesh(t testing.TB, db sql.StateDatabase, miners []miner, accts []*typ for _, miner := range miners { for _, atx := range miner.atxs { require.NoError(t, atxs.Add(db, atx.ActivationTx, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx.ID(), atx.previous, 0, atx.SmesherID, atx.NumUnits)) + require.NoError( + t, + atxs.SetPost(db, atx.ID(), atx.previous, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch), + ) } if proof := miner.malfeasanceProof; len(proof) > 0 { require.NoError(t, identities.SetMalicious(db, miner.atxs[0].SmesherID, proof, time.Now())) @@ -400,7 +403,7 @@ func TestRunner_Generate_PreservesMarriageATX(t *testing.T) { } atx.SetID(types.RandomATXID()) require.NoError(t, atxs.Add(db, atx, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits)) + require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch)) fs := afero.NewMemMapFs() dir, err := afero.TempDir(fs, "", "Generate") diff --git a/common/types/poet.go b/common/types/poet.go index 89a62113ea..c8c8b95a51 100644 --- a/common/types/poet.go +++ b/common/types/poet.go @@ -16,7 +16,7 @@ import ( type PoetServer struct { Address string `mapstructure:"address" json:"address"` - Pubkey Base64Enc `mapstructure:"pubkey" json:"pubkey"` + Pubkey Base64Enc `mapstructure:"pubkey" json:"pubkey"` } type PoetProofRef Hash32 diff --git a/sql/atxs/atxs.go b/sql/atxs/atxs.go index 9d749a035b..a94f3afa73 100644 --- a/sql/atxs/atxs.go +++ b/sql/atxs/atxs.go @@ -258,9 +258,9 @@ func PrevIDByNodeID(db sql.Executor, nodeID types.NodeID, pubEpoch types.EpochID } if rows, err := db.Exec(` - SELECT posts.atxid FROM posts JOIN atxs ON posts.atxid = atxs.id - WHERE posts.pubkey = ?1 AND atxs.epoch < ?2 - ORDER BY atxs.epoch DESC + SELECT atxid FROM posts + WHERE pubkey = ?1 AND publish_epoch < ?2 + ORDER BY publish_epoch DESC LIMIT 1;`, enc, dec); err != nil { return types.EmptyATXID, fmt.Errorf("exec nodeID %v, epoch %d: %w", nodeID, pubEpoch, err) } else if rows == 0 { @@ -678,7 +678,7 @@ func AddCheckpointed(db sql.Executor, catx *CheckpointAtx) error { for id, units := range catx.Units { // FIXME: should a checkpointed ATX reference its real previous ATX? - if err := SetPost(db, catx.ID, types.EmptyATXID, 0, id, units); err != nil { + if err := SetPost(db, catx.ID, types.EmptyATXID, 0, id, units, catx.Epoch); err != nil { return fmt.Errorf("insert checkpoint ATX units %v: %w", catx.ID, err) } } @@ -958,9 +958,17 @@ func AllUnits(db sql.Executor, id types.ATXID) (map[types.NodeID]uint32, error) return units, nil } -func SetPost(db sql.Executor, atxID, prev types.ATXID, prevIndex int, id types.NodeID, units uint32) error { +func SetPost( + db sql.Executor, + atxID, prev types.ATXID, + prevIndex int, + id types.NodeID, + units uint32, + publish types.EpochID, +) error { _, err := db.Exec( - `INSERT INTO posts (atxid, pubkey, prev_atxid, prev_atx_index, units) VALUES (?1, ?2, ?3, ?4, ?5);`, + `INSERT INTO posts (atxid, pubkey, prev_atxid, prev_atx_index, units, publish_epoch) + VALUES (?1, ?2, ?3, ?4, ?5, ?6);`, func(stmt *sql.Statement) { stmt.BindBytes(1, atxID.Bytes()) stmt.BindBytes(2, id.Bytes()) @@ -969,6 +977,7 @@ func SetPost(db sql.Executor, atxID, prev types.ATXID, prevIndex int, id types.N } stmt.BindInt64(4, int64(prevIndex)) stmt.BindInt64(5, int64(units)) + stmt.BindInt64(6, int64(publish)) }, nil, ) diff --git a/sql/atxs/atxs_test.go b/sql/atxs/atxs_test.go index e07f04db51..91707aeb52 100644 --- a/sql/atxs/atxs_test.go +++ b/sql/atxs/atxs_test.go @@ -161,7 +161,10 @@ func TestLatestN(t *testing.T) { for _, atx := range []*types.ActivationTx{atx1, atx2, atx3, atx4, atx5, atx6} { require.NoError(t, atxs.Add(db, atx, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits)) + require.NoError( + t, + atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch), + ) } for _, tc := range []struct { @@ -1041,9 +1044,9 @@ func Test_PrevATXCollision(t *testing.T) { atx2 := newAtx(t, sig, withPublishEpoch(2)) require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx1.ID(), prevATXID, 0, sig.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), prevATXID, 0, sig.NodeID(), 10, atx1.PublishEpoch)) require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, sig.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, sig.NodeID(), 10, atx2.PublishEpoch)) // verify that the ATXs were added got1, err := atxs.Get(db, atx1.ID()) @@ -1063,7 +1066,7 @@ func Test_PrevATXCollision(t *testing.T) { atx2 := newAtx(t, otherSig, withPublishEpoch(types.EpochID(i+1))) require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, atx2.SmesherID, 10)) + require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, atx2.SmesherID, 10, atx2.PublishEpoch)) } collision1, collision2, err := atxs.PrevATXCollision(db, prevATXID, sig.NodeID()) @@ -1125,7 +1128,7 @@ func TestUnits(t *testing.T) { t.Parallel() db := statesql.InMemory() atxID := types.RandomATXID() - require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, types.RandomNodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, types.RandomNodeID(), 10, 0)) _, err := atxs.Units(db, atxID, types.RandomNodeID()) require.ErrorIs(t, err, sql.ErrNotFound) }) @@ -1138,7 +1141,7 @@ func TestUnits(t *testing.T) { {4, 5, 6}: 20, } for id, units := range units { - require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, id, units)) + require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, id, units, 0)) } nodeID := types.NodeID{1, 2, 3} @@ -1170,7 +1173,7 @@ func Test_AtxWithPrevious(t *testing.T) { prev := types.RandomATXID() atx := newAtx(t, sig) require.NoError(t, atxs.Add(db, atx, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10, atx.PublishEpoch)) id, err := atxs.AtxWithPrevious(db, prev, sig.NodeID()) require.NoError(t, err) @@ -1181,7 +1184,7 @@ func Test_AtxWithPrevious(t *testing.T) { atx := newAtx(t, sig) require.NoError(t, atxs.Add(db, atx, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, sig.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, sig.NodeID(), 10, atx.PublishEpoch)) id, err := atxs.AtxWithPrevious(db, types.EmptyATXID, sig.NodeID()) require.NoError(t, err) @@ -1196,11 +1199,11 @@ func Test_AtxWithPrevious(t *testing.T) { atx := newAtx(t, sig) require.NoError(t, atxs.Add(db, atx, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10, atx.PublishEpoch)) atx2 := newAtx(t, sig2) require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx2.ID(), prev, 0, sig2.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx2.ID(), prev, 0, sig2.NodeID(), 10, atx2.PublishEpoch)) id, err := atxs.AtxWithPrevious(db, prev, sig.NodeID()) require.NoError(t, err) @@ -1230,7 +1233,7 @@ func Test_FindDoublePublish(t *testing.T) { // one atx atx0 := newAtx(t, sig, withPublishEpoch(1)) require.NoError(t, atxs.Add(db, atx0, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10)) + require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx0.PublishEpoch)) _, err = atxs.FindDoublePublish(db, atx0.SmesherID, atx0.PublishEpoch) require.ErrorIs(t, err, sql.ErrNotFound) @@ -1238,7 +1241,7 @@ func Test_FindDoublePublish(t *testing.T) { // two atxs in different epochs atx1 := newAtx(t, sig, withPublishEpoch(atx0.PublishEpoch+1)) require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx1.PublishEpoch)) _, err = atxs.FindDoublePublish(db, atx0.SmesherID, atx0.PublishEpoch) require.ErrorIs(t, err, sql.ErrNotFound) @@ -1249,11 +1252,11 @@ func Test_FindDoublePublish(t *testing.T) { atx0 := newAtx(t, sig) require.NoError(t, atxs.Add(db, atx0, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10)) + require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx0.PublishEpoch)) atx1 := newAtx(t, sig) require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx1.PublishEpoch)) atxIDs, err := atxs.FindDoublePublish(db, atx0.SmesherID, atx0.PublishEpoch) require.NoError(t, err) @@ -1272,16 +1275,16 @@ func Test_FindDoublePublish(t *testing.T) { atx0 := newAtx(t, atx0Signer) require.NoError(t, atxs.Add(db, atx0, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10)) - require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, sig.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx0.PublishEpoch)) + require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, sig.NodeID(), 10, atx0.PublishEpoch)) atx1Signer, err := signing.NewEdSigner() require.NoError(t, err) atx1 := newAtx(t, atx1Signer) require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx1.SmesherID, 10)) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx1.SmesherID, 10, atx1.PublishEpoch)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 10, atx1.PublishEpoch)) atxIDs, err := atxs.FindDoublePublish(db, sig.NodeID(), atx0.PublishEpoch) require.NoError(t, err) @@ -1360,7 +1363,7 @@ func Test_Previous(t *testing.T) { for range 50 { prev := previousAtxs[rand.Intn(len(previousAtxs))] index := slices.Index(previousAtxs, prev) - require.NoError(t, atxs.SetPost(db, atx, prev, index, types.RandomNodeID(), 10)) + require.NoError(t, atxs.SetPost(db, atx, prev, index, types.RandomNodeID(), 10, 0)) } got, err := atxs.Previous(db, atx) @@ -1382,11 +1385,11 @@ func TestPrevIDByNodeID(t *testing.T) { atx1 := newAtx(t, sig, withPublishEpoch(1)) require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4, atx1.PublishEpoch)) atx2 := newAtx(t, sig, withPublishEpoch(2)) require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx2.ID(), types.EmptyATXID, 0, sig.NodeID(), 4)) + require.NoError(t, atxs.SetPost(db, atx2.ID(), types.EmptyATXID, 0, sig.NodeID(), 4, atx2.PublishEpoch)) _, err = atxs.PrevIDByNodeID(db, sig.NodeID(), 1) require.ErrorIs(t, err, sql.ErrNotFound) @@ -1407,14 +1410,17 @@ func TestPrevIDByNodeID(t *testing.T) { atx1 := newAtx(t, sig, withPublishEpoch(1)) require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4)) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, id, 8)) - require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 12)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4, atx1.PublishEpoch)) + require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, id, 8, atx1.PublishEpoch)) + require.NoError( + t, + atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 12, atx1.PublishEpoch), + ) atx2 := newAtx(t, sig, withPublishEpoch(2)) require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{})) - require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, sig.NodeID(), 4)) - require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, types.RandomNodeID(), 12)) + require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, sig.NodeID(), 4, atx2.PublishEpoch)) + require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, types.RandomNodeID(), 12, atx2.PublishEpoch)) prevID, err := atxs.PrevIDByNodeID(db, id, 3) require.NoError(t, err) diff --git a/sql/statesql/migrations/state_0021_migration.go b/sql/statesql/migrations/state_0021_migration.go index f1e5263798..b88471fbeb 100644 --- a/sql/statesql/migrations/state_0021_migration.go +++ b/sql/statesql/migrations/state_0021_migration.go @@ -10,7 +10,6 @@ import ( "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/sql" - "github.com/spacemeshos/go-spacemesh/sql/atxs" ) type migration0021 struct { @@ -160,7 +159,7 @@ func (m *migration0021) processBatch(db sql.Executor, offset, size int) (int, er func (m *migration0021) applyPendingUpdates(db sql.Executor, updates map[types.ATXID]*update) error { for atxID, upd := range updates { - if err := atxs.SetPost(db, atxID, upd.prev, 0, upd.id, upd.units); err != nil { + if err := setPost(db, atxID, upd.prev, 0, upd.id, upd.units); err != nil { return err } } @@ -183,3 +182,20 @@ func processATX(blob types.AtxBlob) (*update, error) { return nil, fmt.Errorf("unsupported ATX version: %d", blob.Version) } } + +func setPost(db sql.Executor, atxID, prev types.ATXID, prevIndex int, id types.NodeID, units uint32) error { + _, err := db.Exec( + `INSERT INTO posts (atxid, pubkey, prev_atxid, prev_atx_index, units) VALUES (?1, ?2, ?3, ?4, ?5);`, + func(stmt *sql.Statement) { + stmt.BindBytes(1, atxID.Bytes()) + stmt.BindBytes(2, id.Bytes()) + if prev != types.EmptyATXID { + stmt.BindBytes(3, prev.Bytes()) + } + stmt.BindInt64(4, int64(prevIndex)) + stmt.BindInt64(5, int64(units)) + }, + nil, + ) + return err +} diff --git a/sql/statesql/migrations/state_0021_migration_test.go b/sql/statesql/migrations/state_0021_migration_test.go index 8b1c354b96..8eed5a9844 100644 --- a/sql/statesql/migrations/state_0021_migration_test.go +++ b/sql/statesql/migrations/state_0021_migration_test.go @@ -20,7 +20,7 @@ func Test0021Migration(t *testing.T) { schema, err := statesql.Schema() require.NoError(t, err) schema.Migrations = slices.DeleteFunc(schema.Migrations, func(m sql.Migration) bool { - return m.Order() == 21 + return m.Order() >= 21 }) db := sql.InMemory( diff --git a/sql/statesql/schema/migrations/0023_posts_has_publish_epoch.sql b/sql/statesql/schema/migrations/0023_posts_has_publish_epoch.sql new file mode 100644 index 0000000000..bffbb1ca20 --- /dev/null +++ b/sql/statesql/schema/migrations/0023_posts_has_publish_epoch.sql @@ -0,0 +1,12 @@ +ALTER TABLE posts ADD COLUMN publish_epoch UNSIGNED INT; + +-- migrate data by iterating over all atxids in posts and inserting epoch for matching atx from atxs table by id column +UPDATE posts +SET publish_epoch = ( + SELECT epoch + FROM atxs + WHERE atxs.id = posts.atxid +); + + +CREATE INDEX posts_by_atxid_by_pubkey_epoch ON posts (pubkey, publish_epoch); diff --git a/sql/statesql/schema/schema.sql b/sql/statesql/schema/schema.sql index 4e115bd24a..cf71312a53 100755 --- a/sql/statesql/schema/schema.sql +++ b/sql/statesql/schema/schema.sql @@ -1,4 +1,4 @@ -PRAGMA user_version = 22; +PRAGMA user_version = 23; CREATE TABLE accounts ( address CHAR(24), @@ -110,8 +110,9 @@ CREATE TABLE posts ( prev_atxid CHAR(32), prev_atx_index INT, units INT NOT NULL -); +, publish_epoch UNSIGNED INT); CREATE UNIQUE INDEX posts_by_atxid_by_pubkey ON posts (atxid, pubkey); +CREATE INDEX posts_by_atxid_by_pubkey_epoch ON posts (pubkey, publish_epoch); CREATE INDEX posts_by_atxid_by_pubkey_prev_atxid ON posts (atxid, pubkey, prev_atxid); CREATE TABLE proposal_transactions (