博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ源码解析:定时消息与消息重试
阅读量:6387 次
发布时间:2019-06-23

本文共 17569 字,大约阅读时间需要 58 分钟。

???关注微信公众号:【芋艿的后端小屋】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右

1. 概述

建议前置阅读内容:

? 为什么把定时消息消息重试放在一起?你猜。

? 你猜我猜不猜。

2. 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

下图是定时消息的处理逻辑图:

定时消息逻辑图.png

2.1 延迟级别

RocketMQ 目前只支持固定精度的定时消息。官方说法如下:

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

  • 延迟级别:
延迟级别 时间
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h
  • 核心源码如下:

    1: // ⬇️⬇️⬇️【MessageStoreConfig.java】    2: /**    3:  * 消息延迟级别字符串配置    4:  */    5: private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";    6:     7: // ⬇️⬇️⬇️【ScheduleMessageService.java】    8: /**    9:  * 解析延迟级别   10:  *   11:  * @return 是否解析成功   12:  */   13: public boolean parseDelayLevel() {   14:     HashMap
    timeUnitTable = new HashMap<>(); 15: timeUnitTable.put("s", 1000L); 16: timeUnitTable.put("m", 1000L * 60); 17: timeUnitTable.put("h", 1000L * 60 * 60); 18: timeUnitTable.put("d", 1000L * 60 * 60 * 24); 19: 20: String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); 21: try { 22: String[] levelArray = levelString.split(" "); 23: for (int i = 0; i < levelArray.length; i++) { 24: String value = levelArray[i]; 25: String ch = value.substring(value.length() - 1); 26: Long tu = timeUnitTable.get(ch); 27: 28: int level = i + 1; 29: if (level > this.maxDelayLevel) { 30: this.maxDelayLevel = level; 31: } 32: long num = Long.parseLong(value.substring(0, value.length() - 1)); 33: long delayTimeMillis = tu * num; 34: this.delayLevelTable.put(level, delayTimeMillis); 35: } 36: } catch (Exception e) { 37: log.error("parseDelayLevel exception", e); 38: log.info("levelString String = {}", levelString); 39: return false; 40: } 41: 42: return true; 43: }复制代码

2.2 Producer 发送定时消息

  • ?发送时,设置消息的延迟级别
Message msg = new Message(...);msg.setDelayTimeLevel(level);复制代码

2.3 Broker 存储定时消息

  • ? 存储消息时,延迟消息进入 TopicSCHEDULE_TOPIC_XXXX
  • ? 延迟级别 与 消息队列编号 做固定映射:QueueId = DelayLevel - 1

核心代码如下:

1: // ⬇️⬇️⬇️【CommitLog.java】  2: /**  3:  * 添加消息,返回消息结果  4:  *  5:  * @param msg 消息  6:  * @return 结果  7:  */  8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {  9:     // ....(省略代码)  10:  11:     // 定时消息处理 12:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 13:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 14:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 15:         // Delay Delivery 16:         if (msg.getDelayTimeLevel() > 0) { 17:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 18:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 19:             } 20:  21:             // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。 22:             topic = ScheduleMessageService.SCHEDULE_TOPIC; 23:  24:             // 延迟级别 与 消息队列编号 做固定映射 25:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 26:  27:             // Backup real topic, queueId 28:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 30:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 31:  32:             msg.setTopic(topic); 33:             msg.setQueueId(queueId); 34:         } 35:     } 36:  37:     // ....(省略代码)  38: } 39:  40: // ⬇️⬇️⬇️【ScheduleMessageService.java】 41: /** 42:  * 根据 延迟级别 计算 消息队列编号 43:  * QueueId = DelayLevel - 1 44:  * 45:  * @param delayLevel 延迟级别 46:  * @return 消息队列编号 47:  */ 48: public static int delayLevel2QueueId(final int delayLevel) { 49:     return delayLevel - 1; 50: }复制代码

  • ? 生成 ConsumeQueue 时,每条消息的 tagsCode 使用【消息计划消费时间】。这样,ScheduleMessageService 在轮询 ConsumeQueue 时,可以使用 tagsCode 进行过滤。

核心代码如下:

1: // ⬇️⬇️⬇️【CommitLog.java】  2: /**  3:  * check the message and returns the message size  4:  *  5:  * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure  6:  */  7: public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {  8:     try {  9:         // // ....(省略代码) 10:  11:         // 17 properties 12:         short propertiesLength = byteBuffer.getShort(); 13:         if (propertiesLength > 0) { 14:             // ....(省略代码) 15:             String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS); 16:             if (tags != null && tags.length() > 0) { 17:                 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); 18:             } 19:  20:             // Timing message processing 21:             { 22:                 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 23:                 if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) { 24:                     int delayLevel = Integer.parseInt(t); 25:  26:                     if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 27:                         delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); 28:                     } 29:  30:                     if (delayLevel > 0) { 31:                         tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, 32:                             storeTimestamp); 33:                     } 34:                 } 35:             } 36:         } 37:  38:         // ....(省略代码) 39:  40:         return new DispatchRequest(// 41:             topic, // 1 42:             queueId, // 2 43:             physicOffset, // 3 44:             totalSize, // 4 45:             tagsCode, // 5 46:             storeTimestamp, // 6 47:             queueOffset, // 7 48:             keys, // 8 49:             uniqKey, //9 50:             sysFlag, // 9 51:             preparedTransactionOffset// 10 52:         ); 53:     } catch (Exception e) { 54:     } 55:  56:     return new DispatchRequest(-1, false /* success */); 57: } 58:  59: // ⬇️⬇️⬇️【ScheduleMessageService.java】 60: /** 61:  * 计算 投递时间【计划消费时间】 62:  * 63:  * @param delayLevel 延迟级别 64:  * @param storeTimestamp 存储时间 65:  * @return 投递时间【计划消费时间】 66:  */ 67: public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) { 68:     Long time = this.delayLevelTable.get(delayLevel); 69:     if (time != null) { 70:         return time + storeTimestamp; 71:     } 72:  73:     return storeTimestamp + 1000; 74: }复制代码

2.4 Broker 发送定时消息

  • ? 对 SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。

下图是发送定时消息的处理逻辑图:

定时消息定时逻辑

实现代码如下:

1: /**  2:  * ⬇️⬇️⬇️ 发送(投递)延迟消息定时任务  3:  */  4: class DeliverDelayedMessageTimerTask extends TimerTask {  5:     /**  6:      * 延迟级别  7:      */  8:     private final int delayLevel;  9:     /** 10:      * 位置 11:      */ 12:     private final long offset; 13:  14:     public DeliverDelayedMessageTimerTask(int delayLevel, long offset) { 15:         this.delayLevel = delayLevel; 16:         this.offset = offset; 17:     } 18:  19:     @Override 20:     public void run() { 21:         try { 22:             this.executeOnTimeup(); 23:         } catch (Exception e) { 24:             // XXX: warn and notify me 25:             log.error("ScheduleMessageService, executeOnTimeup exception", e); 26:             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( 27:                 this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); 28:         } 29:     } 30:  31:     /** 32:      * 纠正可投递时间。 33:      * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。 34:      * 35:      * @param now 当前时间 36:      * @param deliverTimestamp 投递时间 37:      * @return 纠正结果 38:      */ 39:     private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { 40:         long result = deliverTimestamp; 41:  42:         long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); 43:         if (deliverTimestamp > maxTimestamp) { 44:             result = now; 45:         } 46:  47:         return result; 48:     } 49:  50:     public void executeOnTimeup() { 51:         ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,  delayLevel2QueueId(delayLevel)); 52:  53:         long failScheduleOffset = offset; 54:  55:         if (cq != null) { 56:             SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); 57:             if (bufferCQ != null) { 58:                 try { 59:                     long nextOffset = offset; 60:                     int i = 0; 61:                     for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 62:                         long offsetPy = bufferCQ.getByteBuffer().getLong(); 63:                         int sizePy = bufferCQ.getByteBuffer().getInt(); 64:                         long tagsCode = bufferCQ.getByteBuffer().getLong(); 65:  66:                         long now = System.currentTimeMillis(); 67:                         long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); 68:  69:                         nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 70:  71:                         long countdown = deliverTimestamp - now; 72:  73:                         if (countdown <= 0) { // 消息到达可发送时间 74:                             MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); 75:                             if (msgExt != null) { 76:                                 try { 77:                                     // 发送消息 78:                                     MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); 79:                                     PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner); 80:                                     if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功 81:                                         continue; 82:                                     } else { // 发送失败 83:                                         // XXX: warn and notify me 84:                                         log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); 85:  86:                                         // 安排下一次任务 87:                                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); 88:  89:                                         // 更新进度 90:                                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 91:                                         return; 92:                                     } 93:                                 } catch (Exception e) { 94:                                     // XXX: warn and notify me 95:                                     log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" 96:                                             + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); 97:                                 } 98:                             } 99:                         } else {100:                             // 安排下一次任务101:                             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);102: 103:                             // 更新进度104:                             ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);105:                             return;106:                         }107:                     } // end of for108: 109:                     nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);110: 111:                     // 安排下一次任务112:                     ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);113: 114:                     // 更新进度115:                     ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);116:                     return;117:                 } finally {118:                     bufferCQ.release();119:                 }120:             } // end of if (bufferCQ != null)121:             else { // 消费队列已经被删除部分,跳转到最小的消费进度122:                 long cqMinOffset = cq.getMinOffsetInQueue();123:                 if (offset < cqMinOffset) {124:                     failScheduleOffset = cqMinOffset;125:                     log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="126:                         + cqMinOffset + ", queueId=" + cq.getQueueId());127:                 }128:             }129:         } // end of if (cq != null)130: 131:         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);132:     }133: 134:     /**135:      * 设置消息内容136:      *137:      * @param msgExt 消息138:      * @return 消息139:      */140:     private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {141:         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();142:         msgInner.setBody(msgExt.getBody());143:         msgInner.setFlag(msgExt.getFlag());144:         MessageAccessor.setProperties(msgInner, msgExt.getProperties());145: 146:         TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());147:         long tagsCodeValue =148:             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());149:         msgInner.setTagsCode(tagsCodeValue);150:         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));151: 152:         msgInner.setSysFlag(msgExt.getSysFlag());153:         msgInner.setBornTimestamp(msgExt.getBornTimestamp());154:         msgInner.setBornHost(msgExt.getBornHost());155:         msgInner.setStoreHost(msgExt.getStoreHost());156:         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());157: 158:         msgInner.setWaitStoreMsgOK(false);159:         MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);160: 161:         msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));162: 163:         String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);164:         int queueId = Integer.parseInt(queueIdStr);165:         msgInner.setQueueId(queueId);166: 167:         return msgInner;168:     }169: }复制代码

2.5 Broker 持久化定时发送进度

  • ? 定时消息发送进度存储在文件(../config/delayOffset.json)里
  • ? 每 10s 定时持久化发送进度。

核心代码如下:

1: // ⬇️⬇️⬇️【ScheduleMessageService.java】  2: /**  3: public void start() {  4:     // 定时发送消息  5:     for (Map.Entry
entry : this.delayLevelTable.entrySet()) { 6: Integer level = entry.getKey(); 7: Long timeDelay = entry.getValue(); 8: Long offset = this.offsetTable.get(level); 9: if (null == offset) { 10: offset = 0L; 11: } 12: 13: if (timeDelay != null) { 14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); 15: } 16: } 17: 18: // 定时持久化发送进度 19: this.timer.scheduleAtFixedRate(new TimerTask() { 20: 21: @Override 22: public void run() { 23: try { 24: ScheduleMessageService.this.persist(); 25: } catch (Exception e) { 26: log.error("scheduleAtFixedRate flush exception", e); 27: } 28: } 29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 30: }复制代码

3. 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。

  • ? Consumer 将消费失败的消息发回 Broker,进入延迟消息队列。即,消费失败的消息,不会立即消费。

核心代码如下:

1: // ⬇️⬇️⬇️【SendMessageProcessor.java】  2: /**  3:  * 消费者发回消息  4:  *  5:  * @param ctx ctx  6:  * @param request 请求  7:  * @return 响应  8:  * @throws RemotingCommandException 当远程调用异常  9:  */ 10: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) 11:     throws RemotingCommandException { 12:     // ....(省略代码) 13:     // 处理 delayLevel(独有)。 14:     int delayLevel = requestHeader.getDelayLevel(); 15:     int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); 16:     if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { 17:         maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); 18:     } 19:     if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// 20:     // ....(省略代码) 21:     } else { 22:         if (0 == delayLevel) { 23:             delayLevel = 3 + msgExt.getReconsumeTimes(); 24:         } 25:         msgExt.setDelayTimeLevel(delayLevel); 26:     } 27:  28:     // ....(省略代码) 29:     return response; 30: }复制代码

转载地址:http://xsdha.baihongyu.com/

你可能感兴趣的文章
raid5分析结果(临时)
查看>>
CCNA系列课程(1) 网络基础
查看>>
Enterprise Library 2.0 Hands On Lab 翻译(11):缓存应用程序块(三)
查看>>
【REACT NATIVE 系列教程之四】刷新组件RENDER(重新渲染)的三种方式详解
查看>>
认识配置设置文件(INI与XML)
查看>>
foreman架构的引入4-安装Foreman1.6.3架构(foreman与puppetmaster分离)
查看>>
【Xamarin】使用WebSocket开发实时通信应用程序
查看>>
配置Docker多台宿主机间的容器互联
查看>>
Android系统Surface机制的SurfaceFlinger服务的线程模型分析
查看>>
Webpack中publicPath设置
查看>>
event.srcElement的用法
查看>>
“页面制作人员”?“页面工程师”?“页面架构师”?滚一边去!
查看>>
C++:派生类的构造函数和析构函数
查看>>
2012 Autodesk开发者日开始注册啦
查看>>
System.Net.Http for Silverlight
查看>>
ASP.NET 2.0 验证控件新的功能
查看>>
asp.net获取客户端的MAC地址
查看>>
CentOS下Zabbix安装部署及汉化
查看>>
判断两矩形是否相交
查看>>
第7章 "敏捷+"项目管理
查看>>