Skip to content

Commit

Permalink
added draft implementation for epoch transition
Browse files Browse the repository at this point in the history
  • Loading branch information
0xluk committed Mar 6, 2024
1 parent f18e6de commit dfca681
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 140 deletions.
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func run() error {
}
Qubic struct {
NodePort string `conf:"default:21841"`
FallbackTick uint64 `conf:"default:12789267"`
StorageFolder string `conf:"default:store"`
ProcessTickTimeout time.Duration `conf:"default:5s"`
}
Expand Down Expand Up @@ -105,7 +104,7 @@ func run() error {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

p := processor.NewProcessor(chPool, ps, cfg.Qubic.FallbackTick, cfg.Qubic.ProcessTickTimeout)
p := processor.NewProcessor(chPool, ps, cfg.Qubic.ProcessTickTimeout)
archiveErrors := make(chan error, 1)

// Start the service listening for requests.
Expand Down
47 changes: 26 additions & 21 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator"
qubic "github.com/qubic/go-node-connector"
"github.com/qubic/go-node-connector/types"
"github.com/silenceper/pool"
"log"
"time"
Expand All @@ -25,18 +26,16 @@ func (e *TickInTheFutureError) Error() string {
}

type Processor struct {
pool pool.Pool
ps *store.PebbleStore
processTickTimeout time.Duration
fallbackNextProcessingTick uint64
pool pool.Pool
ps *store.PebbleStore
processTickTimeout time.Duration
}

func NewProcessor(pool pool.Pool, ps *store.PebbleStore, fallbackNextProcessingTick uint64, processTickTimeout time.Duration) *Processor {
func NewProcessor(pool pool.Pool, ps *store.PebbleStore, processTickTimeout time.Duration) *Processor {
return &Processor{
pool: pool,
ps: ps,
fallbackNextProcessingTick: fallbackNextProcessingTick,
processTickTimeout: processTickTimeout,
pool: pool,
ps: ps,
processTickTimeout: processTickTimeout,
}
}

Expand Down Expand Up @@ -76,15 +75,17 @@ func (p *Processor) processOneByOne() error {
}
}()

nextTick, err := p.getNextProcessingTick(ctx)
if err != nil {
return errors.Wrap(err, "getting next processing tick")
}
log.Printf("Next tick to process: %d\n", nextTick)
tickInfo, err := client.GetTickInfo(ctx)
if err != nil {
return errors.Wrap(err, "getting tick info")
}

nextTick, err := p.getNextProcessingTick(ctx, tickInfo)
if err != nil {
return errors.Wrap(err, "getting next processing tick")
}
log.Printf("Next tick to process: %d\n", nextTick)

if uint64(tickInfo.Tick) < nextTick {
err = newTickInTheFutureError(nextTick, uint64(tickInfo.Tick))
return err
Expand All @@ -96,23 +97,27 @@ func (p *Processor) processOneByOne() error {
return errors.Wrapf(err, "validating tick %d", nextTick)
}

err = p.ps.SetLastProcessedTick(ctx, nextTick)
err = p.ps.SetLastProcessedTick(ctx, nextTick, uint32(tickInfo.Epoch))
if err != nil {
return errors.Wrapf(err, "setting last processed tick %d", nextTick)
}

return nil
}

func (p *Processor) getNextProcessingTick(ctx context.Context) (uint64, error) {
func (p *Processor) getNextProcessingTick(ctx context.Context, currentTickInfo types.TickInfo) (uint64, error) {
lastTick, err := p.ps.GetLastProcessedTick(ctx)
if err == nil {
return lastTick + 1, nil
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return uint64(currentTickInfo.InitialTick), nil
}

return 0, errors.Wrap(err, "getting last processed tick")
}

if errors.Cause(err) == store.ErrNotFound {
return p.fallbackNextProcessingTick, nil
if uint64(currentTickInfo.InitialTick) > lastTick {
return uint64(currentTickInfo.InitialTick), nil
}

return 0, errors.Wrap(err, "getting last processed tick")
return lastTick + 1, nil
}
192 changes: 108 additions & 84 deletions protobuff/archive.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions protobuff/archive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message GetIdentityInfoResponse {
message GetLastProcessedTickRequest {}
message GetLastProcessedTickResponse{
uint32 last_processed_tick = 1;
map<uint32, uint64> last_processed_ticks_per_epoch = 2;
}

service ArchiveService {
Expand Down
9 changes: 7 additions & 2 deletions rpc/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *Server) GetQuorumTickData(ctx context.Context, req *protobuff.GetQuorum
return &protobuff.GetQuorumTickDataResponse{QuorumTickData: qtd}, nil
}
func (s *Server) GetComputors(ctx context.Context, req *protobuff.GetComputorsRequest) (*protobuff.GetComputorsResponse, error) {
computors, err := s.store.GetComputors(ctx, uint64(req.Epoch))
computors, err := s.store.GetComputors(ctx, req.Epoch)
if err != nil {
if errors.Cause(err) == store.ErrNotFound {
return nil, status.Errorf(codes.NotFound, "computors not found")
Expand Down Expand Up @@ -139,7 +139,12 @@ func (s *Server) GetLastProcessedTick(ctx context.Context, req *protobuff.GetLas
return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err)
}

return &protobuff.GetLastProcessedTickResponse{LastProcessedTick: uint32(tick)}, nil
lastProcessedTicksPerEpoch, err := s.store.GetLastProcessedTicksPerEpoch(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err)
}

return &protobuff.GetLastProcessedTickResponse{LastProcessedTick: uint32(tick), LastProcessedTicksPerEpoch: lastProcessedTicksPerEpoch}, nil
}

func (s *Server) Start() error {
Expand Down
21 changes: 14 additions & 7 deletions store/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
)

const (
TickData = 0x00
QuorumData = 0x01
ComputorList = 0x02
Transaction = 0x03
LastProcessedTick = 0x04
TickData = 0x00
QuorumData = 0x01
ComputorList = 0x02
Transaction = 0x03
LastProcessedTick = 0x04
LastProcessedTickPerEpoch = 0x05
)

func tickDataKey(tickNumber uint64) []byte {
Expand All @@ -26,9 +27,9 @@ func quorumTickDataKey(tickNumber uint64) []byte {
return key
}

func computorsKey(epochNumber uint64) []byte {
func computorsKey(epochNumber uint32) []byte {
key := []byte{ComputorList}
key = binary.BigEndian.AppendUint64(key, epochNumber)
key = binary.BigEndian.AppendUint32(key, epochNumber)

return key
}
Expand All @@ -42,5 +43,11 @@ func tickTxKey(txID string) ([]byte, error) {

func lastProcessedTickKey() []byte {
return []byte{LastProcessedTick}
}

func lastProcessedTickKeyPerEpoch(epochNumber uint32) []byte {
key := []byte{LastProcessedTickPerEpoch}
key = binary.BigEndian.AppendUint32(key, epochNumber)

return key
}
59 changes: 54 additions & 5 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"log"
"strconv"
)

var ErrNotFound = errors.New("store resource not found")
Expand Down Expand Up @@ -92,7 +94,7 @@ func (s *PebbleStore) SetQuorumTickData(ctx context.Context, tickNumber uint64,
return nil
}

func (s *PebbleStore) GetComputors(ctx context.Context, epoch uint64) (*protobuff.Computors, error) {
func (s *PebbleStore) GetComputors(ctx context.Context, epoch uint32) (*protobuff.Computors, error) {
key := computorsKey(epoch)

value, closer, err := s.db.Get(key)
Expand All @@ -113,7 +115,7 @@ func (s *PebbleStore) GetComputors(ctx context.Context, epoch uint64) (*protobuf
return &computors, nil
}

func (s *PebbleStore) SetComputors(ctx context.Context, epoch uint64, computors *protobuff.Computors) error {
func (s *PebbleStore) SetComputors(ctx context.Context, epoch uint32, computors *protobuff.Computors) error {
key := computorsKey(epoch)

serialized, err := proto.Marshal(computors)
Expand Down Expand Up @@ -208,16 +210,33 @@ func (s *PebbleStore) GetTransaction(ctx context.Context, txID string) (*protobu
return &tx, nil
}

func (s *PebbleStore) SetLastProcessedTick(ctx context.Context, tickNumber uint64) error {
key := lastProcessedTickKey()
func (s *PebbleStore) SetLastProcessedTick(ctx context.Context, tickNumber uint64, epochNumber uint32) error {
batch := s.db.NewBatch()
defer batch.Close()

key := lastProcessedTickKeyPerEpoch(epochNumber)
value := make([]byte, 8)
binary.LittleEndian.PutUint64(value, tickNumber)

err := s.db.Set(key, value, &pebble.WriteOptions{Sync: true})
err := batch.Set(key, value, pebble.Sync)
if err != nil {
return errors.Wrap(err, "setting last processed tick")
}

key = lastProcessedTickKey()
value = make([]byte, 8)
binary.LittleEndian.PutUint64(value, tickNumber)

err = batch.Set(key, value, pebble.Sync)
if err != nil {
return errors.Wrap(err, "setting last processed tick")
}

err = batch.Commit(pebble.Sync)
if err != nil {
return errors.Wrap(err, "committing batch")
}

return nil
}

Expand All @@ -235,3 +254,33 @@ func (s *PebbleStore) GetLastProcessedTick(ctx context.Context) (uint64, error)

return binary.LittleEndian.Uint64(value), nil
}

func (s *PebbleStore) GetLastProcessedTicksPerEpoch(ctx context.Context) (map[uint32]uint64, error) {
maxUint64 := ^uint64(0)
upperBound := append([]byte{LastProcessedTickPerEpoch}, []byte(strconv.FormatUint(maxUint64, 10))...)
iter, err := s.db.NewIter(&pebble.IterOptions{
LowerBound: []byte{LastProcessedTickPerEpoch},
UpperBound: upperBound,
})
if err != nil {
return nil, errors.Wrap(err, "creating iter")
}
defer iter.Close()

ticksPerEpoch := make(map[uint32]uint64)
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()

value, err := iter.ValueAndErr()
if err != nil {
return nil, errors.Wrap(err, "getting value from iter")
}

epochNumber := binary.BigEndian.Uint32(key[1:])
tickNumber := binary.LittleEndian.Uint64(value)
ticksPerEpoch[epochNumber] = tickNumber
log.Printf("key: %s, value: %s\n", key, value)
}

return ticksPerEpoch, nil
}
57 changes: 56 additions & 1 deletion store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestPebbleStore_Computors(t *testing.T) {
store := NewPebbleStore(db, logger)

// Sample Computors for testing
epoch := uint64(1) // Convert epoch to uint64 as per method requirements
epoch := uint32(1) // Convert epoch to uint32 as per method requirements
computors := &pb.Computors{
Epoch: 1,
Identities: []string{"identity1", "identity2"},
Expand Down Expand Up @@ -316,3 +316,58 @@ func TestPebbleStore_GetTransaction(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, ErrNotFound, err)
}

func TestSetAndGetLastProcessedTicksPerEpoch(t *testing.T) {
ctx := context.Background()

// Setup test environment
dbDir, err := os.MkdirTemp("", "pebble_test")
assert.NoError(t, err)
defer os.RemoveAll(dbDir)

db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{})
assert.NoError(t, err)
defer db.Close()

logger, _ := zap.NewDevelopment()
store := NewPebbleStore(db, logger)

// Set last processed ticks per epoch
err = store.SetLastProcessedTick(ctx, 16, 1)
assert.NoError(t, err)

// Get last processed tick per epoch
lastProcessedTick, err := store.GetLastProcessedTick(ctx)
assert.NoError(t, err)
assert.Equal(t, uint64(16), lastProcessedTick)

lastProcessedTicksPerEpoch, err := store.GetLastProcessedTicksPerEpoch(ctx)
assert.NoError(t, err)
assert.Equal(t, map[uint32]uint64{1: 16}, lastProcessedTicksPerEpoch)

// Set last processed ticks per epoch
err = store.SetLastProcessedTick(ctx, 17, 1)
assert.NoError(t, err)

// Get last processed tick per epoch
lastProcessedTick, err = store.GetLastProcessedTick(ctx)
assert.NoError(t, err)
assert.Equal(t, uint64(17), lastProcessedTick)

lastProcessedTicksPerEpoch, err = store.GetLastProcessedTicksPerEpoch(ctx)
assert.NoError(t, err)
assert.Equal(t, map[uint32]uint64{1: 17}, lastProcessedTicksPerEpoch)

// Set last processed ticks per epoch
err = store.SetLastProcessedTick(ctx, 18, 2)
assert.NoError(t, err)

// Get last processed tick per epoch
lastProcessedTick, err = store.GetLastProcessedTick(ctx)
assert.NoError(t, err)
assert.Equal(t, uint64(18), lastProcessedTick)

lastProcessedTicksPerEpoch, err = store.GetLastProcessedTicksPerEpoch(ctx)
assert.NoError(t, err)
assert.Equal(t, map[uint32]uint64{1: 17, 2: 18}, lastProcessedTicksPerEpoch)
}
14 changes: 0 additions & 14 deletions types.go

This file was deleted.

5 changes: 2 additions & 3 deletions validator/computors/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/pkg/errors"
"github.com/qubic/go-archiver/utils"
"github.com/qubic/go-node-connector/types"

)

func Validate(ctx context.Context, computors types.Computors) error {
Expand Down Expand Up @@ -49,15 +48,15 @@ func getDigestFromComputors(data types.Computors) ([32]byte, error) {
func Store(ctx context.Context, store *store.PebbleStore, computors types.Computors) error {
protoModel := qubicToProto(computors)

err := store.SetComputors(ctx, uint64(protoModel.Epoch), protoModel)
err := store.SetComputors(ctx, protoModel.Epoch, protoModel)
if err != nil {
return errors.Wrap(err, "set computors")
}

return nil
}

func Get(ctx context.Context, store *store.PebbleStore, epoch uint64) (types.Computors, error) {
func Get(ctx context.Context, store *store.PebbleStore, epoch uint32) (types.Computors, error) {
protoModel, err := store.GetComputors(ctx, epoch)
if err != nil {
return types.Computors{}, errors.Wrap(err, "get computors")
Expand Down
2 changes: 1 addition & 1 deletion validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (v *Validator) ValidateTick(ctx context.Context, tickNumber uint64) error {
//getting computors from storage, otherwise get it from a node
epoch := quorumVotes[0].Epoch
var comps types.Computors
comps, err = computors.Get(ctx, v.store, uint64(epoch))
comps, err = computors.Get(ctx, v.store, uint32(epoch))
if err != nil {
if errors.Cause(err) != store.ErrNotFound {
return errors.Wrap(err, "getting computors from store")
Expand Down

0 comments on commit dfca681

Please sign in to comment.