Skip to content

Commit

Permalink
use error for failure chan with custom error types
Browse files Browse the repository at this point in the history
  • Loading branch information
mjneil committed Jun 11, 2020
1 parent 6688fb4 commit 59f5701
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 51 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func main() {
// Handle failures
go func() {
for r := range failures {
log.Error(r.Err)
log.Error(r)
}
}()

Expand Down Expand Up @@ -95,7 +95,7 @@ func main() {
// Handle failures
go func() {
for r := range failures {
log.Error(r.Err)
log.Error(r)
}
}()

Expand Down Expand Up @@ -179,7 +179,7 @@ func main() {
// Handle failures
go func() {
for r := range failures {
log.Error(r.Err)
log.Error(r)
}
}()

Expand Down
9 changes: 8 additions & 1 deletion aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,14 @@ func (a *Aggregator) Drain() (*AggregatedRecordRequest, error) {
Records: a.aggregateUserRecords(),
})
if err != nil {
return nil, err
drainErr := &DrainError{
Err: err,
UserRecords: a.buf,
}
// Q: Should we clear the aggregator on drain error? Otherwise I would expect Marshal
// to fail indefinitely until the buffer is cleared
a.clear()
return nil, drainErr
}

h := md5.New()
Expand Down
50 changes: 50 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package producer

import (
"errors"
"fmt"
)

// Errors
var (
ErrStoppedProducer = errors.New("Unable to Put record. Producer is already stopped")
ErrIllegalPartitionKey = errors.New("Invalid parition key. Length must be at least 1 and at most 256")
ErrRecordSizeExceeded = errors.New("Data must be less than or equal to 1MB in size")
)

// Failure record type for failures from Kinesis PutRecords request
type FailureRecord struct {
Err error
// The PartitionKey that was used in the kinesis.PutRecordsRequestEntry
PartitionKey string
// The ExplicitHashKey that was used in the kinesis.PutRecordsRequestEntry. Will be the
// empty string if nil
ExplicitHashKey string
// UserRecords that were contained in the failed aggregated record request
UserRecords []UserRecord
}

func (e *FailureRecord) Error() string {
return e.Err.Error()
}

type DrainError struct {
Err error
// UserRecords in the buffer when drain attempt was made
UserRecords []UserRecord
}

func (e *DrainError) Error() string {
return e.Err.Error()
}

type ShardBucketError struct {
UserRecord
}

func (s *ShardBucketError) Error() string {
if hk := s.ExplicitHashKey(); hk != nil {
return fmt.Sprintf("ExplicitHashKey outside shard key range: %s", hk.String())
}
return fmt.Sprintf("PartitionKey outside shard key range: %s", s.PartitionKey())
}
6 changes: 3 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ExampleSimple() {
// Handle failures
go func() {
for r := range failures {
logger.Error("detected put failure", r.Err)
logger.Error("detected put failure", r)
}
}()

Expand Down Expand Up @@ -66,7 +66,7 @@ func ExampleShardMap() {
// Handle failures
go func() {
for r := range failures {
logger.Error("detected put failure", r.Err)
logger.Error("detected put failure", r)
}
}()

Expand Down Expand Up @@ -131,7 +131,7 @@ func ExampleUserRecord() {
// Handle failures
go func() {
for r := range failures {
logger.Error("detected put failure", r.Err)
logger.Error("detected put failure", r)
}
}()

Expand Down
49 changes: 19 additions & 30 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package producer

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -16,21 +15,14 @@ import (
"github.com/jpillora/backoff"
)

// Errors
var (
ErrStoppedProducer = errors.New("Unable to Put record. Producer is already stopped")
ErrIllegalPartitionKey = errors.New("Invalid parition key. Length must be at least 1 and at most 256")
ErrRecordSizeExceeded = errors.New("Data must be less than or equal to 1MB in size")
)

// Producer batches records.
type Producer struct {
sync.RWMutex
*Config
shardMap *ShardMap
semaphore semaphore
records chan *AggregatedRecordRequest
failure chan *FailureRecord
failure chan error
done chan struct{}

// Current state of the Producer
Expand Down Expand Up @@ -96,12 +88,8 @@ func (p *Producer) PutUserRecord(userRecord UserRecord) error {
p.records <- NewAggregatedRecordRequest(userRecord.Data(), &partitionKey, nil, []UserRecord{userRecord})
} else {
drained, err := p.shardMap.Put(userRecord)
switch err.(type) {
case nil:
case *ShardBucketError:
if err != nil {
return err
default:
p.Logger.Error("drain aggregator", err)
}
if drained != nil {
p.records <- drained
Expand All @@ -110,27 +98,15 @@ func (p *Producer) PutUserRecord(userRecord UserRecord) error {
return nil
}

// Failure record type
type FailureRecord struct {
Err error
// The PartitionKey that was used in the kinesis.PutRecordsRequestEntry
PartitionKey string
// The ExplicitHashKey that was used in the kinesis.PutRecordsRequestEntry. Will be the
// empty string if nil
ExplicitHashKey string
// UserRecords that were contained in the failed aggregated record request
UserRecords []UserRecord
}

// NotifyFailures registers and return listener to handle undeliverable messages.
// The incoming struct has a copy of the Data and the PartitionKey along with some
// error information about why the publishing failed.
func (p *Producer) NotifyFailures() <-chan *FailureRecord {
func (p *Producer) NotifyFailures() <-chan error {
p.Lock()
defer p.Unlock()
if !p.notify {
p.notify = true
p.failure = make(chan *FailureRecord, p.BacklogCount)
p.failure = make(chan error, p.BacklogCount)
}
return p.failure
}
Expand Down Expand Up @@ -237,6 +213,12 @@ func (p *Producer) loop() {
records, err := p.shardMap.UpdateShards(p.GetShards)
if err != nil {
p.Logger.Error("UpdateShards error", err)
p.RLock()
notify := p.notify
p.RUnlock()
if notify {
p.failure <- err
}
continue
}
for _, record := range records {
Expand All @@ -253,8 +235,15 @@ func (p *Producer) drainIfNeed() []*AggregatedRecordRequest {
return nil
}
records, errs := p.shardMap.Drain()
for _, err := range errs {
p.Logger.Error("drain aggregator", err)
if len(errs) > 0 {
p.RLock()
notify := p.notify
p.RUnlock()
if notify {
for _, err := range errs {
p.failure <- err
}
}
}
return records
}
Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestNotify(t *testing.T) {
close(done)
return
}
failed += len(failure.UserRecords)
failed += len(failure.(*FailureRecord).UserRecords)
case <-timeout:
return
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func BenchmarkProducer(b *testing.B) {
go func() {
defer close(failuresDone)
for f := range failures {
b.Fatal(f.Err)
b.Fatal(f.Error())
}
}()

Expand Down
12 changes: 0 additions & 12 deletions shard_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package producer

import (
"crypto/md5"
"fmt"
"math/big"
"sort"
"sync"
Expand All @@ -14,17 +13,6 @@ import (
// Hash key ranges are 0 indexed, so true max is 2^128 - 1
const maxHashKeyRange = "340282366920938463463374607431768211455"

type ShardBucketError struct {
UserRecord
}

func (s *ShardBucketError) Error() string {
if hk := s.ExplicitHashKey(); hk != nil {
return fmt.Sprintf("ExplicitHashKey outside shard key range: %s", hk.String())
}
return fmt.Sprintf("PartitionKey outside shard key range: %s", s.PartitionKey())
}

// ShardLister is the interface that wraps the KinesisAPI.ListShards method.
type ShardLister interface {
ListShards(input *k.ListShardsInput) (*k.ListShardsOutput, error)
Expand Down

0 comments on commit 59f5701

Please sign in to comment.