前提介绍
在RocketMQ中一般有两种获取消息的方式,一个是拉(pull,消费者主动去broker拉取),一个是推(push,主动推送给消费者),在上一章节中已经介绍到了相关的Push操作,接下来的章节会介绍Pull操作方式的消费机制体系。
DefaultMQPullConsumer
DefaultMQPullConsumer与DefaultMQPushConsumer相比最大的区别是,消费哪些队列的消息,从哪个位移开始消费,以及何时提交消费位移都是由程序自己的控制的。下面来介绍一下DefaultMQPullConsumer的内部原理。
总体流程执行
DefaultMQPullConsumer使用例子
public class MQPullConsumer {
private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
consumer.start();
// 从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
for(MessageQueue mq:mqs){
try {
// 获取消息的offset,指定从store中获取
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}
// 保存上次消费的消息下标
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
// 获取上次消费的消息的下标
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if(offset != null){
return offset;
}
return 0l;
}
}
- 消费者启动:consumer.start();
- 获取主题下所有的消息队列:这里是根据topic从nameserver获取的这里我们可以修改为从其他位置获取队列信息
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicTest");
//遍历队列
for(MessageQueue mq:mqs){
try {
//获取当前队列的消费位移,第二个参数表示位移是从本地内存获取,还是从broker获取,true表示从broker获取
long offset = consumer.fetchConsumeOffset(mq,true);
while(true){
//第二个参数表示可以消费哪些tag的消息
//第三个参数表示从哪个位移开始消费消息
//第四个参数表示一次最大拉多少个消息
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
}
DefaultMQPullConsumer的总体流程
启动DefaultMQPullConsumer是通过调用start()方法完成的
DefaultMQPullConsumer拉取源码分析
分析下DefaultMQPullConsumer拉取消息的流程
consumer.fetchSubscribeMessageQueues("order-topic")
从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
核心源码分析
fetchSubscribeMessageQueues()
-
通过调用fetchSubscribeMessageQueues()方法可以获取指定topic(GET_ROUTEINTO_BY_TOPIC)的读队列信息。它通过向nameserver发送GetRouteInfoRequest请求,请求内容为GET_ROUTEINTO_BY_TOPIC,nameserver将主题下的读队列个数发送给消费者,然后消费者使用如下代码创建出与读队列个数相同的MessageQueue对象。
-
每个MessageQueue对象里面记录了topic、broker名和读队列号。最后fetchSubscribeMessageQueues()将MessageQueue对象集合返回给调用者。
-
向NameServer发送请求获取topic参数对应的Broker信息和topic配置信息,即TopicRouteData对象。
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
if (topicRouteData != null) {
// 2、遍历topicRouteData
Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
if (!mqList.isEmpty()) {
return mqList;
} else {
throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
}
}
} catch (Exception e) {
throw new MQClientException(
"Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
遍历过程TopicRouteData
遍历TopicRouteData对象的QueueData列表中每个QueueData对象,首先判断该QueueData对象是否具有读权限,
若有则根据该QueueData对象的readQueueNums值,创建readQueueNums个MessageQueue对象,并构成MessageQueue集合;
最后返回给MessageQueue集合
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
consumer.fetchConsumeOffset
通过该方法获取该MessageQueue队列下面从offset位置开始的消息内容,其中maxNums=32即表示获取的最大消息个数,offset为该MessageQueue对象的开始消费位置。
DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
fetchConsumeOffset()有两个入参,第一个参数表示队列,第二个参数表示是否从broker获取该队列的消费位移,true表示从broker获取,false表示从本地记录获取,如果本地获取不到再从broker获取。
这里说的从本地获取是指从RemoteBrokerOffsetStore.offsetTable属性中获取,该属性记录了每个队列的消费位移。当从broker获取位移后会更新offsetTable。
pullBlockIfNotFound拉取信息
rocketmq提供了多个拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。两者的区别是如果队列中没有消息,两个方法的超时时间是不同的,pullBlockIfNotFound会等待30s返回一个空结果,pull是等待10s返回空结果。
不过pull方法的入参可以调整超时时间,而pullBlockIfNotFound则需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis参数。不过两个方法调用的底层逻辑都是一样的,都是调用DefaultMQPullConsumerImpl.pullSyncImpl()方法获取消息。下面分析一下pullSyncImpl()方法。
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
获取该MessageQueue队列的消费进度来设定参数offset值该方法最终调用pullSyncImpl,可以获取相关的结果数据。
-
参数1:消息队列(通过调用消费者的fetchSubscibeMessageQueue(topic)可以得到相应topic的所需要消息队列) ;
-
参数2:需要过滤用的表达式 ;
-
参数3:偏移量即消费队列的进度 ;
-
参数4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
DefaultMQPullConsumerImpl.pullSyncImpl的实现过程
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.isRunning();
//检查入参是否合法
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
throw new MQClientException("offset < 0", null);
}
if (maxNums <= 0) {
throw new MQClientException("maxNums <= 0", null);
}
//更新再平衡服务的数据,因为再平衡服务不起作用,所以更新数据没有效果
this.subscriptionAutomatically(mq.getTopic());
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
//计算超时时间,如果调用的是pullBlockIfNotFound方法,block参数就是true,否则就是false
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
//调用PullAPIWrapper从broker拉取消息,
//pullKernelImpl方法里面构建PullMessageRequest请求对象
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,//队列
subscriptionData.getSubString(),//消息的过滤规则
subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset,//拉取消息的位移
maxNums,//建议broker一次性返回最大消息个数,默认是32个
sysFlag,
0,//设置的提交位移,可以看到永远都是0,所以broker无法记录有效位移,需要程序自己记录控制提交位移
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,//超时时间
CommunicationMode.SYNC,
null//回调逻辑为null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
//If namespace is not null , reset Topic without namespace.
this.resetTopic(pullResult.getMsgFoundList());
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
检查MessageQueue对象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap<String,SubscriptionData>变量中,若不在则以consumerGroup、topic、subExpression为参数调用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法构造SubscriptionData对象保存到RebalanceImpl.subscriptionInner变量中,其中 subExpression="*"
this.subscriptionAutomatically(mq.getTopic());
// 构建标志位,逻辑或运算|=
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData;
try {
//以请求参数subExpression以及consumerGroup、topic为参数调用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法构造SubscriptionData对象并返回
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
// 从broker中拉取消息
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
offset,
maxNums,
sysFlag,
0,
this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
);
// 对拉取到的消息进行解码,过滤并执行回调,并把解析的message列表放到MsgFoundList中
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
Push和Pull的操作对比
- push-优点:及时性、服务端统一处理实现方便
- push-缺点:容易造成堆积、负载性能不可控
- pull-优点:获得消息状态方便、负载均衡性能可控
- pull-缺点:及时性差
使用DefaultMQPullConsumer拉取消息,发送到broker的提交位移永远都是0,所以broker无法记录有效位移,需要程序自己记录和控制提交位移。
资料参考
-
https://blog.csdn.net/weixin_38308374/article/details/111058530
-
https://blog.csdn.net/weixin_38308374/article/details/111057238