Skip to content

Commit

Permalink
fix: wrap errors using %w to preserve context (#1321)
Browse files Browse the repository at this point in the history
* fix: wrap errors using %w to preserve context

* move the consumer state check
  • Loading branch information
reugn authored Dec 19, 2024
1 parent 31ba011 commit 024e230
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 55 deletions.
2 changes: 1 addition & 1 deletion oauth2/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func ExtractUserName(token oauth2.Token) (string, error) {
p := jwt.Parser{}
claims := jwt.MapClaims{}
if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil {
return "", fmt.Errorf("unable to decode the access token: %v", err)
return "", fmt.Errorf("unable to decode the access token: %w", err)
}
username, ok := claims[ClaimNameUserName]
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions oauth2/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
// load from the store and use the access token if it isn't expired
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
return nil, fmt.Errorf("LoadGrant: %v", err)
return nil, fmt.Errorf("LoadGrant: %w", err)
}
t.token = grant.Token
if t.token != nil && t.validateAccessToken(*t.token) {
Expand All @@ -90,13 +90,13 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
// obtain and cache a fresh access token
grant, err = t.refresher.Refresh(grant)
if err != nil {
return nil, fmt.Errorf("RefreshGrant: %v", err)
return nil, fmt.Errorf("RefreshGrant: %w", err)
}
t.token = grant.Token
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
return nil, fmt.Errorf("SaveGrant: %v", err)
return nil, fmt.Errorf("SaveGrant: %w", err)
}

return t.token, nil
Expand All @@ -117,14 +117,14 @@ func (t *tokenCache) InvalidateToken() error {
}
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
return fmt.Errorf("LoadGrant: %v", err)
return fmt.Errorf("LoadGrant: %w", err)
}
if grant.Token != nil && grant.Token.AccessToken == previous.AccessToken {
grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
return fmt.Errorf("SaveGrant: %v", err)
return fmt.Errorf("SaveGrant: %w", err)
}
}
return nil
Expand Down
13 changes: 7 additions & 6 deletions oauth2/store/keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package store
import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -92,7 +93,7 @@ func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, e

item, err := f.getItem(audience)
if err != nil {
if err == keyring.ErrKeyNotFound {
if errors.Is(err, keyring.ErrKeyNotFound) {
return nil, ErrNoAuthenticationData
}
return nil, err
Expand All @@ -119,10 +120,10 @@ func (f *KeyringStore) WhoAmI(audience string) (string, error) {
key := hashKeyringKey(audience)
authItem, err := f.kr.Get(key)
if err != nil {
if err == keyring.ErrKeyNotFound {
if errors.Is(err, keyring.ErrKeyNotFound) {
return "", ErrNoAuthenticationData
}
return "", fmt.Errorf("unable to get information from the keyring: %v", err)
return "", fmt.Errorf("unable to get information from the keyring: %w", err)
}
return authItem.Label, nil
}
Expand All @@ -134,13 +135,13 @@ func (f *KeyringStore) Logout() error {
var err error
keys, err := f.kr.Keys()
if err != nil {
return fmt.Errorf("unable to get information from the keyring: %v", err)
return fmt.Errorf("unable to get information from the keyring: %w", err)
}
for _, key := range keys {
err = f.kr.Remove(key)
}
if err != nil {
return fmt.Errorf("unable to update the keyring: %v", err)
return fmt.Errorf("unable to update the keyring: %w", err)
}
return nil
}
Expand Down Expand Up @@ -180,7 +181,7 @@ func (f *KeyringStore) setItem(item storedItem) error {
}
err = f.kr.Set(i)
if err != nil {
return fmt.Errorf("unable to update the keyring: %v", err)
return fmt.Errorf("unable to update the keyring: %w", err)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion oauth2/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// ErrNoAuthenticationData indicates that stored authentication data is not available
var ErrNoAuthenticationData = errors.New("authentication data is not available")

// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
// ErrUnsupportedAuthData indicates that stored authentication data is unusable
var ErrUnsupportedAuthData = errors.New("authentication data is not usable")

// Store is responsible for persisting authorization grants
Expand Down
20 changes: 10 additions & 10 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,21 +1158,21 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
// error decrypting the payload
if err != nil {
// default crypto failure action
crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
if pc.options.decryption != nil {
crypToFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
cryptoFailureAction = pc.options.decryption.ConsumerCryptoFailureAction
}

switch crypToFailureAction {
switch cryptoFailureAction {
case crypto.ConsumerCryptoFailureActionFail:
pc.log.Errorf("consuming message failed due to decryption err :%v", err)
pc.log.Errorf("consuming message failed due to decryption err: %v", err)
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
return err
case crypto.ConsumerCryptoFailureActionDiscard:
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError)
return fmt.Errorf("discarding message on decryption error :%v", err)
return fmt.Errorf("discarding message on decryption error: %w", err)
case crypto.ConsumerCryptoFailureActionConsume:
pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err)
pc.log.Warnf("consuming encrypted message due to error in decryption: %v", err)
messages := []*message{
{
publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
Expand Down Expand Up @@ -1767,16 +1767,16 @@ func (pc *partitionConsumer) runEventsLoop() {
func (pc *partitionConsumer) internalClose(req *closeRequest) {
defer close(req.doneCh)
state := pc.getConsumerState()
if state != consumerReady {
// this might be redundant but to ensure nack tracker is closed
if state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
return
}

if state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if state != consumerReady {
// this might be redundant but to ensure nack tracker is closed
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
Expand Down
14 changes: 7 additions & 7 deletions pulsar/crypto/default_message_crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName string, keyReader KeyR
d.cipherLock.Lock()
defer d.cipherLock.Unlock()
if keyName == "" || keyReader == nil {
return fmt.Errorf("keyname or keyreader is null")
return fmt.Errorf("keyname or keyreader is nil")
}

// read the public key and its info using keyReader
Expand Down Expand Up @@ -212,7 +212,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
payload []byte,
keyReader KeyReader) ([]byte, error) {
// if data key is present, attempt to derypt using the existing key
// if data key is present, attempt to decrypt using the existing key
if d.dataKey != nil {
decryptedData, err := d.getKeyAndDecryptData(msgMetadata, payload)
if err != nil {
Expand Down Expand Up @@ -342,20 +342,20 @@ func (d *DefaultMessageCrypto) loadPrivateKey(key []byte) (gocrypto.PrivateKey,

// read the public key into RSA key
func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey, error) {
var publickKey gocrypto.PublicKey
var publicKey gocrypto.PublicKey

pubPem, _ := pem.Decode(key)
if pubPem == nil {
return publickKey, fmt.Errorf("failed to decode public key")
return publicKey, fmt.Errorf("failed to decode public key")
}

genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
if err != nil {
return publickKey, err
return publicKey, err
}
publickKey = genericPublicKey
publicKey = genericPublicKey

return publickKey, nil
return publicKey, nil
}

func generateDataKey() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func serializeMessage(wb Buffer,
encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
if err != nil {
// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error: %w", err)
}

cmdSize := uint32(proto.Size(cmdSend))
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (c *connection) handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi
resourceID := commandTopicMigrated.GetResourceId()
migratedBrokerServiceURL := c.getMigratedBrokerServiceURL(commandTopicMigrated)
if migratedBrokerServiceURL == "" {
c.log.Warnf("Failed to find the migrated broker url for resource: %s, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
c.log.Warnf("Failed to find the migrated broker url for resource: %d, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
resourceID,
commandTopicMigrated.GetBrokerServiceUrl(),
commandTopicMigrated.GetBrokerServiceUrlTls())
Expand Down
6 changes: 3 additions & 3 deletions pulsar/internal/crypto/producer_encryptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetad
crypto.NewMessageMetadataSupplier(msgMetadata),
payload)

// error encryping the payload
// error encrypting the payload
if err != nil {
// error occurred in encrypting the payload
// crypto ProducerCryptoFailureAction is set to send
// send unencrypted message
// unencrypted message
if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
e.logger.
WithError(err).
Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send")
return payload, nil
}

return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err)
return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload: %w", err)
}
return encryptedPayload, nil
}
4 changes: 2 additions & 2 deletions pulsar/primitiveSerDe.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64,

func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
if len(buf) < 8 {
return 0, fmt.Errorf("cannot decode binary double: %s", io.ErrShortBuffer)
return 0, fmt.Errorf("cannot decode binary double: %w", io.ErrShortBuffer)
}
return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
}

func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
if len(buf) < 4 {
return 0, fmt.Errorf("cannot decode binary float: %s", io.ErrShortBuffer)
return 0, fmt.Errorf("cannot decode binary float: %w", io.ErrShortBuffer)
}
return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
true,
client.log.SubLogger(log.Fields{"topic": p.topic}))
if err != nil {
return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %v", err)
return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %w", err)
}
p.options.Encryption.MessageCrypto = messageCrypto
}
Expand Down
34 changes: 17 additions & 17 deletions pulsar/table_view_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type TableViewImpl struct {
dataMu sync.Mutex
data map[string]interface{}

readersMu sync.Mutex
cancelRaders map[string]cancelReader
readersMu sync.Mutex
cancelReaders map[string]cancelReader

listenersMu sync.Mutex
listeners []func(string, interface{}) error
Expand Down Expand Up @@ -73,12 +73,12 @@ func newTableView(client *client, options TableViewOptions) (TableView, error) {
}

tv := TableViewImpl{
client: client,
options: options,
data: make(map[string]interface{}),
cancelRaders: make(map[string]cancelReader),
logger: logger,
closedCh: make(chan struct{}),
client: client,
options: options,
data: make(map[string]interface{}),
cancelReaders: make(map[string]cancelReader),
logger: logger,
closedCh: make(chan struct{}),
}

// Do an initial round of partition update check to make sure we can populate the partition readers
Expand All @@ -104,16 +104,16 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
tv.readersMu.Lock()
defer tv.readersMu.Unlock()

for partition, cancelReader := range tv.cancelRaders {
for partition, cancelReader := range tv.cancelReaders {
if _, ok := partitions[partition]; !ok {
cancelReader.cancelFunc()
cancelReader.reader.Close()
delete(tv.cancelRaders, partition)
delete(tv.cancelReaders, partition)
}
}

for partition := range partitions {
if _, ok := tv.cancelRaders[partition]; !ok {
if _, ok := tv.cancelReaders[partition]; !ok {
reader, err := newReader(tv.client, ReaderOptions{
Topic: partition,
StartMessageID: EarliestMessageID(),
Expand All @@ -127,14 +127,14 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
tv.logger.Errorf("read next message failed for %s: %w", partition, err)
tv.logger.Errorf("read next message failed for %s: %v", partition, err)
}
if msg != nil {
tv.handleMessage(msg)
}
}
ctx, cancelFunc := context.WithCancel(context.Background())
tv.cancelRaders[partition] = cancelReader{
tv.cancelReaders[partition] = cancelReader{
reader: reader,
cancelFunc: cancelFunc,
}
Expand All @@ -148,7 +148,7 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
for {
if err := tv.partitionUpdateCheck(); err != nil {
tv.logger.Errorf("failed to check for changes in number of partitions: %w", err)
tv.logger.Errorf("failed to check for changes in number of partitions: %v", err)
}
select {
case <-tv.closedCh:
Expand Down Expand Up @@ -236,7 +236,7 @@ func (tv *TableViewImpl) Close() {

if !tv.closed {
tv.closed = true
for _, cancelReader := range tv.cancelRaders {
for _, cancelReader := range tv.cancelReaders {
cancelReader.reader.Close()
}
close(tv.closedCh)
Expand All @@ -259,7 +259,7 @@ func (tv *TableViewImpl) handleMessage(msg Message) {

for _, listener := range tv.listeners {
if err := listener(msg.Key(), reflect.Indirect(payload).Interface()); err != nil {
tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
tv.logger.Errorf("table view listener failed for %v: %v", msg, err)
}
}
}
Expand All @@ -268,7 +268,7 @@ func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader R
for {
msg, err := reader.Next(ctx)
if err != nil {
tv.logger.Errorf("read next message failed for %s: %w", reader.Topic(), err)
tv.logger.Errorf("read next message failed for %s: %v", reader.Topic(), err)
}
var e *Error
if (errors.As(err, &e) && e.Result() == ConsumerClosed) || errors.Is(err, context.Canceled) {
Expand Down

0 comments on commit 024e230

Please sign in to comment.