Skip to content

Commit

Permalink
Merge pull request #44 from qubic/empty-ticks
Browse files Browse the repository at this point in the history
Implement logic for calculating saving and exposing empty tick counts…
  • Loading branch information
LINCKODE authored Jul 29, 2024
2 parents a0c50ed + 2834535 commit 84e0dba
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 66 deletions.
34 changes: 33 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package main

import (
"context"
"fmt"
"github.com/ardanlabs/conf"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/processor"
"github.com/qubic/go-archiver/rpc"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator/tick"
qubic "github.com/qubic/go-node-connector"
"log"
"os"
Expand Down Expand Up @@ -48,6 +50,9 @@ func run() error {
StorageFolder string `conf:"default:store"`
ProcessTickTimeout time.Duration `conf:"default:5s"`
}
EmptyTicks struct {
CalculateAll bool `conf:"default:false"`
}
}

if err := conf.Parse(os.Args[1:], prefix, &cfg); err != nil {
Expand Down Expand Up @@ -84,6 +89,30 @@ func run() error {

ps := store.NewPebbleStore(db, nil)

if cfg.EmptyTicks.CalculateAll == true {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
epochs, err := ps.GetLastProcessedTicksPerEpoch(ctx)
if err != nil {
return errors.Wrap(err, "getting epoch list from db")
}

for epoch, _ := range epochs {
emptyTicksPerEpoch, err := tick.CalculateEmptyTicksForEpoch(ctx, ps, epoch)
if err != nil {
return errors.Wrapf(err, "calculating empty ticks for epoch %d", epoch)
}

err = ps.SetEmptyTicksPerEpoch(epoch, emptyTicksPerEpoch)
if err != nil {
return errors.Wrap(err, "saving emptyTickCount to database")
}
}

return nil
}

p, err := qubic.NewPoolConnection(qubic.PoolConfig{
InitialCap: cfg.Pool.InitialCap,
MaxCap: cfg.Pool.MaxCap,
Expand All @@ -98,7 +127,10 @@ func run() error {
}

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, cfg.Server.NodeSyncThreshold, cfg.Server.ChainTickFetchUrl, ps, p)
rpcServer.Start()
err = rpcServer.Start()
if err != nil {
return errors.Wrap(err, "starting rpc server")
}

shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
Expand Down
152 changes: 87 additions & 65 deletions protobuff/archive.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions protobuff/archive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ message GetStatusResponse {
map<uint32, uint32> last_processed_ticks_per_epoch = 2;
repeated SkippedTicksInterval skipped_ticks = 3;
repeated ProcessedTickIntervalsPerEpoch processed_tick_intervals_per_epoch = 4;
map<uint32, uint32> empty_ticks_per_epoch = 5;
}

message GetHealthCheckResponse {
Expand Down
11 changes: 11 additions & 0 deletions rpc/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,22 @@ func (s *Server) GetStatus(ctx context.Context, _ *emptypb.Empty) (*protobuff.Ge
return nil, status.Errorf(codes.Internal, "getting processed tick intervals")
}

var epochs []uint32
for epoch, _ := range lastProcessedTicksPerEpoch {
epochs = append(epochs, epoch)
}

emptyTicksForAllEpochs, err := s.store.GetEmptyTicksForEpochs(epochs)
if err != nil {
return nil, status.Errorf(codes.Internal, "getting empty ticks for all epochs: %v", err)
}

return &protobuff.GetStatusResponse{
LastProcessedTick: tick,
LastProcessedTicksPerEpoch: lastProcessedTicksPerEpoch,
SkippedTicks: skippedTicks.SkippedTicks,
ProcessedTickIntervalsPerEpoch: ptie,
EmptyTicksPerEpoch: emptyTicksForAllEpochs,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions store/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ const (
TickTransactionsStatus = 0x10
TransactionStatus = 0x11
StoreDigest = 0x12
EmptyTicksPerEpoch = 0x13
)

func emptyTicksPerEpochKey(epoch uint32) []byte {
key := []byte{EmptyTicksPerEpoch}
key = binary.BigEndian.AppendUint64(key, uint64(epoch))
return key
}

func tickDataKey(tickNumber uint32) []byte {
key := []byte{TickData}
key = binary.BigEndian.AppendUint64(key, uint64(tickNumber))
Expand Down
47 changes: 47 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,3 +659,50 @@ func (s *PebbleStore) GetProcessedTickIntervals(ctx context.Context) ([]*protobu

return processedTickIntervals, nil
}

func (s *PebbleStore) SetEmptyTicksPerEpoch(epoch uint32, emptyTicksCount uint32) error {
key := emptyTicksPerEpochKey(epoch)

value := make([]byte, 4)
binary.LittleEndian.PutUint32(value, emptyTicksCount)

err := s.db.Set(key, value, pebble.Sync)
if err != nil {
return errors.Wrapf(err, "saving emptyTickCount for epoch %d", epoch)
}
return nil
}

func (s *PebbleStore) GetEmptyTicksPerEpoch(epoch uint32) (uint32, error) {
key := emptyTicksPerEpochKey(epoch)

value, closer, err := s.db.Get(key)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return 0, nil
}

return 0, errors.Wrapf(err, "getting emptyTickCount for epoch %d", epoch)
}
defer closer.Close()

emptyTicksCount := binary.LittleEndian.Uint32(value)

return emptyTicksCount, nil
}

func (s *PebbleStore) GetEmptyTicksForEpochs(epochs []uint32) (map[uint32]uint32, error) {

emptyTickMap := make(map[uint32]uint32, len(epochs))

for _, epoch := range epochs {
emptyTicks, err := s.GetEmptyTicksPerEpoch(epoch)
if err != nil {
return nil, errors.Wrapf(err, "getting empty ticks for epoch %d", epoch)
}
emptyTickMap[epoch] = emptyTicks
}

return emptyTickMap, nil

}
60 changes: 60 additions & 0 deletions validator/tick/empty_tick.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package tick

import (
"context"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/protobuff"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-node-connector/types"
)

func CalculateEmptyTicksForEpoch(ctx context.Context, ps *store.PebbleStore, epoch uint32) (uint32, error) {

epochs, err := ps.GetProcessedTickIntervals(ctx)
if err != nil {
return 0, errors.Wrap(err, "getting stored epochs")
}

for _, e := range epochs {
if e.Epoch != epoch {
continue
}

var emptyTicks uint32

for _, interval := range e.Intervals {
for tickOffset := range interval.LastProcessedTick - interval.InitialProcessedTick + 1 {
tickNumber := tickOffset + interval.InitialProcessedTick

tickData, err := ps.GetTickData(ctx, tickNumber)
if err != nil {
return 0, errors.Wrapf(err, "getting tick data for tick %d", tickNumber)
}

if CheckIfTickIsEmptyProto(tickData) {
emptyTicks += 1
continue
}
}
}
return emptyTicks, err
}
return 0, nil
}

func CheckIfTickIsEmptyProto(tickData *protobuff.TickData) bool {
if tickData == nil {
return true
}

return false
}

func CheckIfTickIsEmpty(tickData types.TickData) (bool, error) {
data, err := qubicToProto(tickData)
if err != nil {
return false, errors.Wrap(err, "converting tick data to protobuf format")
}

return CheckIfTickIsEmptyProto(data), nil
}
24 changes: 24 additions & 0 deletions validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/protobuff"
"github.com/qubic/go-archiver/store"
Expand Down Expand Up @@ -156,6 +157,29 @@ func (v *Validator) ValidateTick(ctx context.Context, initialEpochTick, tickNumb
return errors.Wrap(err, "computing and saving store digest")
}

isEmpty, err := tick.CheckIfTickIsEmpty(tickData)
if err != nil {
return errors.Wrap(err, "checking if tick is empty")
}

if isEmpty {
emptyTicks, err := v.store.GetEmptyTicksPerEpoch(uint32(epoch))
if err != nil {
return errors.Wrap(err, "getting empty ticks for current epoch")
}

if emptyTicks == 0 {
fmt.Printf("Initializing empty ticks for epoch: %d\n", epoch)
}

emptyTicks += 1

err = v.store.SetEmptyTicksPerEpoch(uint32(epoch), emptyTicks)
if err != nil {
return errors.Wrap(err, "setting current ticks for current epoch")
}
fmt.Printf("Empty ticks for epoch %d: %d\n", epoch, emptyTicks)
}
return nil
}

Expand Down

0 comments on commit 84e0dba

Please sign in to comment.