Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: performance optimization in "op-batcher" #13584

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 78 additions & 7 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ type BatchSubmitter struct {
channelMgrMutex sync.Mutex // guards channelMgr and prevCurrentL1
channelMgr *channelManager
prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus

// Tracking attempts to send transactions
pendingTxs map[string]*txTracker
txTrackerMux sync.RWMutex
}

// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
Expand All @@ -126,8 +130,9 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
state.SetChannelOutFactory(setup.ChannelOutFactory)
}
return &BatchSubmitter{
DriverSetup: setup,
channelMgr: state,
DriverSetup: setup,
channelMgr: state,
pendingTxs: make(map[string]*txTracker),
}
}

Expand Down Expand Up @@ -413,8 +418,8 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc
if syncActions.clearState != nil {
l.channelMgr.Clear(*syncActions.clearState)
} else {
l.channelMgr.PruneSafeBlocks(syncActions.blocksToPrune)
l.channelMgr.PruneChannels(syncActions.channelsToPrune)
l.channelMgr.pruneSafeBlocks(syncActions.blocksToPrune)
l.channelMgr.pruneChannels(syncActions.channelsToPrune)
}
return syncActions.blocksToLoad
}
Expand Down Expand Up @@ -448,14 +453,19 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR
ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()

cleanupTicker := time.NewTicker(30 * time.Second)
defer cleanupTicker.Stop()

for {
select {
case <-ticker.C:

if !l.checkTxpool(queue, receiptsCh) {
continue
}

// Adding periodic checking for stuck transactions
l.checkAndCleanStuckTransactions(ctx, queue, receiptsCh)

syncStatus, err := l.getSyncStatus(l.shutdownCtx)
if err != nil {
l.Log.Warn("could not get sync status", "err", err)
Expand All @@ -475,6 +485,17 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR

l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval)

case <-cleanupTicker.C:
// Periodic cleanup of old records
l.txTrackerMux.Lock()
now := time.Now()
for txID, tracker := range l.pendingTxs {
if now.Sub(tracker.lastAttempt) > txTimeout*2 {
delete(l.pendingTxs, txID)
}
}
l.txTrackerMux.Unlock()

case <-ctx.Done():
if err := queue.Wait(); err != nil {
l.Log.Error("error waiting for transactions to complete", "err", err)
Expand Down Expand Up @@ -855,6 +876,22 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T
candidate.GasLimit = intrinsicGas
}

txID := txdata.ID().String()

l.txTrackerMux.Lock()
if tracker, exists := l.pendingTxs[txID]; exists {
tracker.attempts++
tracker.lastAttempt = time.Now()
} else {
l.pendingTxs[txID] = &txTracker{
attempts: 1,
firstTry: time.Now(),
lastAttempt: time.Now(),
isBlob: txdata.asBlob,
}
}
l.txTrackerMux.Unlock()

queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh)
}

Expand Down Expand Up @@ -892,8 +929,6 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
}

func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
}
Expand Down Expand Up @@ -959,3 +994,39 @@ func logFields(xs ...any) (fs []any) {
}
return fs
}

// Adding a new structure for tracking transaction states
type txTracker struct {
attempts uint32
firstTry time.Time
lastAttempt time.Time
isBlob bool
}

// Adding new constants
const (
maxTxAttempts = 3
txTimeout = 5 * time.Minute
)

// Enhanced function for checking and clearing stuck transactions
func (l *BatchSubmitter) checkAndCleanStuckTransactions(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
l.txTrackerMux.Lock()
defer l.txTrackerMux.Unlock()

now := time.Now()
for txID, tracker := range l.pendingTxs {
// Checking the number of attempts and the waiting time
if tracker.attempts >= maxTxAttempts || now.Sub(tracker.firstTry) > txTimeout {
l.Log.Warn("Transaction appears stuck, attempting cleanup",
"tx_id", txID,
"attempts", tracker.attempts,
"time_since_first_try", now.Sub(tracker.firstTry),
"is_blob", tracker.isBlob)

// Sending a canceling transaction
l.cancelBlockingTx(queue, receiptsCh, tracker.isBlob)
delete(l.pendingTxs, txID)
}
}
}