前言 本文是RabbitMQ基础学习笔记
原文地址:https://xuedongyun.cn/post/33842/
MQ相关概念 什么是MQ 本质是个队列,先进先出(FIFO) 跨进程的通信机制,用于上下游传递消息 “逻辑解耦+物理解耦”的消息通信服务 为什么使用MQ 比如下单,使用消息队列做缓冲。高峰期,排队进行,让短时间的请求分散到一定时间来处理。
可能导致用户下单很久,才能收到下单成功的消息。但也比无法下单好。
比如订单系统,通过消息队列调用物流系统。即使物流系统宕机10分钟,恢复后,也能继续处理订单。
比如A调用B,B需要处理很长时间。可以用消息队列,A调用B的服务后,监听B处理完的消息。B完成后,会发送消息给MQ,MQ会将消息转发给A服务。
MQ的分类 ActiveMQ
:很老的MQKafka
:为大数据而生RocketMQ
:出自阿里巴巴,消息可以做到0丢失RabbitMQ
:最主流的消息中间件之性能好,时效性微秒级,社区活动度高,管理界面方便。如果数据量没有那么大,优先选用。 RabbitMQ RabbitMQ是一个消息中间件,负责接收,存储,转发消息数据
四大核心 生产者:产生数据的人 交换机:决定将消息推送到哪一个队列 队列:大的消息缓冲区 消费者:从队列中接收数据进行处理 名词解释 Broker : 接收和分发消息的应用,Rabbit Server就是Message BrokerVirtual host : 当多个用户使用同一个Rabbit MQ时,可以划分多个vhost。每个用户在自己的vhost中创建exchange,queue等Connection : publisher/consumer与broker之间的TCP连接Channel : 在Connection内部建立的逻辑连接,通常使用channel id帮助客户端和brocker识别channelExchange : 交换机Queue : 队列Binding : Exchange和Queue之间的虚拟连接。Binding可以包含routing key,被保存到exchange中的查询表中。用作message的分发依据安装教程 使用系统:ubuntu22
官方文档:https://www.rabbitmq.com/download.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #!/bin/sh sudo apt-get install curl gnupg apt-transport-https -y curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null curl -1sLf https://ppa1.novemberain.com/gpg.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null curl -1sLf https://ppa1.novemberain.com/gpg.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF ## Provides modern Erlang/OTP releases ## deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main ## Provides RabbitMQ ## deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main EOF sudo apt-get update -y sudo apt-get install -y erlang-base \ erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \ erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \ erlang-runtime-tools erlang-snmp erlang-ssl \ erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl sudo apt-get install rabbitmq-server -y --fix-missing
1 systemctl status rabbitmq-server
1 sudo rabbitmq-plugins enable rabbitmq_management
http://localhost:15672
默认账号和密码:guest guest,但只有localhost才能登录
1 2 3 4 5 6 7 8 9 10 rabbitmqctl add_user admin 123456 rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
1 2 3 4 5 6 7 8 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app
简单队列模式
Java依赖
1 2 3 4 5 amqp-client commons-io
创建连接 创建:连接工厂 -> 连接 -> 信道 -> 队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ConnectionFactory factory = new ConnectionFactory ();factory.setHost("localhost" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false , false , false , null );
生产者 1 2 3 4 5 6 7 8 9 10 String message = "hello world" ;channel.basicPublish("" , QUEUE_NAME, null , message.getBytes());
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println("消费已成功:message = " + new String (message.getBody())); }; CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消费被取消:consumerTag = " + consumerTag); }; channel.basicConsume(QUEUE_NAME, true , deliverCallback, cancelCallback);
Work Queues
工作队列模式:生产者发送大量消息,多个工作线程(消费者)接受并处理消息。
轮询分发消息 多个消费者是竞争的关系,每条消息只会被处理一次
抽取工具类 RabbitMQ创建信道的工具类
1 2 3 4 5 6 7 8 9 10 public class RabbitMqUtils { public static Channel getChannel () throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("localhost" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); return connection.createChannel(); } }
消费者 这里使用idea手动运行多个线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Work01 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("消费已成功:message = " + new String (message.getBody())); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消费被取消:consumerTag = " + consumerTag); }; System.out.println("c1等待接受消息" ); channel.basicConsume(QUEUE_NAME, true , deliverCallback, cancelCallback); } }
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Task01 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); } } }
消息应答 为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制。消费者处理完消息后,告诉RabbitMQ:可以把消息删除了
自动应答 以接受到消息为准。自动应答相对不是很靠谱
手动应答 API 作用 void basicAck(long deliveryTag, boolean multiple)
用于肯定确认 void basicNack(long deliveryTag, boolean multiple, boolean requeue)
用于否认 void basicReject(long deliveryTag, boolean requeue)
用于否认,少一个参数(不批量应答)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Work01 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("消费已成功:message = " + new String (message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消费被取消:consumerTag = " + consumerTag); }; channel.basicConsume(QUEUE_NAME, false , deliverCallback, cancelCallback); } }
Multiple解释 手动应答可以批量应答,减少网络拥堵
true代表批量应答channel上未应答的消息(tag比当前小的都会被应答)
假如channel上传递消息,tag为:8,7,6,5。若当前tag为8,应答后5-8中未应答的消息都会被确认 false代表只应答当前消息
假如channel上传递消息,tag为:8,7,6,5。若当前tag为8,应答后只有8会被确认 一般情况下使用false
消息重新入队 假如消费者由于某些原因失去连接(通道关闭,连接关闭或TCP连接丢失),导致未发送ACK确认,RabbitMQ将对其重新排队。
当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时某个消费者会立马又接收到这条消息进行处理。
持久化 为了确保消息在RabbitMQ中不会丢失,我们需要将队列 和消息 都标记为持久化
队列持久化 1 2 3 4 boolean durable = true ;channel.queueDeclare(QUEUE_NAME, durable, false , false , null );
此时Features下方会有一个”D”,现在即使重启RabbitMQ队列也不会丢失
消息持久化 1 2 3 4 5 6 channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); channel.basicPublish("" , QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在:当刚准备存储在磁盘时,突然宕机的情况。无法保证百分百数据不丢失。但是对我们的简单队列来说,绰绰有余了。
更强力的持久化策略,参考后面的发布确认章节
不公平分发 正常情况下使用轮询分发。假如有两个消费者,消费者A处理非常快,消费者B处理非常慢。这会导致处理的快的A有大量的空闲时间,处理的慢的消费者B一直在干活。
为避免这种情况,我们可以设置参数channel.basicQos(int prefetchCount)
。能者多劳
1 2 3 4 int prefetchCount = 1 ;channel.basicQos(prefetchCount);
可以在Channels里看到一个Queue,Prefetch count(分配数量)为1
预取值概念 预取值(prefetch):一个channel中,未确认的消息缓冲区大小。(消费者的手动确认本质上也是异步的)
正常情况下依然是能者多干。如果同时有7条数据,那么会指定,按照2条,5条分配。
发布确认 即使队列和消息都设置持久化,仍然存在消息丢失的可能(比如还未存到磁盘上就宕机了)
生产者将信道设置成confirm模式。所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出。
开启发布确认 在生产者部分设置
1 2 channel.confirmSelect();
单个确认发布 它是一种同步确认发布 方式。发一条,确认一次。上一条消息确认了,下一条消息才能发布。
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void pubMsgOne () throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); channel.confirmSelect(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); for (int i = 0 ; i < 1000 ; i++) { String msg = String.valueOf(i); channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes()); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("发送成功" ); } } }
批量确认发布 依然是同步的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void pubMsgBatch () throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); channel.confirmSelect(); int batchSize = 100 ; int outstandingMsg = 0 ; channel.queueDeclare(QUEUE_NAME, false , false , false , null ); for (int i = 0 ; i < 1000 ; i++) { String msg = String.valueOf(i); channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes()); outstandingMsg++; if (outstandingMsg == batchSize) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("本批次消息发送成功" ); } outstandingMsg = 0 ; } } }
异步确认发布 通过回调函数的方式来确保投递成功
简单来说:ConcurrentSkipListMap
是TreeMap
的并发实现,但是为什么没有称之为ConcurrentTreeMap
呢?这和其自身的实现有关,该类是SkipLists
的变种实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static void pubMsgAsync () throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.confirmSelect(); ConcurrentSkipListMap<Long, String> outstandingConfirm = new ConcurrentSkipListMap <>(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirm = outstandingConfirm.headMap(deliveryTag, true ); confirm.clear(); } else { outstandingConfirm.remove(deliveryTag); } }; channel.addConfirmListener(ackCallback, null ); for (int i = 0 ; i < 1000 ; i++) { String msg = String.valueOf(i); outstandingConfirm.put(channel.getNextPublishSeqNo(), msg); channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes()); } }
性能对比 发布1000个单独确认消息:50,278 ms 发布1000个批量确认消息:635 ms 发布1000个异步确认消息:92 ms 交换机 我们之前一直假设,每个任务都恰好交付给一个消费者。现在,我们希望将一个消息传达给多个消费者。这种模式就是Pub/Sub模式
例子:日志系统,第一个程序将日志发送到消息队列中;然后启动两个消费者,一个消费者把日志存储到磁盘,另一个消费者把日志打印在屏幕上
Exchanges 事实上,在RabbitMQ中消息不会直接发送给队列,而是会发送给交换机。交换机负责接受消息并推入队列。
交换机的类型 直接(direct),主题(topic),标题(headers),扇出(fanout)
无名交换机 当我们用空字符串,表示无名交换机时:目标队列实际上是由routeKey
绑定的key指定的
1 channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes());
临时队列 我们可以创建一个随机名称的新队列。一旦消费者全断开连接,队列将被删除
1 String queueName = channel.queueDeclare().getQueue();
绑定(binding) 就是交换机和队列之间的映射关系
Fanout交换机 Fanout这种类型的Exchange,会将接受到的消息广播 到它知道的所有队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class ReceiveLog1 { public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare("logs" , BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs" , "" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("消费者1已接收到消息:" + new String (message.getBody())); }; channel.basicConsume(queueName, true , deliverCallback, (CancelCallback) null ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class EmitLog { public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare("logs" , BuiltinExchangeType.FANOUT); Scanner scanner = new Scanner (System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("logs" , "" , null , msg.getBytes()); System.out.println("msg = " + msg); } } }
此时,Fanout类型的交换机会把接受到的消息广播 到所有绑定的队列上去
Direct交换机
我们可以声明Direct类型的交换机,发送到交换机的消息,会根据Routing key分配到不同的队列中
消费者,声明完Direct类型的交换机后,还根据routingKey为交换机绑定了队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ReceiveLogDirect01 { public static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console" , false , false , false , null ); channel.queueBind("console" , EXCHANGE_NAME, "info" ); channel.queueBind("console" , EXCHANGE_NAME, "warning" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("消费者1已接收到消息:" + new String (message.getBody())); }; channel.basicConsume("console" , true , deliverCallback, (CancelCallback) null ); } }
现在发送给direct_logs交换机的消息,只要routingKey为info或者warning,都会被发送到console队列
Topic交换机 对于Direct交换机来说,一个routingKey只能路由到唯一的队列。
Topic的要求 发送到Topic类型交换机的消息的routingKey不能随意写,必须满足一定的要求。它必须是一个单词列表,以点分割 。比如”quick.orange.rabbit“。单词列表最多不能超过255个字节。
有两个替换符需要注意:
*可以代替一个单词,比如:
#可以代替零个或多个单词,比如:
Topic匹配案例
routingKey 接收队列 quick.orange.rabbit
Q1,Q2 lazy.orange.elephant
Q1,Q2 quick.orange.fox
Q1 lazy.brown.fox
Q2 lazy.pink.rabbit
Q2(只会接收一次) quick.brown.fox
不匹配任何绑定,被丢弃 quick.orange.male.rabbit
不匹配任何绑定,被丢弃 lazy.orange.male.rabbit
Q2
两种特殊队列绑定:
#:这个队列将接收交换机的所有消息,类似Fanout 没有#和*:这个队列的绑定类型就是Direct了 实战代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ReceiveLogTopic1 { public static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q1" , false , false , false , null ); channel.queueBind("Q1" , EXCHANGE_NAME, "*.orange.*" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("消费者1已接收到消息:" + new String (message.getBody()) + "绑定键为:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume("Q1" , true , deliverCallback, (CancelCallback) null ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class EmitLog { public static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); Map<String, String> map = new HashMap <>(); map.put("quick.orange.rabbit" , "Q1,Q2" ); map.put("lazy.orange.elephant" , "Q1,Q2" ); map.put("quick.orange.fox" , "Q1" ); map.put("lazy.brown.fox" , "Q2" ); map.put("lazy.pink.rabbit" , "Q2(只会接收一次)" ); map.put("quick.brown.fox" , "不匹配任何绑定,被丢弃" ); map.put("quick.orange.male.rabbit" , "不匹配任何绑定,被丢弃" ); map.put("lazy.orange.male.rabbit" , "Q2" ); for (Map.Entry<String, String> entry : map.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); channel.basicPublish(EXCHANGE_NAME, key, null , value.getBytes()); } } }
死信队列 死信,就是无法被消费的消息。消费者取出消息后,可能由于某些原因导致消息无法被消费 ,这样消息没有后续处理,就成为了死信。
为保证订单消息数据不丢失,需要用到死信队列机制。
应用场景:
当消息消费异常时,将消息投入死信队列(等后续恢复了再消费) 用户下单后,指定时间未付款,订单自动取消 死信的来源 消息TTL过期 队列达到最大长度 消息被拒绝(basicReject,且requeue为false)(basicNack) 死信实战 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); AMQP.BasicProperties properties = new AMQP .BasicProperties().builder().expiration("5000" ).build(); for (int i = 0 ; i < 10 ; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "normal" , properties, message.getBytes()); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE = "normal_queue" ; public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); HashMap<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" , "dead" ); arguments.put("x-max-length" , 6 ); channel.queueDeclare(NORMAL_QUEUE, false , false , false , arguments); channel.queueDeclare(DEAD_QUEUE, false , false , false , null ); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal" ); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead" ); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String (message.getBody()); if (msg.equals("info5" )) { System.out.println("message = " + msg + "此消息被拒绝" ); channel.basicReject(message.getEnvelope().getDeliveryTag(), false ); } else { System.out.println("message = " + msg); channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); } }; channel.basicConsume(NORMAL_QUEUE, false , deliverCallback, (CancelCallback) null ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("message = " + new String (message.getBody())); }; channel.basicConsume(DEAD_QUEUE, true , deliverCallback, (CancelCallback) null ); } }
总结:声明队列时的参数 1 2 3 4 arguments.put("x-message-ttl" , 10000 ); arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" , "dead" ); arguments.put("x-max-length" , 6 );
延迟队列 延迟队列其实就是死信队列的一种。简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景 订单十分钟内未支付,则自动取消 新创建的店铺,十天都没有上传过商品,则自动提醒 用户注册成功后,三天都没有登录,则短信提醒 用户发起退款,三天没人处理,则通知相关运营人员 预定会议后,开始前十分钟提醒各人员参会 具体实现 可以直接看下一章:《整合SpringBoot》
整合SpringBoot 依赖 1 2 3 4 5 6 7 8 9 10 11 12 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency >
修改配置文件 1 2 3 4 5 6 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
队列TTL
配置类 创建:交换机,队列,绑定
1 2 3 4 5 6 7 8 9 10 public class MqConstant { public static final String NORMAL_EXCHANGE = "X" ; public static final String TTL_10_QUEUE_KEY = "XA" ; public static final String TTL_40_QUEUE_KEY = "XB" ; public static final String TTL_10_QUEUE = "QA" ; public static final String TTL_40_QUEUE = "QB" ; public static final String DEAD_EXCHANGE = "Y" ; public static final String DEAD_QUEUE_KEY = "YD" ; public static final String DEAD_QUEUE = "QD" ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Configuration public class TtlQueueConfig { @Bean public DirectExchange normalExchange () { return new DirectExchange (NORMAL_EXCHANGE); } @Bean public DirectExchange deadExchange () { return new DirectExchange (DEAD_EXCHANGE); } @Bean public Queue queueTtl10 () { return QueueBuilder.nonDurable(TTL_10_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_QUEUE_KEY) .ttl(10000 ) .build(); } @Bean public Queue queueTtl40 () { return QueueBuilder.nonDurable(TTL_40_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_QUEUE_KEY) .ttl(40000 ) .build(); } @Bean public Queue queueDead () { return QueueBuilder.nonDurable(DEAD_QUEUE).build(); } @Bean public Binding queueABindingX (Queue queueTtl10, DirectExchange normalExchange) { return BindingBuilder.bind(queueTtl10).to(normalExchange).with(TTL_10_QUEUE_KEY); } @Bean public Binding queueBBindingX (Queue queueTtl40, DirectExchange normalExchange) { return BindingBuilder.bind(queueTtl40).to(normalExchange).with(TTL_40_QUEUE_KEY); } @Bean public Binding queueDBindingY (Queue queueDead, DirectExchange deadExchange) { return BindingBuilder.bind(queueDead).to(deadExchange).with(DEAD_QUEUE_KEY); } }
生产者 发送消息:http://localhost:8080/ttl/sendMsg/hello
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}") public String sendMsg (@PathVariable String msg) { log.info("当前时间:{},发送消息给队列:{}" , new Date (), msg); rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, TTL_10_QUEUE_KEY, "来自TTL为10s的队列" + msg); rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, TTL_40_QUEUE_KEY, "来自TTL为40s的队列" + msg); return "发送成功" ; } }
消费者 1 2 3 4 5 6 7 8 9 10 @Slf4j @Component public class DeadQueueConsumer { @RabbitListener(queues = DEAD_QUEUE) public void receiveDead (Message message, Channel channel) { String msg = new String (message.getBody()); log.info("当前时间:{},收到死信队列消息:{}" , new Date (), msg); } }
延迟队列优化 QC不设置过期时间,由生产者指定过期时间
1 2 3 4 public class MqConstant { public static final String Queue_C = "QC" ; public static final String QUEUE_C_KEY = "XC" ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class TtlQueueConfig { @Bean public Queue queueC () { return QueueBuilder.nonDurable(Queue_C) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_QUEUE_KEY) .build(); } @Bean public Binding queueCBingX (Queue queueC, DirectExchange normalExchange) { return BindingBuilder.bind(queueC).to(normalExchange).with(QUEUE_C_KEY); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, QUEUE_C_KEY, "来自queueC的消息" + msg, new MessagePostProcessor () { @Override public Message postProcessMessage (Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000" ); return message; } }); rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, QUEUE_C_KEY, "来自queueC的消息" + msg, message -> { message.getMessageProperties().setExpiration("10000" ); return message; });
但是这么做是有问题的 :
RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列。
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
RabbitMQ插件实现延迟队列 安装插件 为了解决上述提到的问题,使用了插件
插件官网:https://www.rabbitmq.com/community-plugins.html
下载插件:rabbitmq_delayed_message_exchange
将插件放在/usr/lib/rabbitmq/lib/rabbitmq_server-3.12.2/plugins
目录中,执行命令
1 sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装成功后,添加交换机时可以看到,出现了新的Type:x-delayed-message
此时整体原理将简单很多。消息将在交换机处延迟,时间到了之后发给死信队列。
配置类 1 2 3 public static final String DELAYED_QUEUE_NAME = "delayed.queue" ;public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange" ;public static final String DELAYED_ROUTING_KEY = "delayed.routingkey" ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Bean public Queue delayedQueue () { return new Queue (DELAYED_QUEUE_NAME); } @Bean public CustomExchange delayedExchange () { HashMap<String, Object> map = new HashMap <>(); map.put("x-delayed-type" , "direct" ); return new CustomExchange (DELAYED_EXCHANGE_NAME, "x-delayed-message" , true , false , map); } @Bean public Binding bindingDelayedQueue (Queue delayedQueue, CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); }
生产者 1 2 3 4 5 rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, message -> { message.getMessageProperties().setDelay(ttl); return message; });
消费者 1 2 3 4 5 @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDead (Message message, Channel channel) { String msg = new String (message.getBody()); log.info("当前时间:{},收到死信队列消息:{}" , new Date (), msg); }
总结 延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列。
来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。
另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis 的zset,利用 Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景
发布确认(高级) 在生产环境中由于一些不明原因,导致RabbitMQ重启。
在RabbitMQ重启期间,生产者消息投递失败,导致消息丢失。
发布确认SpringBoot版本 配置文件 NONE:禁用发布确认模式,默认值 CORRELATED:发布消息到交换机成功后,触发回调函数 SIMPLE:有两种效果其一,和CORRELATED一样触发回调方法; 其二,在发布消息成功后使用rabbitTemplate
调用 waitForConfirms
或 waitForConfirmsOrDie
方法等待 broker
节点返回发送结果,根据返回结果来判定下一步的逻辑(同步确认消息)。要注意的点是waitForConfirmsOrDie
方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker 1 spring.rabbitmq.publisher-confirm-type =correlated
基础代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange" ; public static final String CONFIRM_QUEUE_NAME = "confirm_queue" ; public static final String CONFIRM_ROUTING_KEY = "key1" ; @Bean public DirectExchange confirmExchange () { return new DirectExchange (CONFIRM_EXCHANGE_NAME); } @Bean public Queue confirmQueue () { return QueueBuilder.nonDurable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange (Queue confirmQueue, DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}") public void sendMsg (@PathVariable String msg) { rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg); log.info("发送消息:{}" , msg); } }
1 2 3 4 5 6 7 8 9 10 @Slf4j @Component public class Consumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMsg (Message message) { String msg = new String (message.getBody()); log.info("接收到的消息:{}" , msg); } }
自定义回调方法 自定义回调方法,注入到rabbitTemplate
中 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j @Component public class MyCallBack implements RabbitTemplate .ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id = correlationData == null ? "" : correlationData.getId(); if (ack) { log.info("交换机已收到Id为:{}的消息" , id); } else { log.info("交换机未收到Id为:{}的消息,原因是:{}" , id, cause); } } }
比如发消息时故意将exchangeName
写错,会打印原因:找不到交换机
回退消息 在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息。
但假如队列没收到消息(比如routingkey写错了),我们是无法得知的。消息会被直接丢弃。
通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
配置文件 1 2 spring.rabbitmq.publisher-returns =true
自定义回调方法 继续实现RabbitTemplate.ReturnsCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j @Component public class MyCallBack implements RabbitTemplate .ConfirmCallback, RabbitTemplate.ReturnsCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setReturnsCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id = correlationData == null ? "" : correlationData.getId(); if (ack) { log.info("交换机已收到Id为:{}的消息" , id); } else { log.info("交换机未收到Id为:{}的消息,原因是:{}" , id, cause); } } @Override public void returnedMessage (ReturnedMessage returned) { String msg = new String (returned.getMessage().getBody()); String exchange = returned.getExchange(); String replyText = returned.getReplyText(); String routingKey = returned.getRoutingKey(); log.info("消息{},被交换机{}退回,原因:{},路由key:{}" , msg, exchange, replyText, routingKey); } }
备份交换机 代码构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static final String BACKUP_EXCHANGE_NAME = "backup_exchange" ;public static final String BACKUP_QUEUE_NAME = "backup_queue" ;public static final String WARNING_QUEUE_NAME = "warning_queue" ;@Bean public FanoutExchange backupExchange () { return new FanoutExchange (BACKUP_EXCHANGE_NAME); } @Bean public Queue backupQueue () { return QueueBuilder.nonDurable(BACKUP_QUEUE_NAME).build(); } @Bean public Queue warningQueue () { return QueueBuilder.nonDurable(WARNING_QUEUE_NAME).build(); } @Bean public Binding backupQueue2BackupExchange (Queue backupQueue, FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningQueue2BackupExchange (Queue warningQueue, FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); }
1 2 3 4 5 6 7 8 @Bean public DirectExchange confirmExchange () { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .durable(true ) .withArgument("alternate-exchange" , BACKUP_EXCHANGE_NAME) .build(); }
1 2 3 4 5 6 7 8 9 10 @Slf4j @Component public class WarningConsumer { @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveWarning (Message message) { String s = new String (message.getBody()); log.error("发现不可路由消息:{}" , s); } }
结果分析 当mandatory参数与备份交换机同时开启,经测试,备份交换机优先级高
RabbitMQ其他知识点 幂等性 幂等性的概念 同一操作发起一次请求或者多次请求,结果是一致的,不会因为多次点击而产生了副作用。
比如:用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常 用户再次点击按钮,此时会进行第二次扣款,返回结果成功 用户查询余额发现多扣钱了,流水记录也变成了两条 重复消费 场景MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断 该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者 造成消费者消费了重复的消息 解决思路 使用全局ID,或者写个唯一标识,比如:时间戳、UUID、按自己的规则生成一个全局唯一 id… 每次消费消息时用该 id 先判断该消息是否已消费过 消费端的幂等性保障 在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性。
消息永远不会被消费多次,即使我们收到了一样的消息。
主流的幂等性有两种操作:
唯一ID+指纹码机制,利用数据库主键去重 利用redis的原子性去实现 唯一ID+指纹码 指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性。利用查询语句进行判断这个id是否存在数据库中。
优势:实现简单,拼接然后查询判断是否重复
劣势:高并发时,如果是单个数据库就会有写入性能瓶颈。当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
利用redis的原子性 利用redis执行setnx命令,天然具有幂等性。从而实现不重复消费
优先级队列 优先级越高的消息,越先被消费
如何添加 实战 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public class priorityConfig { @Bean public Queue priQueue () { return QueueBuilder.nonDurable("pri_queue" ).maxPriority(10 ).build(); } @Bean public DirectExchange priExchange () { return ExchangeBuilder.directExchange("pri_exchange" ).durable(false ).build(); } @Bean public Binding priBinding (Queue priQueue, DirectExchange priExchange) { return BindingBuilder.bind(priQueue).to(priExchange).with("pri_key" ); } }
1 2 3 String msg = "aaa" ;Message message = MessageBuilder.withBody(msg.getBytes()).setPriority(5 ).build();rabbitTemplate.send("pri_exchange" , "pri_key" , message);
堕性队列 正常情况:消息保存在内存中
堕性队列:消息保存在磁盘上
当消费者由于各种原因(比如下线,宕机,或者由于维护而关闭等),致使长时间不能消费,造成消息堆积时,堕性队列就很有必要了。
1 2 3 4 5 6 QueueBuilder.nonDurable("lazy.queue" ).lazy().build(); Map<String, Object> args = new HashMap (); args.put("x-queue-mode" , "lazy" ); new Queue ("lazy.queue" ,true ,false ,false ,args);
在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB
RabbitMQ集群 生产者大量发送消息,单机版的RabbitMQ性能捉襟见肘。
如果RabbitMQ服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办? 单台RabbitMQ服务器可以满足每秒1000条消息的吞吐量,如果应用需要满足每秒 10 万条消息的吞吐量,该怎么办? clustering 修改三台主机名称 配置各台主机的host文件,使其能相互识别 1 2 3 4 5 vim /etc/hosts 10.211.55.74 node1 10.211.55.75 node2 10.211.55.76 node3
确保各个节点使用的cookie是同一个值(搭建的集群要求erlang底层的cookie是同一个值) 1 2 3 scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
重启RabbitMQ 1 2 3 4 rabbitmq-server -detached
1 2 3 4 5 6 7 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node1 rabbitmqctl start_app
1 2 3 4 5 6 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node2 rabbitmqctl start_app
后续操作可以在任意一台机器上运行
查看集群状态 1 rabbitmqctl cluster_status
为集群创建账户 1 2 3 4 5 rabbitmqctl add_user admin 123 rabbitmqctl set_user_tags admin administrator rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
解除任意一台机器 1 2 3 4 5 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app rabbitmqctl cluster_status
1 2 rabbitmqctl forget_cluster_node rabbit@node2
镜像队列 使用镜像队列的原因 使用集群的时候,在哪个机器创建的队列,队列实际上就存在哪台机器上。如果那一台机器宕机了,相应队列也会消失。
从Web UI看,某台机器宕机了,该节点成为红色。在队列选项中查看,队列还在,但是状态为Down。
使用镜像队列,可以将队列镜像到集群中的其他节点上。如果某节点宕机了,队列能自动切换到另一个节点上,保证服务的可用性。
搭建步骤 启动三台集群节点 随便找一台节点,添加policyha-mode: exactly (备机模式:指定,也即指定备份的个数) ha-params: 2 (指定备份个数:两份) ha-sync-mode: automatic (同步的模式:自动) 在某节点上创建队列(比如节点1),自动备份到另一个节点 假如停掉node1,node2成为镜像队列 就算集群只剩下一台机器了,依然能消费队列中的内容 具体到Java选择连接MQ的部分,需要使用负载均衡软件(比如Haproxy + Keepalive),这里不在继续讨论
Federation Exchange 不重要,略