RocketMQ使用之Producer(二)

接上篇文章,研究下Producer这个类代码中一些关键点。https://guozh.net/?p=77

结论

结论先行,然后结合源码一步步反推出来,这是我比较喜欢的方式。

  • Producer Group。一个用来发消息应用,Producer Group包含多个Producer实例,这些多个实例,可以是多台机器,也可以是一台机器多个进程。但是它本身的实例对象只能是一个,即MQClientInstance
  • ProducerName Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

以下是我个人对结论的认识,如果有问题,欢迎大家指正。

1

想想平时我们怎样使用MQ?

将构造Producer对象封装成一个方法,然后发送消息封装成一个方法,假设项目中只有一个模块使用到MQ。

现在开始走流程,首先消息数据在用户,经过接口传输到Server层,这时候调用初始化Producer实例的方法获取一个Producer,然后调用发送消息方法。假设不做负载,不管多少用户,都是使用这一个Producer实例,只不过每个用户的数据标识不一样而已,这是Consumer解析的事情,不管Producer的事。

至于其中一个实例多个机器,或者是一个机器多个进程,这是负载相关内容,和其他负载没区别,都是一个理解。

2

发送消息的消息代码是这样的。

  Message message = new Message("topic","tag","Message Content".getBytes());

我们整个消息数据,最后传到Broker,那为什么还要和NameServer建立连接定时获取Topic路由信息呢?这是因为Broker将路由信息注册到了NameSer,如果NameSer挂了也不影响,Porducer发送消息,Consumer消费消息,只是topic路由信息不会更新,或者不存在获取不到对应topic的路由信息。

二、源码

        DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        Message message = new Message("topic","tag","Message : Malwewejakwwge".getBytes());
        producer.send(message);

发送消息,一共就这几行代码。先看上面三行。

DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A");

前面我们说了,一个程序只会存在Producer Group的实例,即MQClientInstance,怎样保证只会存在一个实例呢,其实就是保证它是单例。这要在producer.start();里面看到。现在上面这行代码,其实就是构造了个producer,然后它放在my-group-name-A这个Producer Group中。

再看下面.start()方法。

1

首先进入这里

         if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

这里单例对象有个默认pid,只是比较下我们命名,my-group-name-A和它默认是否一样,不一样,将pid改成我们的。

2

然后如下

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

从这里获取到单例的MQClientInstance,从这里进入方法

    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

就是个工厂模式+单例模式,有点区别的是这行代码

MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);

这里与DefaultMQProducer有关。

3

再接着如下

      boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

进入方法

   public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }

        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }

        return true;
    }

这里将Producer注册到Producer Group中,注册的方式是使用ProducerTable维护。

     MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);

看看这个producerTable是啥

    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

好吧,是个线程安全的Map,只是keyGroup Producer Name。貌似这样看来key不会变了,所有的Producer以链表的形式存储。

4

在到下一行这里

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

终于到了存储路由信息了。topicPublishInfoTable是用来存储路由信息的对象。而TopicPublishInfo是路由信息,这里的this.defaultMQProducer.getCreateTopicKey()获取到的就是message中的topic,可能这里有人会觉得奇怪,这里还没运行到message代码,并没有这个key。当然 这里设置了个默认的key

    /**
     * Just for testing or demo program
     */
    private String createTopicKey = MixAll.DEFAULT_TOPIC;

并没有什么特殊的作用,这代码自己也注释说just for testing or demo program

但是这个对象很重要,和上面的维护producer一样,也是个线程安全的map对象

    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();

上面所有的都可以用这张图来总结,其实下面的也可以。

 

5

终于到了start了,本来是从start进入,结果一直没做start的事情….

    mQClientFactory.start();

进入如下

    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

好吧,果然没让我们失望,一堆的 start。

每个方法上都有注释,很容易看出来。

this.mQClientAPIImpl.fetchNameServerAddr();

首先找到NameServer地址,连接上。

      this.mQClientAPIImpl.start();

Start request-response channel ,看看这解释,开始一个请求与返回的通道,很明显与通信有关。点进去看到这个类NettyRemotingClient

好吧 Netty,果然是的,Netty的强大我就不说了,只说一句。

我们项目的架构就是使用Netty,据说并发上线是10万+。我们也没办法测试,因为现在我们项目并发最多 1.5万上下,好无奈啊 好像要 10万+ 的并发额 :)

强大岁强大,但是比较复杂,项目架构看了几天还没怎么看懂….

     this.startScheduledTask();

开始各种定时任务,进去很容看明白。贴两个出来

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

updateTopicRouteInfoFromNameServer sendHeartbeatToAllBrokerWithLock

看名字也很容易看出来,前面结论说了。这里定时做任务的实现方式可以学习下,以前没见过

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });

ok!差不多结束了

三、参考

十分钟入门RocketMQ

消息的产生

怎么说呢?不管有用没,还是想加上这句
老郭种树原创,转载请加上RocketMQ使用之Producer(二)
点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注