Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backup): add a batch feature to the RPC client #45

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 102 additions & 45 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Service struct {
writer writer.Writer
logger log.Logger

batchSize uint
watchInterval time.Duration // interval for the watch routine
}

Expand All @@ -37,6 +38,11 @@ func NewService(client client.Client, writer writer.Writer, opts ...Option) *Ser
opt(s)
}

// Batch size needs to be at least 1
if s.batchSize == 0 {
s.batchSize = 1
}

return s
}

Expand All @@ -53,61 +59,116 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
return fmt.Errorf("unable to determine right bound, %w", boundErr)
}

// Keep track of total txs backed up
totalTxs := uint64(0)
// Log info about what will be backed up
s.logger.Info(
"Existing blocks to backup",
"from block", cfg.FromBlock,
"to block", toBlock,
"total", toBlock-cfg.FromBlock+1,
)

// Keep track of what has been backed up
var results struct {
blocksFetched uint64
blocksWithTxs uint64
txsBackedUp uint64
}

fetchAndWrite := func(height uint64) error {
block, txErr := s.client.GetBlock(height)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
}
// Log results on exit
defer func() {
s.logger.Info(
"Total data backed up",
"blocks fetched", results.blocksFetched,
"blocks with transactions", results.blocksWithTxs,
"transactions written", results.txsBackedUp,
)
}()

// Internal function that fetches and writes a range of blocks
fetchAndWrite := func(fromBlock, toBlock uint64) error {
// Fetch by batches
for batchStart := fromBlock; batchStart <= toBlock; {
// Determine batch stop block
batchStop := batchStart + uint64(s.batchSize) - 1
if batchStop > toBlock {
batchStop = toBlock
}

// Skip empty blocks
if len(block.Txs) == 0 {
return nil
}
batchSize := batchStop - batchStart + 1

// Save the block transaction data, if any
for _, tx := range block.Txs {
data := &types.TxData{
Tx: tx,
BlockNum: block.Height,
Timestamp: block.Timestamp,
}
// Verbose log for blocks to be fetched
s.logger.Debug(
"Fetching batch of blocks",
"from", batchStart,
"to", batchStop,
"size", batchSize,
)

// Write the tx data to the file
if writeErr := s.writer.WriteTxData(data); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
// Fetch current batch
blocks, txErr := s.client.GetBlocks(ctx, batchStart, batchStop)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
}

totalTxs++
// Keep track of the number of fetched blocks & those containing transactions
results.blocksFetched += batchSize
results.blocksWithTxs += uint64(len(blocks))

// Log the progress
s.logger.Info(
"Transaction backed up",
"total", totalTxs,
// Verbose log for blocks containing transactions
s.logger.Debug(
"Batch fetched successfully",
"blocks with transactions", fmt.Sprintf("%d/%d", len(blocks), batchSize),
)
}

return nil
}
// Iterate over the list of blocks containing transactions
for _, block := range blocks {
for i, tx := range block.Txs {
// Write the tx data to the file
txData := &types.TxData{
Tx: tx,
BlockNum: block.Height,
Timestamp: block.Timestamp,
}

// Gather the chain data from the node
for block := cfg.FromBlock; block <= toBlock; block++ {
select {
case <-ctx.Done():
s.logger.Info("backup procedure stopped")
if writeErr := s.writer.WriteTxData(txData); writeErr != nil {
return fmt.Errorf("unable to write tx data, %w", writeErr)
}

// Keep track of the number of backed up transactions
results.txsBackedUp++

return nil
default:
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
// Verbose log for each transaction written
s.logger.Debug(
"Transaction backed up",
"blockNum", block.Height,
"tx count (block)", i+1,
"tx count (total)", results.txsBackedUp,
)
}
}

batchStart = batchStop + 1
}

return nil
}

// Backup the existing transactions
if fetchErr := fetchAndWrite(cfg.FromBlock, toBlock); fetchErr != nil {
return fetchErr
}

// Check if there needs to be a watcher setup
if cfg.Watch {
s.logger.Info(
"Existing blocks backup complete",
"blocks fetched", results.blocksFetched,
"blocks with transactions", results.blocksWithTxs,
"transactions written", results.txsBackedUp,
)

s.logger.Info("Watch for new blocks to backup")

ticker := time.NewTicker(s.watchInterval)
defer ticker.Stop()

Expand All @@ -116,7 +177,7 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
for {
select {
case <-ctx.Done():
s.logger.Info("export procedure stopped")
s.logger.Info("Stop watching for new blocks to backup")

return nil
case <-ticker.C:
Expand All @@ -132,10 +193,8 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
}

// Catch up to the latest block
for block := lastBlock + 1; block <= latest; block++ {
if fetchErr := fetchAndWrite(block); fetchErr != nil {
return fetchErr
}
if fetchErr := fetchAndWrite(lastBlock+1, latest); fetchErr != nil {
return fetchErr
}

// Update the last exported block
Expand All @@ -144,8 +203,6 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
}
}

s.logger.Info("Backup complete")

return nil
}

Expand Down
Loading
Loading