侧边栏壁纸
  • 累计撰写 27 篇文章
  • 累计创建 42 个标签
  • 累计收到 34 条评论

目 录CONTENT

文章目录

SpringBoot 项目整合 RocketMQ

miykah
2023-11-15 / 0 评论 / 0 点赞 / 56 阅读 / 2954 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-12-11,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

SpringBoot 项目整合 RocketMQ

之前做的项目中,涉及到判题业务。用户点击提交题目答案之后,后端数据库生成提交记录后,会将 id 传给判题服务进行判题。准备将这部分使用 RocketMQ 进行处理

原来的部分代码为:

CompletableFuture.runAsync(() -> {
	judgeService.doJudge(questionSubmitId);
});

开始整合!

引入依赖

首先引入 rocketmq-spring-boot-starter 的依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

修改配置文件

在配置文件application.yml 进行 RocketMQ 的相关配置

# RocketMQ 配置
rocketmq:
  name-server: 192.168.31.128:9876
  producer:
    group: ojsystem-backend

生产者

生产者直接在业务代码中改造

在业务代码中注入rocketMQTemplate ,使用其 API 来发送消息

@Resource
private RocketMQTemplate rocketMQTemplate;

将原来的代码部分:

CompletableFuture.runAsync(() -> {
	judgeService.doJudge(questionSubmitId);
});

改造为:

/*
    topic: ojsystem
    tag: questionSubmit
 */
rocketMQTemplate.convertAndSend("ojsystem:questionSubmit", questionSubmitId);

在 springboot 的 RocketMQ 生产者中,tag 写法如上所示,与 topic 间使用冒号分隔。

消费者

创建一个消费者,继承 RocketMQListener,同时需要指定消息的实体类型,如本文中的 questionSubmitId Long 类型。

同时需要注意消费者端的 tag 写法,与生产者不一样,是使用 selectorExpression = "questionSubmit" 来指定的。

@Component
@Slf4j
@RocketMQMessageListener(topic = "ojsystem", selectorExpression = "questionSubmit", consumerGroup = "judgeService")
public class JudgeServiceMQReceiver implements RocketMQListener<Long> {

    @Resource
    private JudgeService judgeService;

    @Override
    public void onMessage(Long questionSubmitId) {
        log.info("消费者收到消息:" + questionSubmitId);
        judgeService.doJudge(questionSubmitId);
    }

}

结果

改造后,当有题目答案提交后,可以收到控制台输出:

2023-11-15 21:05:39.206  INFO 17464 --- [_judgeService_3] c.m.oj.judge.JudgeServiceMQReceiver      : 消费者收到消息:1724775669979832321

同样我们也可以使用 RocketMQ Dashboard 可视化查看

0

评论区