Skip to content

Commit

Permalink
Fix single message offset commit in new SDK (#1120)
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex authored Aug 10, 2024
1 parent 42fe472 commit 3e551ee
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.1"
version = "0.6.2"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
9 changes: 2 additions & 7 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl IggyConsumer {
last_stored_offset.insert(partition_id, AtomicU64::new(0));
}

if offset <= stored_offset {
if offset <= stored_offset && offset >= 1 {
trace!("Offset: {offset} is less than or equal to the last stored offset: {stored_offset}. Skipping storing the offset.");
continue;
}
Expand Down Expand Up @@ -254,19 +254,14 @@ impl IggyConsumer {
for entry in last_consumed_offset.iter() {
let partition_id = *entry.key();
let consumed_offset = entry.value().load(ORDERING);
let has_stored_offset = last_stored_offset.contains_key(&partition_id);
let stored_offset = last_stored_offset
.get(&partition_id)
.map_or(0, |offset| offset.load(ORDERING));
trace!(
"Trying to store the offset: {consumed_offset}, last stored offset: {stored_offset} for partition ID: {partition_id}"
);
if (has_stored_offset && stored_offset == 0) && consumed_offset == 0 {
trace!("Last offset and next offset are 0 for partition ID: {partition_id}. Skipping storing the offset in the background.");
continue;
}

if consumed_offset <= stored_offset {
if consumed_offset <= stored_offset && consumed_offset >= 1 {
trace!("Offset: {consumed_offset} is less than or equal to the last stored offset: {stored_offset} for partition ID: {partition_id}. Skipping storing the offset in the background.");
continue;
}
Expand Down

0 comments on commit 3e551ee

Please sign in to comment.