从NameServer章节分析得知,路由信息存储在NameServer,生产端和消费端定时向NameServer获取topic相关的路由信息;
从生产者启动流程得知:
路由信息的动态更新源码在MQClientInstance#startScheduledTask定时任务里面
具体方法:
updateTopicRouteInfoFromNameServer下图为路由更新流程
添加图片注释,不超过 140 字(可选)
接下来我们着重解析此段源码:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//从nameServer更新路由信息 -定时任务:30s一次
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
public void updateTopicRouteInfoFromNameServer() {
Set
分析如下:
1.1 getPublishTopicListgetPublishTopicList 方法分析:
public Set
备注:
细心的你可能发现从启动流程中得知:
topicPublishInfoTable(ConcurrentHashMap)只会默认注册topic=TBW102的信息,那正常业务发送的topic是如何注册进去的呢,建议直接观看理解以下代码,在发送流程中会体现出如何注册到topicPublishInfoTable中;
topicPublishInfoTable数据的初始化(value:第一次默认都是new TopicPublishInfo())
//查找主题的路由信息的方法
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); //从NameServer更新topic路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); //从NameServer更新topic路由信息
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
分析
1.2 updateTopicRouteInfoFromNameServer从NameServer更新topic路由信息
在分析之前,可先简单分析MQClientInstance核心属性:
public class MQClientInstance {
...省略...
//key:group, value: 生产者
private final ConcurrentMap
备注:
此处列出的属性仅跟生产端相关,其他的属性和方法大都我们会在消费端分析
接下来着重分析:updateTopicRouteInfoFromNameServer
/**
* 向-NameServer查询该 topic 的路由信息
* @param topic 主题
* @param isDefault 是否默认主题
* @param defaultMQProducer 默认MQProducer
* @return
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { //获取锁:3s
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) { //默认主题-'TBW102',从NameServer查询-topicRouteData
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) { //读写队列取最小值,getDefaultTopicQueueNums=4,getReadQueueNums=16
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {//非默认主题,从NameServer查询-topicRouteData
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);//1> 判断:TopicRouteData 是否改变
if (!changed) { //未改变,
changed = this.isNeedUpdateTopicRouteInfo(topic);//2>继续判断是否需要更新:topic-路由信息
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) { // 需要更新
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());//维护brokerAddrTable地址信息
}
// Update Pub info
{ //topicRouteData 转换 TopicPublishInfo(isWriteable)-生产需要的数据
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); // 3>数据转换
publishInfo.setHaveTopicRouterInfo(true);
Iterator<>
备注:
分析如下:
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { // 此处可忽略,针对顺序消息
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else {
List
4.更新-路由发布信息:updateTopicPublishInfo(topic, publishInfo);调用的代码为:DefaultMQProducerImpl#updateTopicPublishInfo,本质就是维护Map-topicPublishInfoTable
路由更新虽然相对简单,但对于生产者来说至关重要,生产端需要知道路由信息才能进行计算选择将消息发送到哪台broker;但从源码分析中,可以看出更新路由信息以topic为维度,组装更新数据,本质还是维护Map(topicRouteTable、brokerAddrTable、topicPublishInfoTable)等,但是要注意是:ConcurrentHashMap。
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例.
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,作者是 《 消息中间件 RocketMQ 技术内幕》一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
……