diff --git a/CHANGELOG.md b/CHANGELOG.md index b395556..66a55e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * [#82](https://github.com/allora-network/allora-offchain-node/pull/82) Adjust adapter log levels * [#83](https://github.com/allora-network/allora-offchain-node/pull/83) Added missing params to .env example +* [#88](https://github.com/allora-network/allora-offchain-node/pull/88) New topic case + handle window-related errorcodes ### Security diff --git a/lib/repo_tx_utils.go b/lib/repo_tx_utils.go index ecb44e6..114f36d 100644 --- a/lib/repo_tx_utils.go +++ b/lib/repo_tx_utils.go @@ -13,17 +13,18 @@ import ( "github.com/rs/zerolog/log" errorsmod "cosmossdk.io/errors" + emissions "github.com/allora-network/allora-chain/x/emissions/types" sdktypes "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/ignite/cli/v28/ignite/pkg/cosmosclient" ) +const ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER = "error code:" const ERROR_MESSAGE_DATA_ALREADY_SUBMITTED = "already submitted" const ERROR_MESSAGE_CANNOT_UPDATE_EMA = "cannot update EMA" const ERROR_MESSAGE_WAITING_FOR_NEXT_BLOCK = "waiting for next block" // This means tx is accepted in mempool but not yet included in a block const ERROR_MESSAGE_ACCOUNT_SEQUENCE_MISMATCH = "account sequence mismatch" const ERROR_MESSAGE_TIMEOUT_HEIGHT = "timeout height" -const ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER = "error code:" const ERROR_MESSAGE_NOT_PERMITTED_TO_SUBMIT_PAYLOAD = "not permitted to submit payload" const ERROR_MESSAGE_NOT_PERMITTED_TO_ADD_STAKE = "not permitted to add stake" @@ -84,6 +85,22 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig) return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "invalid chain-id") case int(sdkerrors.ErrTxTimeoutHeight.ABCICode()): return ERROR_PROCESSING_FAILURE, errorsmod.Wrapf(err, "tx timeout height") + case int(emissions.ErrWorkerNonceWindowNotAvailable.ABCICode()): + log.Warn(). + Err(err). + Str("msg", infoMsg). + Msg("Worker window not available, retrying with exponential backoff") + delay := calculateExponentialBackoffDelay(node.Wallet.RetryDelay, retryCount) + time.Sleep(delay) + return ERROR_PROCESSING_CONTINUE, nil + case int(emissions.ErrReputerNonceWindowNotAvailable.ABCICode()): + log.Warn(). + Err(err). + Str("msg", infoMsg). + Msg("Reputer window not available, retrying with exponential backoff") + delay := calculateExponentialBackoffDelay(node.Wallet.RetryDelay, retryCount) + time.Sleep(delay) + return ERROR_PROCESSING_CONTINUE, nil default: log.Info().Int("errorCode", errorCode).Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry") } diff --git a/usecase/spawn_actor_processes.go b/usecase/spawn_actor_processes.go index 85dad90..68231d1 100644 --- a/usecase/spawn_actor_processes.go +++ b/usecase/spawn_actor_processes.go @@ -22,6 +22,9 @@ const NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS int64 = 2 // Waiting times under nearness circumstances are adjusted by this factor const NEARNESS_CORRECTION_FACTOR float64 = 1.0 +// Correction factor used when calculating time distances for new topics +const NEW_TOPIC_CORRECTION_FACTOR float64 = 0.5 + // Minimum wait time between status checks const WAIT_TIME_STATUS_CHECKS int64 = 2 @@ -318,9 +321,47 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Int64("EpochLength", epochLength). Msg("Info from topic") + // Special case: new topic + if topicInfo.EpochLastEnded == 0 { + log.Debug().Msg("New topic, processing payload") + // timeoutHeight is one epoch length away + timeoutHeight := currentBlockHeight + epochLength + + latestNonceHeightSentTxFor, err = params.ProcessPayload(params.Config, latestNonceHeightSentTxFor, uint64(timeoutHeight)) + if err != nil { + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Msg("Error processing payload - could not complete transaction") + } + // Wait for an epochLength with a correction factor, it will self-adjust from there + waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( + epochLength, + suite.Node.Wallet.BlockDurationEstimated, + NEW_TOPIC_CORRECTION_FACTOR, + ) + if err != nil { + log.Error(). + Err(err). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Int64("waitingTimeInSeconds", waitingTimeInSeconds). + Msg("Error calculating time distance to next epoch after sending tx - wait epochLength") + return + } + suite.Wait(waitingTimeInSeconds) + continue + } + epochLastEnded := topicInfo.EpochLastEnded epochEnd := epochLastEnded + epochLength timeoutHeight := epochLastEnded + params.SubmissionWindowLength + log.Trace(). + Int64("epochLastEnded", epochLastEnded). + Int64("epochEnd", epochEnd). + Int64("timeoutHeight", timeoutHeight). + Msg("Epoch info") // Check if block is within the submission window if currentBlockHeight-epochLastEnded <= params.SubmissionWindowLength { @@ -335,6 +376,16 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP } distanceUntilNextEpoch := epochEnd - currentBlockHeight + if distanceUntilNextEpoch < 0 { + log.Warn(). + Uint64("topicId", uint64(params.Config.GetTopicId())). + Str("actorType", params.ActorType). + Int64("distanceUntilNextEpoch", distanceUntilNextEpoch). + Int64("submissionWindowLength", params.SubmissionWindowLength). + Msg("Distance until next epoch is less than 0, setting to submissionWindowLength") + distanceUntilNextEpoch = params.SubmissionWindowLength + } + waitingTimeInSeconds, err := calculateTimeDistanceInSeconds( distanceUntilNextEpoch, suite.Node.Wallet.BlockDurationEstimated, @@ -376,7 +427,9 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP Uint64("topicId", uint64(params.Config.GetTopicId())). Str("actorType", params.ActorType). Int64("waitingTimeInSeconds", waitingTimeInSeconds). - Msg("Current block height is greater than next epoch length, inactive topic? Waiting seconds...") + Int64("currentBlockHeight", currentBlockHeight). + Int64("epochEnd", epochEnd). + Msg("Current block height is greater than next epoch length, is topic inactive? Waiting seconds...") suite.Wait(waitingTimeInSeconds) } else { distanceUntilNextEpoch := epochEnd - currentBlockHeight