Spring Boot & RabbitMq 搭建(docker)及整合.md

Spring Boot & RabbitMq 搭建(docker)及整合.md

原文链接:慧言博客

[toc]


写在前面:

  • 源码:https://github.com/baoqihui/code-demo-persion.git
  • 两年前写过一版RabbitMq的整合,但局限于当时技术水平,代码风格凌乱
  • 此次更新
    • 搭建RabbitMq
    • 安装延时插件
    • spring boot整合RabbitMq
    • 封装工具,并测试消息队列、延时队列、定时队列
    • 消息重试机制配置及测试

一、docker搭建rabbitmq

1. 拉取并启动rabbitmq

1
docker run -d --hostname rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management

2. 开启验收队列插件(有延时队列需求时安装)

1
2
3
4
5
#下载并复制延迟队列插件到容器
wget -O /root/rabbitmq_delayed_message_exchange-3.8.0.ez https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez && docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbit:/plugins && rm -f rabbitmq_delayed_message_exchange-3.8.0.ez

#开启插件
docker exec rabbit sh -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange"

image-20220929151320531

  1. 登录控制台http://127.0.0.1:15672/ ,默认账号/密码guest/guest

    此处顺便可以查看一下我们的延时队列插件已经生效

image-20220929152219621


二、整合spring boot

1. pom

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. yml

1
2
3
4
5
6
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
username: guest #用户名
password: guest #密码

3. 配置一些常量,主要是交换器的名称,要发送的队列名称,此处简历一个内部枚举类是为了方便遍历创建队列;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @Author: huibq
* @Date: 2022/9/29 15:26
* @Description: 默认队列常量
*/
public class RabbitMqDefaultConstants {
/**
* 交换器
*/
public static final String DIRECT_DEFAULT = "direct.default";

/**
* 队列的key
*/
public static final String DEFAULT_ROUTING_KEY_1 = "default.no1";
public static final String DEFAULT_ROUTING_KEY_2 = "default.no2";

/**
* 队列key枚举,用于创建队列;
* 将上方的队列key加入这里,队列才会被创建
*/
public enum QueueEnum {
DELAY1(DEFAULT_ROUTING_KEY_1, "默认消息1"),
DELAY2(DEFAULT_ROUTING_KEY_2, "默认消息2");
public final String code;
private final String name;
QueueEnum(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @Author: huibq
* @Date: 2022/9/29 15:26
* @Description: 延时队列常量
*/
public class RabbitMqDelayConstants {

/**
* 交换器
*/
public static final String DIRECT_DELAYED = "direct.delayed";
/**
* 队列key
*/
public static final String DELAY_ROUTING_KEY_1 = "delay.no1";
public static final String DELAY_ROUTING_KEY_2 = "delay.no2";

/**
* 队列key枚举,用于创建队列;
* 将上方的队列key加入这里,队列才会被创建
*/
public enum QueueEnum {
DELAY1(DELAY_ROUTING_KEY_1, "延时消息1"),
DELAY2(DELAY_ROUTING_KEY_2, "延时消息2");
public final String code;
private final String name;
QueueEnum(String code, String name) {
this.code = code;
this.name = name;
}
public String getCode() {
return code;
}
public String getName() {
return name;
}
}
}

4. 配置RabbitMq,使用rabbit admin对象,创建交换机并绑定队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* @Author: huibq
* @Date: 2022/9/29 15:33
* @Description: 配置rabbitmq
*/
@Configuration
public class RabbitMqConfig {
@Autowired
RabbitAdmin rabbitAdmin;

/**
* 创建初始化RabbitAdmin对象
*
* @param connectionFactory connectionFactory
* @return rabbitAdmin
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}

/**
* 默认交换机
*
* @return DirectExchange
*/
@Bean
public DirectExchange defaultExchange() {
// 参数意义: name: 名称、durable: true、autoDelete: 自动删除
return new DirectExchange(RabbitMqDefaultConstants.DIRECT_DEFAULT, true, false);
}

/**
* 延时交换机
*
* @return CustomExchange
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitMqDelayConstants.DIRECT_DELAYED, "x-delayed-message", true, false, args);
}

/**
* 读取创建交换机和对列
*/
@PostConstruct
public void declareQueueByConfig() {
//延时
Arrays.stream(RabbitMqDelayConstants.QueueEnum.values()).forEach(
queueEnum -> {
Queue queue = new Queue(queueEnum.getCode(), true);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(
BindingBuilder.bind(queue)
.to(delayedExchange())
.with(queueEnum.getCode())
.noargs()
);
}
);
//默认
Arrays.stream(RabbitMqDefaultConstants.QueueEnum.values()).forEach(
rabbitMqQueueEnum -> {
Queue queue = new Queue(rabbitMqQueueEnum.getCode(), true);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(
BindingBuilder.bind(queue)
.to(defaultExchange())
.with(rabbitMqQueueEnum.getCode())
);
}
);
}
}

5. 制作RabbitMq工具类,此处包含发送普通消息、延时消息、定时消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
@Service
@AllArgsConstructor
public class RabbitMqUtil {
private RabbitTemplate rabbitTemplate;

/**
* 向默认队列推送消息
*
* @param queueCode 队列编码
* @param value 消息内容
*/
public <T> void sendDefaultMessage(String queueCode, T value) {
log.info("(添加默认队列成功) 队列键:{},队列值:{}", queueCode, value);
try {
rabbitTemplate.convertAndSend(RabbitMqDefaultConstants.DIRECT_DEFAULT, queueCode, value);
} catch (Exception e) {
log.error("(添加默认队列失败)", e);
}
}

/**
* 发送延时消息到默认交换机
*
* @param queueCode 队列编码
* @param value 消息内容
* @param delayTime 延时时间
* @param timeUnit 时间单位
*/
public <T> void addDelayQueue(String queueCode, T value, Long delayTime, TimeUnit timeUnit) {
log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}秒", queueCode, value, timeUnit.toSeconds(delayTime));
try {
rabbitTemplate.convertAndSend(DIRECT_DELAYED, queueCode, value,
message -> {
message.getMessageProperties().setHeader("x-delay", timeUnit.toMillis(delayTime));
return message;
});
} catch (Exception e) {
log.error("(添加延时队列失败) ", e);
}
}

/**
* 定时推送消息
* @param queueCode 队列编码
* @param value 消息内容
* @param sendTime 发送时间
*/
public <T> void addDelayQueue(String queueCode, T value, Date sendTime) {
log.info("(添加定时推送队列成功) 队列键:{},队列值:{},延迟时间:{}秒,实际发送时间为:{}", queueCode, value, DateUtil.between(new Date(), sendTime, DateUnit.SECOND), sendTime);
addDelayQueue(queueCode, value, DateUtil.between(new Date(), sendTime, DateUnit.MS, false), TimeUnit.MILLISECONDS);
}
}

6. 使用controller发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@RestController
@RequestMapping("test/rabbit")
@AllArgsConstructor
public class RabbitMqController {
private RabbitMqUtil rabbitMqUtil;

@ApiOperation(value = "发送信息到默认队列1")
@GetMapping(value = "toQueue1")
public Result toQueue1(String message) {
rabbitMqUtil.sendDefaultMessage(RabbitMqDefaultConstants.DEFAULT_ROUTING_KEY_1,message);
return Result.succeed("发送成功");
}

@ApiOperation(value = "发送信息到默认队列2")
@GetMapping(value = "toQueue2")
public Result toQueue2(String message) {
rabbitMqUtil.sendDefaultMessage(RabbitMqDefaultConstants.DEFAULT_ROUTING_KEY_2,message);
return Result.succeed("发送成功");
}

@ApiOperation(value = "延时发送:延时10s发送信息到延时队列1")
@GetMapping(value = "toDelayQueue1")
public Result toDelayQueue1(String message) {
rabbitMqUtil.addDelayQueue(RabbitMqDelayConstants.DELAY_ROUTING_KEY_1, message, 10L, TimeUnit.SECONDS);
return Result.succeed("发送成功");
}

@ApiOperation(value = "定时发送:20s后推送消息到延时队列2")
@GetMapping(value = "toDelayQueue2")
public Result toDelayQueue2(String message) {
rabbitMqUtil.addDelayQueue(RabbitMqDelayConstants.DELAY_ROUTING_KEY_2, message, DateUtil.offsetSecond(DateUtil.date(),20));
return Result.succeed("发送成功");
}
}

7. 设置监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@Component
public class ListenerService {
@RabbitListener(queues = RabbitMqDefaultConstants.DEFAULT_ROUTING_KEY_1)
public void defaultListener1(String message) {
log.info("默认消费者1接收到的消息为:{}", message);
}

@RabbitListener(queues = RabbitMqDefaultConstants.DEFAULT_ROUTING_KEY_2)
public void defaultListener2(String message) {
log.info("默认消费者2接收到的消息为:{}", message);
}

@RabbitListener(queues = RabbitMqDelayConstants.DELAY_ROUTING_KEY_1)
public void delayListener1(String message) {
log.info("延时消费者1接收到的消息为:{}", message);
}

@RabbitListener(queues = RabbitMqDelayConstants.DELAY_ROUTING_KEY_2)
public void delayListener2(String message) {
log.info("延时消费者2接收到的消息为:{}", message);
}
}

8. 测试结果,注意看日志打印时间,呈现延时效果

image-20220929161523274

三、重试机制的开启和测试

1.yml,此处设置重试5次,首次重试时间2s,间隔倍数2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
username: guest #用户名
password: guest #密码
listener:
simple:
acknowledge-mode: auto # 消息确认方式,其有三种配置方式,分别是none、manual(手动ack) 和auto(自动ack) 默认auto
retry: #设置重试机制
enabled: true #重试开启
max-attempts: 5 #最大重试次数
initial-interval: 2000 # 重试时间间隔(秒)
max-interval: 20000 #最大时间间隔(秒)
multiplier: 2 #重试间隔倍数

2.修改延时监听者1,使其跑出异常并统计重试次数

1
2
3
4
5
6
7
int count = 1;
@RabbitListener(queues = RabbitMqDelayConstants.DELAY_ROUTING_KEY_1)
public void delayListener1(String message) {
log.error("重试次数:{}",count++);
int i=1/0;
log.info("延时消费者1接收到的消息为:{}", message);
}

3.测试结果

  • 可以看到,依次在间隔2,4,8,16秒时进行了重试;共计重试到第五次,抛出异常,符合预期结果

image-20220929164011077