From cd44b5894e962892e991b2dcf8facddb09b56ac4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=B8=94?= Date: Mon, 29 Jan 2024 13:40:16 +0800 Subject: [PATCH] [Fix] [Connector] Rocketmq source startOffset greater than endOffset error (#6287) --- .../rocketmq/source/RocketMqSourceSplitEnumerator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java index ce841a4bf08..6630d495f98 100644 --- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java @@ -149,7 +149,10 @@ public void addSplitsBack(List splits, int subtaskId) { ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); splits.forEach( split -> { - split.setStartOffset(split.getEndOffset() + 1); + split.setStartOffset( + Math.min( + split.getEndOffset() + 1, + listOffsets.get(split.getMessageQueue()))); split.setEndOffset(listOffsets.get(split.getMessageQueue())); }); return splits.stream()