Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard map #1

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open

Shard map #1

wants to merge 21 commits into from

Conversation

mjneil
Copy link
Owner

@mjneil mjneil commented Jun 10, 2020

This PR introduces 2 major changes.

First is the UserRecord interface. This allows more control over error handling. See README changes for more information. This addresses a8m#11

Second is the ShardMap. This is a map of Kinesis shards used for aggregation. Records put into the producer are assigned a shard using md5 hash of the partition key or a provided explicit hash key. See README changes for more information. This should address a8m#17 and a8m#27 and a8m#1

I tried to make these changes backwards compatible, but to support the UserRecord interface, I introduced breaking changes with the FailureRecord struct. I also felt that error handling throughout the producer was not ideal, so I've changed the failure chan to be a generic chan error and created custom error types for specific errors (such as FailureRecord and DrainError) any error that was previously just logged and not returned to users now gets sent over the failure channel. Users of the library can have more control over these errors, using a type switch to detect the different errors. I've also changed the structure ofFailureRecord from per user record, to one per kinesis.PutRecordsRequestEntry and the FailureRecord will contain a slice of all the aggregated UserRecords that were part of the failed request entry.

@a8m and @jawang35 (since you did recent work on the Surfline fork) I'm pinging you both for reviews but making the pull request to my fork. If there is interest in getting this work upstream we can do that.

I did the following to run benchmarks on my machine and these were the results. The b.N count should roughly be the records/s throughput

$ go test -v ./... -cpu 1,2,4,8,16 -benchmem -bench=. -run=^$$
goos: darwin
goarch: amd64
pkg: github.com/a8m/kinesis-producer
BenchmarkProducer/default_producer            	  355250	      3038 ns/op	    2622 B/op	       6 allocs/op
BenchmarkProducer/default_producer-2          	  537939	      2522 ns/op	    2622 B/op	       6 allocs/op
BenchmarkProducer/default_producer-4          	  538022	      2169 ns/op	    2622 B/op	       6 allocs/op
BenchmarkProducer/default_producer-8          	  537499	      2156 ns/op	    2622 B/op	       6 allocs/op
BenchmarkProducer/default_producer-16         	  529176	      2175 ns/op	    2622 B/op	       6 allocs/op
BenchmarkProducer/10_shard_count              	  193441	      5693 ns/op	    4151 B/op	      52 allocs/op
BenchmarkProducer/10_shard_count-2            	  335112	      3608 ns/op	    4151 B/op	      52 allocs/op
BenchmarkProducer/10_shard_count-4            	  408216	      2666 ns/op	    4151 B/op	      52 allocs/op
BenchmarkProducer/10_shard_count-8            	  457116	      2595 ns/op	    4151 B/op	      52 allocs/op
BenchmarkProducer/10_shard_count-16           	  485910	      2616 ns/op	    4151 B/op	      52 allocs/op
BenchmarkProducer/500_shard_count             	  125145	      9222 ns/op	    4662 B/op	      68 allocs/op
BenchmarkProducer/500_shard_count-2           	  207092	      5773 ns/op	    4625 B/op	      68 allocs/op
BenchmarkProducer/500_shard_count-4           	  272650	      4208 ns/op	    4661 B/op	      68 allocs/op
BenchmarkProducer/500_shard_count-8           	  304765	      3919 ns/op	    4639 B/op	      68 allocs/op
BenchmarkProducer/500_shard_count-16          	  310267	      4013 ns/op	    4635 B/op	      68 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key            	  256620	      4330 ns/op	    2938 B/op	      16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-2          	  511671	      2457 ns/op	    2938 B/op	      16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-4          	  786858	      1847 ns/op	    2939 B/op	      16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-8          	  939548	      1622 ns/op	    2939 B/op	      16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-16         	  919674	      1435 ns/op	    2938 B/op	      16 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key           	  154376	      7756 ns/op	    3446 B/op	      33 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-2         	  252448	      4915 ns/op	    3421 B/op	      33 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-4         	  358032	      3565 ns/op	    3393 B/op	      33 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-8         	  370375	      3329 ns/op	    3382 B/op	      33 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-16        	  374530	      3323 ns/op	    3380 B/op	      33 allocs/op

EDIT be6e967
commit be6e967 introduced a large refactor of the main producer loop. I was struggling with concurrency issues when trying to use the shard refresh interval to poll kinesis and update the shard map after autoscaling. It became too complicated for me to try and fit the logic into the existing design, so I started fresh. The biggest challenge I was facing was that after the shard map was updated, records were not being re-aggregated and sent to new shards. Since records are aggregated and drained before being added to the backlog, with a large enough backlog those records would not be re-aggregated. Another issue was that the retry loop in flush would loop until all records were sent. The whole system would get blocked up waiting for these requests to not throttle anymore, but if they had been stopped and re-aggregated after a shard update, the throttling end.

My solution was to create a WorkerPool to manage MaxConnections. After making a PutRecords request, instead of retrying, it adds the failed requests to the front of the queue and releases the semaphore. This allows the pool to support Pause and Resume where the queue is drained and blocked while re-aggregation happens.

Performance did take a hit with these changes as you can see from the benchmarks below.

goos: darwin
goarch: amd64
pkg: github.com/a8m/kinesis-producer
BenchmarkProducer/default_producer                178435        6236 ns/op      2622 B/op        6 allocs/op
BenchmarkProducer/default_producer-2              482074        2865 ns/op      2622 B/op        6 allocs/op
BenchmarkProducer/default_producer-4              515551        2310 ns/op      2622 B/op        6 allocs/op
BenchmarkProducer/default_producer-8              502381        2343 ns/op      2622 B/op        6 allocs/op
BenchmarkProducer/default_producer-16             508364        2355 ns/op      2622 B/op        6 allocs/op
BenchmarkProducer/10_shard_count                   94549       12053 ns/op      4151 B/op       52 allocs/op
BenchmarkProducer/10_shard_count-2                216038        5795 ns/op      4151 B/op       52 allocs/op
BenchmarkProducer/10_shard_count-4                370910        3196 ns/op      4151 B/op       52 allocs/op
BenchmarkProducer/10_shard_count-8                385885        3026 ns/op      4151 B/op       52 allocs/op
BenchmarkProducer/10_shard_count-16               383342        3012 ns/op      4151 B/op       52 allocs/op
BenchmarkProducer/500_shard_count                    100    20986290 ns/op      5115 B/op       79 allocs/op
BenchmarkProducer/500_shard_count-2               168798        7251 ns/op      4631 B/op       68 allocs/op
BenchmarkProducer/500_shard_count-4               252313        4844 ns/op      4644 B/op       68 allocs/op
BenchmarkProducer/500_shard_count-8               224570        4605 ns/op      4642 B/op       68 allocs/op
BenchmarkProducer/500_shard_count-16              216969        4661 ns/op      4638 B/op       68 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key                119632        9261 ns/op      2938 B/op       16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-2              247773        4261 ns/op      2939 B/op       16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-4              378006        3048 ns/op      2939 B/op       16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-8              729954        1883 ns/op      2939 B/op       16 allocs/op
BenchmarkProducer/10_shard_count_using_explicit_hash_key-16             679153        1573 ns/op      2939 B/op       16 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key                  100    22306249 ns/op      3981 B/op       45 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-2             196442        6098 ns/op      3397 B/op       33 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-4             243746        4379 ns/op      3408 B/op       33 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-8             301854        3888 ns/op      3418 B/op       33 allocs/op
BenchmarkProducer/500_shard_count_using_explicit_hash_key-16            258960        3974 ns/op      3461 B/op       33 allocs/op

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant