# 安装
# Windows 下安装
环境
在安装 RabbitMQ 之前,需要配置 Erlang 环境。
Erlang 下载:
https://www.erlang.org/downloads (慢)
https://packages.erlang-solutions.com/erlang/ (快)
Erlang 安装:
双击执行安装文件。
选择安装位置(基本上都是下一步)。
安装完成。
下载地址
https://www.rabbitmq.com/download.html (下载选择页)
https://www.rabbitmq.com/changelog.html (快捷下载页)
安装
双击执行安装文件。
选择安装位置(基本上都是下一步)。
安装完成。
安装完成后,
开始
中可以看到RabbitMQ Service -start
、RabbitMQ Service -stop
、RabbitMQ Service -remove
三个快捷启动方式(如果没有,可以通过win + S
搜索RabbitMQ Service
)。配置
启动插件
切换到
[rabbitmq]\rabbitmq_server-3.8.1\sbin>
目录。执行:
rabbitmq-plugins enable rabbitmq_management
(执行一次即可)。
启动
执行
RabbitMQ Service -start
(需要启动服务时执行)。访问
浏览器访问:http://localhost:15672
默认配置
用户名:guest
密码:guset
后台端口:15672
TCP 端口:5672
# AMQP 知识
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议。
AMQP 是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品,不同的开发语言等条件的限制。目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。主要实现有 RabbitMQ。
# 概念
生产者,即消息的创建者,它将消息发送到 RabbitMQ。
消费者,连接到 RabbitMQ,订阅到队列上,对消息进行消费,其订阅类型可分为:持续订阅(basicConsumer)和单条订阅(basicGet)。
消息,它包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且 RabbitMQ 用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁。
信道,信道是生产消费者与 RabbitMQ 通信的渠道,生产者发布或是消费者订阅一个队列都是通过信道来通信的。信道是建立在 TCP 连接上的虚拟连接。RabbitMQ 在一条 TCP 上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个线程对应一个信道,信道在 rabbit 都有唯一的 ID ,保证了信道私有性,对应上唯一的线程使用。
交换器,在 RabbitMQ 中,主要有四种交换器类型:Direct,Fanout,Topic,Head。
队列,
绑定,
路由键,
队列通过路由键(routing key,某种确定的规则)绑定到交换器,生产者将消息发布到交换器,交换器根据绑定的路由键将消息路由到特定队列,然后由订阅这个队列的消费者进行接收。
# 消息的确认
消费者收到的每一条消息都必须进行确认(自动确认和自行确认)。
消费者在声明队列时,可以指定 autoAck 参数,当 autoAck=false 时,RabbitMQ 会等待消费者显式发回 ack 信号后才从内存 (和磁盘,如果是持久化消息的话) 中移去消息。否则,在 RabbitMQ 队列中消息被消费后会立即删除它。
采用消息确认机制后,只要令 autoAck=false,消费者就有足够的时间处理消息 (任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直持有消息直到消费者显式调用 basicAck 为止。
当 autoAck=false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者 ack 信号的消息。如果服务器端一直没有收到消费者的 ack 信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ 不会为未 ack 的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
# 交换器类型
Direct
direct 为默认的交换器类型,也非常的简单,如果路由键匹配的话,消息就投递到相应的队列。
使用代码:
channel.basicPublish("", QueueName, null, message)
推送 direct 交换器消息到对于的队列,空字符为默认的 direct 交换器,用队列名称当做路由键。Fanout (发布 / 订阅模式)
fanout 有别于 direct 交换器,fanout 是一种发布 / 订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
和 direct 交换器不同,我们在发送消息的时候新增
channel.exchangeDeclare(ExchangeName, "fanout")
,这行代码声明 fanout 交换器。Topic (匹配订阅模式)
topic 交换器运行和 fanout 类似,但是可以更灵活的匹配自己想要订阅的信息,它使用路由键进行消息(规则)匹配。
通过使用 “*” 和 “#” ,使来自不同源头的消息到达同一个队列, ”.” 将路由键分为了几个标识符, “*” 匹配 1 个, “#” 匹配一个或多个。
例如:
channel.queueBind(queueName, ExchangeName, "#.error")
。headers
几乎和 direct 一样,不实用,可以忽略。
# 虚拟主机
虚拟消息服务器,vhost,本质上就是一个迷你版的消息队列服务器,有自己的队列、交换器和绑定,最重要的是它有自己的权限机制。Vhost 提供了逻辑上的分离,可以将众多客户端进行区分,又可以避免队列和交换器的命名冲突。Vhost 必须在连接时指定,rabbitmq 包含缺省 vhost:“/”,通过缺省用户和口令 guest 进行访问。
虚拟主机保证了用户可以在多个不同的 application 中使用 RabbitMQ。
在 RabbitMQ 中创建用户,必须要被指派给至少一个 vhost,并且只能访问被指派内的队列、交换器和绑定。Vhost 必须通过 rabbitmq 的管理控制工具创建。
# 原生 RabbitMQ
# 准备
启动 RabbitMQ 消息服务。
创建 maven 项目及添加基础依赖。
# 依赖
<dependency> | |
<groupId>com.rabbitmq</groupId> | |
<artifactId>amqp-client</artifactId> | |
<!--version--> | |
</dependency> |
# 代码
生产者端:
package com.xfc.exchange.direct; | |
import com.rabbitmq.client.BuiltinExchangeType; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ConnectionFactory; | |
import java.util.Arrays; | |
import java.util.List; | |
public class DirectProducer { | |
public final static String EXCHANGE_NAME = "direct_exchange"; | |
public static void main(String[] args) throws Exception { | |
// 1. 创建连接工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory();// 默认连接到 localhost | |
// 2. 创建连接 | |
Connection connection = connectionFactory.newConnection(); | |
// 3. 创建信道 | |
Channel channel = connection.createChannel(); | |
// 4. 设置交换器 | |
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); | |
// 自定义路由键 | |
List<String> routeKey = Arrays.asList("key1", "key2", "key3"); | |
// 循环发送消息 | |
for (int i = 0; i < routeKey.size(); i++) { | |
String key = routeKey.get(i % routeKey.size()); | |
String msg = "Hello, RabbitMq " + (i + 1); | |
// 5. 发送消息(交换器,路由键,属性参数,消息字节) | |
channel.basicPublish(EXCHANGE_NAME, key, null, msg.getBytes()); | |
System.out.println("Sent " + key + ":" + msg); | |
} | |
channel.close(); | |
connection.close(); | |
} | |
} |
消费者端:
package com.xfc.exchange.direct; | |
import com.rabbitmq.client.*; | |
import java.io.IOException; | |
public class DirectConsumer { | |
public final static String QUEUE_NAME = "queue_name"; | |
public static void main(String[] args) throws Exception { | |
// 1. 创建连接工厂 | |
ConnectionFactory connectionFactory = new ConnectionFactory();// 默认连接到 localhost | |
// 2. 创建连接 | |
Connection connection = connectionFactory.newConnection(); | |
// 3. 创建信道 | |
Channel channel = connection.createChannel(); | |
// 4. 设置交换器 | |
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); | |
// 4. 声明队列 | |
channel.queueDeclare(QUEUE_NAME, false, false, false, null); | |
// 5. 交换器和队列绑定:将队列和交换器通过路由键进行绑定 | |
channel.queueBind(QUEUE_NAME, DirectProducer.EXCHANGE_NAME, "key2"); | |
System.out.println("waiting for message........"); | |
// 6. 声明消费者 | |
final Consumer consumer = new DefaultConsumer(channel) { | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | |
String message = new String(body, "UTF-8"); | |
System.out.println("Received[" + envelope.getRoutingKey() + "] " + message); | |
} | |
}; | |
/* 消费者正式开始在指定队列上消费消息 */ | |
channel.basicConsume(QUEUE_NAME, true, consumer); | |
} | |
} |
注:其他交换器类型的使用方式与 direct 基本一致,可以参照 AMQP 知识 >> 交换机类型 相关内容进行尝试。
# 运行
运行消费者端代码。
运行生产者端代码。
观察控制台打印语句及 RabbitMQ 管理界面。
生产者控制台:
消费者控制台:
RabbitMQ 管理界面:
# 扩展
# 消息发布时的权衡
RabbitMQ 在设计的时候,特意让生产者和消费者 “脱钩”,也就是消息的发布和消息的消费之间是解耦的。
在 RabbitMQ 中,有不同的投递机制(生产者),但是每一种机制都对性能有一定的影响。一般来讲速度快的可靠性低,可靠性好的性能差,具体怎么使用需要根据你的应用程序来定,所以说没有最好的方式,只有最合适的方式。只有把你的项目和技术相结合,才能找到适合你的平衡。
# Spring 整合 RabbitMQ
# 准备
启动 RabbitMQ 消息服务。
创建 maven 项目。
基础依赖。
<!-- RabbitMQ 核心依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.12.RELEASE</version>
</dependency>
<!-- Spring 相关依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- 日志相关依赖 -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
# 配置
配置 spring-rabbitmq.xml :