RocketMQ消息处理流程

RocketMQ在接收到消息后,会执行下面的流程:

  1. 检查消息格式是否正确
  2. 进行消息空间检测(没有超过限制)
  3. 将消息存入内存队列
  4. 根据配置进行持久化
  5. 根据路由规则发送消息
  6. 触发消费者消费消息
  7. 记录消费情况等
图片[1]-RocketMQ消息处理流程-不念博客

模板方法

RocketMQ定义了一个模板方法用来处理消息:

public abstract class MessageTemplate {

  public final void processMessage(Message msg) {

    validate(msg);  // 对消息格式校验
    spaceCheck(msg);    
    putInQueue(msg);   
    persist(msg);   
    route(msg); 
    triggerConsumer(msg);   
    recordStats(msg);   
  }

  // 具体方法

  protected abstract void validate(Message msg);

  protected abstract void spaceCheck(Message msg);

  // ...
}

这里定义了消息处理的整个流程,但具体逻辑留给子类实现。

子类实现

有不同的实现类:

public class DefaultMessageTemplate extends MessageTemplate{

   protected void validate(Message msg) {
     // 默认实现  
   }

   protected void spaceCheck(msg) {
      // 检查空间大小    
   }

   // ...
}

public class RTMessageTemplate extends MessageTemplate {

   // 实现实时消息的校验和检查逻辑  
}

使用

MessageTemplate template = new DefaultMessageTemplate();

template.processMessage(msg);

这样就使用了默认处理消息的框架。

作用

RocketMQ通过模板方法模式定义消息处理模板:

  • 提供了一个算法的框架
  • 子类只需要实现具体步骤
  • 提供了相关的钩子方法
  • 算法的结构不变
  • 简化了子类的实现

符合模板方法模式原理:定义一个操作中的算法骨架,由子类实现具体步骤。

RocketMQ通过模板方法定义消息处理的抽象框架,可以方便地扩展具体实现。

© 版权声明
THE END
喜欢就支持一下吧
点赞113赞赏 分享
评论 抢沙发
头像
欢迎光临不念博客,留下您的想法和建议,祝您有愉快的一天~
提交
头像

昵称

取消
昵称

    暂无评论内容