RabbitMQ队列的选择

RabbitMQ队列的选择

文章目录

前言一、Rabbit MQ队列的选择1.1、Classic1.2、Quorum1.3、Stream1.4、懒队列

本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别

前言

本篇是Rabbit MQ高级特性的学习笔记,记录RabbitMQ的Classic,Quorum,Stream队列,懒队列的特性和运用场景

一、Rabbit MQ队列的选择

Rabbit MQ默认提供了三种队列:

Classic:经典队列,在单机模式下是最常用的。Quorum:仲裁队列,通常用于集群环境下,保证集群的高可用性。Stream:流式队列,是自3.9.0版本开始引入的新特性,这种队列类型的消息是持久化到磁盘并且具备分布式备份的。

1.1、Classic

经典队列是Rabbit MQ默认的队列,与Spring Boot整合时,使用new Queue,创建的队列就是普通队列: 经典队列在创建时除了指定队列的名称,还有额外的四个选项: 对应的是页面上的: durable代表了队列是否进行持久化,如果开启持久化,则会将队列保存到磁盘上,将来Rabbit MQ重启后,可以从磁盘恢复,否则队列只存在于内存中,服务重启后会自动删除。这里的durable只是设置了队列的持久化。消息和交换机,同样可以设置持久化。如果消息设置了持久化,而队列未设置持久化,重启之后,消息依旧会丢失。所以如果要保证消息的零丢失,消息和队列都需要设置持久化。

// 队列声明

channel.queueDeclare("safe_queue", true, false, false, null); // Durability=true

// 消息发布

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()

.deliveryMode(2) // 持久化消息

.build();

exclusive代表了队列是否排他。当属性为true时,表示仅允许声明了该队列的连接进行访问,其他连接无法访问该队列。通常用于单消费者模式,确保队列只能被特定消费者访问。 autoDelete代表了队列是否自动删除,如果设置为true,表示该队列在最后一个消费者断开连接之后,进行删除操作。通常用于临时队列的场景。 arguments可以指定更多的参数,比如指定死信交换机和路由键,消息过期时间,最大长度,是否为懒队列等。

1.2、Quorum

Quorum是针对镜像队列的一种优化,目前已经取代了镜像队列,作为Rabbit MQ集群部署保证高可用性的解决方案。传统的镜像队列,是将消息副本存储在一组节点上,以提高可用性和可靠性。镜像队列将队列中的消息复制到一个或多个其他节点上,并使这些节点上的队列保持同步。当一个节点失败时,其他节点上的队列不受影响,因为它们上面都有消息的备份。 镜像队列使用主从模式,所有消息写入和读取均通过主节点,并异步复制到镜像节点。主节点故障时需重新选举,期间队列不可用。而仲裁队列基于Raft分布式共识算法,所有节点组成仲裁组。消息需被多数节点持久化后才确认成功,Leader故障时自动触发选举。 相比较于传统的主从模式,避免了发生网络分区时的脑裂问题(基于Raft分布式共识算法避免)。 和普通队列的区别: 相比较于普通队列,仲裁队列增加了一个对于有毒消息的处理。什么是有毒消息?首先,消费者从队列中获取到了元素,队列会将该元素删除,但是消费者消费失败了,会给队列nack,并且可以设置消息重新入队。这样可能存在因为业务代码的问题,某条消息一直处理不成功的问题。仲裁队列会记录消息的重新投递次数,判断是否超过了设置的阈值,如果超过了就直接丢弃,或者放入死信队列人工处理。 如果需要声明一个仲裁队列,只需要加入参数:

@Configuration

public class QuorumConfig {

@Bean

public Queue quorumQueue() {

Map params = new HashMap<>();

params.put("x-queue-type","quorum");

return new Queue(MyConstants.QUEUE_QUORUM,true,false,false,params);

}

}

仲裁队列适用于集群环境下,队列长期存在,并且对于消息可靠性要求高,允许牺牲一部分性能(因为raft算法,消息需被多数节点持久化后才确认成功)的场景。

1.3、Stream

在传统的队列模型中,同一条消息只能被一个消费者消费(一个队列如果有多个消费者,是工作分发的机制。消息1->消费者1,消息2->消费者2,消息3->消费者1,不能两个消费者读同一条消息。),并且消息是阅后即焚的(消费者接收到消息后,队列中的该消息就删除,如果消费者拒绝签收并且设置了重新入队,再把消息重新放入队列中),无法重复从队列中获取相同的消息。并且在当队列中积累的消息过多时,性能下降会非常明显。 Stream队列正是解决了以上的这些问题。Stream队列的核心是用aof文件的形式存储队列,将消息以aof的方式追加到文件中。允许用户在日志的任何一个连接点开始重新读取数据。(需要用户自己记录偏移量) 声明一个stream队列:

@Configuration

public class StreamConfig {

@Bean

public Queue streamQueue() {

Map params = new HashMap<>();

params.put("x-queue-type","stream");

params.put("x-max-length-bytes", 20_000_000_000L); // 指定队列的大小

params.put("x-stream-max-segment-size-bytes", 100_000_000); // 文件分片存储,每一片的大小

//必须设置持久化为true,同时独占和自动删除模式为false

return new Queue(MyConstants.QUEUE_STREAM,true,false,false,params);

}

}

声明消费者:

public void stremReceiver(Channel channel,String message){

try {

channel.basicQos(100);

Consumer myconsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

System.out.println("========================");

String routingKey = envelope.getRoutingKey();

System.out.println("routingKey >"+routingKey);

String contentType = properties.getContentType();

System.out.println("contentType >"+contentType);

long deliveryTag = envelope.getDeliveryTag();

System.out.println("deliveryTag >"+deliveryTag);

System.out.println("content:"+new String(body,"UTF-8"));

// (process the message components here ...)

//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。

//没有答复过的消息,服务器会一直不停转发。

channel.basicAck(deliveryTag, false);

}

};

Map consumeParam = new HashMap<>();

//first: 从日志队列中第一个可消费的消息开始消费

//last: 消费消息日志中最后一个消息

//next: 相当于不指定offset,消费不到消息。

//Offset: 一个数字型的偏

//Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。

//例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)

consumeParam.put("x-stream-offset","last");

channel.basicConsume(MyConstants.QUEUE_STREAM, false,consumeParam, myconsumer);

} catch (IOException e) {

e.printStackTrace();

}

System.out.println("quorumReceiver received message : "+ message);

}

1.4、懒队列

Rabbit MQ对于常规队列的处理是,将消息优先存在于内存中,在合适的时机再持久化到磁盘上,而懒队列则相反,懒队列会尽可能早的将消息内容保存到磁盘当中,并且只有在用户请求到时,才临时从磁盘加载到内存当中。懒队列的设计也是为了应对消息堆积问题的。 声明懒队列的方式,只需要加入参数,相应的,当一个队列被声明为懒队列,那即使队列被设定为不持久化,消息依然会写入到硬盘中。

Map args = new HashMap();

args.put("x-queue-mode", "lazy");

懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。

相关推荐

吃荔枝会导致“酒驾”?当心!不能这样吃→
beat365中国在线体育

吃荔枝会导致“酒驾”?当心!不能这样吃→

📅 07-31 👁️ 3284
国际足总世界杯梦幻阵容
365买球官网入口

国际足总世界杯梦幻阵容

📅 07-07 👁️ 9116
阴阳师雨女怎么样 雨女技能属性图鉴
365bet网站

阴阳师雨女怎么样 雨女技能属性图鉴

📅 07-26 👁️ 8558
卢布兑人民币汇率
365买球官网入口

卢布兑人民币汇率

📅 07-12 👁️ 7649
媲的解释
365买球官网入口

媲的解释

📅 07-13 👁️ 9177
网络间谍活动的危害、特点与防治建议
beat365中国在线体育

网络间谍活动的危害、特点与防治建议

📅 07-27 👁️ 7988
为什么有的女孩手很小
beat365中国在线体育

为什么有的女孩手很小

📅 06-30 👁️ 2636
公文写作指南:副标题格式你真的掌握了吗?
beat365中国在线体育

公文写作指南:副标题格式你真的掌握了吗?

📅 07-19 👁️ 7338
第四章 网络监听
365bet网站

第四章 网络监听

📅 08-18 👁️ 7080