前言
这篇文章还有一个疑问没解决,查了很多资料,源码也翻了好几遍还是没找出
Topic路由信息如果本地没获取到从
NameSer
获取缓存
为什么NameSer
会有,它的路由信息是从哪里来的?为什么NameSer
一定能获取到,因为所有的相关博客中都没考虑获取不到的情况。有点点猜想。我们在初始化Producer
时和NameSer
建立长连接,这其中在发送消息时是不是发生了什么事?
所以这里有点坑,但是不影响本篇博客。
源码
结论
Message message = new Message("What-fucking","my-tag","hello Oliver Guo 2018 08 07".getBytes());
producer.send(message);
代码很简单,就是如上一行。但是其中做的事情非常多。简单总结如下:
- 首先将消息中的
Topic
作为Key
查询本地的路由发布信息 - 如果没找到,通过
Netty
向NameSer
获取 - 获取后缓存在本地
分析
这里先将两个实体类提出来弄明白他们的概念和之间关系。
TopicPublishInfo
主题发布信息(直译),只存在本地
TopicRouteData
路由信息,来自NameSer
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
好巧,TopicRouteData
是TopicPublishInfo
的一个属性。
ok。上面完了下面代码理解就容易多了。发送代码主要如下
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//检验消息数据,不符合的抛出异常
Validators.checkMessage(msg, this.defaultMQProducer);
//先从本地获取 topicPublishInfo ,如果没有从 NameSer 获取并缓存
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());--1
*************省略几行*******************
//从 TopicPublishInfo 获取消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);--2
//发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);--3
整个方法重点代码如上。一行行、一层层进去
–1 获取 topicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//首先通过 topic 从本地获取。topicPublishInfoTable是一个线程安全 Map 前一篇博客 start 时看到过
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
//有点奇怪,这里为什么要使用 putIfAbsent 而不是 put
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//从 NameSer 更新 路由数据到本地
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); -- 1
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
—1—
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//false 进入这里
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
//创建远程请求体
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
//Netty 请求
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
break;
}
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
–2–
消息队列实体类中包含 brokeName
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
}
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//发消息的类,省略看重点
//根据 BrokerName 获取 地址。不明白的看上一篇监控平台。里面全都有
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//构建请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
switch (communicationMode) {
//异步
case ASYNC:
// sendMessage 里面也是通过 Netty
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
//同步
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
}
前面还有个地方就是两个实体类之间的转换。TopicRouteData
转成TopicPublishInfo
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
OK!差不多完了
本文由老郭种树原创,转载请注明:https://guozh.net/rocketmq%e4%bd%bf%e7%94%a8%e4%b9%8b%e5%8f%91%e9%80%81%e6%b6%88%e6%81%af%e5%88%86%e6%9e%90%e5%9b%9b/