RocketMQ使用之发送消息分析(四)

前言

这篇文章还有一个疑问没解决,查了很多资料,源码也翻了好几遍还是没找出

Topic路由信息如果本地没获取到从NameSer获取缓存

为什么NameSer会有,它的路由信息是从哪里来的?为什么NameSer一定能获取到,因为所有的相关博客中都没考虑获取不到的情况。有点点猜想。我们在初始化Producer时和NameSer建立长连接,这其中在发送消息时是不是发生了什么事?

所以这里有点坑,但是不影响本篇博客。

源码

结论

  Message message = new Message("What-fucking","my-tag","hello Oliver Guo 2018 08   07".getBytes());
        producer.send(message);

代码很简单,就是如上一行。但是其中做的事情非常多。简单总结如下:

  • 首先将消息中的Topic作为Key查询本地的路由发布信息
  • 如果没找到,通过NettyNameSer获取
  • 获取后缓存在本地

分析

这里先将两个实体类提出来弄明白他们的概念和之间关系。

TopicPublishInfo主题发布信息(直译),只存在本地

TopicRouteData路由信息,来自NameSer

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

好巧,TopicRouteDataTopicPublishInfo的一个属性。

ok。上面完了下面代码理解就容易多了。发送代码主要如下

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //检验消息数据,不符合的抛出异常
     Validators.checkMessage(msg, this.defaultMQProducer);
    //先从本地获取 topicPublishInfo ,如果没有从 NameSer 获取并缓存
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());--1
    *************省略几行*******************
    //从 TopicPublishInfo 获取消息队列
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);--2
  	//发送消息
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);--3

整个方法重点代码如上。一行行、一层层进去

–1 获取 topicPublishInfo

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //首先通过 topic 从本地获取。topicPublishInfoTable是一个线程安全 Map 前一篇博客 start 时看到过 
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //有点奇怪,这里为什么要使用 putIfAbsent 而不是 put
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
       	//从 NameSer 更新 路由数据到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); -- 1
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

—1—

if (isDefault && defaultMQProducer != null) {
    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
        1000 * 3);
    if (topicRouteData != null) {
        for (QueueData data : topicRouteData.getQueueDatas()) {
            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
            data.setReadQueueNums(queueNums);
            data.setWriteQueueNums(queueNums);
        }
    }
} else {
    //false 进入这里
    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
	//创建远程请求体
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
	//Netty 请求
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.TOPIC_NOT_EXIST: {
            if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
            }

            break;
        }
        case ResponseCode.SUCCESS: {
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}

–2–

消息队列实体类中包含 brokeName

public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    private String topic;
    private String brokerName;
    private int queueId;
    }
private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //发消息的类,省略看重点
    
    //根据 BrokerName 获取 地址。不明白的看上一篇监控平台。里面全都有
          String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    //构建请求头
    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTopic(msg.getTopic());
    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
    requestHeader.setQueueId(mq.getQueueId());
    requestHeader.setSysFlag(sysFlag);
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    requestHeader.setFlag(msg.getFlag());
    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
    requestHeader.setReconsumeTimes(0);
    requestHeader.setUnitMode(this.isUnitMode());
    requestHeader.setBatch(msg instanceof MessageBatch);
    
    switch (communicationMode) {
            //异步
        case ASYNC: 
            // sendMessage 里面也是通过 Netty
            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                brokerAddr,
                mq.getBrokerName(),
                msg,
                requestHeader,
                timeout,
                communicationMode,
                sendCallback,
                topicPublishInfo,
                this.mQClientFactory,
                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                context,
                this);
            break;
        case ONEWAY:
            //同步
        case SYNC:
            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                brokerAddr,
                mq.getBrokerName(),
                msg,
                requestHeader,
                timeout,
                communicationMode,
                context,
                this);
            break;
        default:
            assert false;
            break;
    }
    }

前面还有个地方就是两个实体类之间的转换。TopicRouteData转成TopicPublishInfo

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);

OK!差不多完了

本文由老郭种树原创,转载请注明:https://guozh.net/rocketmq%e4%bd%bf%e7%94%a8%e4%b9%8b%e5%8f%91%e9%80%81%e6%b6%88%e6%81%af%e5%88%86%e6%9e%90%e5%9b%9b/

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注