拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 Spring WebFlux中的背压机制

Spring WebFlux中的背压机制

白鹭 - 2021-11-24 660 0 0

1.简介

Spring WebFlux为Web应用程序提供了反应式编程。响应式设计的异步和非阻塞性质提高了性能和内存使用率。 Project Reactor提供了那些功能来有效地管理数据流。

但是,背压是这类应用中的常见问题。在本教程中,我们将解释它是什么以及如何在Spring WebFlux中应用反压机制来缓解这种情况。

2.反应流中的背压

由于响应式编程的非阻塞性质,服务器不会立即发送完整的流。它可以在数据可用时立即并发推送数据。因此,客户端等待较少的时间来接收和处理事件。但是,仍有一些问题需要克服。

软件系统中的背压是使通信量过载的能力。换句话说,信息发布者将无法处理的数据淹没了消费者。

最终,人们也将此术语用作控制和处理它的机制。它是系统控制下游力所采取的保护措施。

2.1 什么是背压?

**在“反应性流”中,**背压还定义了如何调节流元素的传输。换句话说,控制接收者可以消耗多少元素。

让我们用一个例子清楚地描述它是什么:

  • 该系统包含三个服务:发布者,使用者和图形用户界面(GUI)
  • 发布者每秒向消费者发送10000个事件
  • 使用者处理它们并将结果发送到GUI
  • GUI将结果显示给用户
  • 消费者每秒只能处理7500个事件

Spring

**以该速度,消费者无法管理事件(**背压) 。因此,系统将崩溃并且用户将看不到结果。

2.2 使用背压防止系统性故障

这里的建议是应用某种背压策略来防止系统性故障。目的是有效管理收到的额外事件:

  • 控制发送的数据流将是第一个选择。基本上,发布者需要放慢事件的速度。因此,消费者不会过载。不幸的是,这并不总是可能的,我们需要找到其他可用的选项
  • 第二个选择是缓冲额外的数据量。使用这种方法,消费者可以临时存储剩余事件,直到可以处理它们为止。这里的主要缺点是取消绑定缓冲区,导致内存崩溃
  • 丢弃多余的事件,使它们失去踪迹。即使此解决方案也不是理想的选择,使用此技术,系统也不会崩溃

Spring

2.3 控制背压

我们将重点控制发布者发出的事件。基本上,有三种策略可以遵循:

  • 仅在订阅者请求时发送新事件。这是在发射器请求时收集元素的拉动策略
  • 限制要在客户端接收的事件数。限制数量的推送策略,发布者只能一次将最大数量的项目发送给客户
  • 当使用者无法处理更多事件时,取消数据流。在这种情况下,接收方可以在任何给定时间中止传输并稍后再次订阅该流

Spring

3.在Spring WebFlux中处理背压

Spring WebFlux提供了反应流的异步非阻塞流。 Spring WebFlux中负责背压的是Project Reactor 。它在内部使用Flux功能来应用机制来控制由发射器产生的事件。

WebFlux使用TCP流量控制来调节背压(以字节为单位)。但是它不能处理消费者可以接收的逻辑元素。让我们看看幕后发生的交互流:

  • WebFlux框架负责将事件转换为字节,以便通过TCP传输/接收事件
  • 在请求下一个逻辑元素之前,使用者可能会启动并运行长时间的工作
  • 接收方处理事件时,WebFlux会在不进行确认的情况下使字节排队,因为不需要新事件
  • 由于TCP协议的性质,如果有新事件,发布者将继续将其发送到网络

Spring

总之,上图显示,对于消费者和发布者,逻辑元素中的需求可能会有所不同。 Spring WebFlux不能理想地管理作为一个整体系统进行交互的服务之间的背压。它与消费者独立处理,然后与发布者以相同方式处理。但这没有考虑到这两种服务之间的逻辑需求。

因此, Spring WebFlux不会像我们期望的那样处理背压。在下一节中,让我们看看如何在Spring WebFlux中实现反压机制!

4.使用Spring WebFlux实现背压机制

我们将使用Flux实现来处理对接收到的事件的控制。因此,我们将在读取和写入侧为请求和响应主体提供反压支持。然后,生产者将放慢速度或停下来,直到消费者的生产能力释放出来。让我们来看看如何做!

4.1 依赖关系

为了实现示例,我们只需将Spring WebFlux启动器Reactor测试依赖项添加到我们的pom.xml

<dependency>

 <groupId>org.springframework.boot</groupId>

 <artifactId>spring-boot-starter-webflux</artifactId>

 </dependency>



 <dependency>

 <groupId>io.projectreactor</groupId>

 <artifactId>reactor-test</artifactId>

 <scope>test</scope>

 </dependency>

4.2 请求新事件

第一种选择是****让消费者控制它可以处理的事件。因此,发布者一直等到接收者请求新事件。总而言之,客户端订阅Flux ,然后根据其需求处理事件:

@Test

 public void whenRequestingChunks10_thenMessagesAreReceived() {

 Flux request = Flux.range(1, 50);



 request.subscribe(

 System.out::println,

 err -> err.printStackTrace(),

 () -> System.out.println("All 50 items have been successfully processed!!!"),

 subscription -> {

 for (int i = 0; i < 5; i++) {

 System.out.println("Requesting the next 10 elements!!!");

 subscription.request(10);

 }

 }

 );



 StepVerifier.create(request)

 .expectSubscription()

 .thenRequest(10)

 .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 .thenRequest(10)

 .expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

 .thenRequest(10)

 .expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)

 .thenRequest(10)

 .expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)

 .thenRequest(10)

 .expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)

 .verifyComplete();

通过这种方法,发射器永远不会淹没接收器。换句话说,客户端处于控制之下以处理其所需的事件。

StepVerifier测试与反压有关的生产者行为。 thenRequest(n)时才能期待接下来的n个项目。

4.3 限制

第二个选项是使用Project Reactor中limitRange()它允许设置要预取的项目数。一个有趣的功能是,即使订户请求处理更多事件,该限制也适用。发射器将事件分成多个块,避免消耗超过每个请求的限制:

@Test

 public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {

 Flux<Integer> limit = Flux.range(1, 25);



 limit.limitRate(10);

 limit.subscribe(

 value -> System.out.println(value),

 err -> err.printStackTrace(),

 () -> System.out.println("Finished!!"),

 subscription -> subscription.request(15)

 );



 StepVerifier.create(limit)

 .expectSubscription()

 .thenRequest(15)

 .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

 .expectNext(11, 12, 13, 14, 15)

 .thenRequest(10)

 .expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)

 .verifyComplete();

 }

4.4 取消

最终,消费者可以随时取消要接收的事件。对于此示例,我们将使用另一种方法。 Project Reactor允许实现我们自己的Subscriber或扩展BaseSubscriber 。因此,让我们看一下接收者如何在任何时候都可以重写接收到的新类来中止新事件的接收:

@Test

 public void whenCancel_thenSubscriptionFinished() {

 Flux<Integer> cancel = Flux.range(1, 10).log();



 cancel.subscribe(new BaseSubscriber<Integer>() {

 @Override

 protected void hookOnNext(Integer value) {

 request(3);

 System.out.println(value);

 cancel();

 }

 });



 StepVerifier.create(cancel)

 .expectNext(1, 2, 3)

 .thenCancel()

 .verify();

 }

5.结论

在本教程中,我们展示了反应式编程中的反压以及如何避免这种反压。 Spring WebFlux通过Project Reactor支持背压。因此,当发布者通过太多事件压倒消费者时,它可以提供可用性,鲁棒性和稳定性。总而言之,它可以防止由于需求量大而导致的系统性故障。

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *