Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexers: update indexer error types. #2770

Merged
merged 1 commit into from
Dec 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions blockchain/indexers/addrindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package indexers

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -77,11 +76,6 @@ var (
// addrIndexKey is the key of the address index and the db bucket used
// to house it.
addrIndexKey = []byte("txbyaddridx")

// errUnsupportedAddressType is an error that is used to signal an
// unsupported address type has been used.
errUnsupportedAddressType = errors.New("address type is not supported " +
"by the address index")
)

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -564,8 +558,8 @@ func addrToKey(addr stdaddr.Address) ([addrKeySize]byte, error) {
copy(result[1:], addr.Hash160()[:])
return result, nil
}

return [addrKeySize]byte{}, errUnsupportedAddressType
return [addrKeySize]byte{}, indexerError(ErrUnsupportedAddressType,
"address type is not supported by the address index")
}

// AddrIndex implements a transaction by address index. That is to say, it
Expand Down Expand Up @@ -630,7 +624,7 @@ func (idx *AddrIndex) NeedsInputs() bool {
// This is part of the Indexer interface.
func (idx *AddrIndex) Init(ctx context.Context, chainParams *chaincfg.Params) error {
if interruptRequested(ctx) {
return errInterruptRequested
return indexerError(ErrInterruptRequested, interruptMsg)
}

// Finish any drops that were previously interrupted.
Expand Down Expand Up @@ -1150,7 +1144,8 @@ func NewAddrIndex(subscriber *IndexSubscriber, db database.DB, chain ChainQuerye

consumer, ok := sc.(*SpendConsumer)
if !ok {
return nil, errors.New("consumer not of type SpendConsumer")
return nil, indexerError(ErrInvalidSpendConsumerType,
"consumer not of type SpendConsumer")
}

idx.consumer = consumer
Expand Down Expand Up @@ -1193,7 +1188,9 @@ func (idx *AddrIndex) ProcessNotification(dbTx database.Tx, ntfn *IndexNtfn) err
err := idx.connectBlock(dbTx, ntfn.Block, ntfn.Parent,
ntfn.PrevScripts, ntfn.IsTreasuryEnabled)
if err != nil {
return fmt.Errorf("%s: unable to connect block: %v", idx.Name(), err)
msg := fmt.Sprintf("%s: unable to connect block: %v",
idx.Name(), err)
return indexerError(ErrConnectBlock, msg)
}

idx.consumer.UpdateTip(ntfn.Block.Hash())
Expand All @@ -1202,23 +1199,28 @@ func (idx *AddrIndex) ProcessNotification(dbTx database.Tx, ntfn *IndexNtfn) err
err := idx.disconnectBlock(dbTx, ntfn.Block, ntfn.Parent,
ntfn.PrevScripts, ntfn.IsTreasuryEnabled)
if err != nil {
log.Errorf("%s: unable to disconnect block: %v", idx.Name(), err)
msg := fmt.Sprintf("%s: unable to disconnect block: %v",
idx.Name(), err)
return indexerError(ErrDisconnectBlock, msg)
}

// Remove the associated spend consumer dependency for the disconnected
// block.
err = idx.Queryer().RemoveSpendConsumerDependency(dbTx, ntfn.Block.Hash(),
idx.consumer.id)
if err != nil {
log.Errorf("%s: unable to remove spend consumer dependency "+
"for block %s: %v", idx.Name(), ntfn.Block.Hash(), err)
msg := fmt.Sprintf("%s: unable to remove spend consumer "+
"dependency for block %s: %v", idx.Name(),
ntfn.Block.Hash(), err)
return indexerError(ErrDisconnectBlock, msg)
}
Comment on lines 1201 to 1216
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if intended, but this changes the behavior to return rather than just log errors and the update tip below is no longer always called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's intended, if disconnecting a block fails for the index it'd be inaccurate to update its tip. Also to keep things synchronized, if removing a spend consumer dependency generates an error for an index it should return.


idx.consumer.UpdateTip(ntfn.Parent.Hash())

default:
return fmt.Errorf("%s: unknown notification type provided: %d",
msg := fmt.Sprintf("%s: unknown notification type provided: %d",
idx.Name(), ntfn.NtfnType)
return indexerError(ErrInvalidNotificationType, msg)
}

return nil
Expand Down
36 changes: 20 additions & 16 deletions blockchain/indexers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ var (
// fields for storage in the database.
byteOrder = binary.LittleEndian

// errInterruptRequested indicates that an operation was cancelled due
// to a user-requested interrupt.
errInterruptRequested = errors.New("interrupt requested")

// indexTipsBucketName is the name of the db bucket used to house the
// current tip of each index.
indexTipsBucketName = []byte("idxtips")

// interruptMsg is the error message for interrupt requested errors.
interruptMsg = "interrupt requested"
)

// NeedsInputser provides a generic interface for an indexer to specify the it
Expand Down Expand Up @@ -310,7 +309,7 @@ func incrementalFlatDrop(ctx context.Context, db database.DB, idxKey []byte, idx
}

if interruptRequested(ctx) {
return errInterruptRequested
return indexerError(ErrInterruptRequested, interruptMsg)
}
}
return nil
Expand Down Expand Up @@ -489,7 +488,7 @@ func recover(ctx context.Context, idx Indexer) error {
var cachedBlock *dcrutil.Block
for !queryer.MainChainHasBlock(hash) {
if interruptRequested(ctx) {
return errInterruptRequested
return indexerError(ErrInterruptRequested, interruptMsg)
}

// Get the block, unless it's already cached.
Expand All @@ -513,7 +512,7 @@ func recover(ctx context.Context, idx Indexer) error {
var prevScripts PrevScripter
err = idx.DB().Update(func(dbTx database.Tx) error {
if interruptRequested(ctx) {
return errInterruptRequested
return indexerError(ErrInterruptRequested, interruptMsg)
}

// Fetch the associated script information for previous outputs
Expand Down Expand Up @@ -597,7 +596,7 @@ func finishDrop(ctx context.Context, indexer Indexer) error {
}

if interruptRequested(ctx) {
return errInterruptRequested
return indexerError(ErrInterruptRequested, interruptMsg)
}

log.Infof("Resuming %s drop", indexer.Name())
Expand Down Expand Up @@ -679,7 +678,7 @@ func maybeNotifySubscribers(ctx context.Context, indexer Indexer) error {
}

if interruptRequested(ctx) {
return errInterruptRequested
return indexerError(ErrInterruptRequested, interruptMsg)
}

bestHeight, bestHash := indexer.Queryer().Best()
Expand All @@ -703,13 +702,14 @@ func maybeNotifySubscribers(ctx context.Context, indexer Indexer) error {
// the provided index if there is one set.
func notifyDependent(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) error {
if interruptRequested(ctx) {
return errInterruptRequested
return indexerError(ErrInterruptRequested, interruptMsg)
}

sub := indexer.IndexSubscription()
if sub == nil {
return fmt.Errorf("%s: no index update subscription found",
msg := fmt.Sprintf("%s: no index update subscription found",
indexer.Name())
return indexerError(ErrFetchSubscription, msg)
}

// Notify the dependent subscription if set.
Expand All @@ -730,8 +730,9 @@ func notifyDependent(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) erro
func updateIndex(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) error {
tip, _, err := indexer.Tip()
if err != nil {
return fmt.Errorf("%s: unable to fetch index tip: %v",
msg := fmt.Sprintf("%s: unable to fetch index tip: %v",
indexer.Name(), err)
return indexerError(ErrFetchTip, msg)
}

var expectedHeight int64
Expand All @@ -741,8 +742,9 @@ func updateIndex(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) error {
case DisconnectNtfn:
expectedHeight = tip
default:
return fmt.Errorf("%s: unknown notification type received: %v",
msg := fmt.Sprintf("%s: unknown notification type received: %v",
indexer.Name(), ntfn.NtfnType)
return indexerError(ErrInvalidNotificationType, msg)
}

switch {
Expand All @@ -757,9 +759,10 @@ func updateIndex(ctx context.Context, indexer Indexer, ntfn *IndexNtfn) error {
case ntfn.Block.Height() > expectedHeight:
// Receiving a notification with a height higher than the expected
// implies a missed index update.
return fmt.Errorf("%s: missing index notification, expected "+
msg := fmt.Sprintf("%s: missing index notification, expected "+
"notification for height %d, got %d", indexer.Name(),
expectedHeight, ntfn.Block.Height())
return indexerError(ErrMissingNotification, msg)

default:
err = indexer.DB().Update(func(dbTx database.Tx) error {
Expand Down Expand Up @@ -791,8 +794,9 @@ func AddIndexSpendConsumers(db database.DB, chain ChainQueryer) error {
if err != nil {
if !errors.Is(err, database.ErrValueNotFound) &&
!errors.Is(err, database.ErrBucketNotFound) {
return fmt.Errorf("unable to fetch index tip for "+
"address index %w", err)
msg := fmt.Sprintf("unable to fetch index tip for "+
"address index %s", err)
return indexerError(ErrFetchTip, msg)
}
}

Expand Down
76 changes: 76 additions & 0 deletions blockchain/indexers/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2021 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package indexers

// ErrorKind identifies a kind of error. It has full support for errors.Is and
// errors.As, so the caller can directly check against an error kind when
// determining the reason for an error.
type ErrorKind string

// These constants are used to identify a specific IndexerError.
const (
// ErrUnsupportedAddressType indicates an unsupported address type.
ErrUnsupportedAddressType = ErrorKind("ErrUnsupportedAddressType")

// ErrInvalidSpendConsumerType indicates an invalid spend consumer type.
ErrInvalidSpendConsumerType = ErrorKind("ErrInvalidSpendConsumerType")

// ErrConnectBlock indicates an error indexing a connected block.
ErrConnectBlock = ErrorKind("ErrConnectBlock")

// ErrDisconnectBlock indicates an error indexing a disconnected block.
ErrDisconnectBlock = ErrorKind("ErrDisconnectBlock")

// ErrRemoveSpendDependency indicates a spend dependency removal error.
ErrRemoveSpendDependency = ErrorKind("ErrRemoveSpendDependency")

// ErrInvalidNotificationType indicates an invalid indexer notification type.
ErrInvalidNotificationType = ErrorKind("ErrInvalidNotificationType")

// ErrInterruptRequested indicates an operation was cancelled due to
// a user-requested interrupt.
ErrInterruptRequested = ErrorKind("ErrInterruptRequested")

// ErrFetchSubscription indicates an error fetching an index subscription.
ErrFetchSubscription = ErrorKind("ErrFetchSubscription")

// ErrFetchTip indicates an error fetching an index tip.
ErrFetchTip = ErrorKind("ErrFetchTip")

// ErrMissingNotification indicates a missing index notification.
ErrMissingNotification = ErrorKind("ErrMissingNotification")

// ErrBlockNotOnMainChain indicates the provided block is not on the
// main chain.
ErrBlockNotOnMainChain = ErrorKind("ErrBlockNotOnMainChain")
)

// Error satisfies the error interface and prints human-readable errors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also check with var _ error = ErrorKind("") but I guess it will never change with just the one method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deferring this for later if we decide it is better to be explicit about implementing the error interface throughout the codebase.

func (e ErrorKind) Error() string {
return string(e)
}

// IndexerError identifies an indexer error. It has full support for
// errors.Is and errors.As, so the caller can ascertain the specific reason
// for the error by checking the underlying error.
type IndexerError struct {
Description string
Err error
}

// Error satisfies the error interface and prints human-readable errors.
func (e IndexerError) Error() string {
return e.Description
}

// Unwrap returns the underlying wrapped error.
func (e IndexerError) Unwrap() error {
return e.Err
}

// indexerError creates a IndexerError given a set of arguments.
func indexerError(kind ErrorKind, desc string) IndexerError {
return IndexerError{Err: kind, Description: desc}
}
Loading