rocketMQ系列 01 namesrv 篇

2021-05-04 07:37:35 Author: juejin.cn
觉得文章还不错?,点我收藏



整体架构

image.png 从架构设计图来看,namesrv 充当的角色为注册中心,broker 向 namesrv 注册。producer、consumer 分别与 namesrv 保持长连接, 并从 namesrv 中拉取 broker 信息,然后再与

namesrv 之间是不互相通信的,他们各自持有 broker 的注册信息。 在分布式领域中,正常集群部署,都会有主从、或者有 leader 节点。但是对于 namesrv,它每个节点都是独立的,每个 namesrv 节点都是互不通信。

rocketMQ 在注册中心这一块的设计还是挺特别的。

namesrv 原理

在说 namesrv 原理之前,需要先跟大家介绍一下,rocketMQ 是如何抽象实体的?都抽象出哪些实体?这个有点类似 在设计数据库时的 ER 图。

实体设计

这东西很重要。理解了是如何设计实体的。大致就能知道,namesrv 是如何存储数据的。 image.png

Namespace: namespace 个人理解就是用于隔离(跟 apollo namespace 设计用途一致,不过 apollo 的 namespace 功能更强)。 support namespace 比如:你有三个环境 /qa/pre/release。 然后这 3 个环境的 topic 都是一样的。 那么你可以在创建 producer、consumer 时,指定 namespace 即可。

RouteInfoManager:路由元信息管理

// 主题队列表,初始化时,topic 默认有 4 个队列
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker 信息。包含集群名称、broker 名称、主备 broker 地址
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// broker 集群信息。存储集群中所有的名称
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// broker 状态信息,namesrv 每次收到心跳包,会替换该信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker FilterService 列表,用于类模式消息过滤
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
复制代码

BrokerData:broker 信息封装

/**
 * broker.conf brokerClusterName 配置
 */
private String cluster;
/**
 * broker.conf brokerName 配置
 */
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
复制代码

cluster、brokerName、brokerId 之间的关系

一个 cluster 中,不会存在相同的 brokerName(这里指的是 master 节点)。 对于主从结构的 broker。 他们之间的 brokerName 是一致的,但是 brokerId 是不一样的。master broker 的 brokerId 默认为 0。slave broker 的 brokerId 为非 0 的数字。

BrokerLiveInfo:broker

/**
 * 最后一次更新时间, namesrv 收到 broker 心跳后,会更新该信息。
 */
private long lastUpdateTimestamp;
private DataVersion dataVersion;
// netty 中的 channel, 即 socket
private Channel channel;
// BrokerController#getHAServerAddr
// 格式如下:ip:port
private String haServerAddr;
复制代码

QueueData:队列数据

private String brokerName;
// 读队列数量
private int readQueueNums;
// 写队列梳理
private int writeQueueNums;
/**
 * 6:同时支持读写
 * 4:禁写
 * 2:禁读
 */
private int perm;
private int topicSynFlag;
复制代码

rocketMQ 中 readQueueNums, writeQueueNums 在 99% 情况下,这 2 个值都是一样。 如果有扩容,缩容的场景,可能会不大一致。 例如,当你的 topic 的队列,被设置为 128 队列时;此时如果想缩容,那么可以先将 writeQueueNums 修改为 64,然后等到 剩下的 64 个队列被消费完后,再将 readQueueNums 修改为 64.

mqadmin updateTopic -c cluster -n localhost:9876 -t topic_test -r 64 -w 128 
复制代码

namesrv 启动

NamesrvController#initialize

启动过程相对简单,无非就是加载配置。这里挑选一段相对重要的代码:定期扫描过期 broker

// 延迟 5 秒,间隔 10 秒,扫描过期 broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);
复制代码

CURD -- broker 注册|心跳

BrokerController#start()

  1. broker 启动后,延迟 10 秒,每隔 30 秒向 namesrv 发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// brokerConfig.getRegisterNameServerPeriod()  = 30000
复制代码
  1. broker 发送请求后,namesrv 会在该方法处理逻辑

DefaultRequestProcessor#registerBrokerWithFilterServer()

该方法主要功能是维护 RouteInfoManagerclusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable、filterServerTable 信息。然后为了保证操作的一致性,直接用了锁。

CURD -- namesrv 剔除失效 broker

  1. 每隔 10 秒,扫描过期的 broker
// 延迟 5 秒,间隔 10 秒,扫描过期 broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);
复制代码
  1. 剔除 120s 内,未发送心跳的 broker。
public class RouteInfoManager {
    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            //todo 最后一次注册的时间
            long last = next.getValue().getLastUpdateTimestamp();
            //todo 超过 120s 未注册,直接将 broker 剔除
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                // 关闭与 broker 的 channel
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                // 从注册表中删除该 broker 的信息
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
}
复制代码

然后,你会发现,namesrv,仅仅只是将 broker 剔除了,namesrv 并没有告知 producer or consumer broker 已经被剔除了。并且,namesrv 至少得等 120s 才能得知这个 broker 可能出了问题。 那么这样的设计是否有问题呢?如果有,如何避免呢?

broker 断开与 namesrv 连接

正常断开连接: 如果是正常断开与 namesrv 连接,broker 在关闭前,会发请求通知 namesrv, namesrv 会通过 RouteInfoManager#unregisterBroker 将 broker 移出。

非正常断开连接:

当 broker 被强行断开与 namesrv 连接时。 namesrv 会触发事件监听,能够感知到 broker 已下线,会主动将 broker 剔除。其实,最后都是通过调用 RouteInfoManager#onChannelDestroy() 将 broker 剔除 NettyEventExecutor#run()

实际上是,只要是与 namesrv 的连接断开,都会调用 RouteInfoManager#onChannelDestroy() 方法。

得出结论:只要 broker 与 namesrv 之间的连接断开,那么 namesrv 是能够第一时间感知到的。

路由发现

RouteInfoManager#pickupTopicRouteData

topicQueueTable, brokerAddrTable, filterServerTable 中获取对应 topic 数据,然后封装为 TopicRouteData 并返回。

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
复制代码

namesrv 总结

  1. 集群中所有 Broker 每隔 30s 向集群中所有的 NameServer 发送心跳包,注册Topic 路由信息、Broker 自身信息。(注册|心跳)
  2. NameServer 启动一个定时任务 每10s 扫描 Broker存活状态表,如果 Nameserver 连续 120s 未收到 Broker 的心跳包,将判定该 Broker 已下线,从路由表中将该 Broker 移除。(剔除)
  3. 如果 NameserverBroker 端的长连接断开,NameServer 能立即感知 Broker 下线并从路由表中将该 Broker 移除。(因为是基于 tcp/ip 通信,连接断开能立马知道)(剔除)
  4. 可通过改变 topic 的读写队列实现,扩容缩容
  5. 主从结构的 broker。 brokerName 是一致的,但是 brokerId 是不一致的;其中 master 节点的 brokerId 必须为 0, slave 节点的 brokerId 必须非 0;对于同一个 cluster,绝对不允许出现相同的 brokerName(指的是 master 节点)
  6. Namesrv 提供的是 broker 的注册发现功能。

代码鉴赏

创建线程池的标准写法

 this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.asyncSenderThreadPoolQueue,
    new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
        }
    });
复制代码

一定要给线程设置名称,方便后续观看日志

rocketMQ 如何在程序关闭时,优雅关闭资源

NamesrvStartup#start

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));
复制代码

controller.shutdown() 这里面关闭的资源大概包含,连接、线程池。这里线程池的关闭跟我们在开发中息息相关,因此这边贴下代码,看别人是如何处理的

NamesrvController#shutdown

public void shutdown() {
    ...
    // 调用 shutdown(),而不是 shutdownNow()
    this.scheduledExecutorService.shutdown();
    ...
}
复制代码
public class ShutdownHookThread extends Thread {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    private final InternalLogger log;
    private final Callable callback;

    /**
     * Create the standard hook thread, with a call back, by using {@link Callable} interface.
     *
     * @param log The log instance is used in hook thread.
     * @param callback The call back function.
     */
    public ShutdownHookThread(InternalLogger log, Callable callback) {
        super("ShutdownHook");
        this.log = log;
        this.callback = callback;
    }

    /**
     * Thread run method.
     * Invoke when the jvm shutdown.
     * 1. count the invocation times.
     * 2. execute the {@link ShutdownHookThread#callback}, and time it.
     */
    @Override
    public void run() {
        synchronized (this) {
            log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet() + " times.");
            if (!this.hasShutdown) {
                this.hasShutdown = true;
                long beginTime = System.currentTimeMillis();
                try {
                    this.callback.call();
                } catch (Exception e) {
                    log.error("shutdown hook callback invoked failure.", e);
                }
                long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                log.info("shutdown hook done, consuming time total(ms): " + consumingTimeTotal);
            }
        }
    }
}
复制代码

如何处理集合的并发写

public class RouteInfoManager {
    // 读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {

        ...
            try {
                this.lock.writeLock().lockInterruptibly();
                ...
                ...
                ...
            } finally {
                this.lock.writeLock().unlock();
            }
        ... 
    }
}
复制代码
  1. 锁(资源)的释放,必须写在 finally 中,确保能够被释放
  2. 对于多个集合需要保证原子性的,rocketMQ 直接用 读写锁(读之间不冲突)保证一个操作的原子性。用读写锁的原因,当然是为了避免读之间的等待。

生产上可能导致的问题

避免将 namesrv 与 broker 部署到同一台物理机上

从年末生产故障解锁RocketMQ集群部署的最佳实践

生产环境中,broker autoCreateTopicEnable 不能设置为 true

RocketMQ实战:生产环境中,autoCreateTopicEnable为什么不能设置为true

书籍推荐




觉得文章还不错?,点我收藏



如果文章侵犯到您的版权,请联系我:buaq.net[#]pm.me