Skip to content

Commit

Permalink
Implement support for new quorum data format.
Browse files Browse the repository at this point in the history
  • Loading branch information
LINCKODE committed Sep 6, 2024
1 parent f78a43e commit 8898443
Show file tree
Hide file tree
Showing 11 changed files with 1,204 additions and 716 deletions.
35 changes: 22 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func run() error {
ProcessTickTimeout time.Duration `conf:"default:5s"`
}
Store struct {
ResetEmptyTickKeys bool `conf:"default:false"`
ResetEmptyTickKeys bool `conf:"default:false"`
CompressionType string `conf:"default:Zstd"`
StoreFullVoteData bool `conf:"default:false"`
}
}

Expand Down Expand Up @@ -80,17 +82,10 @@ func run() error {
}
log.Printf("main: Config :\n%v\n", out)

/*db, err := pebble.Open(cfg.Qubic.StorageFolder, &pebble.Options{})
db, err := openDB(cfg.Qubic.StorageFolder, cfg.Store.CompressionType)
if err != nil {
log.Fatalf("err opening pebble: %s", err.Error())
return errors.Wrap(err, "opening db")
}
defer db.Close()*/

db, err := CreateDBWithZstdCompression(cfg.Qubic.StorageFolder)
if err != nil {
return errors.Wrap(err, "creating db")
}

defer db.Close()

ps := store.NewPebbleStore(db, nil)
Expand Down Expand Up @@ -130,7 +125,7 @@ func run() error {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout)
proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout, cfg.Store.StoreFullVoteData)
procErrors := make(chan error, 1)

// Start the service listening for requests.
Expand All @@ -148,7 +143,21 @@ func run() error {
}
}

func CreateDBWithZstdCompression(path string) (*pebble.DB, error) {
func openDB(path string, compressionType string) (*pebble.DB, error) {
switch compressionType {
case "Zstd":
return createDBWithZstdCompression(path)
case "Snappy":
db, err := pebble.Open(path, &pebble.Options{})
if err != nil {
return nil, errors.Wrap(err, "opening db with snappy compression")
}
return db, nil
}
return nil, errors.New("unknown compression type")
}

func createDBWithZstdCompression(path string) (*pebble.DB, error) {

levelOptions := pebble.LevelOptions{
BlockRestartInterval: 16,
Expand All @@ -167,7 +176,7 @@ func CreateDBWithZstdCompression(path string) (*pebble.DB, error) {

db, err := pebble.Open(path, &pebbleOptions)
if err != nil {
return nil, errors.Wrap(err, "opening db")
return nil, errors.Wrap(err, "opening db with zstd compression")
}

return db, nil
Expand Down
6 changes: 4 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ type Processor struct {
pool *qubic.Pool
ps *store.PebbleStore
processTickTimeout time.Duration
storeFullVoteData bool
}

func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration) *Processor {
func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration, storeFullVoteData bool) *Processor {
return &Processor{
pool: p,
ps: ps,
processTickTimeout: processTickTimeout,
storeFullVoteData: storeFullVoteData,
}
}

Expand Down Expand Up @@ -96,7 +98,7 @@ func (p *Processor) processOneByOne() error {
}

val := validator.New(client, p.ps)
err = val.ValidateTick(ctx, tickInfo.InitialTick, nextTick.TickNumber)
err = val.ValidateTick(ctx, tickInfo.InitialTick, nextTick.TickNumber, p.storeFullVoteData)
if err != nil {
return errors.Wrapf(err, "validating tick %d", nextTick.TickNumber)
}
Expand Down
Loading

0 comments on commit 8898443

Please sign in to comment.