Skip to content

Commit

Permalink
Implement suggested fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
LINCKODE committed Sep 9, 2024
1 parent 61a5472 commit 7615d71
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 271 deletions.
25 changes: 0 additions & 25 deletions .github/workflows/push-compression.yaml

This file was deleted.

6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ The archive system consists of two services:
## IMPORTANT

> [!WARNING]
> This version of archiver is **INCOMPATIBLE** by default with versions **v0.x.x**, as it features database compression and a different format for quorum data.
> If you wish to enable backwards compatibility, please set `STORE_COMPRESSION_TYPE` to `Snappy` and `STORE_SAVE_FULL_QUORUM_DATA` to `true`.
> This version of archiver is **INCOMPATIBLE** with versions **v0.x.x**, as it features database compression and a different format for quorum data.
> Archiver **DOES NOT** migrate the database to the new format by itself, and **MAY BREAK** your existing information, if not migrated correctly.
> For a migration tool, please see the [Archiver DB Migrator](https://github.com/qubic/archiver-db-migrator), and make sure to back up your data!
> See [this](db-migration.md) for how to migrate the database to the new format.
Before starting the system, open the `docker-compose.yml` file and make sure that you have a reliable peer list setup
for the `qubic-nodes` service.
Expand All @@ -41,8 +41,6 @@ This can be configured using the `QUBIC_NODES_QUBIC_PEER_LIST` environment varia
$QUBIC_ARCHIVER_QUBIC_NODE_PORT <string> (default: 21841)
$QUBIC_ARCHIVER_QUBIC_STORAGE_FOLDER <string> (default: store)
$QUBIC_ARCHIVER_QUBIC_PROCESS_TICK_TIMEOUT <duration> (default: 5s)
$QUBIC_ARCHIVER_STORE_COMPRESSION_TYPE <string> (default: Zstd)
$QUBIC_ARCHIVER_STORE_SAVE_FULL_VOTE_DATA <bool> (default: false)
```

## Run with docker-compose:
Expand Down
7 changes: 7 additions & 0 deletions db-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Process of migrating an Archiver instance:

1. Stop archiver.
2. Run the [archiver-db-migrator](https://github.com/qubic/archiver-db-migrator) tool, specifying the path of the old database, and the path of where to store the new database directory. Wait for the process to stop.
3. Rename the old database directory to something else, and the new directory to the old name, thus we don't need to modify the archiver configuration.
4. Update the archiver version in the docker compose file.
5. Start archiver.
64 changes: 19 additions & 45 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ func run() error {
ProcessTickTimeout time.Duration `conf:"default:5s"`
}
Store struct {
ResetEmptyTickKeys bool `conf:"default:false"`
CompressionType string `conf:"default:Zstd"`
SaveFullQuorumData bool `conf:"default:false"`
ResetEmptyTickKeys bool `conf:"default:false"`
}
}

Expand Down Expand Up @@ -82,9 +80,24 @@ func run() error {
}
log.Printf("main: Config :\n%v\n", out)

db, err := openDB(cfg.Qubic.StorageFolder, cfg.Store.CompressionType)
levelOptions := pebble.LevelOptions{
BlockRestartInterval: 16,
BlockSize: 4096,
BlockSizeThreshold: 90,
Compression: pebble.ZstdCompression,
FilterPolicy: nil,
FilterType: pebble.TableFilter,
IndexBlockSize: 4096,
TargetFileSize: 2097152,
}

pebbleOptions := pebble.Options{
Levels: []pebble.LevelOptions{levelOptions},
}

db, err := pebble.Open(cfg.Qubic.StorageFolder, &pebbleOptions)
if err != nil {
return errors.Wrap(err, "opening db")
return errors.Wrap(err, "opening db with zstd compression")
}
defer db.Close()

Expand Down Expand Up @@ -125,7 +138,7 @@ func run() error {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout, cfg.Store.SaveFullQuorumData)
proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout)
procErrors := make(chan error, 1)

// Start the service listening for requests.
Expand All @@ -142,42 +155,3 @@ func run() error {
}
}
}

func openDB(path string, compressionType string) (*pebble.DB, error) {
switch compressionType {
case "Zstd":
return createDBWithZstdCompression(path)
case "Snappy":
db, err := pebble.Open(path, &pebble.Options{})
if err != nil {
return nil, errors.Wrap(err, "opening db with snappy compression")
}
return db, nil
}
return nil, errors.New("unknown compression type")
}

func createDBWithZstdCompression(path string) (*pebble.DB, error) {

levelOptions := pebble.LevelOptions{
BlockRestartInterval: 16,
BlockSize: 4096,
BlockSizeThreshold: 90,
Compression: pebble.ZstdCompression,
FilterPolicy: nil,
FilterType: pebble.TableFilter,
IndexBlockSize: 4096,
TargetFileSize: 2097152,
}

pebbleOptions := pebble.Options{
Levels: []pebble.LevelOptions{levelOptions},
}

db, err := pebble.Open(path, &pebbleOptions)
if err != nil {
return nil, errors.Wrap(err, "opening db with zstd compression")
}

return db, nil
}
6 changes: 2 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ type Processor struct {
pool *qubic.Pool
ps *store.PebbleStore
processTickTimeout time.Duration
storeFullVoteData bool
}

func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration, storeFullVoteData bool) *Processor {
func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration) *Processor {
return &Processor{
pool: p,
ps: ps,
processTickTimeout: processTickTimeout,
storeFullVoteData: storeFullVoteData,
}
}

Expand Down Expand Up @@ -98,7 +96,7 @@ func (p *Processor) processOneByOne() error {
}

val := validator.New(client, p.ps)
err = val.ValidateTick(ctx, tickInfo.InitialTick, nextTick.TickNumber, p.storeFullVoteData)
err = val.ValidateTick(ctx, tickInfo.InitialTick, nextTick.TickNumber)
if err != nil {
return errors.Wrapf(err, "validating tick %d", nextTick.TickNumber)
}
Expand Down
63 changes: 59 additions & 4 deletions rpc/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
"github.com/qubic/go-archiver/protobuff"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator/quorum"
qubic "github.com/qubic/go-node-connector"
"github.com/qubic/go-node-connector/types"
"google.golang.org/grpc"
Expand Down Expand Up @@ -242,15 +243,69 @@ func (s *Server) GetQuorumTickData(ctx context.Context, req *protobuff.GetQuorum
return nil, st.Err()
}

qtd, err := s.store.GetQuorumTickData(ctx, req.TickNumber)
if req.TickNumber == lastProcessedTick.TickNumber {
tickData, err := s.store.GetQuorumTickDataV2(ctx, req.TickNumber)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "quorum tick data not found")
}
return nil, status.Errorf(codes.Internal, "getting quorum tick data: %v", err)
}

res := protobuff.GetQuorumTickDataResponse{
QuorumTickData: &protobuff.QuorumTickData{
QuorumTickStructure: tickData.QuorumTickStructure,
QuorumDiffPerComputor: make(map[uint32]*protobuff.QuorumDiff),
},
}

for id, diff := range tickData.QuorumDiffPerComputor {
res.QuorumTickData.QuorumDiffPerComputor[id] = &protobuff.QuorumDiff{
ExpectedNextTickTxDigestHex: diff.ExpectedNextTickTxDigestHex,
SignatureHex: diff.SignatureHex,
}
}

return &res, nil
}

nextTick := req.TickNumber + 1

//Get quorum data for next tick
nextTickQuorumData, err := s.store.GetQuorumTickDataV2(ctx, nextTick)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "quorum tick data not found")
return nil, status.Errorf(codes.Internal, "quorum data for next tick was not found")
}
return nil, status.Errorf(codes.Internal, "getting quorum tick data: %v", err)
return nil, status.Errorf(codes.Internal, "getting tick data: %v", err)
}

//Get quorum data for current tick
currentTickQuorumData, err := s.store.GetQuorumTickDataV2(ctx, req.TickNumber)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, status.Errorf(codes.Internal, "quorum data for tick was not found")
}
return nil, status.Errorf(codes.Internal, "getting tick data: %v", err)
}

//Get computors
computors, err := s.store.GetComputors(ctx, currentTickQuorumData.QuorumTickStructure.Epoch)
if err != nil {
return nil, status.Errorf(codes.Internal, "getting computor list")
}

reconstructedQuorumData, err := quorum.ReconstructQuorumData(currentTickQuorumData, nextTickQuorumData, computors)
if err != nil {
return nil, status.Errorf(codes.Internal, "reconstructing quorum data: %v", err)
}

//Response object
res := protobuff.GetQuorumTickDataResponse{
QuorumTickData: reconstructedQuorumData,
}

return &protobuff.GetQuorumTickDataResponse{QuorumTickData: qtd}, nil
return &res, nil
}
func (s *Server) GetComputors(ctx context.Context, req *protobuff.GetComputorsRequest) (*protobuff.GetComputorsResponse, error) {
computors, err := s.store.GetComputors(ctx, req.Epoch)
Expand Down
Loading

0 comments on commit 7615d71

Please sign in to comment.