diff --git a/main.go b/main.go index 4f37970..4896921 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,6 @@ func run() error { } Qubic struct { NodePort string `conf:"default:21841"` - FallbackTick uint64 `conf:"default:12789267"` StorageFolder string `conf:"default:store"` ProcessTickTimeout time.Duration `conf:"default:5s"` } @@ -105,7 +104,7 @@ func run() error { shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) - p := processor.NewProcessor(chPool, ps, cfg.Qubic.FallbackTick, cfg.Qubic.ProcessTickTimeout) + p := processor.NewProcessor(chPool, ps, cfg.Qubic.ProcessTickTimeout) archiveErrors := make(chan error, 1) // Start the service listening for requests. diff --git a/processor/processor.go b/processor/processor.go index dab9a52..1782d60 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -6,6 +6,7 @@ import ( "github.com/qubic/go-archiver/store" "github.com/qubic/go-archiver/validator" qubic "github.com/qubic/go-node-connector" + "github.com/qubic/go-node-connector/types" "github.com/silenceper/pool" "log" "time" @@ -25,18 +26,16 @@ func (e *TickInTheFutureError) Error() string { } type Processor struct { - pool pool.Pool - ps *store.PebbleStore - processTickTimeout time.Duration - fallbackNextProcessingTick uint64 + pool pool.Pool + ps *store.PebbleStore + processTickTimeout time.Duration } -func NewProcessor(pool pool.Pool, ps *store.PebbleStore, fallbackNextProcessingTick uint64, processTickTimeout time.Duration) *Processor { +func NewProcessor(pool pool.Pool, ps *store.PebbleStore, processTickTimeout time.Duration) *Processor { return &Processor{ - pool: pool, - ps: ps, - fallbackNextProcessingTick: fallbackNextProcessingTick, - processTickTimeout: processTickTimeout, + pool: pool, + ps: ps, + processTickTimeout: processTickTimeout, } } @@ -76,15 +75,17 @@ func (p *Processor) processOneByOne() error { } }() - nextTick, err := p.getNextProcessingTick(ctx) - if err != nil { - return errors.Wrap(err, "getting next processing tick") - } - log.Printf("Next tick to process: %d\n", nextTick) tickInfo, err := client.GetTickInfo(ctx) if err != nil { return errors.Wrap(err, "getting tick info") } + + nextTick, err := p.getNextProcessingTick(ctx, tickInfo) + if err != nil { + return errors.Wrap(err, "getting next processing tick") + } + log.Printf("Next tick to process: %d\n", nextTick) + if uint64(tickInfo.Tick) < nextTick { err = newTickInTheFutureError(nextTick, uint64(tickInfo.Tick)) return err @@ -96,7 +97,7 @@ func (p *Processor) processOneByOne() error { return errors.Wrapf(err, "validating tick %d", nextTick) } - err = p.ps.SetLastProcessedTick(ctx, nextTick) + err = p.ps.SetLastProcessedTick(ctx, nextTick, uint32(tickInfo.Epoch)) if err != nil { return errors.Wrapf(err, "setting last processed tick %d", nextTick) } @@ -104,15 +105,19 @@ func (p *Processor) processOneByOne() error { return nil } -func (p *Processor) getNextProcessingTick(ctx context.Context) (uint64, error) { +func (p *Processor) getNextProcessingTick(ctx context.Context, currentTickInfo types.TickInfo) (uint64, error) { lastTick, err := p.ps.GetLastProcessedTick(ctx) - if err == nil { - return lastTick + 1, nil + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return uint64(currentTickInfo.InitialTick), nil + } + + return 0, errors.Wrap(err, "getting last processed tick") } - if errors.Cause(err) == store.ErrNotFound { - return p.fallbackNextProcessingTick, nil + if uint64(currentTickInfo.InitialTick) > lastTick { + return uint64(currentTickInfo.InitialTick), nil } - return 0, errors.Wrap(err, "getting last processed tick") + return lastTick + 1, nil } diff --git a/protobuff/archive.pb.go b/protobuff/archive.pb.go index 0adbc00..1e14467 100644 --- a/protobuff/archive.pb.go +++ b/protobuff/archive.pb.go @@ -1331,7 +1331,8 @@ type GetLastProcessedTickResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - LastProcessedTick uint32 `protobuf:"varint,1,opt,name=last_processed_tick,json=lastProcessedTick,proto3" json:"last_processed_tick,omitempty"` + LastProcessedTick uint32 `protobuf:"varint,1,opt,name=last_processed_tick,json=lastProcessedTick,proto3" json:"last_processed_tick,omitempty"` + LastProcessedTicksPerEpoch map[uint32]uint64 `protobuf:"bytes,2,rep,name=last_processed_ticks_per_epoch,json=lastProcessedTicksPerEpoch,proto3" json:"last_processed_ticks_per_epoch,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` } func (x *GetLastProcessedTickResponse) Reset() { @@ -1373,6 +1374,13 @@ func (x *GetLastProcessedTickResponse) GetLastProcessedTick() uint32 { return 0 } +func (x *GetLastProcessedTickResponse) GetLastProcessedTicksPerEpoch() map[uint32]uint64 { + if x != nil { + return x.LastProcessedTicksPerEpoch + } + return nil +} + var File_archive_proto protoreflect.FileDescriptor var file_archive_proto_rawDesc = []byte{ @@ -1588,71 +1596,85 @@ var file_archive_proto_rawDesc = []byte{ 0x62, 0x2e, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0c, 0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x1d, 0x0a, 0x1b, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, - 0x54, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x4e, 0x0a, 0x1c, 0x47, - 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, - 0x69, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x6c, - 0x61, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x5f, 0x74, 0x69, - 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x50, 0x72, - 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x32, 0xf1, 0x06, 0x0a, 0x0e, - 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x6c, - 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x12, 0x2d, 0x2e, - 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, - 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, - 0x6b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x71, - 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, - 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, - 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x84, 0x01, 0x0a, - 0x13, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x35, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, - 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, - 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x36, 0x2e, 0x71, 0x75, + 0x54, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xbb, 0x02, 0x0a, 0x1c, + 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, + 0x54, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x13, + 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x5f, 0x74, + 0x69, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x50, + 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x12, 0x9b, 0x01, 0x0a, + 0x1e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x5f, + 0x74, 0x69, 0x63, 0x6b, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x65, 0x70, 0x6f, 0x63, 0x68, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x57, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, + 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, + 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, + 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x4c, + 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, + 0x73, 0x50, 0x65, 0x72, 0x45, 0x70, 0x6f, 0x63, 0x68, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x1a, + 0x6c, 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, + 0x6b, 0x73, 0x50, 0x65, 0x72, 0x45, 0x70, 0x6f, 0x63, 0x68, 0x1a, 0x4d, 0x0a, 0x1f, 0x4c, 0x61, + 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x73, + 0x50, 0x65, 0x72, 0x45, 0x70, 0x6f, 0x63, 0x68, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xf1, 0x06, 0x0a, 0x0e, 0x41, 0x72, + 0x63, 0x68, 0x69, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x6c, 0x0a, 0x0b, + 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x12, 0x2d, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, - 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x75, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x30, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, + 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x44, + 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x71, 0x75, 0x62, + 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, + 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, + 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x84, 0x01, 0x0a, 0x13, 0x47, + 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x12, 0x35, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, + 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x36, 0x2e, 0x71, 0x75, 0x62, 0x69, + 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x63, 0x6b, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x75, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x30, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, + 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, + 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x31, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x31, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, - 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, - 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x7e, 0x0a, 0x11, 0x47, 0x65, - 0x74, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x12, - 0x33, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, - 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x51, - 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x34, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x7e, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x51, + 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x12, 0x33, 0x2e, + 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, + 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x51, 0x75, 0x6f, + 0x72, 0x75, 0x6d, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x34, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, + 0x65, 0x74, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, 0x74, 0x61, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x6f, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x43, + 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x6f, 0x72, 0x73, 0x12, 0x2e, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, + 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, + 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x6f, 0x72, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2f, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, + 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, + 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x6f, 0x72, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x78, 0x0a, 0x0f, 0x47, 0x65, 0x74, + 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x31, 0x2e, 0x71, + 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, + 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x64, 0x65, 0x6e, + 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x32, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, + 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x49, + 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x87, 0x01, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, 0x50, + 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x12, 0x36, 0x2e, 0x71, + 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, + 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, + 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x37, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, - 0x2e, 0x47, 0x65, 0x74, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x54, 0x69, 0x63, 0x6b, 0x44, 0x61, - 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x6f, 0x0a, 0x0c, 0x47, 0x65, - 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x6f, 0x72, 0x73, 0x12, 0x2e, 0x2e, 0x71, 0x75, 0x62, - 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, - 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, - 0x6f, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2f, 0x2e, 0x71, 0x75, 0x62, - 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, - 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, - 0x6f, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x78, 0x0a, 0x0f, 0x47, - 0x65, 0x74, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x31, - 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, - 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x64, - 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x32, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, - 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, - 0x74, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x87, 0x01, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, - 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x12, 0x36, - 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, - 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x61, - 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x37, 0x2e, 0x71, 0x75, 0x62, 0x69, 0x63, 0x2e, 0x61, - 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x2e, - 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, - 0x73, 0x65, 0x64, 0x54, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, - 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x71, 0x75, - 0x62, 0x69, 0x63, 0x2f, 0x67, 0x6f, 0x2d, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x66, 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, + 0x64, 0x54, 0x69, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x29, 0x5a, + 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x71, 0x75, 0x62, 0x69, + 0x63, 0x2f, 0x67, 0x6f, 0x2d, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x66, 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1667,7 +1689,7 @@ func file_archive_proto_rawDescGZIP() []byte { return file_archive_proto_rawDescData } -var file_archive_proto_msgTypes = make([]protoimpl.MessageInfo, 23) +var file_archive_proto_msgTypes = make([]protoimpl.MessageInfo, 24) var file_archive_proto_goTypes = []interface{}{ (*TickData)(nil), // 0: qubic.archiver.archive.pb.TickData (*GetTickDataRequest)(nil), // 1: qubic.archiver.archive.pb.GetTickDataRequest @@ -1692,6 +1714,7 @@ var file_archive_proto_goTypes = []interface{}{ (*GetLastProcessedTickRequest)(nil), // 20: qubic.archiver.archive.pb.GetLastProcessedTickRequest (*GetLastProcessedTickResponse)(nil), // 21: qubic.archiver.archive.pb.GetLastProcessedTickResponse nil, // 22: qubic.archiver.archive.pb.QuorumTickData.QuorumDiffPerComputorEntry + nil, // 23: qubic.archiver.archive.pb.GetLastProcessedTickResponse.LastProcessedTicksPerEpochEntry } var file_archive_proto_depIdxs = []int32{ 0, // 0: qubic.archiver.archive.pb.GetTickDataResponse.tick_data:type_name -> qubic.archiver.archive.pb.TickData @@ -1703,26 +1726,27 @@ var file_archive_proto_depIdxs = []int32{ 11, // 6: qubic.archiver.archive.pb.GetQuorumTickDataResponse.quorum_tick_data:type_name -> qubic.archiver.archive.pb.QuorumTickData 14, // 7: qubic.archiver.archive.pb.GetComputorsResponse.computors:type_name -> qubic.archiver.archive.pb.Computors 17, // 8: qubic.archiver.archive.pb.GetIdentityInfoResponse.identity_info:type_name -> qubic.archiver.archive.pb.IdentityInfo - 9, // 9: qubic.archiver.archive.pb.QuorumTickData.QuorumDiffPerComputorEntry.value:type_name -> qubic.archiver.archive.pb.QuorumDiff - 1, // 10: qubic.archiver.archive.pb.ArchiveService.GetTickData:input_type -> qubic.archiver.archive.pb.GetTickDataRequest - 7, // 11: qubic.archiver.archive.pb.ArchiveService.GetTickTransactions:input_type -> qubic.archiver.archive.pb.GetTickTransactionsRequest - 4, // 12: qubic.archiver.archive.pb.ArchiveService.GetTransaction:input_type -> qubic.archiver.archive.pb.GetTransactionRequest - 12, // 13: qubic.archiver.archive.pb.ArchiveService.GetQuorumTickData:input_type -> qubic.archiver.archive.pb.GetQuorumTickDataRequest - 15, // 14: qubic.archiver.archive.pb.ArchiveService.GetComputors:input_type -> qubic.archiver.archive.pb.GetComputorsRequest - 18, // 15: qubic.archiver.archive.pb.ArchiveService.GetIdentityInfo:input_type -> qubic.archiver.archive.pb.GetIdentityInfoRequest - 20, // 16: qubic.archiver.archive.pb.ArchiveService.GetLastProcessedTick:input_type -> qubic.archiver.archive.pb.GetLastProcessedTickRequest - 2, // 17: qubic.archiver.archive.pb.ArchiveService.GetTickData:output_type -> qubic.archiver.archive.pb.GetTickDataResponse - 8, // 18: qubic.archiver.archive.pb.ArchiveService.GetTickTransactions:output_type -> qubic.archiver.archive.pb.GetTickTransactionsResponse - 5, // 19: qubic.archiver.archive.pb.ArchiveService.GetTransaction:output_type -> qubic.archiver.archive.pb.GetTransactionResponse - 13, // 20: qubic.archiver.archive.pb.ArchiveService.GetQuorumTickData:output_type -> qubic.archiver.archive.pb.GetQuorumTickDataResponse - 16, // 21: qubic.archiver.archive.pb.ArchiveService.GetComputors:output_type -> qubic.archiver.archive.pb.GetComputorsResponse - 19, // 22: qubic.archiver.archive.pb.ArchiveService.GetIdentityInfo:output_type -> qubic.archiver.archive.pb.GetIdentityInfoResponse - 21, // 23: qubic.archiver.archive.pb.ArchiveService.GetLastProcessedTick:output_type -> qubic.archiver.archive.pb.GetLastProcessedTickResponse - 17, // [17:24] is the sub-list for method output_type - 10, // [10:17] is the sub-list for method input_type - 10, // [10:10] is the sub-list for extension type_name - 10, // [10:10] is the sub-list for extension extendee - 0, // [0:10] is the sub-list for field type_name + 23, // 9: qubic.archiver.archive.pb.GetLastProcessedTickResponse.last_processed_ticks_per_epoch:type_name -> qubic.archiver.archive.pb.GetLastProcessedTickResponse.LastProcessedTicksPerEpochEntry + 9, // 10: qubic.archiver.archive.pb.QuorumTickData.QuorumDiffPerComputorEntry.value:type_name -> qubic.archiver.archive.pb.QuorumDiff + 1, // 11: qubic.archiver.archive.pb.ArchiveService.GetTickData:input_type -> qubic.archiver.archive.pb.GetTickDataRequest + 7, // 12: qubic.archiver.archive.pb.ArchiveService.GetTickTransactions:input_type -> qubic.archiver.archive.pb.GetTickTransactionsRequest + 4, // 13: qubic.archiver.archive.pb.ArchiveService.GetTransaction:input_type -> qubic.archiver.archive.pb.GetTransactionRequest + 12, // 14: qubic.archiver.archive.pb.ArchiveService.GetQuorumTickData:input_type -> qubic.archiver.archive.pb.GetQuorumTickDataRequest + 15, // 15: qubic.archiver.archive.pb.ArchiveService.GetComputors:input_type -> qubic.archiver.archive.pb.GetComputorsRequest + 18, // 16: qubic.archiver.archive.pb.ArchiveService.GetIdentityInfo:input_type -> qubic.archiver.archive.pb.GetIdentityInfoRequest + 20, // 17: qubic.archiver.archive.pb.ArchiveService.GetLastProcessedTick:input_type -> qubic.archiver.archive.pb.GetLastProcessedTickRequest + 2, // 18: qubic.archiver.archive.pb.ArchiveService.GetTickData:output_type -> qubic.archiver.archive.pb.GetTickDataResponse + 8, // 19: qubic.archiver.archive.pb.ArchiveService.GetTickTransactions:output_type -> qubic.archiver.archive.pb.GetTickTransactionsResponse + 5, // 20: qubic.archiver.archive.pb.ArchiveService.GetTransaction:output_type -> qubic.archiver.archive.pb.GetTransactionResponse + 13, // 21: qubic.archiver.archive.pb.ArchiveService.GetQuorumTickData:output_type -> qubic.archiver.archive.pb.GetQuorumTickDataResponse + 16, // 22: qubic.archiver.archive.pb.ArchiveService.GetComputors:output_type -> qubic.archiver.archive.pb.GetComputorsResponse + 19, // 23: qubic.archiver.archive.pb.ArchiveService.GetIdentityInfo:output_type -> qubic.archiver.archive.pb.GetIdentityInfoResponse + 21, // 24: qubic.archiver.archive.pb.ArchiveService.GetLastProcessedTick:output_type -> qubic.archiver.archive.pb.GetLastProcessedTickResponse + 18, // [18:25] is the sub-list for method output_type + 11, // [11:18] is the sub-list for method input_type + 11, // [11:11] is the sub-list for extension type_name + 11, // [11:11] is the sub-list for extension extendee + 0, // [0:11] is the sub-list for field type_name } func init() { file_archive_proto_init() } @@ -2002,7 +2026,7 @@ func file_archive_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_archive_proto_rawDesc, NumEnums: 0, - NumMessages: 23, + NumMessages: 24, NumExtensions: 0, NumServices: 1, }, diff --git a/protobuff/archive.proto b/protobuff/archive.proto index 145814e..f01a155 100644 --- a/protobuff/archive.proto +++ b/protobuff/archive.proto @@ -128,6 +128,7 @@ message GetIdentityInfoResponse { message GetLastProcessedTickRequest {} message GetLastProcessedTickResponse{ uint32 last_processed_tick = 1; + map last_processed_ticks_per_epoch = 2; } service ArchiveService { diff --git a/rpc/rpc_server.go b/rpc/rpc_server.go index 7aad1be..a417753 100644 --- a/rpc/rpc_server.go +++ b/rpc/rpc_server.go @@ -83,7 +83,7 @@ func (s *Server) GetQuorumTickData(ctx context.Context, req *protobuff.GetQuorum return &protobuff.GetQuorumTickDataResponse{QuorumTickData: qtd}, nil } func (s *Server) GetComputors(ctx context.Context, req *protobuff.GetComputorsRequest) (*protobuff.GetComputorsResponse, error) { - computors, err := s.store.GetComputors(ctx, uint64(req.Epoch)) + computors, err := s.store.GetComputors(ctx, req.Epoch) if err != nil { if errors.Cause(err) == store.ErrNotFound { return nil, status.Errorf(codes.NotFound, "computors not found") @@ -139,7 +139,12 @@ func (s *Server) GetLastProcessedTick(ctx context.Context, req *protobuff.GetLas return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err) } - return &protobuff.GetLastProcessedTickResponse{LastProcessedTick: uint32(tick)}, nil + lastProcessedTicksPerEpoch, err := s.store.GetLastProcessedTicksPerEpoch(ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "getting last processed tick: %v", err) + } + + return &protobuff.GetLastProcessedTickResponse{LastProcessedTick: uint32(tick), LastProcessedTicksPerEpoch: lastProcessedTicksPerEpoch}, nil } func (s *Server) Start() error { diff --git a/store/keys.go b/store/keys.go index 5cd920a..406a0d5 100644 --- a/store/keys.go +++ b/store/keys.go @@ -5,11 +5,12 @@ import ( ) const ( - TickData = 0x00 - QuorumData = 0x01 - ComputorList = 0x02 - Transaction = 0x03 - LastProcessedTick = 0x04 + TickData = 0x00 + QuorumData = 0x01 + ComputorList = 0x02 + Transaction = 0x03 + LastProcessedTick = 0x04 + LastProcessedTickPerEpoch = 0x05 ) func tickDataKey(tickNumber uint64) []byte { @@ -26,9 +27,9 @@ func quorumTickDataKey(tickNumber uint64) []byte { return key } -func computorsKey(epochNumber uint64) []byte { +func computorsKey(epochNumber uint32) []byte { key := []byte{ComputorList} - key = binary.BigEndian.AppendUint64(key, epochNumber) + key = binary.BigEndian.AppendUint32(key, epochNumber) return key } @@ -42,5 +43,11 @@ func tickTxKey(txID string) ([]byte, error) { func lastProcessedTickKey() []byte { return []byte{LastProcessedTick} +} + +func lastProcessedTickKeyPerEpoch(epochNumber uint32) []byte { + key := []byte{LastProcessedTickPerEpoch} + key = binary.BigEndian.AppendUint32(key, epochNumber) + return key } diff --git a/store/store.go b/store/store.go index 2611d5b..3506b44 100644 --- a/store/store.go +++ b/store/store.go @@ -9,6 +9,8 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" + "log" + "strconv" ) var ErrNotFound = errors.New("store resource not found") @@ -92,7 +94,7 @@ func (s *PebbleStore) SetQuorumTickData(ctx context.Context, tickNumber uint64, return nil } -func (s *PebbleStore) GetComputors(ctx context.Context, epoch uint64) (*protobuff.Computors, error) { +func (s *PebbleStore) GetComputors(ctx context.Context, epoch uint32) (*protobuff.Computors, error) { key := computorsKey(epoch) value, closer, err := s.db.Get(key) @@ -113,7 +115,7 @@ func (s *PebbleStore) GetComputors(ctx context.Context, epoch uint64) (*protobuf return &computors, nil } -func (s *PebbleStore) SetComputors(ctx context.Context, epoch uint64, computors *protobuff.Computors) error { +func (s *PebbleStore) SetComputors(ctx context.Context, epoch uint32, computors *protobuff.Computors) error { key := computorsKey(epoch) serialized, err := proto.Marshal(computors) @@ -208,16 +210,33 @@ func (s *PebbleStore) GetTransaction(ctx context.Context, txID string) (*protobu return &tx, nil } -func (s *PebbleStore) SetLastProcessedTick(ctx context.Context, tickNumber uint64) error { - key := lastProcessedTickKey() +func (s *PebbleStore) SetLastProcessedTick(ctx context.Context, tickNumber uint64, epochNumber uint32) error { + batch := s.db.NewBatch() + defer batch.Close() + + key := lastProcessedTickKeyPerEpoch(epochNumber) value := make([]byte, 8) binary.LittleEndian.PutUint64(value, tickNumber) - err := s.db.Set(key, value, &pebble.WriteOptions{Sync: true}) + err := batch.Set(key, value, pebble.Sync) + if err != nil { + return errors.Wrap(err, "setting last processed tick") + } + + key = lastProcessedTickKey() + value = make([]byte, 8) + binary.LittleEndian.PutUint64(value, tickNumber) + + err = batch.Set(key, value, pebble.Sync) if err != nil { return errors.Wrap(err, "setting last processed tick") } + err = batch.Commit(pebble.Sync) + if err != nil { + return errors.Wrap(err, "committing batch") + } + return nil } @@ -235,3 +254,33 @@ func (s *PebbleStore) GetLastProcessedTick(ctx context.Context) (uint64, error) return binary.LittleEndian.Uint64(value), nil } + +func (s *PebbleStore) GetLastProcessedTicksPerEpoch(ctx context.Context) (map[uint32]uint64, error) { + maxUint64 := ^uint64(0) + upperBound := append([]byte{LastProcessedTickPerEpoch}, []byte(strconv.FormatUint(maxUint64, 10))...) + iter, err := s.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte{LastProcessedTickPerEpoch}, + UpperBound: upperBound, + }) + if err != nil { + return nil, errors.Wrap(err, "creating iter") + } + defer iter.Close() + + ticksPerEpoch := make(map[uint32]uint64) + for iter.First(); iter.Valid(); iter.Next() { + key := iter.Key() + + value, err := iter.ValueAndErr() + if err != nil { + return nil, errors.Wrap(err, "getting value from iter") + } + + epochNumber := binary.BigEndian.Uint32(key[1:]) + tickNumber := binary.LittleEndian.Uint64(value) + ticksPerEpoch[epochNumber] = tickNumber + log.Printf("key: %s, value: %s\n", key, value) + } + + return ticksPerEpoch, nil +} diff --git a/store/store_test.go b/store/store_test.go index 58adc44..c629578 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -154,7 +154,7 @@ func TestPebbleStore_Computors(t *testing.T) { store := NewPebbleStore(db, logger) // Sample Computors for testing - epoch := uint64(1) // Convert epoch to uint64 as per method requirements + epoch := uint32(1) // Convert epoch to uint32 as per method requirements computors := &pb.Computors{ Epoch: 1, Identities: []string{"identity1", "identity2"}, @@ -316,3 +316,58 @@ func TestPebbleStore_GetTransaction(t *testing.T) { assert.Error(t, err) assert.Equal(t, ErrNotFound, err) } + +func TestSetAndGetLastProcessedTicksPerEpoch(t *testing.T) { + ctx := context.Background() + + // Setup test environment + dbDir, err := os.MkdirTemp("", "pebble_test") + assert.NoError(t, err) + defer os.RemoveAll(dbDir) + + db, err := pebble.Open(filepath.Join(dbDir, "testdb"), &pebble.Options{}) + assert.NoError(t, err) + defer db.Close() + + logger, _ := zap.NewDevelopment() + store := NewPebbleStore(db, logger) + + // Set last processed ticks per epoch + err = store.SetLastProcessedTick(ctx, 16, 1) + assert.NoError(t, err) + + // Get last processed tick per epoch + lastProcessedTick, err := store.GetLastProcessedTick(ctx) + assert.NoError(t, err) + assert.Equal(t, uint64(16), lastProcessedTick) + + lastProcessedTicksPerEpoch, err := store.GetLastProcessedTicksPerEpoch(ctx) + assert.NoError(t, err) + assert.Equal(t, map[uint32]uint64{1: 16}, lastProcessedTicksPerEpoch) + + // Set last processed ticks per epoch + err = store.SetLastProcessedTick(ctx, 17, 1) + assert.NoError(t, err) + + // Get last processed tick per epoch + lastProcessedTick, err = store.GetLastProcessedTick(ctx) + assert.NoError(t, err) + assert.Equal(t, uint64(17), lastProcessedTick) + + lastProcessedTicksPerEpoch, err = store.GetLastProcessedTicksPerEpoch(ctx) + assert.NoError(t, err) + assert.Equal(t, map[uint32]uint64{1: 17}, lastProcessedTicksPerEpoch) + + // Set last processed ticks per epoch + err = store.SetLastProcessedTick(ctx, 18, 2) + assert.NoError(t, err) + + // Get last processed tick per epoch + lastProcessedTick, err = store.GetLastProcessedTick(ctx) + assert.NoError(t, err) + assert.Equal(t, uint64(18), lastProcessedTick) + + lastProcessedTicksPerEpoch, err = store.GetLastProcessedTicksPerEpoch(ctx) + assert.NoError(t, err) + assert.Equal(t, map[uint32]uint64{1: 17, 2: 18}, lastProcessedTicksPerEpoch) +} diff --git a/types.go b/types.go deleted file mode 100644 index cc32ea8..0000000 --- a/types.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -const ( - SignatureSize = 64 -) - -type QuorumDiff struct { - SaltedResourceTestingDigest uint64 - SaltedSpectrumDigest [32]byte - SaltedUniverseDigest [32]byte - SaltedComputerDigest [32]byte - ExpectedNextTickTxDigest [32]byte - Signature [SignatureSize]byte -} diff --git a/validator/computors/validator.go b/validator/computors/validator.go index c920db3..2ad5389 100644 --- a/validator/computors/validator.go +++ b/validator/computors/validator.go @@ -7,7 +7,6 @@ import ( "github.com/pkg/errors" "github.com/qubic/go-archiver/utils" "github.com/qubic/go-node-connector/types" - ) func Validate(ctx context.Context, computors types.Computors) error { @@ -49,7 +48,7 @@ func getDigestFromComputors(data types.Computors) ([32]byte, error) { func Store(ctx context.Context, store *store.PebbleStore, computors types.Computors) error { protoModel := qubicToProto(computors) - err := store.SetComputors(ctx, uint64(protoModel.Epoch), protoModel) + err := store.SetComputors(ctx, protoModel.Epoch, protoModel) if err != nil { return errors.Wrap(err, "set computors") } @@ -57,7 +56,7 @@ func Store(ctx context.Context, store *store.PebbleStore, computors types.Comput return nil } -func Get(ctx context.Context, store *store.PebbleStore, epoch uint64) (types.Computors, error) { +func Get(ctx context.Context, store *store.PebbleStore, epoch uint32) (types.Computors, error) { protoModel, err := store.GetComputors(ctx, epoch) if err != nil { return types.Computors{}, errors.Wrap(err, "get computors") diff --git a/validator/validator.go b/validator/validator.go index e10531c..b508652 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -35,7 +35,7 @@ func (v *Validator) ValidateTick(ctx context.Context, tickNumber uint64) error { //getting computors from storage, otherwise get it from a node epoch := quorumVotes[0].Epoch var comps types.Computors - comps, err = computors.Get(ctx, v.store, uint64(epoch)) + comps, err = computors.Get(ctx, v.store, uint32(epoch)) if err != nil { if errors.Cause(err) != store.ErrNotFound { return errors.Wrap(err, "getting computors from store")