pushConsumer拉取消息流程是怎样的
这篇文章主要介绍“pushConsumer拉取消息流程是怎样的”,在日常操作中,相信很多人在pushConsumer拉取消息流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”pushConsumer拉取消息流程是怎样的”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
从事服务器托管雅安,服务器租用,云主机,网站空间,空间域名,CDN,网络代维等服务。
这是一段RocketMq经典的consumer异步获取broker消息的代码:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr(Constants.NameServerAddr); consumer.subscribe("topic01","*"); consumer.setMessageModel(MessageModel.BROADCASTING);//广播消息,所有相同组,定于topic的消费端都能收到消息 //consumer.setMessageModel(MessageModel.CLUSTERING);//集群消息--默认(相同组内的topic,集群消息只有一端会接收到) consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt:list){ System.out.println(new java.lang.String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
consumer start()方法跟踪
1. this.defaultMQPushConsumerImpl.start();
2. 刚启动serviceState状态为 CREATE_JUST,进入这个状态的switch处理逻辑
3. 先用checkCoing()检查consumer的各个配置是否配置ok
4. 然后 copySubscription()用于根据subject构建本地的rebalance的conhurrentHashMapInner
5. 接着构建MqClientFactory的一个Instance
6. 构建PullWrapper,用于去Broker注册过滤消息
7. 再根据MessageMode是广播模式还是集群模式获取offset。(广播模式是从consumer本地的store获取,集群模式则是需要去broker去请求获取)
8. 根据监听消息的类型是OrderLy还是Concurrently去构建一个consumeMessageService对象
9.启动刚才创建的consumerMessageService对象,调用其start方法
10. 使用MqClientFactory Instance实例registerConsumer进行注册
11. 把当前的serviceState状态变为Running状态
12.然后就开始从broker获取消息,请看下面的pushConsumer拉取消息流程
pushConsumer拉取消息流程介绍
consumer --DefaultMqPushConsumerImpl 使用pullMessage(pullRequest)拉取消息,pullAPIWrapper.pullKernelImpl(传递pullReuest,回调callback等参数)根据是否同步pullMessageSync还是异步pullMessageAsync, 拉取回来的消息PullResult经过解析处理存放到ProcessQueue 队列里的TreeMap(offset,messageExt)
到此,关于“pushConsumer拉取消息流程是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
本文标题:pushConsumer拉取消息流程是怎样的
网站网址:http://scjbc.cn/article/iggiid.html