Skip to content

Commit

Permalink
Track duplicated offsets in consumer (#1131)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Aug 14, 2024
1 parent 78ac9ba commit 00a9a70
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.4"
version = "0.6.5"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub struct IggyConsumer {
last_polled_at: Arc<AtomicU64>,
current_partition_id: Arc<AtomicU32>,
retry_interval: IggyDuration,
ignore_duplicates: bool,
}

impl IggyConsumer {
Expand Down Expand Up @@ -158,6 +159,7 @@ impl IggyConsumer {
last_polled_at: Arc::new(AtomicU64::new(0)),
current_partition_id: Arc::new(AtomicU32::new(0)),
retry_interval,
ignore_duplicates: polling_strategy.kind == PollingKind::Next,
}
}

Expand Down Expand Up @@ -422,6 +424,7 @@ impl IggyConsumer {
let retry_interval = self.retry_interval;
let last_stored_offset = self.last_stored_offsets.clone();
let last_consumed_offset = self.last_consumed_offsets.clone();
let ignore_duplicates = self.ignore_duplicates;

async move {
if interval > 0 {
Expand Down Expand Up @@ -466,6 +469,17 @@ impl IggyConsumer {
last_consumed_offset.insert(partition_id, AtomicU64::new(0));
}

if has_consumed_offset
&& ignore_duplicates
&& consumed_offset >= polled_messages.messages[0].offset
{
return Ok(PolledMessages {
messages: EMPTY_MESSAGES,
current_offset: polled_messages.current_offset,
partition_id,
});
}

let stored_offset;
if let Some(stored_offset_entry) = last_stored_offset.get(&partition_id) {
if auto_commit_after_polling {
Expand Down

0 comments on commit 00a9a70

Please sign in to comment.