什么是MQ
MQ,全称为message queue
,消息队列,本质就是一个先进先出的队列,是一种跨进程的通信机制,用于上下游传递信息。
为什么要使用MQ
MQ的优点
- 流量削峰
- 异步处理
- 应用解耦
MQ分类
ActiveMQ
- 优点。单机吞吐量万级,时效性为ms级,可用性高,基于主从架构实现高可用性,丢失数据的概率较低。
- 缺点。早期项目,目前社区活跃度低,高吞吐量场景较少使用。
Kafka
- 优点。在大数据的消息传输方面,举足轻重。方面性能卓越,单机写入TPS约在每秒百万条,最大的优点就是吞吐量很高,时效性为ms级,可用性很高。而且是Kafka是分布式的,一个数据多个副本。
- 缺点。Kafka单机超过64个队列(分区),Load就会发生明显的飙高,发送消息响应时间变长,消息失败不支持重试。支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢。
RocketMQ
- 优点。RocketMQ出自阿里巴巴,而且用Java语言实现的,对于学习Java的人来说还能自己定制。单机吞吐量达十万级,可用性很高,分布式架构,消息可以做到零丢失,功能较为完善,扩展性也很好,支持10亿级别的消息堆积,不会因为消息堆积而导致性能下降。
- 缺点。支持的客户端语言不多,目前是Java和C++,而且C++方面也还不成熟,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量的代码。
RabbitMQ
- 优点。RabbitMQ由erlang实现,由于erlang的高并发特性,性能较好,吞吐量到万级,功能比较完备,稳定易用跨平台,而且支持多种语言,社区活跃度也很高。
- 缺点。商业版需要收费,学习成本较高。
RabbitMQ的概念
RabbitMQ是一个消息中间件,接收并转发消息,它只接收存储和转发消息数据,并不会处理消息。
四大核心概念。
生产者
。
产生数据并发送消息的程序。
交换机
。
交换机一方面接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
队列
队列是RabbitMQ中的使用的一个数据结构,用于存储消息。队列仅受主机的内存和磁盘限制的约束,本质上是一个消息缓冲区。生产者将消息发送到队列,消费者从队列中接收消息。
消费者
接收并处理消息的程序。
RabbitMQ安装
RabbitMQ的安装可以参考官方的文档。
https://www.rabbitmq.com/download.html
里面有多种安装的方式,建议使用docker来进行安装,不用考虑这么多环境的问题,只要安装了docker,就只需要一条命令即可。
1 2 3 4 5
| docker run -id --name rabbitmq \ -e RABBITMQ_DEFAULT_USER=username \ -e RABBITMQ_DEFAULT_PASS=password \ -p 15672:15672 -p 5672:5672 \ rabbitmq:3.8-management
|
安装的是管理版本,所以可以访问地址
http://IP:15672/
来进行登录管理RabbitMQ的web界面。
服务器记得开放15672和5672这两个端口,15672是web管理界面的接口,5672是RabbitMQ的服务端口。
如果没设置参数,默认的账号和密码都是guest
官方的教程也是挺好的,可以参考一下。
https://www.rabbitmq.com/getstarted.html
Hello World
引入MQ客户端的依赖。
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> </dependency>
|
首先先创建一个常量类,用于存储一些配置参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.yww.rabbitmq;
public class Constant {
public static final String QUEUE_NAME = "Hello";
public static final String HOSTNAME = "119.29.119.110";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
}
|
生产者发送消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.yww.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(Constant.HOSTNAME); factory.setUsername(Constant.USERNAME); factory.setPassword(Constant.PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(Constant.QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", Constant.QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送成功"); }
}
|
运行该程序就能发送一条消息到指定队列了。
消费者消费消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.yww.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(Constant.HOSTNAME); factory.setUsername(Constant.USERNAME); factory.setPassword(Constant.PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到的消息为:"); String messageMessage = new String(message.getBody()); System.out.println(messageMessage); };
CancelCallback cancelCallback = consumerTag -> { System.out.println("消费者取消了消费!"); System.out.println(consumerTag); };
channel.basicConsume(Constant.QUEUE_NAME, true, deliverCallback, cancelCallback); }
}
|
此时就能看到消费者获取到了这个消息。
Work Queues
Work Queues,工作队列或者说是任务队列,简单的理解为一个队列的任务可以分配给多个消费者消费。
它的主要思想是避免立即执行资源密集任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
循环调度
生产者和消费者获取连接,信道的方式是一样的,所以可以抽取出来作为一个工具类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.yww.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class MqUtil {
public static final String QUEUE_NAME = "Hello";
public static final String HOSTNAME = "119.29.119.110";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOSTNAME); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); return connection.createChannel(); } }
|
这次先编写两个消费者(工作线程),两个消费者除了打印不一样,其他都一样,所以就不写全了,启动两次或者分两个类启动都行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.yww.rabbitmq;
import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Worker1 {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MqUtil.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到的消息:"); System.out.println(new String(message.getBody())); };
CancelCallback cancelCallback = consumerTag -> { System.out.println("消费者取消消费这个消息"); System.out.println(consumerTag); }; System.out.println("当前工作线程为**1**,正在等待接收消息--------"); channel.basicConsume(MqUtil.QUEUE_NAME, true, deliverCallback, cancelCallback); }
}
|
接下来编写生产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yww.rabbitmq;
import com.rabbitmq.client.Channel;
import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MqUtil.getChannel();
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.nextLine(); channel.basicPublish("", MqUtil.QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息完成,本次的消息为----" + message); } }
}
|
接下来进行发送测试,发送几个消息。
1 2 3 4 5 6 7 8
| AA 发送消息完成,本次的消息为----AA BB 发送消息完成,本次的消息为----BB CC 发送消息完成,本次的消息为----CC DD 发送消息完成,本次的消息为----DD
|
可以看到两个消费者的情况。
1 2 3 4 5 6 7 8 9 10 11
| 当前工作线程为**1**,正在等待接收消息-------- 接收到的消息: AA 接收到的消息: CC
当前工作线程为**2**,正在等待接收消息-------- 接收到的消息: BB 接收到的消息: DD
|
从这里可以看出一些规律。
每个消息是只能被消费一次的。
消息的消费的轮询的,就是说每个消费者消费了一次消息之后,就轮到下一个消费者去消费,不会连续消费。
消息应答
消费者完成一个任务需要一段时间,在这段时间内如果它挂掉了,由于RabbitMQ一旦向消费者传递消息后就会立即将该消息标记为删除,所以就会丢失了这段消息的处理。所以为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制。
消息应答就是说,消费者在接收到消息并处理了之后,在通知RabbitMQ处理完成,RabbitMQ才会把该消息删除。
自动应答
这种方式表示消息发送后就会立即被认为已经传输完成,就会有可能出现上述的消息丢失的情况。这种模式会出现消息丢失的情况,但是可以适用于高吞吐量的场景,毕竟速度和安全并不能全都要。上述的代码演示都是可以看到是自动应答的。
这种模式尽量不使用,因为在大多数情况下,每一个消息都是很重要的,所以尽量使用自定义的应答方式。
消息应答的方法
Channel.basicNack()
用于否定确认。
1 2 3 4 5 6 7 8
|
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
|
Channel.basicReject()
用于否定确认。
1 2 3 4 5 6 7
|
void basicReject(long deliveryTag, boolean requeue) throws IOException;
|
关于参数multiple这个批量应答的处理。
不建议使用批量应答,因为不能保证之前的消息是完成的,批量应答可能会出现应答了没完成的消息。
消息自动重新入队
当消费者由于一些特殊情况导致ACK确认未成功发送给RabbitMQ,RabbitMQ就会让该消息重新排队(上述的requeue参数选择了消息不被丢弃),这就可以解决消息丢失的问题了。
消息应答的测试
手动应答是消费者设置的,所以生产者不需要特殊的变化,跟上一例子一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Producer {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MqUtil.getChannel();
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.nextLine(); channel.basicPublish("", MqUtil.QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息完成,本次的发送的消息为----" + message); } } }
|
接下来编写消费者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.yww.rabbitmq;
import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class consumer1 {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MqUtil.getChannel(); System.out.println("消费者**1**等待接收消息, 该消费者处理消息时间较短"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("消费者**1**开始消费消息"); String deliverMessage = new String(message.getBody()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者**1**接收到了消息,消息为: " + deliverMessage); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); };
CancelCallback cancelCallback = consumerTag -> { System.out.println("消费者**1**取消了消费消息,当前消息信息为:"); System.out.println(consumerTag); };
channel.basicConsume(MqUtil.QUEUE_NAME, false, deliverCallback, cancelCallback); }
}
|
消费者二就调整一下时间,设置一个较久的时间,便于我们在暂停期间停止该进程,使其消费失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.yww.rabbitmq;
import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class consumer2 {
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MqUtil.getChannel(); System.out.println("消费者**2**等待接收消息, 该消费者处理消息时间较长"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("消费者**2**开始消费消息"); String deliverMessage = new String(message.getBody()); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者**2**接收到了消息,消息为: " + deliverMessage); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); };
CancelCallback cancelCallback = consumerTag -> { System.out.println("消费者**2**取消了消费消息,当前消息信息为:"); System.out.println(consumerTag); };
channel.basicConsume(MqUtil.QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
|
接下来开始测试。
开启生产者和这两个消费者,当消费者2出现开始消费的字样,即进入了接收消费的回调之后,停止程序运行,模拟消息丢失。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Producer的控制台 aa 发送消息完成,本次的发送的消息为----aa bb 发送消息完成,本次的发送的消息为----bb cc 发送消息完成,本次的发送的消息为----cc
Comsumer1的控制台 消费者**1**等待接收消息, 该消费者处理消息时间较短 消费者**1**开始消费消息 消费者**1**接收到了消息,消息为: bb -------------------此时关闭消费者2------------------------- 消费者**1**开始消费消息 消费者**1**接收到了消息,消息为: cc
Comsumer2的控制台 消费者**2**等待接收消息, 该消费者处理消息时间较长 消费者**2**开始消费消息 消费者**2**接收到了消息,消息为: aa 消费者**2**开始消费消息 -------------------此时关闭消费者2-------------------------
|
从这个测试就可以看到,cc
这个消息原本应该是在消费者2中消费的,但是我停下了消费者2的进程之后,会发现这个消息转交到了消费者1中执行。
RabbitMQ持久化
消息应答是消费者出现问题的时候的解决方法,但是当RabbitMQ服务宕机之后,消息还是可能会丢失,所以这就需要启动RabbitMQ的持久化机制了。
队列实现持久化
之前是有提到过的,队列不开启持久化的话,MQ服务宕机重启的时候,队列就会丢失(因为是临时存储的),所以需要在声明队列的时候就要开启持久化。
主要就是生产者存放消息时声明队列的那个方法。
1 2 3 4 5 6
|
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
|
消息实现持久化
根据上述所说,当消息不开启持久化,服务宕机之后,即使队列不会丢失,消息也会丢失的,所以也需要开启持久化。
主要就是发送消息的那个方法。
1 2 3 4 5 6 7 8
|
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
channel.basicPublish("", Constant.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
|
持久化不代表不丢失
队列和消息都实现持久化之后,并不能保证消息不会丢失,因为有很多特殊情况会影响结果,比如持久化到磁盘的过程中服务宕机,还是会可能丢失数据的,所以这种方法并不能避免一些特殊情况。所以还需要参考下述的发布确认。
发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack
的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
开启发布确认
开启发布确认是在生产者端的,所以只需要在生产者端标记信道,开启即可,主要的方法就是这个。
1 2 3 4 5 6 7 8 9
|
Confirm.SelectOk confirmSelect() throws IOException;
channel.confirmSelect();
|
这个方法是将信道标记成confirm
,还需要配合下面的方法实现确认。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
boolean waitForConfirms() throws InterruptedException;
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
void waitForConfirmsOrDie() throws IOException, InterruptedException;
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
|
一般都会使用
1
| boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
|
单个确认发布
最简单的一种发布确认方式,是一种同步的确认发布,也就是说发布一个消息之后,只有它被确认的时候才会返回,如果在指定时间范围内没有被确认,就会抛出异常。
这种方式是最花费时间的,因为每一个发布的消息都要等待确认才能发下一个消息,吞吐量会小很多,不过这种方式也是最安全的,能保证消息不会丢失。
测试如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.yww.rabbitmq.publish;
import com.rabbitmq.client.Channel; import com.yww.rabbitmq.MqUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class PublishOne {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Channel channel = MqUtil.getChannel(); channel.confirmSelect(); channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null); int total = 0; long beginTime = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { String msg = String.valueOf(i); channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes()); boolean flag = channel.waitForConfirms(); if (flag) { total++; } } long endTime = System.currentTimeMillis(); System.out.println("发布了100条单独确认消息耗时为:" + (endTime - beginTime) + "ms"); System.out.println("成功发布了消息数为:" + total); }
}
|
测试结果。
1 2
| 发布了1000条单独确认消息耗时为:29474ms 成功发布了消息数为:1000
|
批量发布确认
单个确认的效率很慢,所以可以先发布一批消息然后一起确认可以极大提高吞吐量,这种方案也是同步的发布确认方式。
当然问题很明显,当出现故障的时候,不知道具体是哪一个消息出现了问题。
测试如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.yww.rabbitmq.publish;
import com.rabbitmq.client.Channel; import com.yww.rabbitmq.MqUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class PublishBatch {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Channel channel = MqUtil.getChannel(); channel.confirmSelect(); channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null); int batchSize = 100; int total = 0; long beginTime = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String msg = String.valueOf(i); channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes()); total++; if (total == batchSize) { channel.waitForConfirms(); total = 0; } } if (total > 0) { channel.waitForConfirms(); } long endTime = System.currentTimeMillis(); System.out.println("发布了1000条批量确认消息耗时为:" + (endTime - beginTime) + "ms"); System.out.println("未确认的消息数为:" + total); }
}
|
测试结果如下。
1 2
| 发布了1000条批量确认消息耗时为:292ms 未确认的消息数为:0
|
异步确认发布
异步确认发布在比同步的方式最重要的是效率能高很多,而且因为也是单个确认的,所以也是很可靠的。
这里就是通过监听器来帮忙监听哪些消息是成功的或者是失败的,而不用在逻辑里面等待确认。
添加监听器主要是下面这个方法。
1 2 3
| void addConfirmListener(ConfirmListener listener);
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
|
测试如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.yww.rabbitmq.publish;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.yww.rabbitmq.MqUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class PublishAsync {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Channel channel = MqUtil.getChannel(); channel.confirmSelect(); channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { System.out.println("这个确认消息为" + deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("这个未确认消息为" + deliveryTag); }; channel.addConfirmListener(ackCallback, nackCallback); long beginTime = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String msg = String.valueOf(i); channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes()); } long endTime = System.currentTimeMillis(); System.out.println("发布了1000条批量确认消息耗时为:" + (endTime - beginTime) + "ms"); }
}
|
测试结果如下。
1 2
| 发布了1000条批量确认消息耗时为:33ms (...省略了成功打印)
|
异步未确认消息的处理
从上述的例子可以知道监听器和发布消息的线程不是同一个,所以这个未确认的消息的处理最好还是先存储起来。
一个简单的使用例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.yww.rabbitmq.publish;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.yww.rabbitmq.MqUtil;
import java.io.IOException; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeoutException;
public class publishDeal {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Channel channel = MqUtil.getChannel(); channel.confirmSelect(); channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null); ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = map.headMap(deliveryTag); confirmed.clear(); } else { map.remove(deliveryTag); } System.out.println("确认的消息为:" + deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String nackMsg = map.get(deliveryTag); System.out.println("这个未确认消息的内容为:" + nackMsg); }; channel.addConfirmListener(ackCallback, nackCallback); long beginTime = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String msg = String.valueOf(i); long num = channel.getNextPublishSeqNo(); channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes()); map.put(num, msg); } long endTime = System.currentTimeMillis(); System.out.println("发布了1000条批量确认消息耗时为:" + (endTime - beginTime) + "ms"); }
}
|
不公平调度
刚刚学习的时候是可以知道RabbitMQ默认是循环调度,也就是轮询的,这种调度方法,有优点也有缺点,有时候公平并不是一件好事,所以我们可以设置这个消费者的工作内容。
主要是在消费者中设置信道的这个函数。
1 2 3 4 5 6 7 8 9 10 11 12
|
void basicQos(int prefetchCount) throws IOException;
channel.basicQos(1);
|
注意当所有工作线程都很忙,你的队列有可能会被填满。
所以需要你添加更多的工作线程,或者是使用其他的策略。
上述只是简单实用,这个方法的完整形式如下(以下为机翻)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
|
Publish/Subscribe
在上述的示例中,使用了工作队列,工作队列会将每个任务交付给一个工作线程。当然我们也可以实现向多个消费者传递一条消息,这种模式被称为Publish/Subscribe
,也叫发布/订阅
。
交换机(Exchanges)
RabbitMQ消息传递模型的核心思想是生产者从不直接向队列中发送任何消息。实际上生产者经常甚至根本不知道消息是否会被传送到哪个队列。
实际情况是生产者只能将消息发送到交换机中,交换机的任务也很简单,一方面他接收来自生产者的消息,另一方面将消息推入队列。
是应该把这些消息放到特定队列,还是说把他们丢到许多队列中,或者说是丢弃,这些就是由交换机的类型来决定了。
交换机有几种可用的类型。
- direct(直接)
- topic(主题)
- headers(标题)
- fanout(扇出)
无名交换机
之前使用的就是无名的交换机。
1
| channel.basicPublish("", "hello", null, "hello world".getBytes());
|
第一个参数是交换机的名称,空字符串就表示默认或者说是无名交换机。消息能路由发送到队列中其实就是由routingKey
指定的。
临时队列
临时队列用于临时存储消息,一旦断开了消费者的连接,队列就会被自动删除。
创建一个随机名称的临时队列。
1
| String queueName = channel.queueDeclare().getQueue();
|