什么是MQ

MQ,全称为message queue,消息队列,本质就是一个先进先出的队列,是一种跨进程的通信机制,用于上下游传递信息。

为什么要使用MQ

MQ的优点

  1. 流量削峰
  2. 异步处理
  3. 应用解耦

MQ分类

  1. ActiveMQ
    • 优点。单机吞吐量万级,时效性为ms级,可用性高,基于主从架构实现高可用性,丢失数据的概率较低。
    • 缺点。早期项目,目前社区活跃度低,高吞吐量场景较少使用。
  2. Kafka
    • 优点。在大数据的消息传输方面,举足轻重。方面性能卓越,单机写入TPS约在每秒百万条,最大的优点就是吞吐量很高,时效性为ms级,可用性很高。而且是Kafka是分布式的,一个数据多个副本。
    • 缺点。Kafka单机超过64个队列(分区),Load就会发生明显的飙高,发送消息响应时间变长,消息失败不支持重试。支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢。
  3. RocketMQ
    • 优点。RocketMQ出自阿里巴巴,而且用Java语言实现的,对于学习Java的人来说还能自己定制。单机吞吐量达十万级,可用性很高,分布式架构,消息可以做到零丢失,功能较为完善,扩展性也很好,支持10亿级别的消息堆积,不会因为消息堆积而导致性能下降。
    • 缺点。支持的客户端语言不多,目前是Java和C++,而且C++方面也还不成熟,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量的代码。
  4. RabbitMQ
    • 优点。RabbitMQ由erlang实现,由于erlang的高并发特性,性能较好,吞吐量到万级,功能比较完备,稳定易用跨平台,而且支持多种语言,社区活跃度也很高。
    • 缺点。商业版需要收费,学习成本较高。

RabbitMQ的概念

RabbitMQ是一个消息中间件,接收并转发消息,它只接收存储和转发消息数据,并不会处理消息。

四大核心概念。

  1. 生产者

    产生数据并发送消息的程序。

  2. 交换机

    交换机一方面接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。

  3. 队列

    队列是RabbitMQ中的使用的一个数据结构,用于存储消息。队列仅受主机的内存和磁盘限制的约束,本质上是一个消息缓冲区。生产者将消息发送到队列,消费者从队列中接收消息。

  4. 消费者

    接收并处理消息的程序。

RabbitMQ安装

RabbitMQ的安装可以参考官方的文档。

https://www.rabbitmq.com/download.html

里面有多种安装的方式,建议使用docker来进行安装,不用考虑这么多环境的问题,只要安装了docker,就只需要一条命令即可。

1
2
3
4
5
docker run -id --name rabbitmq \
-e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=password \
-p 15672:15672 -p 5672:5672 \
rabbitmq:3.8-management

安装的是管理版本,所以可以访问地址

http://IP:15672/

来进行登录管理RabbitMQ的web界面。

服务器记得开放15672和5672这两个端口,15672是web管理界面的接口,5672是RabbitMQ的服务端口。

如果没设置参数,默认的账号和密码都是guest

官方的教程也是挺好的,可以参考一下。

https://www.rabbitmq.com/getstarted.html

Hello World

引入MQ客户端的依赖。

1
2
3
4
5
<!-- RabbitMQ的客户端依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>

首先先创建一个常量类,用于存储一些配置参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yww.rabbitmq;

/**
* <p>
* 连接的参数
* </p>
*
* @author yww
**/
public class Constant {

/**
* 队列名称
*/
public static final String QUEUE_NAME = "Hello";
/**
* 主机名
*/
public static final String HOSTNAME = "119.29.119.110";
/**
* 用户名
*/
public static final String USERNAME = "username";
/**
* 密码
*/
public static final String PASSWORD = "password";

}

生产者发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.yww.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 生产者
* </p>
*
* @author yww
**/
public class Producer {

public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接的配置,IP,用户名,密码
factory.setHost(Constant.HOSTNAME);
factory.setUsername(Constant.USERNAME);
factory.setPassword(Constant.PASSWORD);
// 创建连接
Connection connection = factory.newConnection();
// 通过连接来创建一个信道
Channel channel = connection.createChannel();
/*
* 通过一个信道来生成一个队列
* 1. 队列的名称
* 2. true表示声明的是一个持久化队列,该队列会在服务器重启后依旧存活
* 3. true表示声明的是一个独占队列,被限制在这个连接中。
* 4. 最后一个消费者断开连接之后,该队列是否自动删除。true表示自动删除,false表示不自动删除
* 5. 其他参数
*/
channel.queueDeclare(Constant.QUEUE_NAME, false, false, false, null);

String message = "Hello World!";
/*
* 发送一个消息
* 1. 发送到哪个交换机
* 2. 路由的key值是哪个,本次是队列的名称
* 3. 其他参数信息
* 4. 发送消息的消息体
*/
channel.basicPublish("", Constant.QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送成功");
}

}

运行该程序就能发送一条消息到指定队列了。

消费者消费消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.yww.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 消费者
* </p>
*
* @author yww
**/
public class Consumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constant.HOSTNAME);
factory.setUsername(Constant.USERNAME);
factory.setPassword(Constant.PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 消费者接收的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息为:");
String messageMessage = new String(message.getBody());
System.out.println(messageMessage);
};

// 消费者被取消消费的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者取消了消费!");
System.out.println(consumerTag);
};
/*
* 消费者消费消息
* 1. 队列的名称
* 2. 消费成功之后是否要自动应答。true表示自动应答,false表示要手动应答
* 3. 消费者接收的回调
* 4. 消费者被取消消费的回调
*/
channel.basicConsume(Constant.QUEUE_NAME, true, deliverCallback, cancelCallback);
}

}

此时就能看到消费者获取到了这个消息。

Work Queues

Work Queues,工作队列或者说是任务队列,简单的理解为一个队列的任务可以分配给多个消费者消费。

它的主要思想是避免立即执行资源密集任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

循环调度

生产者和消费者获取连接,信道的方式是一样的,所以可以抽取出来作为一个工具类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.yww.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 获取信道工具类
* </p>
*
* @author yww
**/
public class MqUtil {

/**
* 队列名称
*/
public static final String QUEUE_NAME = "Hello";
/**
* 主机名
*/
public static final String HOSTNAME = "119.29.119.110";
/**
* 用户名
*/
public static final String USERNAME = "username";
/**
* 密码
*/
public static final String PASSWORD = "password";


/**
* 获取信道
* @return Channel
*/
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOSTNAME);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
return connection.createChannel();
}

}

这次先编写两个消费者(工作线程),两个消费者除了打印不一样,其他都一样,所以就不写全了,启动两次或者分两个类启动都行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.yww.rabbitmq;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 第一个工作线程
* </p>
*
* @author yww
* @version 1.0
* @date 2021/12/25 22:28
**/
public class Worker1 {

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtil.getChannel();

DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:");
System.out.println(new String(message.getBody()));
};

CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者取消消费这个消息");
System.out.println(consumerTag);
};
System.out.println("当前工作线程为**1**,正在等待接收消息--------");
// 接收消息
channel.basicConsume(MqUtil.QUEUE_NAME, true, deliverCallback, cancelCallback);
}

}

接下来编写生产者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yww.rabbitmq;

import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 生产者
* 发送大量的消息
* </p>
*
* @author yww
**/
public class Producer {

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtil.getChannel();

// 信道指定队列
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null);
// 从控制台中接收信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.nextLine();
channel.basicPublish("", MqUtil.QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成,本次的消息为----" + message);
}
}

}

接下来进行发送测试,发送几个消息。

1
2
3
4
5
6
7
8
AA
发送消息完成,本次的消息为----AA
BB
发送消息完成,本次的消息为----BB
CC
发送消息完成,本次的消息为----CC
DD
发送消息完成,本次的消息为----DD

可以看到两个消费者的情况。

1
2
3
4
5
6
7
8
9
10
11
当前工作线程为**1**,正在等待接收消息--------
接收到的消息:
AA
接收到的消息:
CC

当前工作线程为**2**,正在等待接收消息--------
接收到的消息:
BB
接收到的消息:
DD

从这里可以看出一些规律。

每个消息是只能被消费一次的。

消息的消费的轮询的,就是说每个消费者消费了一次消息之后,就轮到下一个消费者去消费,不会连续消费。

消息应答

消费者完成一个任务需要一段时间,在这段时间内如果它挂掉了,由于RabbitMQ一旦向消费者传递消息后就会立即将该消息标记为删除,所以就会丢失了这段消息的处理。所以为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制。

消息应答就是说,消费者在接收到消息并处理了之后,在通知RabbitMQ处理完成,RabbitMQ才会把该消息删除。

自动应答

这种方式表示消息发送后就会立即被认为已经传输完成,就会有可能出现上述的消息丢失的情况。这种模式会出现消息丢失的情况,但是可以适用于高吞吐量的场景,毕竟速度和安全并不能全都要。上述的代码演示都是可以看到是自动应答的。

这种模式尽量不使用,因为在大多数情况下,每一个消息都是很重要的,所以尽量使用自定义的应答方式。

消息应答的方法

  • Channel.basicAck()

    肯定确认。表示MQ已经知道该消息已经被成功处理了,可以将其丢弃。

    1
    2
    3
    4
    5
    6
    7
    /**
    * 确认一个或多个收到的消息
    * @param deliveryTag
    * @param multiple 是否批量应答,true表示批量应答消息
    * @throws java.io.IOException 如果遇到错误
    */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • Channel.basicNack()

    用于否定确认。

    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * 拒绝接收到的一条或多条消息。
    * @param deliveryTag
    * @param multiple 是否批量应答,true表示批量应答消息
    * @param requeue 如果被拒绝的消息应该被重新排队而不是丢弃/死信,则使用true
    * @throws java.io.IOException 如果遇到错误
    */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • Channel.basicReject()

    用于否定确认。

    1
    2
    3
    4
    5
    6
    7
    /**
    * 拒绝接收到的一条消息。
    * @param deliveryTag
    * @param requeue true表示如果被拒绝的消息应该被重新排队而不是被丢弃/死信,则使用true
    * @throws java.io.IOException 如果遇到错误
    */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

关于参数multiple这个批量应答的处理。

不建议使用批量应答,因为不能保证之前的消息是完成的,批量应答可能会出现应答了没完成的消息。

消息自动重新入队

当消费者由于一些特殊情况导致ACK确认未成功发送给RabbitMQ,RabbitMQ就会让该消息重新排队(上述的requeue参数选择了消息不被丢弃),这就可以解决消息丢失的问题了。

消息应答的测试

手动应答是消费者设置的,所以生产者不需要特殊的变化,跟上一例子一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Producer {

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtil.getChannel();

// 信道指定队列
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null);
// 从控制台中接收信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.nextLine();
channel.basicPublish("", MqUtil.QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成,本次的发送的消息为----" + message);
}
}

}

接下来编写消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.yww.rabbitmq;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 消费者1
* </p>
*
* @author yww
**/
public class consumer1 {

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtil.getChannel();
System.out.println("消费者**1**等待接收消息, 该消费者处理消息时间较短");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者**1**开始消费消息");
String deliverMessage = new String(message.getBody());
// 模拟消费消息需要一秒钟
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者**1**接收到了消息,消息为: " + deliverMessage);
// 手动应答(不批量)
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者**1**取消了消费消息,当前消息信息为:");
System.out.println(consumerTag);
};

// 第二个参数标识为false,表示不使用自动应答
channel.basicConsume(MqUtil.QUEUE_NAME, false, deliverCallback, cancelCallback);
}

}

消费者二就调整一下时间,设置一个较久的时间,便于我们在暂停期间停止该进程,使其消费失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.yww.rabbitmq;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 消费者2
* </p>
*
* @author yww
**/
public class consumer2 {

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MqUtil.getChannel();
System.out.println("消费者**2**等待接收消息, 该消费者处理消息时间较长");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者**2**开始消费消息");
String deliverMessage = new String(message.getBody());
// 模拟消费消息需要二十秒钟
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者**2**接收到了消息,消息为: " + deliverMessage);
// 手动应答(不批量)
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};

CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者**2**取消了消费消息,当前消息信息为:");
System.out.println(consumerTag);
};

// 第二个参数标识为false,表示不使用自动应答
channel.basicConsume(MqUtil.QUEUE_NAME, false, deliverCallback, cancelCallback);

}

}

接下来开始测试。

开启生产者和这两个消费者,当消费者2出现开始消费的字样,即进入了接收消费的回调之后,停止程序运行,模拟消息丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Producer的控制台
aa
发送消息完成,本次的发送的消息为----aa
bb
发送消息完成,本次的发送的消息为----bb
cc
发送消息完成,本次的发送的消息为----cc

Comsumer1的控制台
消费者**1**等待接收消息, 该消费者处理消息时间较短
消费者**1**开始消费消息
消费者**1**接收到了消息,消息为: bb
-------------------此时关闭消费者2-------------------------
消费者**1**开始消费消息
消费者**1**接收到了消息,消息为: cc

Comsumer2的控制台
消费者**2**等待接收消息, 该消费者处理消息时间较长
消费者**2**开始消费消息
消费者**2**接收到了消息,消息为: aa
消费者**2**开始消费消息
-------------------此时关闭消费者2-------------------------

从这个测试就可以看到,cc这个消息原本应该是在消费者2中消费的,但是我停下了消费者2的进程之后,会发现这个消息转交到了消费者1中执行。

RabbitMQ持久化

消息应答是消费者出现问题的时候的解决方法,但是当RabbitMQ服务宕机之后,消息还是可能会丢失,所以这就需要启动RabbitMQ的持久化机制了。

队列实现持久化

之前是有提到过的,队列不开启持久化的话,MQ服务宕机重启的时候,队列就会丢失(因为是临时存储的),所以需要在声明队列的时候就要开启持久化。

主要就是生产者存放消息时声明队列的那个方法。

1
2
3
4
5
6
/*
* 队列持久化的开启主要是第二个参数(durable)
* true表示声明的是一个持久化队列,该队列会在服务器重启后依旧存活
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

消息实现持久化

根据上述所说,当消息不开启持久化,服务宕机之后,即使队列不会丢失,消息也会丢失的,所以也需要开启持久化。

主要就是发送消息的那个方法。

1
2
3
4
5
6
7
8
   /*
* 消息持久化的开启主要是第三个参数(props)
* 这个参数是比较多的,需要在这里配置消息是持久化的。
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

// 需要配置一个常数(MessageProperties.PERSISTENT_TEXT_PLAIN)
channel.basicPublish("", Constant.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

持久化不代表不丢失

队列和消息都实现持久化之后,并不能保证消息不会丢失,因为有很多特殊情况会影响结果,比如持久化到磁盘的过程中服务宕机,还是会可能丢失数据的,所以这种方法并不能避免一些特殊情况。所以还需要参考下述的发布确认。

发布确认

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

开启发布确认

开启发布确认是在生产者端的,所以只需要在生产者端标记信道,开启即可,主要的方法就是这个。

1
2
3
4
5
6
7
8
9
/**
* 在这个信道上开启发布确认。
*/
Confirm.SelectOk confirmSelect() throws IOException;

/**
* 在生产者端开启即可
*/
channel.confirmSelect();

这个方法是将信道标记成confirm,还需要配合下面的方法实现确认。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 开启等待直到上次调用依赖发布的所有消息都被broker返回ack或者是nack。
* 如果在非confirm信道上调用该方法,会抛出InterruptedException异常。
*/
boolean waitForConfirms() throws InterruptedException;

/**
* 开启等待直到自上次调用以来所有发布的消息都被broker返回ack或者是nack,或者是直到超时结束。
* 如果超时过期就会抛出TimeoutException异常。
* 如果在非confirm信道上调用该方法,会抛出InterruptedException异常。
*/
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

/**
* 开启等待直到上次调用依赖发布的所有消息都被broker返回ack或者是nack。
* 如果消息被broker返回了nack,会抛出IOException异常。
* 如果在非confirm信道上调用该方法,会抛出InterruptedException异常。
*/
void waitForConfirmsOrDie() throws IOException, InterruptedException;

/**
* 开启等待直到上次调用依赖发布的所有消息都被broker返回ack或者是nack,或者是直到超时结束。
* 如果消息被broker返回了nack,会抛出IOException异常。
* 如果超时过期就会抛出TimeoutException异常。
* 如果在非confirm信道上调用该方法,会抛出InterruptedException异常。
*/
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

一般都会使用

1
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

单个确认发布

最简单的一种发布确认方式,是一种同步的确认发布,也就是说发布一个消息之后,只有它被确认的时候才会返回,如果在指定时间范围内没有被确认,就会抛出异常。

这种方式是最花费时间的,因为每一个发布的消息都要等待确认才能发下一个消息,吞吐量会小很多,不过这种方式也是最安全的,能保证消息不会丢失。

测试如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.yww.rabbitmq.publish;

import com.rabbitmq.client.Channel;
import com.yww.rabbitmq.MqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 单个确认发布
* </p>
*
* @author yww
**/
public class PublishOne {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = MqUtil.getChannel();
// 开启发布确认
channel.confirmSelect();
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null);
int total = 0;
long beginTime = System.currentTimeMillis();
// 发布消息测试
for (int i = 0; i < 100; i++) {
String msg = String.valueOf(i);
channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes());
// 发布确认,返回true则表示成功发布,返回false可以重新发布,也可以使用超时时间。
boolean flag = channel.waitForConfirms();
if (flag) {
total++;
}
}
long endTime = System.currentTimeMillis();
System.out.println("发布了100条单独确认消息耗时为:" + (endTime - beginTime) + "ms");
System.out.println("成功发布了消息数为:" + total);
}

}

测试结果。

1
2
发布了1000条单独确认消息耗时为:29474ms
成功发布了消息数为:1000

批量发布确认

单个确认的效率很慢,所以可以先发布一批消息然后一起确认可以极大提高吞吐量,这种方案也是同步的发布确认方式。

当然问题很明显,当出现故障的时候,不知道具体是哪一个消息出现了问题。

测试如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.yww.rabbitmq.publish;

import com.rabbitmq.client.Channel;
import com.yww.rabbitmq.MqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 批量确认发布
* </p>
*
* @author yww
**/
public class PublishBatch {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = MqUtil.getChannel();
// 开启发布确认
channel.confirmSelect();
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null);
// 批量确认消息数
int batchSize = 100;
// 未确认消息个数
int total = 0;
long beginTime = System.currentTimeMillis();
// 发布消息测试
for (int i = 0; i < 1000; i++) {
String msg = String.valueOf(i);
channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes());
total++;
// 如果发布的消息数等于设置的批量确认消息数,就进行发布确认
if (total == batchSize) {
channel.waitForConfirms();
total = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (total > 0) {
channel.waitForConfirms();
}
long endTime = System.currentTimeMillis();
System.out.println("发布了1000条批量确认消息耗时为:" + (endTime - beginTime) + "ms");
System.out.println("未确认的消息数为:" + total);
}

}

测试结果如下。

1
2
发布了1000条批量确认消息耗时为:292ms
未确认的消息数为:0

异步确认发布

异步确认发布在比同步的方式最重要的是效率能高很多,而且因为也是单个确认的,所以也是很可靠的。

这里就是通过监听器来帮忙监听哪些消息是成功的或者是失败的,而不用在逻辑里面等待确认。

添加监听器主要是下面这个方法。

1
2
3
	void addConfirmListener(ConfirmListener listener);
// 一般使用下面这个方法,能知道哪些成功,哪些失败
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);

测试如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.yww.rabbitmq.publish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.yww.rabbitmq.MqUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 异步确认发布
* </p>
*
* @author yww
**/
public class PublishAsync {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = MqUtil.getChannel();
// 开启发布确认
channel.confirmSelect();
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null);
// 监听器消息确认成功的回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println("这个确认消息为" + deliveryTag);
};
// 监听器消息确认失败的回调(这里可以直接重发,或者是先记录下来)
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("这个未确认消息为" + deliveryTag);
};
// 准备一个监听器,监听哪些消息发布成功或者失败(异步通知)
channel.addConfirmListener(ackCallback, nackCallback);
long beginTime = System.currentTimeMillis();
// 发布消息测试
for (int i = 0; i < 1000; i++) {
String msg = String.valueOf(i);
// 这里只负责发送消息,不负责确认,确认消息由监听器负责
channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes());
}
long endTime = System.currentTimeMillis();
System.out.println("发布了1000条批量确认消息耗时为:" + (endTime - beginTime) + "ms");
}

}

测试结果如下。

1
2
发布了1000条批量确认消息耗时为:33ms
(...省略了成功打印)

异步未确认消息的处理

从上述的例子可以知道监听器和发布消息的线程不是同一个,所以这个未确认的消息的处理最好还是先存储起来。

一个简单的使用例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.yww.rabbitmq.publish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.yww.rabbitmq.MqUtil;

import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

/**
* <p>
* 处理异步方式未被确认的消息
* </p>
*
* @author yww
**/
public class publishDeal {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = MqUtil.getChannel();
channel.confirmSelect();
channel.queueDeclare(MqUtil.QUEUE_NAME, false, false, false, null);
// 准备一个队列存储消息
ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
// 删除掉已经确认的消息
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
// 如果是批量的就一起清理
if (multiple) {
// 获取到确认的消息
ConcurrentNavigableMap<Long, String> confirmed = map.headMap(deliveryTag);
confirmed.clear();
} else {
map.remove(deliveryTag);
}
System.out.println("确认的消息为:" + deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
String nackMsg = map.get(deliveryTag);
System.out.println("这个未确认消息的内容为:" + nackMsg);
};
channel.addConfirmListener(ackCallback, nackCallback);
long beginTime = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String msg = String.valueOf(i);
long num = channel.getNextPublishSeqNo();
channel.basicPublish("", MqUtil.QUEUE_NAME, null, msg.getBytes());
// 记录发送的所有消息
map.put(num, msg);
}
long endTime = System.currentTimeMillis();
System.out.println("发布了1000条批量确认消息耗时为:" + (endTime - beginTime) + "ms");
}

}

不公平调度

刚刚学习的时候是可以知道RabbitMQ默认是循环调度,也就是轮询的,这种调度方法,有优点也有缺点,有时候公平并不是一件好事,所以我们可以设置这个消费者的工作内容。

主要是在消费者中设置信道的这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
   /**
* 设置工作量
* 可以设置0到65535之间的数
* @param prefetchCount 设置该线程最多持有的任务数,如果是0则代表无限制
*/
void basicQos(int prefetchCount) throws IOException;

/**
* 这样表示这个工作线程,在处理并确认一条消息的时间段
* 不能向该工作线程发送新的消息,将会把消息分派给不忙的工作线程
*/
channel.basicQos(1);

注意当所有工作线程都很忙,你的队列有可能会被填满。

所以需要你添加更多的工作线程,或者是使用其他的策略。

上述只是简单实用,这个方法的完整形式如下(以下为机翻)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  /**
* 请求特定的“服务质量”设置。
* <p>
* 这些设置限制了服务器在需要确认之前将交付给消费者的数据量。
* 因此,它们提供了一种用户发起的流控制方法。
* <p>
* 注意预取计数必须在0到65535之间(unsigned short in AMQP 0-9-1)。
*
* @param prefetchSize 服务器将传递的最大内容量(以字节为单位),如果无限制,则为0
* @param prefetchCount 服务器将传递的最大消息数,如果无限制,则为0
* @param global 如果设置应应用于整个通道而不是每个使用者,则为true
* @throws java.io.IOException 如果遇到错误
* @see com.rabbitmq.client.AMQP.Basic.Qos
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

Publish/Subscribe

在上述的示例中,使用了工作队列,工作队列会将每个任务交付给一个工作线程。当然我们也可以实现向多个消费者传递一条消息,这种模式被称为Publish/Subscribe,也叫发布/订阅

交换机(Exchanges)

RabbitMQ消息传递模型的核心思想是生产者从不直接向队列中发送任何消息。实际上生产者经常甚至根本不知道消息是否会被传送到哪个队列。

实际情况是生产者只能将消息发送到交换机中,交换机的任务也很简单,一方面他接收来自生产者的消息,另一方面将消息推入队列。

是应该把这些消息放到特定队列,还是说把他们丢到许多队列中,或者说是丢弃,这些就是由交换机的类型来决定了。

交换机有几种可用的类型。

  1. direct(直接)
  2. topic(主题)
  3. headers(标题)
  4. fanout(扇出)

无名交换机

之前使用的就是无名的交换机。

1
channel.basicPublish("", "hello", null, "hello world".getBytes());

第一个参数是交换机的名称,空字符串就表示默认或者说是无名交换机。消息能路由发送到队列中其实就是由routingKey指定的。

临时队列

临时队列用于临时存储消息,一旦断开了消费者的连接,队列就会被自动删除。

创建一个随机名称的临时队列。

1
String queueName = channel.queueDeclare().getQueue();