使用 RabbitMQ 进行消息传递

本指南将引导您完成设置发布和订阅消息的 RabbitMQ AMQP 服务器以及创建 Spring Boot 应用程序以与该 RabbitMQ 服务器交互的过程。

使用 RabbitMQ 进行消息传递

你将建造什么

您将构建一个应用程序,该应用程序使用 Spring AMQP 发布消息​​RabbitTemplate​​并使用​​MessageListenerAdapter​​.

你需要什么

  • 约15分钟
  • 最喜欢的文本编辑器或 IDE
  • JDK 11或更高版本
  • Gradle 4+或Maven 3.2+
  • 您还可以将代码直接导入 IDE:
  • 弹簧工具套件 (STS)
  • IntelliJ IDEA
  • 设置 RabbitMQ 服务器。请参阅设置 RabbitMQ 代理。

如何完成本指南

像大多数 Spring入门指南一样,您可以从头开始并完成每个步骤,也可以绕过您已经熟悉的基本设置步骤。无论哪种方式,您最终都会得到工作代码。

从头开始,请继续设置 RabbitMQ 代理。

跳过基础知识,请执行以下操作:

  • 下载并解压本指南的源代码库,或使用Git克隆它:git clone https://github.com/spring-guides/gs-messaging-rabbitmq.git
  • 光盘进入gs-messaging-rabbitmq/initial
  • 跳转到从 Spring Initializr 开始。

完成后,您可以对照中的代码检查结果​​gs-messaging-rabbitmq/complete​​。

设置 RabbitMQ 代理

在构建消息传递应用程序之前,您需要设置一个服务器来处理接收和发送消息。

RabbitMQ 是一个 AMQP 服务器。该服务器可在https://www.rabbitmq.com/download.html免费获得。您可以手动下载它,或者,如果您使用带有 Homebrew 的 Mac,则可以在终端窗口中运行以下命令:

brew install rabbitmq复制

通过在终端窗口中运行以下命令,解压缩服务器并使用默认设置启动它:

rabbitmq-server复制

您应该会看到类似于以下内容的输出:

            RabbitMQ 3.1.3. Copyright (C) 2007-2013 VMware, Inc.
## ## Licensed under the MPL. See https://www.rabbitmq.com/
## ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
##########
Starting broker... completed with 6 plugins.复制

如果您在本地运行 Docker,您还可以使用Docker Compose快速启动 RabbitMQ 服务器。Github 项目​​docker-compose.yml​​的根目录中有一个。​​complete​​这非常简单,如下面的清单所示:

rabbitmq:
image: rabbitmq:management
ports:
- "5672:5672"
- "15672:15672"复制

使用当前目录中的此文件,您可以运行​​docker-compose up​​以使 RabbitMQ 在容器中运行。

从 Spring Initializr 开始

您可以使用这个预先初始化的项目并单击 Generate 下载 ZIP 文件。此项目配置为适合本教程中的示例。

手动初始化项目:

  1. 导航到https://start.spring.io。该服务提取应用程序所需的所有依赖项,并为您完成大部分设置。
  2. 选择 Gradle 或 Maven 以及您要使用的语言。本指南假定您选择了 Java。
  3. 单击Dependencies并为 RabbitMQ 选择Spring
  4. 单击生成
  5. 下载生成的 ZIP 文件,该文件是根据您的选择配置的 Web 应用程序的存档。
如果您的 IDE 具有 Spring Initializr 集成,您可以从您的 IDE 完成此过程。
你也可以从 Github 上 fork 项目并在你的 IDE 或其他编辑器中打开它。

创建 RabbitMQ 消息接收器

对于任何基于消息传递的应用程序,您都需要创建一个响应已发布消息的接收器。以下清单(来自​​src/main/java/com.example.messagingrabbitmq/Receiver.java​​)显示了如何执行此操作:

package com.example.messagingrabbitmq;

import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

private CountDownLatch latch = new CountDownLatch(1);

public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}

public CountDownLatch getLatch() {
return latch;
}

}复制

这​​Receiver​​是一个 POJO,它定义了接收消息的方法。当您注册它以接收消息时,您可以将其命名为任何您想要的名称。

为了方便起见,这个 POJO 也有一个CountDownLatch. 这让它发出已收到消息的信号。这是您不太可能在生产应用程序中实现的东西。

注册监听器并发送消息

Spring AMQP​​RabbitTemplate​​提供了使用 RabbitMQ 发送和接收消息所需的一切。但是,您需要:

  • 配置消息侦听器容器。
  • 声明队列、交换器以及它们之间的绑定。
  • 配置一个组件发送一些消息来测试监听器。
Spring Boot 会自动创建连接工厂和 RabbitTemplate,从而减少您必须编写的代码量。

您将使用​​RabbitTemplate​​​来发送消息,并且您将​​Receiver​​​使用消息侦听器容器注册一个以接收消息。连接工厂驱动两者,让它们连接到 RabbitMQ 服务器。以下清单(来自​​src/main/java/com.example.messagingrabbitmq/MessagingRabbitApplication.java​​)显示了如何创建应用程序类:

package com.example.messagingrabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class MessagingRabbitmqApplication {

static final String topicExchangeName = "spring-boot-exchange";

static final String queueName = "spring-boot";

@Bean
Queue queue() {
return new Queue(queueName, false);
}

@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}

@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}

public static void main(String[] args) throws InterruptedException {
SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
}

}复制

​@SpringBootApplication​​是一个方便的注释,它添加了以下所有内容:

  • ​@Configuration​​: 将类标记为应用程序上下文的 bean 定义源。
  • ​@EnableAutoConfiguration​​:告诉 Spring Boot 根据类路径设置、其他 bean 和各种属性设置开始添加 bean。例如,如果spring-webmvc位于类路径上,则此注释将应用程序标记为 Web 应用程序并激活关键行为,例如设置DispatcherServlet.
  • ​@ComponentScan​​: 告诉 Spring 在包中查找其他组件、配置和服务com/example,让它找到控制器。

该​​main()​​方法使用 Spring Boot 的​​SpringApplication.run()​​方法来启动应用程序。您是否注意到没有一行 XML?也没有​​web.xml​​文件。这个 Web 应用程序是 100% 纯 Java,您不必处理任何管道或基础设施的配置。

方法中定义的 bean​​listenerAdapter()​​​被注册为容器中的消息监听器(定义在 中​​container()​​​)。它侦听​​spring-boot​​​队列中的消息。因为​​Receiver​​​该类是 POJO,所以需要将其包装在 中​​MessageListenerAdapter​​​,您可以在其中指定它调用​​receiveMessage​​.

JMS 队列和 AMQP 队列具有不同的语义。例如,JMS 仅将排队的消息发送给一个消费者。虽然 AMQP 队列做同样的事情,但 AMQP 生产者并不直接将消息发送到队列。相反,一条消息被发送到一个交换器,该交换器可以发送到单个队列或扇出到多个队列,模拟 JMS 主题的概念。

消息侦听器容器和接收器 bean 是您侦听消息所需的全部内容。要发送消息,您还需要一个 Rabbit 模板。

该​​queue()​​​方法创建一个 AMQP 队列。该​​exchange()​​​方法创建主题交换。该方法将这两者绑定在一起,定义发布到交换​​binding()​​​时发生的行为。​​RabbitTemplate​

Spring AMQP 要求将Queue、TopicExchange和Binding声明为顶级 Spring bean 以便正确设置。

在这种情况下,我们使用主题交换,并且队列与路由键绑定​​foo.bar.#​​​,这意味着以 开头的路由键发送的任何消息都会​​foo.bar.​​被路由到队列。

发送测试消息

在此示例中,测试消息由 a 发送​​CommandLineRunner​​,它还等待接收器中的闩锁并关闭应用程序上下文。以下清单(来自​​src/main/java/com.example.messagingrabbitmq/Runner.java​​)显示了它是如何工作的:

package com.example.messagingrabbitmq;

import java.util.concurrent.TimeUnit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;

public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}

@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}

}复制

​foo.bar.baz​​请注意,模板使用与绑定匹配的路由键将消息路由到交换器。

在测试中,您可以模拟运行器,以便可以单独测试接收器。

运行应用程序

该​​main()​​方法通过创建 Spring 应用程序上下文来启动该过程。这将启动消息侦听器容器,该容器开始侦听消息。有一个​​Runner​​bean,然后会自动运行。它从应用程序上下文中检索并在队列​​RabbitTemplate​​中发送​​Hello from RabbitMQ!​​消息。​​spring-boot​​最后,它关闭 Spring 应用程序上下文,应用程序结束。

构建一个可执行的 JAR

您可以使用 Gradle 或 Maven 从命令行运行应用程序。您还可以构建一个包含所有必要依赖项、类和资源的单个可执行 JAR 文件并运行它。构建可执行 jar 可以在整个开发生命周期、跨不同环境等中轻松地作为应用程序交付、版本化和部署服务。

如果您使用 Gradle,则可以使用​​./gradlew bootRun​​. 或者,您可以使用构建 JAR 文件​​./gradlew build​​,然后运行 ​​JAR 文件,如下所示:

java -jar build/libs/gs-messaging-rabbitmq-0.1.0.jar

如果您使用 Maven,则可以使用​​./mvnw spring-boot:run​​. 或者,您可以使用构建 JAR 文件,​​./mvnw clean package​​然后运行该 JAR 文件,如下所示:

java -jar 目标/gs-messaging-rabbitmq-0.1.0.jar
此处描述的步骤创建了一个可运行的 JAR。您还可以构建经典的 WAR 文件。

您应该看到以下输出:

    Sending message...
Received <Hello from RabbitMQ!>复制

概括

发表评论

相关文章