Skip to content

Commit

Permalink
New topic case + handle window-related errorcodes (#88)
Browse files Browse the repository at this point in the history
## Purpose of Changes and their Description

* Better handling of the new-topic case
* Handles out-of-window reputer and worker ABCI code with exp retry

## Are these changes tested and documented?

- [x] If tested, please describe how. If not, why tests are not needed.
-- tested locally against v0.7.0 rc chain
- [x] If documented, please describe where. If not, describe why docs
are not needed. -- no need, just improved internal mechanism
- [x] Added to `Unreleased` section of `CHANGELOG.md`?
  • Loading branch information
kpeluso authored Nov 28, 2024
2 parents 71c42fd + f33366e commit 34c0c5f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [#82](https://github.com/allora-network/allora-offchain-node/pull/82) Adjust adapter log levels
* [#83](https://github.com/allora-network/allora-offchain-node/pull/83) Added missing params to .env example
* [#88](https://github.com/allora-network/allora-offchain-node/pull/88) New topic case + handle window-related errorcodes

### Security

Expand Down
19 changes: 18 additions & 1 deletion lib/repo_tx_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (
"github.com/rs/zerolog/log"

errorsmod "cosmossdk.io/errors"
emissions "github.com/allora-network/allora-chain/x/emissions/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/ignite/cli/v28/ignite/pkg/cosmosclient"
)

const ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER = "error code:"
const ERROR_MESSAGE_DATA_ALREADY_SUBMITTED = "already submitted"
const ERROR_MESSAGE_CANNOT_UPDATE_EMA = "cannot update EMA"
const ERROR_MESSAGE_WAITING_FOR_NEXT_BLOCK = "waiting for next block" // This means tx is accepted in mempool but not yet included in a block
const ERROR_MESSAGE_ACCOUNT_SEQUENCE_MISMATCH = "account sequence mismatch"
const ERROR_MESSAGE_TIMEOUT_HEIGHT = "timeout height"
const ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER = "error code:"
const ERROR_MESSAGE_NOT_PERMITTED_TO_SUBMIT_PAYLOAD = "not permitted to submit payload"
const ERROR_MESSAGE_NOT_PERMITTED_TO_ADD_STAKE = "not permitted to add stake"

Expand Down Expand Up @@ -84,6 +85,22 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig)
return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "invalid chain-id")
case int(sdkerrors.ErrTxTimeoutHeight.ABCICode()):
return ERROR_PROCESSING_FAILURE, errorsmod.Wrapf(err, "tx timeout height")
case int(emissions.ErrWorkerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Worker window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelay(node.Wallet.RetryDelay, retryCount)
time.Sleep(delay)
return ERROR_PROCESSING_CONTINUE, nil
case int(emissions.ErrReputerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Reputer window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelay(node.Wallet.RetryDelay, retryCount)
time.Sleep(delay)
return ERROR_PROCESSING_CONTINUE, nil
default:
log.Info().Int("errorCode", errorCode).Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry")
}
Expand Down
55 changes: 54 additions & 1 deletion usecase/spawn_actor_processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS int64 = 2
// Waiting times under nearness circumstances are adjusted by this factor
const NEARNESS_CORRECTION_FACTOR float64 = 1.0

// Correction factor used when calculating time distances for new topics
const NEW_TOPIC_CORRECTION_FACTOR float64 = 0.5

// Minimum wait time between status checks
const WAIT_TIME_STATUS_CHECKS int64 = 2

Expand Down Expand Up @@ -318,9 +321,47 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP
Int64("EpochLength", epochLength).
Msg("Info from topic")

// Special case: new topic
if topicInfo.EpochLastEnded == 0 {
log.Debug().Msg("New topic, processing payload")
// timeoutHeight is one epoch length away
timeoutHeight := currentBlockHeight + epochLength

latestNonceHeightSentTxFor, err = params.ProcessPayload(params.Config, latestNonceHeightSentTxFor, uint64(timeoutHeight))
if err != nil {
log.Error().
Err(err).
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Msg("Error processing payload - could not complete transaction")
}
// Wait for an epochLength with a correction factor, it will self-adjust from there
waitingTimeInSeconds, err := calculateTimeDistanceInSeconds(
epochLength,
suite.Node.Wallet.BlockDurationEstimated,
NEW_TOPIC_CORRECTION_FACTOR,
)
if err != nil {
log.Error().
Err(err).
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Int64("waitingTimeInSeconds", waitingTimeInSeconds).
Msg("Error calculating time distance to next epoch after sending tx - wait epochLength")
return
}
suite.Wait(waitingTimeInSeconds)
continue
}

epochLastEnded := topicInfo.EpochLastEnded
epochEnd := epochLastEnded + epochLength
timeoutHeight := epochLastEnded + params.SubmissionWindowLength
log.Trace().
Int64("epochLastEnded", epochLastEnded).
Int64("epochEnd", epochEnd).
Int64("timeoutHeight", timeoutHeight).
Msg("Epoch info")

// Check if block is within the submission window
if currentBlockHeight-epochLastEnded <= params.SubmissionWindowLength {
Expand All @@ -335,6 +376,16 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP
}

distanceUntilNextEpoch := epochEnd - currentBlockHeight
if distanceUntilNextEpoch < 0 {
log.Warn().
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Int64("distanceUntilNextEpoch", distanceUntilNextEpoch).
Int64("submissionWindowLength", params.SubmissionWindowLength).
Msg("Distance until next epoch is less than 0, setting to submissionWindowLength")
distanceUntilNextEpoch = params.SubmissionWindowLength
}

waitingTimeInSeconds, err := calculateTimeDistanceInSeconds(
distanceUntilNextEpoch,
suite.Node.Wallet.BlockDurationEstimated,
Expand Down Expand Up @@ -376,7 +427,9 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Int64("waitingTimeInSeconds", waitingTimeInSeconds).
Msg("Current block height is greater than next epoch length, inactive topic? Waiting seconds...")
Int64("currentBlockHeight", currentBlockHeight).
Int64("epochEnd", epochEnd).
Msg("Current block height is greater than next epoch length, is topic inactive? Waiting seconds...")
suite.Wait(waitingTimeInSeconds)
} else {
distanceUntilNextEpoch := epochEnd - currentBlockHeight
Expand Down

0 comments on commit 34c0c5f

Please sign in to comment.