RabbitMQ基础

布鸽不鸽 Lv4

前言

本文是RabbitMQ基础学习笔记

原文地址:https://xuedongyun.cn/post/33842/

MQ相关概念

什么是MQ

  • 本质是个队列,先进先出(FIFO)
  • 跨进程的通信机制,用于上下游传递消息
  • “逻辑解耦+物理解耦”的消息通信服务

为什么使用MQ

  • 流量消峰

比如下单,使用消息队列做缓冲。高峰期,排队进行,让短时间的请求分散到一定时间来处理。

可能导致用户下单很久,才能收到下单成功的消息。但也比无法下单好。

  • 应用解耦

比如订单系统,通过消息队列调用物流系统。即使物流系统宕机10分钟,恢复后,也能继续处理订单。

  • 异步处理

比如A调用B,B需要处理很长时间。可以用消息队列,A调用B的服务后,监听B处理完的消息。B完成后,会发送消息给MQ,MQ会将消息转发给A服务。

MQ的分类

  • ActiveMQ:很老的MQ
  • Kafka:为大数据而生
    • 大型公司建议使用。有日志采集需求首选Kafka
  • RocketMQ:出自阿里巴巴,消息可以做到0丢失
    • 为金融互联网而生
  • RabbitMQ:最主流的消息中间件之
    • 性能好,时效性微秒级,社区活动度高,管理界面方便。如果数据量没有那么大,优先选用。

RabbitMQ

RabbitMQ是一个消息中间件,负责接收,存储,转发消息数据

四大核心

  • 生产者:产生数据的人
  • 交换机:决定将消息推送到哪一个队列
  • 队列:大的消息缓冲区
  • 消费者:从队列中接收数据进行处理

名词解释

image-20230731171245490
  • Broker: 接收和分发消息的应用,Rabbit Server就是Message Broker
  • Virtual host: 当多个用户使用同一个Rabbit MQ时,可以划分多个vhost。每个用户在自己的vhost中创建exchange,queue等
  • Connection: publisher/consumer与broker之间的TCP连接
  • Channel: 在Connection内部建立的逻辑连接,通常使用channel id帮助客户端和brocker识别channel
  • Exchange: 交换机
  • Queue: 队列
  • Binding: Exchange和Queue之间的虚拟连接。Binding可以包含routing key,被保存到exchange中的查询表中。用作message的分发依据

安装教程

使用系统:ubuntu22

官方文档:https://www.rabbitmq.com/download.html

  • 安装RabbitMQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Community mirror of Cloudsmith: modern Erlang repository
curl -1sLf https://ppa1.novemberain.com/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Community mirror of Cloudsmith: RabbitMQ repository
curl -1sLf https://ppa1.novemberain.com/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

## Provides RabbitMQ
##
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing
  • 查看状态
1
systemctl status rabbitmq-server
  • 启用web界面插件
1
sudo rabbitmq-plugins enable rabbitmq_management

http://localhost:15672

默认账号和密码:guest guest,但只有localhost才能登录

  • 添加用户并设置权限
1
2
3
4
5
6
7
8
9
10
# 创建账号
rabbitmqctl add_user admin 123456

# 设置角色:超级管理员
rabbitmqctl set_user_tags admin administrator

# 设置权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 用户将拥有"/"这个virtual host中所有资源的配置、写、读权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  • 查看当前用户和角色
1
rabbitmqctl list_users
  • 重置命令
1
2
3
4
5
6
7
8
# 关闭应用
rabbitmqctl stop_app

# 清除命令
rabbitmqctl reset

# 启动命令
rabbitmqctl start_app

简单队列模式

image-20230801201330758

Java依赖

1
2
3
4
5
# rabbitmq依赖客户端
amqp-client

# 操作文件的依赖
commons-io

创建连接

创建:连接工厂 -> 连接 -> 信道 -> 队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("123456");

// 创建连接
Connection connection = factory.newConnection();

// 获取信道
Channel channel = connection.createChannel();

/*
声明一个队列
参数1:队列名称
参数2:消息是否持久化。消息默认存在内存中,可持久化到硬盘
参数3:消息是否共享。一个消息可由多个消费者消费
参数4:是否自动删除。最后一个消费者断开连接,队列是否删除
参数5:其他配置
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

生产者

1
2
3
4
5
6
7
8
9
10
String message = "hello world";

/*
发送消息
参数1:发送到哪个交换机
参数2:路由的key值
参数3:其他配置
参数4:消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
System.out.println("消费已成功:message = " + new String(message.getBody()));
};

CancelCallback cancelCallback = (String consumerTag) -> {
System.out.println("消费被取消:consumerTag = " + consumerTag);
};

/*
消费消息
参数1:消费的队列
参数2:消费成功后是否自动应答
参数3:消费成功回调
参数4:消费取消回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

Work Queues

image-20230801201354696

工作队列模式:生产者发送大量消息,多个工作线程(消费者)接受并处理消息。

轮询分发消息

多个消费者是竞争的关系,每条消息只会被处理一次

抽取工具类

RabbitMQ创建信道的工具类

1
2
3
4
5
6
7
8
9
10
public class RabbitMqUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
return connection.createChannel();
}
}

消费者

这里使用idea手动运行多个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Work01 {

public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费已成功:message = " + new String(message.getBody()));
};

CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消费被取消:consumerTag = " + consumerTag);
};

System.out.println("c1等待接受消息");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Task01 {
public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
}

消息应答

为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制。消费者处理完消息后,告诉RabbitMQ:可以把消息删除了

自动应答

以接受到消息为准。自动应答相对不是很靠谱

手动应答

API作用
void basicAck(long deliveryTag, boolean multiple)用于肯定确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue)用于否认
void basicReject(long deliveryTag, boolean requeue)用于否认,少一个参数(不批量应答)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Work01 {

public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费已成功:message = " + new String(message.getBody()));
// 手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消费被取消:consumerTag = " + consumerTag);
};

// autoAck设为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}

Multiple解释

手动应答可以批量应答,减少网络拥堵

true代表批量应答channel上未应答的消息(tag比当前小的都会被应答)

  • 假如channel上传递消息,tag为:8,7,6,5。若当前tag为8,应答后5-8中未应答的消息都会被确认

false代表只应答当前消息

  • 假如channel上传递消息,tag为:8,7,6,5。若当前tag为8,应答后只有8会被确认

一般情况下使用false

消息重新入队

假如消费者由于某些原因失去连接(通道关闭,连接关闭或TCP连接丢失),导致未发送ACK确认,RabbitMQ将对其重新排队。

当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时某个消费者会立马又接收到这条消息进行处理。

持久化

为了确保消息在RabbitMQ中不会丢失,我们需要将队列消息都标记为持久化

队列持久化

1
2
3
4
// 创建队列时,声明队列需要持久化
// 记得先将之前的同名队列删除
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

此时Features下方会有一个”D”,现在即使重启RabbitMQ队列也不会丢失

image-20230801222521233

消息持久化

1
2
3
4
5
6
// 发布消息时
// 以前
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

// 现在
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在:当刚准备存储在磁盘时,突然宕机的情况。无法保证百分百数据不丢失。但是对我们的简单队列来说,绰绰有余了。

更强力的持久化策略,参考后面的发布确认章节

不公平分发

正常情况下使用轮询分发。假如有两个消费者,消费者A处理非常快,消费者B处理非常慢。这会导致处理的快的A有大量的空闲时间,处理的慢的消费者B一直在干活。

为避免这种情况,我们可以设置参数channel.basicQos(int prefetchCount)。能者多劳

1
2
3
4
// 由消费方设置
// 设为1就是不公平分发,设为0就是轮询分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);

image-20230803160747978

可以在Channels里看到一个Queue,Prefetch count(分配数量)为1

预取值概念

预取值(prefetch):一个channel中,未确认的消息缓冲区大小。(消费者的手动确认本质上也是异步的)

image-20230803161552926

正常情况下依然是能者多干。如果同时有7条数据,那么会指定,按照2条,5条分配。

发布确认

即使队列和消息都设置持久化,仍然存在消息丢失的可能(比如还未存到磁盘上就宕机了)

生产者将信道设置成confirm模式。所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出。

开启发布确认

在生产者部分设置

1
2
// 开启发布确认功能
channel.confirmSelect();

单个确认发布

它是一种同步确认发布方式。发一条,确认一次。上一条消息确认了,下一条消息才能发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void pubMsgOne() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 1000; i++) {
String msg = String.valueOf(i);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("发送成功");
}
}
}

批量确认发布

依然是同步的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void pubMsgBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
channel.confirmSelect();
// 批量确认消息大小
int batchSize = 100;
// 待确认消息个数
int outstandingMsg = 0;
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 1000; i++) {
String msg = String.valueOf(i);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
outstandingMsg++;
if (outstandingMsg == batchSize) {
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("本批次消息发送成功");
}
outstandingMsg = 0;
}
}
}

异步确认发布

通过回调函数的方式来确保投递成功

简单来说:ConcurrentSkipListMapTreeMap的并发实现,但是为什么没有称之为ConcurrentTreeMap呢?这和其自身的实现有关,该类是SkipLists的变种实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void pubMsgAsync() throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.confirmSelect();

// 线程安全的有序哈希表,将序号与消息关联。支持通过序列号批量删除
ConcurrentSkipListMap<Long, String> outstandingConfirm = new ConcurrentSkipListMap<>();

ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
// 批量确认(不过一般很少用)
if (multiple) {
// 返回小于等于当前序列号的未确认消息,是一个map
ConcurrentNavigableMap<Long, String> confirm = outstandingConfirm.headMap(deliveryTag, true);
confirm.clear();
} else {
// 只清除当前序号
outstandingConfirm.remove(deliveryTag);
}
};

// 传参:ackCallback和nackCallback
channel.addConfirmListener(ackCallback, null);
for (int i = 0; i < 1000; i++) {
String msg = String.valueOf(i);
outstandingConfirm.put(channel.getNextPublishSeqNo(), msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
}

性能对比

  • 发布1000个单独确认消息:50,278 ms
  • 发布1000个批量确认消息:635 ms
  • 发布1000个异步确认消息:92 ms

交换机

我们之前一直假设,每个任务都恰好交付给一个消费者。现在,我们希望将一个消息传达给多个消费者。这种模式就是Pub/Sub模式

例子:日志系统,第一个程序将日志发送到消息队列中;然后启动两个消费者,一个消费者把日志存储到磁盘,另一个消费者把日志打印在屏幕上

Exchanges

事实上,在RabbitMQ中消息不会直接发送给队列,而是会发送给交换机。交换机负责接受消息并推入队列。

交换机的类型

直接(direct),主题(topic),标题(headers),扇出(fanout)

无名交换机

当我们用空字符串,表示无名交换机时:目标队列实际上是由routeKey绑定的key指定的

1
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

临时队列

我们可以创建一个随机名称的新队列。一旦消费者全断开连接,队列将被删除

1
String queueName = channel.queueDeclare().getQueue();

绑定(binding)

就是交换机和队列之间的映射关系

image-20230805114503667

Fanout交换机

Fanout这种类型的Exchange,会将接受到的消息广播到它知道的所有队列中

  • 消费者(可以运行多个)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ReceiveLog1 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

// 声明交换机,类型为Fanout
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

// 声明一个临时队列,消费者断开连接队列就没了
String queueName = channel.queueDeclare().getQueue();

// 绑定交换机和队列
channel.queueBind(queueName, "logs", "");

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者1已接收到消息:" + new String(message.getBody()));
};

channel.basicConsume(queueName, true, deliverCallback, (CancelCallback) null);
}
}
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class EmitLog {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个交换机,类型为Fanout
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish("logs", "", null, msg.getBytes());
System.out.println("msg = " + msg);
}
}
}

此时,Fanout类型的交换机会把接受到的消息广播到所有绑定的队列上去

Direct交换机

image-20230805163515227

  • 我们可以声明Direct类型的交换机,发送到交换机的消息,会根据Routing key分配到不同的队列中

image-20230805160323352

  • 消费者,声明完Direct类型的交换机后,还根据routingKey为交换机绑定了队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReceiveLogDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 声明一个队列
channel.queueDeclare("console", false, false, false, null);

// 绑定交换机和队列,routingKey为:info或warning的消息,都会被分配给该队列
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者1已接收到消息:" + new String(message.getBody()));
};

channel.basicConsume("console", true, deliverCallback, (CancelCallback) null);
}
}

现在发送给direct_logs交换机的消息,只要routingKey为info或者warning,都会被发送到console队列

Topic交换机

对于Direct交换机来说,一个routingKey只能路由到唯一的队列。

Topic的要求

发送到Topic类型交换机的消息的routingKey不能随意写,必须满足一定的要求。它必须是一个单词列表,以点分割。比如”quick.orange.rabbit“。单词列表最多不能超过255个字节。

有两个替换符需要注意:

  • *可以代替一个单词,比如:

    • *.orange.*

    • *.*.rabbit

  • #可以代替零个或多个单词,比如:

    • quick.#

Topic匹配案例

image-20230805164243582

routingKey接收队列
quick.orange.rabbitQ1,Q2
lazy.orange.elephantQ1,Q2
quick.orange.foxQ1
lazy.brown.foxQ2
lazy.pink.rabbitQ2(只会接收一次)
quick.brown.fox不匹配任何绑定,被丢弃
quick.orange.male.rabbit不匹配任何绑定,被丢弃
lazy.orange.male.rabbitQ2

两种特殊队列绑定:

  • #:这个队列将接收交换机的所有消息,类似Fanout
  • 没有#和*:这个队列的绑定类型就是Direct了

实战代码

image-20230805165031366

  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 消费者1,消费者2同理
public class ReceiveLogTopic1 {
public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

// 声明Topic类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 声明队列
channel.queueDeclare("Q1", false, false, false, null);

// 绑定队列,指定routingKey
channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*");

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者1已接收到消息:" + new String(message.getBody()) +
"绑定键为:" + message.getEnvelope().getRoutingKey());
};

channel.basicConsume("Q1", true, deliverCallback, (CancelCallback) null);
}
}
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EmitLog {
public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

Map<String, String> map = new HashMap<>();

map.put("quick.orange.rabbit", "Q1,Q2");
map.put("lazy.orange.elephant", "Q1,Q2");
map.put("quick.orange.fox", "Q1");
map.put("lazy.brown.fox", "Q2");
map.put("lazy.pink.rabbit", "Q2(只会接收一次)");
map.put("quick.brown.fox", "不匹配任何绑定,被丢弃");
map.put("quick.orange.male.rabbit", "不匹配任何绑定,被丢弃");
map.put("lazy.orange.male.rabbit", "Q2");

for (Map.Entry<String, String> entry : map.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
channel.basicPublish(EXCHANGE_NAME, key, null, value.getBytes());
}
}
}

死信队列

死信,就是无法被消费的消息。消费者取出消息后,可能由于某些原因导致消息无法被消费,这样消息没有后续处理,就成为了死信。

为保证订单消息数据不丢失,需要用到死信队列机制。

应用场景:

  • 当消息消费异常时,将消息投入死信队列(等后续恢复了再消费)
  • 用户下单后,指定时间未付款,订单自动取消

死信的来源

  • 消息TTL过期
  • 队列达到最大长度
  • 消息被拒绝(basicReject,且requeue为false)(basicNack)

死信实战

image-20230805171705551
  • 消息生产者,设置TTL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

// 死信消息,设置TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();

for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes());
}
}
}
  • 普通消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 死信队列
public class Consumer01 {

// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

// 声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

HashMap<String, Object> arguments = new HashMap<>();
// arguments.put("x-message-ttl", 10000); // 消息过期时间,更推荐发送方来设置
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 死信交换机
arguments.put("x-dead-letter-routing-key", "dead"); // 死信routing-key
arguments.put("x-max-length", 6); // 消息最大长度
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

// 死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

// 绑定普通队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");

// 绑定死信队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");


DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
// 模拟消息被拒
if (msg.equals("info5")) {
System.out.println("message = " + msg + "此消息被拒绝");
// 拒绝该消息,该消息会发送给死信交换机
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("message = " + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}

};

channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (CancelCallback) null);
}
}
  • 死信消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Consumer02 {

public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("message = " + new String(message.getBody()));
};

channel.basicConsume(DEAD_QUEUE, true, deliverCallback, (CancelCallback) null);
}
}

总结:声明队列时的参数

1
2
3
4
arguments.put("x-message-ttl", 10000); 						// 更推荐发送方来设置
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 声明死信息交换机
arguments.put("x-dead-letter-routing-key", "dead"); // 设置发送到死信交换机的routing-key
arguments.put("x-max-length", 6); // 设置队列的最大长度,超出发给死信交换机

延迟队列

延迟队列其实就是死信队列的一种。简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景

  • 订单十分钟内未支付,则自动取消
  • 新创建的店铺,十天都没有上传过商品,则自动提醒
  • 用户注册成功后,三天都没有登录,则短信提醒
  • 用户发起退款,三天没人处理,则通知相关运营人员
  • 预定会议后,开始前十分钟提醒各人员参会

具体实现

可以直接看下一章:《整合SpringBoot》

整合SpringBoot

依赖

1
2
3
4
5
6
7
8
9
10
11
12
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--RabbitMQ 测试依赖 可不加-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

修改配置文件

1
2
3
4
5
6
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

队列TTL

image-20230812213639446

配置类

创建:交换机,队列,绑定

1
2
3
4
5
6
7
8
9
10
public class MqConstant {
public static final String NORMAL_EXCHANGE = "X";
public static final String TTL_10_QUEUE_KEY = "XA";
public static final String TTL_40_QUEUE_KEY = "XB";
public static final String TTL_10_QUEUE = "QA";
public static final String TTL_40_QUEUE = "QB";
public static final String DEAD_EXCHANGE = "Y";
public static final String DEAD_QUEUE_KEY = "YD";
public static final String DEAD_QUEUE = "QD";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
public class TtlQueueConfig {

@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}

@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}

@Bean
public Queue queueTtl10() {
return QueueBuilder.nonDurable(TTL_10_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_QUEUE_KEY)
.ttl(10000)
.build();
}

@Bean
public Queue queueTtl40() {
return QueueBuilder.nonDurable(TTL_40_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_QUEUE_KEY)
.ttl(40000)
.build();
}

@Bean
public Queue queueDead() {
return QueueBuilder.nonDurable(DEAD_QUEUE).build();
}

@Bean
public Binding queueABindingX(Queue queueTtl10, DirectExchange normalExchange) {
return BindingBuilder.bind(queueTtl10).to(normalExchange).with(TTL_10_QUEUE_KEY);
}

@Bean
public Binding queueBBindingX(Queue queueTtl40, DirectExchange normalExchange) {
return BindingBuilder.bind(queueTtl40).to(normalExchange).with(TTL_40_QUEUE_KEY);
}

@Bean
public Binding queueDBindingY(Queue queueDead, DirectExchange deadExchange) {
return BindingBuilder.bind(queueDead).to(deadExchange).with(DEAD_QUEUE_KEY);
}
}

生产者

发送消息:http://localhost:8080/ttl/sendMsg/hello

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{msg}")
public String sendMsg(@PathVariable String msg) {
log.info("当前时间:{},发送消息给队列:{}", new Date(), msg);
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, TTL_10_QUEUE_KEY, "来自TTL为10s的队列" + msg);
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, TTL_40_QUEUE_KEY, "来自TTL为40s的队列" + msg);
return "发送成功";
}
}

消费者

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class DeadQueueConsumer {

@RabbitListener(queues = DEAD_QUEUE)
public void receiveDead(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列消息:{}", new Date(), msg);
}
}

延迟队列优化

QC不设置过期时间,由生产者指定过期时间

image-20230812213704665

  • 添加队列
1
2
3
4
public class MqConstant {
public static final String Queue_C = "QC";
public static final String QUEUE_C_KEY = "XC";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class TtlQueueConfig {

// 不设置TTL
@Bean
public Queue queueC() {
return QueueBuilder.nonDurable(Queue_C)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_QUEUE_KEY)
.build();
}

@Bean
public Binding queueCBingX(Queue queueC, DirectExchange normalExchange) {
return BindingBuilder.bind(queueC).to(normalExchange).with(QUEUE_C_KEY);
}
}
  • 发送消息时,设置TTL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, QUEUE_C_KEY, "来自queueC的消息" + msg,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");
return message;
}
});

// 使用lambda表达式
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, QUEUE_C_KEY, "来自queueC的消息" + msg,
message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});

但是这么做是有问题的

RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列。

如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

RabbitMQ插件实现延迟队列

安装插件

为了解决上述提到的问题,使用了插件

插件官网:https://www.rabbitmq.com/community-plugins.html

下载插件:rabbitmq_delayed_message_exchange

将插件放在/usr/lib/rabbitmq/lib/rabbitmq_server-3.12.2/plugins目录中,执行命令

1
sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装成功后,添加交换机时可以看到,出现了新的Type:x-delayed-message

image-20230812220627538

此时整体原理将简单很多。消息将在交换机处延迟,时间到了之后发给死信队列。

image-20230812220816709

配置类

1
2
3
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}

@Bean
public CustomExchange delayedExchange() {
HashMap<String, Object> map = new HashMap<>();
map.put("x-delayed-type", "direct");
// 自定义交换机的类型
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, map);
}

@Bean
public Binding bindingDelayedQueue(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}

生产者

1
2
3
4
5
// 注意这里是setDelay
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, message -> {
message.getMessageProperties().setDelay(ttl);
return message;
});

消费者

1
2
3
4
5
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDead(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列消息:{}", new Date(), msg);
}

总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列。

来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。

另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis 的zset,利用 Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景

发布确认(高级)

在生产环境中由于一些不明原因,导致RabbitMQ重启。

在RabbitMQ重启期间,生产者消息投递失败,导致消息丢失。

发布确认SpringBoot版本

image-20230814214747879

配置文件

  • NONE:禁用发布确认模式,默认值
  • CORRELATED:发布消息到交换机成功后,触发回调函数
  • SIMPLE:有两种效果
    • 其一,和CORRELATED一样触发回调方法;
    • 其二,在发布消息成功后使用rabbitTemplate调用 waitForConfirmswaitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑(同步确认消息)。要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
1
spring.rabbitmq.publisher-confirm-type=correlated

基础代码

  • 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class ConfirmConfig {

public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
public static final String CONFIRM_ROUTING_KEY = "key1";

@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

@Bean
public Queue confirmQueue() {
return QueueBuilder.nonDurable(CONFIRM_QUEUE_NAME).build();
}

@Bean
public Binding queueBindingExchange(Queue confirmQueue, DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

@Resource
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{msg}")
public void sendMsg(@PathVariable String msg) {
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, msg);
log.info("发送消息:{}", msg);
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class Consumer {

@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message) {
String msg = new String(message.getBody());
log.info("接收到的消息:{}", msg);
}
}

自定义回调方法

  • 自定义回调方法,注入到rabbitTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

@Resource
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData == null ? "" : correlationData.getId();
if (ack) {
log.info("交换机已收到Id为:{}的消息", id);
} else {
log.info("交换机未收到Id为:{}的消息,原因是:{}", id, cause);
}
}
}

比如发消息时故意将exchangeName写错,会打印原因:找不到交换机

回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息。

但假如队列没收到消息(比如routingkey写错了),我们是无法得知的。消息会被直接丢弃。

通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

配置文件

1
2
# 打开消息回退
spring.rabbitmq.publisher-returns=true

自定义回调方法

  • 继续实现RabbitTemplate.ReturnsCallback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

@Resource
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData == null ? "" : correlationData.getId();
if (ack) {
log.info("交换机已收到Id为:{}的消息", id);
} else {
log.info("交换机未收到Id为:{}的消息,原因是:{}", id, cause);
}
}

@Override
public void returnedMessage(ReturnedMessage returned) {
String msg = new String(returned.getMessage().getBody());
String exchange = returned.getExchange();
String replyText = returned.getReplyText();
String routingKey = returned.getRoutingKey();
log.info("消息{},被交换机{}退回,原因:{},路由key:{}", msg, exchange, replyText, routingKey);
}
}

备份交换机

  • 有了mandatory参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。

  • 还有一种解决方案,我们可以使用备份交换机。

  • 当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中。

image-20230814223244376

代码构建

  • 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 备份交换机(扇出)
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
public static final String BACKUP_QUEUE_NAME = "backup_queue";
public static final String WARNING_QUEUE_NAME = "warning_queue";

@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}

@Bean
public Queue backupQueue() {
return QueueBuilder.nonDurable(BACKUP_QUEUE_NAME).build();
}

@Bean
public Queue warningQueue() {
return QueueBuilder.nonDurable(WARNING_QUEUE_NAME).build();
}

@Bean
public Binding backupQueue2BackupExchange(Queue backupQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}

@Bean
public Binding warningQueue2BackupExchange(Queue warningQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
  • 确认交换机,设置将无法投递的消息发送给备份交换机
1
2
3
4
5
6
7
8
@Bean
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
// .alternate(BACKUP_EXCHANGE_NAME)
.build();
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
public class WarningConsumer {

@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarning(Message message) {
String s = new String(message.getBody());
log.error("发现不可路由消息:{}", s);
}
}

结果分析

当mandatory参数与备份交换机同时开启,经测试,备份交换机优先级高

RabbitMQ其他知识点

幂等性

幂等性的概念

同一操作发起一次请求或者多次请求,结果是一致的,不会因为多次点击而产生了副作用。

  • 比如:
    • 用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常
    • 用户再次点击按钮,此时会进行第二次扣款,返回结果成功
    • 用户查询余额发现多扣钱了,流水记录也变成了两条

重复消费

  • 场景
    • MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断
    • 该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者
    • 造成消费者消费了重复的消息

解决思路

  • 使用全局ID,或者写个唯一标识,比如:时间戳、UUID、按自己的规则生成一个全局唯一 id…
  • 每次消费消息时用该 id 先判断该消息是否已消费过

消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性。

消息永远不会被消费多次,即使我们收到了一样的消息。

  • 主流的幂等性有两种操作:

    • 唯一ID+指纹码机制,利用数据库主键去重
    • 利用redis的原子性去实现
  1. 唯一ID+指纹码

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性。利用查询语句进行判断这个id是否存在数据库中。

优势:实现简单,拼接然后查询判断是否重复

劣势:高并发时,如果是单个数据库就会有写入性能瓶颈。当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

  1. 利用redis的原子性

利用redis执行setnx命令,天然具有幂等性。从而实现不重复消费

优先级队列

优先级越高的消息,越先被消费

如何添加

  • 使用Web UI
image-20230818220616550
  • 使用代码:如下所示

实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class priorityConfig {

@Bean
public Queue priQueue() {
return QueueBuilder.nonDurable("pri_queue").maxPriority(10).build();
}

@Bean
public DirectExchange priExchange() {
return ExchangeBuilder.directExchange("pri_exchange").durable(false).build();
}

@Bean
public Binding priBinding(Queue priQueue, DirectExchange priExchange) {
return BindingBuilder.bind(priQueue).to(priExchange).with("pri_key");
}
}
1
2
3
String msg = "aaa";
Message message = MessageBuilder.withBody(msg.getBytes()).setPriority(5).build();
rabbitTemplate.send("pri_exchange", "pri_key", message);

堕性队列

正常情况:消息保存在内存中

堕性队列:消息保存在磁盘上

当消费者由于各种原因(比如下线,宕机,或者由于维护而关闭等),致使长时间不能消费,造成消息堆积时,堕性队列就很有必要了。

1
2
3
4
5
6
QueueBuilder.nonDurable("lazy.queue").lazy().build();

// 或者
Map<String, Object> args = new HashMap();
args.put("x-queue-mode", "lazy");
new Queue("lazy.queue",true,false,false,args);

在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB

RabbitMQ集群

生产者大量发送消息,单机版的RabbitMQ性能捉襟见肘。

  • 如果RabbitMQ服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?
  • 单台RabbitMQ服务器可以满足每秒1000条消息的吞吐量,如果应用需要满足每秒 10 万条消息的吞吐量,该怎么办?

clustering

  1. 修改三台主机名称
1
vim /etc/hostname
  1. 配置各台主机的host文件,使其能相互识别
1
2
3
4
5
vim /etc/hosts

10.211.55.74 node1
10.211.55.75 node2
10.211.55.76 node3
  1. 确保各个节点使用的cookie是同一个值(搭建的集群要求erlang底层的cookie是同一个值)
1
2
3
# 在node1执行,将第一台机器的cookie复制给的第二台和第三台
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
  1. 重启RabbitMQ
1
2
3
4
# 每个节点都执行
# 重启RabbitMQ,以及底层的Erlang虚拟机

rabbitmq-server -detached
1
2
3
4
5
6
7
# 第二台节点执行
# rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app只关闭RabbitMQ服务

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
1
2
3
4
5
6
# 第三台节点执行

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app

后续操作可以在任意一台机器上运行

  1. 查看集群状态
1
rabbitmqctl cluster_status
  1. 为集群创建账户
1
2
3
4
5
rabbitmqctl add_user admin 123
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  1. 解除任意一台机器
1
2
3
4
5
# 在待解除的机器上执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
1
2
# 在node1机器上执行
rabbitmqctl forget_cluster_node rabbit@node2

镜像队列

使用镜像队列的原因

使用集群的时候,在哪个机器创建的队列,队列实际上就存在哪台机器上。如果那一台机器宕机了,相应队列也会消失。

从Web UI看,某台机器宕机了,该节点成为红色。在队列选项中查看,队列还在,但是状态为Down。

image-20230819112144388 image-20230819112221387

使用镜像队列,可以将队列镜像到集群中的其他节点上。如果某节点宕机了,队列能自动切换到另一个节点上,保证服务的可用性。

搭建步骤

  1. 启动三台集群节点
  2. 随便找一台节点,添加policy
    • ha-mode: exactly (备机模式:指定,也即指定备份的个数)
    • ha-params: 2 (指定备份个数:两份)
    • ha-sync-mode: automatic (同步的模式:自动)
image-20230819120610257
  1. 在某节点上创建队列(比如节点1),自动备份到另一个节点
image-20230819121131426
  1. 假如停掉node1,node2成为镜像队列
1
rabbitmqctl stop_app
image-20230819121307722
  1. 就算集群只剩下一台机器了,依然能消费队列中的内容

具体到Java选择连接MQ的部分,需要使用负载均衡软件(比如Haproxy + Keepalive),这里不在继续讨论

Federation Exchange

不重要,略

  • 标题: RabbitMQ基础
  • 作者: 布鸽不鸽
  • 创建于 : 2023-08-28 18:51:40
  • 更新于 : 2024-01-14 02:12:48
  • 链接: https://xuedongyun.cn//post/33842/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论