SpringBoot 项目整合 RocketMQ
之前做的项目中,涉及到判题业务。用户点击提交题目答案之后,后端数据库生成提交记录后,会将 id 传给判题服务进行判题。准备将这部分使用 RocketMQ 进行处理
原来的部分代码为:
CompletableFuture.runAsync(() -> {
judgeService.doJudge(questionSubmitId);
});
开始整合!
引入依赖
首先引入 rocketmq-spring-boot-starter 的依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
修改配置文件
在配置文件application.yml
进行 RocketMQ 的相关配置
# RocketMQ 配置
rocketmq:
name-server: 192.168.31.128:9876
producer:
group: ojsystem-backend
生产者
生产者直接在业务代码中改造
在业务代码中注入rocketMQTemplate
,使用其 API 来发送消息
@Resource
private RocketMQTemplate rocketMQTemplate;
将原来的代码部分:
CompletableFuture.runAsync(() -> {
judgeService.doJudge(questionSubmitId);
});
改造为:
/*
topic: ojsystem
tag: questionSubmit
*/
rocketMQTemplate.convertAndSend("ojsystem:questionSubmit", questionSubmitId);
在 springboot 的 RocketMQ 生产者中,tag 写法如上所示,与 topic 间使用冒号分隔。
消费者
创建一个消费者,继承 RocketMQListener
,同时需要指定消息的实体类型,如本文中的 questionSubmitId
为 Long
类型。
同时需要注意消费者端的 tag 写法,与生产者不一样,是使用 selectorExpression = "questionSubmit"
来指定的。
@Component
@Slf4j
@RocketMQMessageListener(topic = "ojsystem", selectorExpression = "questionSubmit", consumerGroup = "judgeService")
public class JudgeServiceMQReceiver implements RocketMQListener<Long> {
@Resource
private JudgeService judgeService;
@Override
public void onMessage(Long questionSubmitId) {
log.info("消费者收到消息:" + questionSubmitId);
judgeService.doJudge(questionSubmitId);
}
}
结果
改造后,当有题目答案提交后,可以收到控制台输出:
2023-11-15 21:05:39.206 INFO 17464 --- [_judgeService_3] c.m.oj.judge.JudgeServiceMQReceiver : 消费者收到消息:1724775669979832321
同样我们也可以使用 RocketMQ Dashboard 可视化查看
评论区