站义 拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 RabbitMQ 中的通道和连接

RabbitMQ 中的通道和连接

by 白鹭 - 2022-10-19 2027 0 2

一、简介

在这个快速教程中,我们将展示如何使用与两个核心概念相关的RabbitMQ 的API:连接和通道。

2. RabbitMQ 快速回顾

RabbitMQ 是AMQP(高级消息队列协议)的流行实现,被各种规模的公司广泛用于处理他们的消息传递需求。

从应用程序的角度来看,我们通常关注AMQP 的主要实体:虚拟主机、交换和队列。正如我们在之前的文章中已经讨论过这些概念一样,在这里,我们将重点关注两个讨论较少的概念的细节:连接和通道。

3. 连接

客户端与RabbitMQ 代理交互的第一步是建立连接。AMPQ 是一种应用层协议,因此这种连接发生在传输层协议之上。这可以是常规TCP 连接或使用TLS 加密的连接。Connection 的主要作用是提供一个安全的管道,客户端可以通过它与代理进行交互。

这意味着在连接建立期间,客户端必须向服务器提供有效的凭据。服务器可能支持不同的凭证类型,包括常规用户名/密码、SASL、X.509 密码或任何支持的机制。

除了安全之外,连接建立阶段还负责协商AMPQ 协议的某些方面。此时,如果客户端和/或服务器无法就协议版本或调整参数值达成一致,则不会建立连接,并且将关闭传输级连接。

3.1。在Java 应用程序中创建连接

使用Java 时,与RabbitMQ 浏览器通信的标准方法是使用amqp-clientJava 库。我们可以使用添加相应的Maven 依赖项将这个库添加到我们的项目中:

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.16.0</version>
 </dependency>

该工件的最新版本可在Maven Central上获得。

这个库使用工厂模式来创建新的连接。首先,我们创建一个新的ConnectionFactory实例并设置创建连接所需的所有参数。至少,这需要通知RabbitMQ 主机的地址:

ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("amqp.example.com");

一旦我们完成了这些参数的设置,我们使用newConnection()工厂方法来创建一个新的Connection实例:

Connection conn = factory.newConnection();

4. 渠道

简单地说,AMQP 通道是一种允许在单个连接之上多路复用多个逻辑流的机制。这允许在客户端和服务器端更好地使用资源,因为建立连接是一项相对昂贵的操作。

客户端创建一个或多个通道,以便它可以向代理发送命令。这包括与发送和/或接收消息相关的命令。

通道还提供了一些关于协议逻辑的额外保证:

  • 给定通道的命令总是按照它们发送的顺序执行。

  • 给定客户端通过单个连接打开多个通道的场景,实现可以在它们之间分配可用带宽

  • 双方都可以发出流控制命令,通知对等方应该停止发送消息。

通道的一个关键方面是它的生命周期绑定到用于创建它的连接。这意味着如果我们关闭一个连接,所有关联的通道也将被关闭。

4.1。在Java 应用程序中创建通道

使用amqp-client库的Java 应用程序使用前者的createChannel()方法从现有Connection创建新Channel

channel = conn.createChannel();

一旦我们有了Channel,我们就可以向服务器发送命令。例如,要创建队列,我们使用queueDeclare()方法:

channel.queueDeclare("example.queue", true, false, true, null);

这段代码“声明”了一个队列,这是AMQP 表示“如果不存在则创建”的方式。队列名称后面的附加参数定义了它的附加特征:

  • durable性:此声明是持久的,这意味着它将在服务器重新启动后继续存在

  • exclusive:此队列仅限于与声明它的通道关联的连接

  • autodelete:一旦不再使用,服务器将删除队列

  • args:带有用于调整队列行为的参数的可选映射;例如,我们可以使用这些参数来定义消息和死信行为的TTL

现在,要使用默认交换向此队列发布消息,我们使用basicPublish()方法:

channel.basicPublish("", queue, null, payload);

此代码使用队列名称作为其路由键向默认交换器发送消息。

5. 渠道分配策略

让我们考虑一个我们使用消息传递系统的场景:CQRS(命令查询责任分离)应用程序。简而言之,基于CQRS 的应用程序有两个独立的路径:命令和查询。命令可以更改数据但从不返回值。另一方面,查询返回值但从不修改它们。

由于命令路径从不返回任何数据,因此服务可以异步执行它们。在一个典型的实现中,我们有一个HTTP POST 端点,它在内部构建一条消息并将其发送到队列以供以后处理。

现在,对于一个必须处理数十甚至数百个并发请求的服务来说,每次都打开连接和通道并不是一个现实的选择相反,更好的方法是使用通道池。

当然,这会导致下一个问题:我们应该创建单个连接并从中创建通道还是使用多个连接?

5.1。单连接/多通道

在此策略中,我们将使用单个连接并创建一个容量等于服务可以管理的最大并发连接数的通道池。对于传统的每请求线程模型,这应该设置为与请求处理程序线程池相同的大小。

这种策略的缺点是,在较重的负载下,我们必须通过关联的通道一次发送一个命令这一事实意味着我们必须使用同步机制。这反过来又在命令路径中增加了额外的延迟,我们希望将其最小化。

5.2.每线程连接策略

另一种选择是走向另一个极端并使用Connection池,因此永远不会争用通道。对于每个Connection,我们将创建一个Channel,处理程序线程将使用该Channel 向服务器发出命令。

然而,我们从客户端移除同步是有代价的。代理必须为每个连接分配额外的资源,例如套接字描述符和状态信息。此外,服务器必须在客户端之间分配可用吞吐量。

6. 基准策略

为了评估这些候选策略,让我们为每个策略运行一个简单的基准。基准测试包括并行运行多个工作程序,这些工作程序发送一千条每条4 KB 的消息。在构建时,worker 会收到一个Connection,它将创建一个Channel来发送命令。它还接收迭代次数、有效负载大小和用于通知测试运行器它已完成发送消息的CountDownLatch

public class Worker implements Callable<Worker.WorkerResult> {
 // ... field and constructor omitted
 @Override
 public WorkerResult call() throws Exception {
 try {
 long start = System.currentTimeMillis();
 for (int i = 0; i < iterations; i++) {
 channel.basicPublish("", queue, null, payload);
 }
 long elapsed = System.currentTimeMillis() - start;
 channel.queueDelete(queue);
 return new WorkerResult(elapsed);
 } finally {
 counter.countDown();
 }
 }
 public static class WorkerResult {
 public final long elapsed;
 WorkerResult(long elapsed) {
 this.elapsed = elapsed;
 }
 }
 }

除了通过减少锁存器来表明它已经完成了它的工作外,worker 还返回一个WorkerResult实例,其中包含发送所有消息的经过时间。虽然这里我们只有一个long值,但我们可以使用扩展它来返回更多细节。

控制器根据正在评估的策略创建连接工厂和工人。对于单个连接,它会创建Connection实例并将其传递给每个工作人员:

@Override
 public Long call() {
 try {
 Connection connection = factory.newConnection();
 CountDownLatch counter = new CountDownLatch(workerCount);
 List<Worker> workers = new ArrayList<>();
 for( int i = 0 ; i < workerCount ; i++ ) {
 workers.add(new Worker("queue_" + i, connection, iterations, counter,payloadSize));
 }
 ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0,
 TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true));
 long start = System.currentTimeMillis();
 executor.invokeAll(workers);
 if( counter.await(5, TimeUnit.MINUTES)) {
 long elapsed = System.currentTimeMillis() - start;
 return throughput(workerCount,iterations,elapsed);
 }
 else {
 throw new RuntimeException("Timeout waiting workers to complete");
 }
 }
 catch(Exception ex) {
 throw new RuntimeException(ex);
 }
 }

对于多连接策略,我们为每个工作人员创建一个新Connection

for (int i = 0; i < workerCount; i++) {
 Connection conn = factory.newConnection();
 workers.add(new Worker("queue_" + i, conn, iterations, counter, payloadSize));
 }

throughput函数计算的基准度量将是完成所有工作人员所需的总时间除以工作人员数量:

private static long throughput(int workerCount, int iterations, long elapsed) {
 return (iterations * workerCount * 1000) / elapsed;
 }

请注意,我们需要将分子乘以1000,以便我们以秒为单位获得消息吞吐量。

7. 运行基准

这些是我们对这两种策略进行基准测试的结果。对于每个工人数量,我们已经运行了10 次基准测试,并使用平均值作为tar 特定工人/策略的吞吐量度量。按照今天的标准,环境本身是适度的:

  • CPU:双核i7 戴尔笔记本@ 3.0 GHz

  • 总内存:16 GB

  • RabbitMQ:在Docker 上运行的3.10.7(具有4 GB RAM 的docker-machine)

channels-and-connections-in-rabbitmq.png

对于这个特定的环境,我们看到单连接策略有一点优势。对于150 名工人的情况,这种优势似乎有所增加。

8. 选择策略

鉴于基准测试结果,我们无法指出明确的赢家。对于5 到100 之间的工人数量,结果或多或少是相同的。在那之后,与多个连接相关的开销似乎高于在单个连接上处理多个通道。

此外,我们必须考虑到测试工作者只做一件事:将固定消息发送到队列。现实世界的应用程序,例如我们提到的CQRS 应用程序,通常在发送消息之前和/或之后做一些额外的工作。因此,要选择最佳策略,推荐的方法是使用尽可能接近生产环境的配置运行您自己的基准测试

9. 结论

在本文中,我们探讨了RabbitMQ 中的通道和连接的概念,以及我们如何以不同的方式使用它们。


一起学习
标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *