forked from apache/rocketmq
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
06f2208
commit 8bd32de
Showing
1 changed file
with
36 additions
and
0 deletions.
There are no files selected for viewing
36 changes: 36 additions & 0 deletions
36
...nt/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerUnitTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package org.apache.rocketmq.client.consumer; | ||
|
||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; | ||
import org.apache.rocketmq.common.message.MessageExt; | ||
import org.junit.Test; | ||
|
||
import java.util.List; | ||
|
||
public class DefaultLitePullConsumerUnitTest { | ||
|
||
@Test | ||
public void testAutoCommit() throws Exception { | ||
boolean running = true; | ||
// 定义消费者组 mygroup | ||
DefaultLitePullConsumer litePullConsumer = new | ||
DefaultLitePullConsumer("mygroup"); | ||
// 设置名字服务地址 | ||
litePullConsumer.setNamesrvAddr("127.0.0.1:9876"); | ||
// 从最新的进度偏移量 | ||
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); | ||
// 订阅主题 TopicTest | ||
litePullConsumer.subscribe("TopicTest", "*"); | ||
// 自动提交消费偏移量的选项设置为 true | ||
litePullConsumer.setAutoCommit(true); | ||
litePullConsumer.start(); | ||
try { | ||
while (running) { | ||
List<MessageExt> messageExts = litePullConsumer.poll(); | ||
System.out.printf("%s%n", messageExts); | ||
} | ||
} finally { | ||
litePullConsumer.shutdown(); | ||
} | ||
} | ||
|
||
} |