Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New topic case + handle window-related errorcodes #88

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [#82](https://github.com/allora-network/allora-offchain-node/pull/82) Adjust adapter log levels
* [#83](https://github.com/allora-network/allora-offchain-node/pull/83) Added missing params to .env example
* [#88](https://github.com/allora-network/allora-offchain-node/pull/88) New topic case + handle window-related errorcodes

### Security

Expand Down
19 changes: 18 additions & 1 deletion lib/repo_tx_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (
"github.com/rs/zerolog/log"

errorsmod "cosmossdk.io/errors"
emissions "github.com/allora-network/allora-chain/x/emissions/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/ignite/cli/v28/ignite/pkg/cosmosclient"
)

const ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER = "error code:"
const ERROR_MESSAGE_DATA_ALREADY_SUBMITTED = "already submitted"
const ERROR_MESSAGE_CANNOT_UPDATE_EMA = "cannot update EMA"
const ERROR_MESSAGE_WAITING_FOR_NEXT_BLOCK = "waiting for next block" // This means tx is accepted in mempool but not yet included in a block
const ERROR_MESSAGE_ACCOUNT_SEQUENCE_MISMATCH = "account sequence mismatch"
const ERROR_MESSAGE_TIMEOUT_HEIGHT = "timeout height"
const ERROR_MESSAGE_ABCI_ERROR_CODE_MARKER = "error code:"
const ERROR_MESSAGE_NOT_PERMITTED_TO_SUBMIT_PAYLOAD = "not permitted to submit payload"
const ERROR_MESSAGE_NOT_PERMITTED_TO_ADD_STAKE = "not permitted to add stake"

Expand Down Expand Up @@ -84,6 +85,22 @@ func processError(err error, infoMsg string, retryCount int64, node *NodeConfig)
return ERROR_PROCESSING_ERROR, errorsmod.Wrapf(err, "invalid chain-id")
case int(sdkerrors.ErrTxTimeoutHeight.ABCICode()):
return ERROR_PROCESSING_FAILURE, errorsmod.Wrapf(err, "tx timeout height")
case int(emissions.ErrWorkerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Worker window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelay(node.Wallet.RetryDelay, retryCount)
time.Sleep(delay)
return ERROR_PROCESSING_CONTINUE, nil
case int(emissions.ErrReputerNonceWindowNotAvailable.ABCICode()):
log.Warn().
Err(err).
Str("msg", infoMsg).
Msg("Reputer window not available, retrying with exponential backoff")
delay := calculateExponentialBackoffDelay(node.Wallet.RetryDelay, retryCount)
time.Sleep(delay)
return ERROR_PROCESSING_CONTINUE, nil
default:
log.Info().Int("errorCode", errorCode).Str("msg", infoMsg).Msg("ABCI error, but not special case - regular retry")
}
Expand Down
55 changes: 54 additions & 1 deletion usecase/spawn_actor_processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const NUM_SUBMISSION_WINDOWS_FOR_SUBMISSION_NEARNESS int64 = 2
// Waiting times under nearness circumstances are adjusted by this factor
const NEARNESS_CORRECTION_FACTOR float64 = 1.0

// Correction factor used when calculating time distances for new topics
const NEW_TOPIC_CORRECTION_FACTOR float64 = 0.5

// Minimum wait time between status checks
const WAIT_TIME_STATUS_CHECKS int64 = 2

Expand Down Expand Up @@ -318,9 +321,47 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP
Int64("EpochLength", epochLength).
Msg("Info from topic")

// Special case: new topic
if topicInfo.EpochLastEnded == 0 {
log.Debug().Msg("New topic, processing payload")
// timeoutHeight is one epoch length away
timeoutHeight := currentBlockHeight + epochLength
kpeluso marked this conversation as resolved.
Show resolved Hide resolved

latestNonceHeightSentTxFor, err = params.ProcessPayload(params.Config, latestNonceHeightSentTxFor, uint64(timeoutHeight))
if err != nil {
log.Error().
Err(err).
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Msg("Error processing payload - could not complete transaction")
}
// Wait for an epochLength with a correction factor, it will self-adjust from there
waitingTimeInSeconds, err := calculateTimeDistanceInSeconds(
epochLength,
suite.Node.Wallet.BlockDurationEstimated,
NEW_TOPIC_CORRECTION_FACTOR,
)
if err != nil {
log.Error().
Err(err).
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Int64("waitingTimeInSeconds", waitingTimeInSeconds).
Msg("Error calculating time distance to next epoch after sending tx - wait epochLength")
return
}
suite.Wait(waitingTimeInSeconds)
continue
}

epochLastEnded := topicInfo.EpochLastEnded
epochEnd := epochLastEnded + epochLength
timeoutHeight := epochLastEnded + params.SubmissionWindowLength
log.Trace().
Int64("epochLastEnded", epochLastEnded).
Int64("epochEnd", epochEnd).
Int64("timeoutHeight", timeoutHeight).
Msg("Epoch info")

// Check if block is within the submission window
if currentBlockHeight-epochLastEnded <= params.SubmissionWindowLength {
Expand All @@ -335,6 +376,16 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP
}

distanceUntilNextEpoch := epochEnd - currentBlockHeight
if distanceUntilNextEpoch < 0 {
log.Warn().
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Int64("distanceUntilNextEpoch", distanceUntilNextEpoch).
Int64("submissionWindowLength", params.SubmissionWindowLength).
Msg("Distance until next epoch is less than 0, setting to submissionWindowLength")
distanceUntilNextEpoch = params.SubmissionWindowLength
}

waitingTimeInSeconds, err := calculateTimeDistanceInSeconds(
distanceUntilNextEpoch,
suite.Node.Wallet.BlockDurationEstimated,
Expand Down Expand Up @@ -376,7 +427,9 @@ func runActorProcess[T lib.TopicActor](suite *UseCaseSuite, params ActorProcessP
Uint64("topicId", uint64(params.Config.GetTopicId())).
Str("actorType", params.ActorType).
Int64("waitingTimeInSeconds", waitingTimeInSeconds).
Msg("Current block height is greater than next epoch length, inactive topic? Waiting seconds...")
Int64("currentBlockHeight", currentBlockHeight).
Int64("epochEnd", epochEnd).
Msg("Current block height is greater than next epoch length, is topic inactive? Waiting seconds...")
suite.Wait(waitingTimeInSeconds)
} else {
distanceUntilNextEpoch := epochEnd - currentBlockHeight
Expand Down