forked from Surfline/kinesis-producer
-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shard map #1
Open
mjneil
wants to merge
21
commits into
master
Choose a base branch
from
shard-map
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR introduces 2 major changes.
First is the
UserRecord
interface. This allows more control over error handling. See README changes for more information. This addresses a8m#11Second is the
ShardMap
. This is a map of Kinesis shards used for aggregation. Records put into the producer are assigned a shard using md5 hash of the partition key or a provided explicit hash key. See README changes for more information. This should address a8m#17 and a8m#27 and a8m#1I tried to make these changes backwards compatible, but to support the
UserRecord
interface, I introduced breaking changes with theFailureRecord
struct. I also felt that error handling throughout the producer was not ideal, so I've changed the failure chan to be a genericchan error
and created custom error types for specific errors (such asFailureRecord
andDrainError
) any error that was previously just logged and not returned to users now gets sent over the failure channel. Users of the library can have more control over these errors, using a type switch to detect the different errors. I've also changed the structure ofFailureRecord
from per user record, to one perkinesis.PutRecordsRequestEntry
and theFailureRecord
will contain a slice of all the aggregatedUserRecord
s that were part of the failed request entry.@a8m and @jawang35 (since you did recent work on the Surfline fork) I'm pinging you both for reviews but making the pull request to my fork. If there is interest in getting this work upstream we can do that.
I did the following to run benchmarks on my machine and these were the results. The
b.N
count should roughly be the records/s throughputEDIT be6e967
commit be6e967 introduced a large refactor of the main producer loop. I was struggling with concurrency issues when trying to use the shard refresh interval to poll kinesis and update the shard map after autoscaling. It became too complicated for me to try and fit the logic into the existing design, so I started fresh. The biggest challenge I was facing was that after the shard map was updated, records were not being re-aggregated and sent to new shards. Since records are aggregated and drained before being added to the backlog, with a large enough backlog those records would not be re-aggregated. Another issue was that the retry loop in flush would loop until all records were sent. The whole system would get blocked up waiting for these requests to not throttle anymore, but if they had been stopped and re-aggregated after a shard update, the throttling end.
My solution was to create a
WorkerPool
to manageMaxConnections
. After making aPutRecords
request, instead of retrying, it adds the failed requests to the front of the queue and releases the semaphore. This allows the pool to supportPause
andResume
where the queue is drained and blocked while re-aggregation happens.Performance did take a hit with these changes as you can see from the benchmarks below.