Skip to content

Commit

Permalink
fix slow lookup of previous ATX for (ID, epoch) (#6314)
Browse files Browse the repository at this point in the history
## Motivation

Speed up `atxs.PrevIdByNodeId`
  • Loading branch information
poszu committed Sep 3, 2024
1 parent f5f96b9 commit 138c61c
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 48 deletions.
5 changes: 4 additions & 1 deletion activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ func publishAtxV1(
return codec.Decode(got, &watx)
})
require.NoError(tb, atxs.Add(tab.db, toAtx(tb, &watx), watx.Blob()))
require.NoError(tb, atxs.SetPost(tab.db, watx.ID(), watx.PrevATXID, 0, watx.SmesherID, watx.NumUnits))
require.NoError(
tb,
atxs.SetPost(tab.db, watx.ID(), watx.PrevATXID, 0, watx.SmesherID, watx.NumUnits, watx.PublishEpoch),
)
tab.atxsdata.AddFromAtx(toAtx(tb, &watx), false)
return &watx
}
Expand Down
2 changes: 1 addition & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (h *HandlerV1) storeAtx(
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return fmt.Errorf("add atx to db: %w", err)
}
err = atxs.SetPost(tx, atx.ID(), watx.PrevATXID, 0, atx.SmesherID, watx.NumUnits)
err = atxs.SetPost(tx, atx.ID(), watx.PrevATXID, 0, atx.SmesherID, watx.NumUnits, watx.PublishEpoch)
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return fmt.Errorf("set atx units: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
return fmt.Errorf("add atx to db: %w", err)
}
for id, post := range watx.ids {
err = atxs.SetPost(tx, atx.ID(), post.previous, post.previousIndex, id, post.units)
err = atxs.SetPost(tx, atx.ID(), post.previous, post.previousIndex, id, post.units, atx.PublishEpoch)
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return fmt.Errorf("setting atx units for ID %s: %w", id, err)
}
Expand Down
8 changes: 4 additions & 4 deletions activation/handler_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ func Test_ValidatePreviousATX(t *testing.T) {
t.Parallel()
prev := &types.ActivationTx{}
prev.SetID(types.RandomATXID())
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 13))
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 13, 0))

_, err := atxHandler.validatePreviousAtx(types.RandomNodeID(), &wire.SubPostV2{}, []*types.ActivationTx{prev})
require.Error(t, err)
Expand All @@ -1447,8 +1447,8 @@ func Test_ValidatePreviousATX(t *testing.T) {
other := types.RandomNodeID()
prev := &types.ActivationTx{}
prev.SetID(types.RandomATXID())
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, id, 7))
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13))
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, id, 7, 0))
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13, 0))

units, err := atxHandler.validatePreviousAtx(id, &wire.SubPostV2{NumUnits: 100}, []*types.ActivationTx{prev})
require.NoError(t, err)
Expand All @@ -1468,7 +1468,7 @@ func Test_ValidatePreviousATX(t *testing.T) {
other := types.RandomNodeID()
prev := &types.ActivationTx{}
prev.SetID(types.RandomATXID())
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13))
require.NoError(t, atxs.SetPost(atxHandler.cdb, prev.ID(), types.EmptyATXID, 0, other, 13, 0))

_, err := atxHandler.validatePreviousAtx(id, &wire.SubPostV2{NumUnits: 100}, []*types.ActivationTx{prev})
require.Error(t, err)
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/admin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newAtx(tb testing.TB, db sql.StateDatabase) {
atx.SmesherID = types.BytesToNodeID(types.RandomBytes(20))
atx.SetReceived(time.Now().Local())
require.NoError(tb, atxs.Add(db, atx, types.AtxBlob{}))
require.NoError(tb, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits))
require.NoError(tb, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch))
}

func createMesh(tb testing.TB, db sql.StateDatabase) {
Expand Down
7 changes: 5 additions & 2 deletions checkpoint/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ func createMesh(t testing.TB, db sql.StateDatabase, miners []miner, accts []*typ
for _, miner := range miners {
for _, atx := range miner.atxs {
require.NoError(t, atxs.Add(db, atx.ActivationTx, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx.ID(), atx.previous, 0, atx.SmesherID, atx.NumUnits))
require.NoError(
t,
atxs.SetPost(db, atx.ID(), atx.previous, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch),
)
}
if proof := miner.malfeasanceProof; len(proof) > 0 {
require.NoError(t, identities.SetMalicious(db, miner.atxs[0].SmesherID, proof, time.Now()))
Expand Down Expand Up @@ -400,7 +403,7 @@ func TestRunner_Generate_PreservesMarriageATX(t *testing.T) {
}
atx.SetID(types.RandomATXID())
require.NoError(t, atxs.Add(db, atx, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits))
require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch))

fs := afero.NewMemMapFs()
dir, err := afero.TempDir(fs, "", "Generate")
Expand Down
2 changes: 1 addition & 1 deletion common/types/poet.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

type PoetServer struct {
Address string `mapstructure:"address" json:"address"`
Pubkey Base64Enc `mapstructure:"pubkey" json:"pubkey"`
Pubkey Base64Enc `mapstructure:"pubkey" json:"pubkey"`
}

type PoetProofRef Hash32
Expand Down
21 changes: 15 additions & 6 deletions sql/atxs/atxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ func PrevIDByNodeID(db sql.Executor, nodeID types.NodeID, pubEpoch types.EpochID
}

if rows, err := db.Exec(`
SELECT posts.atxid FROM posts JOIN atxs ON posts.atxid = atxs.id
WHERE posts.pubkey = ?1 AND atxs.epoch < ?2
ORDER BY atxs.epoch DESC
SELECT atxid FROM posts
WHERE pubkey = ?1 AND publish_epoch < ?2
ORDER BY publish_epoch DESC
LIMIT 1;`, enc, dec); err != nil {
return types.EmptyATXID, fmt.Errorf("exec nodeID %v, epoch %d: %w", nodeID, pubEpoch, err)
} else if rows == 0 {
Expand Down Expand Up @@ -678,7 +678,7 @@ func AddCheckpointed(db sql.Executor, catx *CheckpointAtx) error {

for id, units := range catx.Units {
// FIXME: should a checkpointed ATX reference its real previous ATX?
if err := SetPost(db, catx.ID, types.EmptyATXID, 0, id, units); err != nil {
if err := SetPost(db, catx.ID, types.EmptyATXID, 0, id, units, catx.Epoch); err != nil {
return fmt.Errorf("insert checkpoint ATX units %v: %w", catx.ID, err)
}
}
Expand Down Expand Up @@ -958,9 +958,17 @@ func AllUnits(db sql.Executor, id types.ATXID) (map[types.NodeID]uint32, error)
return units, nil
}

func SetPost(db sql.Executor, atxID, prev types.ATXID, prevIndex int, id types.NodeID, units uint32) error {
func SetPost(
db sql.Executor,
atxID, prev types.ATXID,
prevIndex int,
id types.NodeID,
units uint32,
publish types.EpochID,
) error {
_, err := db.Exec(
`INSERT INTO posts (atxid, pubkey, prev_atxid, prev_atx_index, units) VALUES (?1, ?2, ?3, ?4, ?5);`,
`INSERT INTO posts (atxid, pubkey, prev_atxid, prev_atx_index, units, publish_epoch)
VALUES (?1, ?2, ?3, ?4, ?5, ?6);`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, atxID.Bytes())
stmt.BindBytes(2, id.Bytes())
Expand All @@ -969,6 +977,7 @@ func SetPost(db sql.Executor, atxID, prev types.ATXID, prevIndex int, id types.N
}
stmt.BindInt64(4, int64(prevIndex))
stmt.BindInt64(5, int64(units))
stmt.BindInt64(6, int64(publish))
},
nil,
)
Expand Down
58 changes: 32 additions & 26 deletions sql/atxs/atxs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ func TestLatestN(t *testing.T) {

for _, atx := range []*types.ActivationTx{atx1, atx2, atx3, atx4, atx5, atx6} {
require.NoError(t, atxs.Add(db, atx, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits))
require.NoError(
t,
atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, atx.SmesherID, atx.NumUnits, atx.PublishEpoch),
)
}

for _, tc := range []struct {
Expand Down Expand Up @@ -1041,9 +1044,9 @@ func Test_PrevATXCollision(t *testing.T) {
atx2 := newAtx(t, sig, withPublishEpoch(2))

require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx1.ID(), prevATXID, 0, sig.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx1.ID(), prevATXID, 0, sig.NodeID(), 10, atx1.PublishEpoch))
require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, sig.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, sig.NodeID(), 10, atx2.PublishEpoch))

// verify that the ATXs were added
got1, err := atxs.Get(db, atx1.ID())
Expand All @@ -1063,7 +1066,7 @@ func Test_PrevATXCollision(t *testing.T) {

atx2 := newAtx(t, otherSig, withPublishEpoch(types.EpochID(i+1)))
require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, atx2.SmesherID, 10))
require.NoError(t, atxs.SetPost(db, atx2.ID(), prevATXID, 0, atx2.SmesherID, 10, atx2.PublishEpoch))
}

collision1, collision2, err := atxs.PrevATXCollision(db, prevATXID, sig.NodeID())
Expand Down Expand Up @@ -1125,7 +1128,7 @@ func TestUnits(t *testing.T) {
t.Parallel()
db := statesql.InMemory()
atxID := types.RandomATXID()
require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, types.RandomNodeID(), 10))
require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, types.RandomNodeID(), 10, 0))
_, err := atxs.Units(db, atxID, types.RandomNodeID())
require.ErrorIs(t, err, sql.ErrNotFound)
})
Expand All @@ -1138,7 +1141,7 @@ func TestUnits(t *testing.T) {
{4, 5, 6}: 20,
}
for id, units := range units {
require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, id, units))
require.NoError(t, atxs.SetPost(db, atxID, types.EmptyATXID, 0, id, units, 0))
}

nodeID := types.NodeID{1, 2, 3}
Expand Down Expand Up @@ -1170,7 +1173,7 @@ func Test_AtxWithPrevious(t *testing.T) {
prev := types.RandomATXID()
atx := newAtx(t, sig)
require.NoError(t, atxs.Add(db, atx, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10, atx.PublishEpoch))

id, err := atxs.AtxWithPrevious(db, prev, sig.NodeID())
require.NoError(t, err)
Expand All @@ -1181,7 +1184,7 @@ func Test_AtxWithPrevious(t *testing.T) {

atx := newAtx(t, sig)
require.NoError(t, atxs.Add(db, atx, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, sig.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx.ID(), types.EmptyATXID, 0, sig.NodeID(), 10, atx.PublishEpoch))

id, err := atxs.AtxWithPrevious(db, types.EmptyATXID, sig.NodeID())
require.NoError(t, err)
Expand All @@ -1196,11 +1199,11 @@ func Test_AtxWithPrevious(t *testing.T) {

atx := newAtx(t, sig)
require.NoError(t, atxs.Add(db, atx, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx.ID(), prev, 0, sig.NodeID(), 10, atx.PublishEpoch))

atx2 := newAtx(t, sig2)
require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx2.ID(), prev, 0, sig2.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx2.ID(), prev, 0, sig2.NodeID(), 10, atx2.PublishEpoch))

id, err := atxs.AtxWithPrevious(db, prev, sig.NodeID())
require.NoError(t, err)
Expand Down Expand Up @@ -1230,15 +1233,15 @@ func Test_FindDoublePublish(t *testing.T) {
// one atx
atx0 := newAtx(t, sig, withPublishEpoch(1))
require.NoError(t, atxs.Add(db, atx0, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx0.PublishEpoch))

_, err = atxs.FindDoublePublish(db, atx0.SmesherID, atx0.PublishEpoch)
require.ErrorIs(t, err, sql.ErrNotFound)

// two atxs in different epochs
atx1 := newAtx(t, sig, withPublishEpoch(atx0.PublishEpoch+1))
require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx1.PublishEpoch))

_, err = atxs.FindDoublePublish(db, atx0.SmesherID, atx0.PublishEpoch)
require.ErrorIs(t, err, sql.ErrNotFound)
Expand All @@ -1249,11 +1252,11 @@ func Test_FindDoublePublish(t *testing.T) {

atx0 := newAtx(t, sig)
require.NoError(t, atxs.Add(db, atx0, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx0.PublishEpoch))

atx1 := newAtx(t, sig)
require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx1.PublishEpoch))

atxIDs, err := atxs.FindDoublePublish(db, atx0.SmesherID, atx0.PublishEpoch)
require.NoError(t, err)
Expand All @@ -1272,16 +1275,16 @@ func Test_FindDoublePublish(t *testing.T) {

atx0 := newAtx(t, atx0Signer)
require.NoError(t, atxs.Add(db, atx0, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, sig.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, atx0.SmesherID, 10, atx0.PublishEpoch))
require.NoError(t, atxs.SetPost(db, atx0.ID(), types.EmptyATXID, 0, sig.NodeID(), 10, atx0.PublishEpoch))

atx1Signer, err := signing.NewEdSigner()
require.NoError(t, err)

atx1 := newAtx(t, atx1Signer)
require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx1.SmesherID, 10))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, atx1.SmesherID, 10, atx1.PublishEpoch))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 10, atx1.PublishEpoch))

atxIDs, err := atxs.FindDoublePublish(db, sig.NodeID(), atx0.PublishEpoch)
require.NoError(t, err)
Expand Down Expand Up @@ -1360,7 +1363,7 @@ func Test_Previous(t *testing.T) {
for range 50 {
prev := previousAtxs[rand.Intn(len(previousAtxs))]
index := slices.Index(previousAtxs, prev)
require.NoError(t, atxs.SetPost(db, atx, prev, index, types.RandomNodeID(), 10))
require.NoError(t, atxs.SetPost(db, atx, prev, index, types.RandomNodeID(), 10, 0))
}

got, err := atxs.Previous(db, atx)
Expand All @@ -1382,11 +1385,11 @@ func TestPrevIDByNodeID(t *testing.T) {

atx1 := newAtx(t, sig, withPublishEpoch(1))
require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4, atx1.PublishEpoch))

atx2 := newAtx(t, sig, withPublishEpoch(2))
require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx2.ID(), types.EmptyATXID, 0, sig.NodeID(), 4))
require.NoError(t, atxs.SetPost(db, atx2.ID(), types.EmptyATXID, 0, sig.NodeID(), 4, atx2.PublishEpoch))

_, err = atxs.PrevIDByNodeID(db, sig.NodeID(), 1)
require.ErrorIs(t, err, sql.ErrNotFound)
Expand All @@ -1407,14 +1410,17 @@ func TestPrevIDByNodeID(t *testing.T) {

atx1 := newAtx(t, sig, withPublishEpoch(1))
require.NoError(t, atxs.Add(db, atx1, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, id, 8))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 12))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, sig.NodeID(), 4, atx1.PublishEpoch))
require.NoError(t, atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, id, 8, atx1.PublishEpoch))
require.NoError(
t,
atxs.SetPost(db, atx1.ID(), types.EmptyATXID, 0, types.RandomNodeID(), 12, atx1.PublishEpoch),
)

atx2 := newAtx(t, sig, withPublishEpoch(2))
require.NoError(t, atxs.Add(db, atx2, types.AtxBlob{}))
require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, sig.NodeID(), 4))
require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, types.RandomNodeID(), 12))
require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, sig.NodeID(), 4, atx2.PublishEpoch))
require.NoError(t, atxs.SetPost(db, atx2.ID(), atx1.ID(), 0, types.RandomNodeID(), 12, atx2.PublishEpoch))

prevID, err := atxs.PrevIDByNodeID(db, id, 3)
require.NoError(t, err)
Expand Down
20 changes: 18 additions & 2 deletions sql/statesql/migrations/state_0021_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

type migration0021 struct {
Expand Down Expand Up @@ -160,7 +159,7 @@ func (m *migration0021) processBatch(db sql.Executor, offset, size int) (int, er

func (m *migration0021) applyPendingUpdates(db sql.Executor, updates map[types.ATXID]*update) error {
for atxID, upd := range updates {
if err := atxs.SetPost(db, atxID, upd.prev, 0, upd.id, upd.units); err != nil {
if err := setPost(db, atxID, upd.prev, 0, upd.id, upd.units); err != nil {
return err
}
}
Expand All @@ -183,3 +182,20 @@ func processATX(blob types.AtxBlob) (*update, error) {
return nil, fmt.Errorf("unsupported ATX version: %d", blob.Version)
}
}

func setPost(db sql.Executor, atxID, prev types.ATXID, prevIndex int, id types.NodeID, units uint32) error {
_, err := db.Exec(
`INSERT INTO posts (atxid, pubkey, prev_atxid, prev_atx_index, units) VALUES (?1, ?2, ?3, ?4, ?5);`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, atxID.Bytes())
stmt.BindBytes(2, id.Bytes())
if prev != types.EmptyATXID {
stmt.BindBytes(3, prev.Bytes())
}
stmt.BindInt64(4, int64(prevIndex))
stmt.BindInt64(5, int64(units))
},
nil,
)
return err
}
2 changes: 1 addition & 1 deletion sql/statesql/migrations/state_0021_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test0021Migration(t *testing.T) {
schema, err := statesql.Schema()
require.NoError(t, err)
schema.Migrations = slices.DeleteFunc(schema.Migrations, func(m sql.Migration) bool {
return m.Order() == 21
return m.Order() >= 21
})

db := sql.InMemory(
Expand Down
12 changes: 12 additions & 0 deletions sql/statesql/schema/migrations/0023_posts_has_publish_epoch.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE posts ADD COLUMN publish_epoch UNSIGNED INT;

-- migrate data by iterating over all atxids in posts and inserting epoch for matching atx from atxs table by id column
UPDATE posts
SET publish_epoch = (
SELECT epoch
FROM atxs
WHERE atxs.id = posts.atxid
);


CREATE INDEX posts_by_atxid_by_pubkey_epoch ON posts (pubkey, publish_epoch);
Loading

0 comments on commit 138c61c

Please sign in to comment.