PushConsumerExample.java 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package com.sf.newhelloworld;
  2. import java.io.IOException;
  3. import java.util.Collections;
  4. import org.apache.rocketmq.client.apis.ClientConfiguration;
  5. import org.apache.rocketmq.client.apis.ClientException;
  6. import org.apache.rocketmq.client.apis.ClientServiceProvider;
  7. import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
  8. import org.apache.rocketmq.client.apis.consumer.FilterExpression;
  9. import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
  10. import org.apache.rocketmq.client.apis.consumer.PushConsumer;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. public class PushConsumerExample {
  14. private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
  15. private PushConsumerExample() {
  16. }
  17. public static void main(String[] args) throws ClientException, IOException, InterruptedException {
  18. final ClientServiceProvider provider = ClientServiceProvider.loadService();
  19. // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
  20. String endpoints = "localhost:8081";
  21. ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
  22. .setEndpoints(endpoints)
  23. .build();
  24. // 订阅消息的过滤规则,表示订阅所有Tag的消息。
  25. String tag = "*";
  26. FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
  27. // 为消费者指定所属的消费者分组,Group需要提前创建。
  28. String consumerGroup = "cg";
  29. // 指定需要订阅哪个目标Topic,Topic需要提前创建。
  30. String topic = "TestTopic";
  31. // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
  32. PushConsumer pushConsumer = provider.newPushConsumerBuilder()
  33. .setClientConfiguration(clientConfiguration)
  34. // 设置消费者分组。
  35. .setConsumerGroup(consumerGroup)
  36. // 设置预绑定的订阅关系。
  37. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
  38. // 设置消费监听器。
  39. .setMessageListener(messageView -> {
  40. // 处理消息并返回消费结果。
  41. logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
  42. return ConsumeResult.SUCCESS;
  43. })
  44. .build();
  45. Thread.sleep(Long.MAX_VALUE);
  46. // 如果不需要再使用 PushConsumer,可关闭该实例。
  47. // pushConsumer.close();
  48. }
  49. }