接上篇文章,研究下
Producer
这个类代码中一些关键点。https://guozh.net/?p=77
结论
结论先行,然后结合源码一步步反推出来,这是我比较喜欢的方式。
Producer Group
。一个用来发消息应用,Producer Group
包含多个Producer
实例,这些多个实例,可以是多台机器,也可以是一台机器多个进程。但是它本身的实例对象只能是一个,即MQClientInstance
Producer
与Name Server
集群中的其中一个节点(随机选择)建立长连接,定期从Name Server
取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer
完全无状态,可集群部署。
以下是我个人对结论的认识,如果有问题,欢迎大家指正。
1
想想平时我们怎样使用MQ?
将构造Producer
对象封装成一个方法,然后发送消息封装成一个方法,假设项目中只有一个模块使用到MQ。
现在开始走流程,首先消息数据在用户,经过接口传输到Server
层,这时候调用初始化Producer
实例的方法获取一个Producer
,然后调用发送消息方法。假设不做负载,不管多少用户,都是使用这一个Producer
实例,只不过每个用户的数据标识不一样而已,这是Consumer
解析的事情,不管Producer
的事。
至于其中一个实例多个机器,或者是一个机器多个进程,这是负载相关内容,和其他负载没区别,都是一个理解。
2
发送消息的消息代码是这样的。
Message message = new Message("topic","tag","Message Content".getBytes());
我们整个消息数据,最后传到Broker
,那为什么还要和NameServer
建立连接定时获取Topic路由信息呢?这是因为Broker
将路由信息注册到了NameSer
,如果NameSer
挂了也不影响,Porducer
发送消息,Consumer
消费消息,只是topic路由信息不会更新,或者不存在获取不到对应topic的路由信息。
二、源码
DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("topic","tag","Message : Malwewejakwwge".getBytes());
producer.send(message);
发送消息,一共就这几行代码。先看上面三行。
DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A");
前面我们说了,一个程序只会存在Producer Group
的实例,即MQClientInstance
,怎样保证只会存在一个实例呢,其实就是保证它是单例。这要在producer.start();
里面看到。现在上面这行代码,其实就是构造了个producer
,然后它放在my-group-name-A
这个Producer Group
中。
再看下面.start()
方法。
1
首先进入这里
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
这里单例对象有个默认pid,只是比较下我们命名,my-group-name-A
和它默认是否一样,不一样,将pid改成我们的。
2
然后如下
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
从这里获取到单例的MQClientInstance
,从这里进入方法
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
就是个工厂模式+单例模式,有点区别的是这行代码
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
这里与DefaultMQProducer
有关。
3
再接着如下
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
进入方法
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
这里将Producer
注册到Producer Group
中,注册的方式是使用ProducerTable
维护。
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
看看这个producerTable
是啥
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
好吧,是个线程安全的Map
,只是key
是Group Producer Name
。貌似这样看来key
不会变了,所有的Producer
以链表的形式存储。
4
在到下一行这里
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
终于到了存储路由信息了。topicPublishInfoTable
是用来存储路由信息的对象。而TopicPublishInfo
是路由信息,这里的this.defaultMQProducer.getCreateTopicKey()
获取到的就是message
中的topic
,可能这里有人会觉得奇怪,这里还没运行到message
代码,并没有这个key。当然 这里设置了个默认的key
/**
* Just for testing or demo program
*/
private String createTopicKey = MixAll.DEFAULT_TOPIC;
并没有什么特殊的作用,这代码自己也注释说just for testing or demo program
但是这个对象很重要,和上面的维护producer
一样,也是个线程安全的map对象
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
上面所有的都可以用这张图来总结,其实下面的也可以。
5
终于到了start了,本来是从start进入,结果一直没做start的事情….
mQClientFactory.start();
进入如下
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
好吧,果然没让我们失望,一堆的 start。
每个方法上都有注释,很容易看出来。
this.mQClientAPIImpl.fetchNameServerAddr();
首先找到NameServer
地址,连接上。
this.mQClientAPIImpl.start();
Start request-response channel ,看看这解释,开始一个请求与返回的通道,很明显与通信有关。点进去看到这个类NettyRemotingClient
。
好吧 Netty
,果然是的,Netty
的强大我就不说了,只说一句。
我们项目的架构就是使用
Netty
,据说并发上线是10万+。我们也没办法测试,因为现在我们项目并发最多 1.5万上下,好无奈啊 好像要 10万+ 的并发额 🙂
强大岁强大,但是比较复杂,项目架构看了几天还没怎么看懂….
this.startScheduledTask();
开始各种定时任务,进去很容看明白。贴两个出来
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
updateTopicRouteInfoFromNameServer
sendHeartbeatToAllBrokerWithLock
看名字也很容易看出来,前面结论说了。这里定时做任务的实现方式可以学习下,以前没见过
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
ok!差不多结束了
三、参考
本文由老郭种树原创,转载请注明:https://guozh.net/rocketmqshiyongzhiproducerer/