Skip to content

Commit

Permalink
[Fix] [Connector] Rocketmq source startOffset greater than endOffset …
Browse files Browse the repository at this point in the history
…error (apache#6287)
  • Loading branch information
cqutwangyu authored Jan 29, 2024
1 parent 064dc29 commit cd44b58
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ public void addSplitsBack(List<RocketMqSourceSplit> splits, int subtaskId) {
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
splits.forEach(
split -> {
split.setStartOffset(split.getEndOffset() + 1);
split.setStartOffset(
Math.min(
split.getEndOffset() + 1,
listOffsets.get(split.getMessageQueue())));
split.setEndOffset(listOffsets.get(split.getMessageQueue()));
});
return splits.stream()
Expand Down

0 comments on commit cd44b58

Please sign in to comment.