diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 53e987d30820..e7cd2856bfb5 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -117,6 +117,10 @@ type BatchSubmitter struct { channelMgrMutex sync.Mutex // guards channelMgr and prevCurrentL1 channelMgr *channelManager prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus + + // Tracking attempts to send transactions + pendingTxs map[string]*txTracker + txTrackerMux sync.RWMutex } // NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup @@ -126,8 +130,9 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { state.SetChannelOutFactory(setup.ChannelOutFactory) } return &BatchSubmitter{ - DriverSetup: setup, - channelMgr: state, + DriverSetup: setup, + channelMgr: state, + pendingTxs: make(map[string]*txTracker), } } @@ -413,8 +418,8 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc if syncActions.clearState != nil { l.channelMgr.Clear(*syncActions.clearState) } else { - l.channelMgr.PruneSafeBlocks(syncActions.blocksToPrune) - l.channelMgr.PruneChannels(syncActions.channelsToPrune) + l.channelMgr.pruneSafeBlocks(syncActions.blocksToPrune) + l.channelMgr.pruneChannels(syncActions.channelsToPrune) } return syncActions.blocksToLoad } @@ -448,14 +453,19 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() + cleanupTicker := time.NewTicker(30 * time.Second) + defer cleanupTicker.Stop() + for { select { case <-ticker.C: - if !l.checkTxpool(queue, receiptsCh) { continue } + // Adding periodic checking for stuck transactions + l.checkAndCleanStuckTransactions(ctx, queue, receiptsCh) + syncStatus, err := l.getSyncStatus(l.shutdownCtx) if err != nil { l.Log.Warn("could not get sync status", "err", err) @@ -475,6 +485,17 @@ func (l *BatchSubmitter) mainLoop(ctx context.Context, receiptsCh chan txmgr.TxR l.publishStateToL1(queue, receiptsCh, daGroup, l.Config.PollInterval) + case <-cleanupTicker.C: + // Periodic cleanup of old records + l.txTrackerMux.Lock() + now := time.Now() + for txID, tracker := range l.pendingTxs { + if now.Sub(tracker.lastAttempt) > txTimeout*2 { + delete(l.pendingTxs, txID) + } + } + l.txTrackerMux.Unlock() + case <-ctx.Done(): if err := queue.Wait(); err != nil { l.Log.Error("error waiting for transactions to complete", "err", err) @@ -855,6 +876,22 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T candidate.GasLimit = intrinsicGas } + txID := txdata.ID().String() + + l.txTrackerMux.Lock() + if tracker, exists := l.pendingTxs[txID]; exists { + tracker.attempts++ + tracker.lastAttempt = time.Now() + } else { + l.pendingTxs[txID] = &txTracker{ + attempts: 1, + firstTry: time.Now(), + lastAttempt: time.Now(), + isBlob: txdata.asBlob, + } + } + l.txTrackerMux.Unlock() + queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh) } @@ -892,8 +929,6 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) { } func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) { - l.channelMgrMutex.Lock() - defer l.channelMgrMutex.Unlock() if err != nil { l.Log.Warn("DA request failed", logFields(id, err)...) } @@ -959,3 +994,39 @@ func logFields(xs ...any) (fs []any) { } return fs } + +// Adding a new structure for tracking transaction states +type txTracker struct { + attempts uint32 + firstTry time.Time + lastAttempt time.Time + isBlob bool +} + +// Adding new constants +const ( + maxTxAttempts = 3 + txTimeout = 5 * time.Minute +) + +// Enhanced function for checking and clearing stuck transactions +func (l *BatchSubmitter) checkAndCleanStuckTransactions(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { + l.txTrackerMux.Lock() + defer l.txTrackerMux.Unlock() + + now := time.Now() + for txID, tracker := range l.pendingTxs { + // Checking the number of attempts and the waiting time + if tracker.attempts >= maxTxAttempts || now.Sub(tracker.firstTry) > txTimeout { + l.Log.Warn("Transaction appears stuck, attempting cleanup", + "tx_id", txID, + "attempts", tracker.attempts, + "time_since_first_try", now.Sub(tracker.firstTry), + "is_blob", tracker.isBlob) + + // Sending a canceling transaction + l.cancelBlockingTx(queue, receiptsCh, tracker.isBlob) + delete(l.pendingTxs, txID) + } + } +}