Skip to content

Commit

Permalink
added skip ticks information to kv to be able to see which ticks were…
Browse files Browse the repository at this point in the history
… skipped at epoch change
  • Loading branch information
0xluk committed Mar 7, 2024
1 parent 49bb1a6 commit 8306cd2
Show file tree
Hide file tree
Showing 11 changed files with 828 additions and 282 deletions.
2 changes: 1 addition & 1 deletion dev.docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'

services:
qubic-archiver:
image: ghcr.io/qubic/qubic-archiver:v0.1.17
image: ghcr.io/qubic/qubic-archiver:v0.1.18
container_name: qubic-archiver
labels:
- "traefik.enable=true"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'

services:
qubic-archiver:
image: ghcr.io/qubic/qubic-archiver:v0.1.17
image: ghcr.io/qubic/qubic-archiver:v0.1.18
container_name: qubic-archiver
ports:
- "127.0.0.1:8000:8000"
Expand Down
47 changes: 38 additions & 9 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processor
import (
"context"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/protobuff"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator"
qubic "github.com/qubic/go-node-connector"
Expand Down Expand Up @@ -80,7 +81,9 @@ func (p *Processor) processOneByOne() error {
return errors.Wrap(err, "getting tick info")
}

nextTick, err := p.getNextProcessingTick(ctx, tickInfo)
lastTick, err := p.getLastProcessedTick(ctx, tickInfo)

nextTick, err := p.getNextProcessingTick(ctx, lastTick, tickInfo)
if err != nil {
return errors.Wrap(err, "getting next processing tick")
}
Expand All @@ -97,6 +100,11 @@ func (p *Processor) processOneByOne() error {
return errors.Wrapf(err, "validating tick %d", nextTick)
}

err = p.processSkippedTicks(ctx, lastTick, nextTick)
if err != nil {
return errors.Wrap(err, "processing skipped ticks")
}

err = p.ps.SetLastProcessedTick(ctx, nextTick, uint32(tickInfo.Epoch))
if err != nil {
return errors.Wrapf(err, "setting last processed tick %d", nextTick)
Expand All @@ -105,24 +113,45 @@ func (p *Processor) processOneByOne() error {
return nil
}

func (p *Processor) getNextProcessingTick(ctx context.Context, currentTickInfo types.TickInfo) (uint64, error) {
func (p *Processor) getNextProcessingTick(ctx context.Context, lastTick uint64, currentTickInfo types.TickInfo) (uint64, error) {
//handles the case where the initial tick of epoch returned by the node is greater than the last processed tick
// which means that we are in the next epoch and we should start from the initial tick of the current epoch
if uint64(currentTickInfo.InitialTick) > lastTick {
return uint64(currentTickInfo.InitialTick), nil
}

// otherwise we are in the same epoch and we should start from the last processed tick + 1
return lastTick + 1, nil
}

func (p *Processor) getLastProcessedTick(ctx context.Context, currentTickInfo types.TickInfo) (uint64, error) {
lastTick, err := p.ps.GetLastProcessedTick(ctx)
if err != nil {
//handles first run of the archiver where there is nothing in storage
// in this case we start from the initial tick of the current epoch
if errors.Is(err, store.ErrNotFound) {
return uint64(currentTickInfo.InitialTick), nil
return uint64(currentTickInfo.InitialTick - 1), nil
}

return 0, errors.Wrap(err, "getting last processed tick")
}

//handles the case where the initial tick of epoch returned by the node is greater than the last processed tick
// which means that we are in the next epoch and we should start from the initial tick of the current epoch
if uint64(currentTickInfo.InitialTick) > lastTick {
return uint64(currentTickInfo.InitialTick), nil
return lastTick, nil
}

func (p *Processor) processSkippedTicks(ctx context.Context, lastTick uint64, nextTick uint64) error {
// nothing to process, no skipped ticks
if nextTick-lastTick == 1 {
return nil
}

// otherwise we are in the same epoch and we should start from the last processed tick + 1
return lastTick + 1, nil
err := p.ps.SetSkippedTicksInterval(ctx, &protobuff.SkippedTicksInterval{
StartTick: uint32(lastTick + 1),
EndTick: uint32(nextTick - 1),
})
if err != nil {
return errors.Wrap(err, "setting skipped ticks interval")
}

return nil
}
Loading

0 comments on commit 8306cd2

Please sign in to comment.