拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 SEDA 与Spring 集成和Apache Camel

SEDA 与Spring 集成和Apache Camel

白鹭 - 2022-10-03 2157 0 2

一、简介

SEDA,即Staged Event-Driven Architecture,是Matt Welsh 在其博士论文中提出的一种建筑风格。论文它的主要优点是可扩展性、对高并发流量的支持和可维护性。

在本教程中,我们将使用SEDA 来计算句子中的唯一词,并使用两个单独的实现:Spring Integration 和Apache Camel。

2. 色达

SEDA 解决了特定于在线服务的几个非功能性要求

  1. 高并发:架构必须支持尽可能多的并发请求。

  2. 动态内容:软件系统必须经常支持复杂的业务用例,需要许多步骤来处理用户请求和生成响应。

  3. 负载鲁棒性:在线服务的用户流量是不可预测的,架构需要优雅地处理流量的变化。

为了满足这些要求,SEDA 将复杂的服务分解为事件驱动的阶段这些阶段与队列间接连接,因此可以彼此完全解耦。此外,每个阶段都有一个扩展机制来应对其传入的负载:

seda-with-spring-integration-and-apache-camel.png

上图来自Matt Welsh 的论文,描述了使用SEDA 实现的Web 服务器的整体结构。每个矩形代表传入HTTP 请求的单个处理阶段。这些阶段可以独立地从其传入队列中消费任务,进行一些处理或I/O 工作,然后将消息传递给下一个队列。

2.1。成分

为了更好地理解SEDA 的组件,让我们看一下Matt Welsh 论文中的这张图表如何显示单个阶段的内部工作原理:

seda-with-spring-integration-and-apache-camel-1.png

正如我们所见,每个SEDA 阶段都有以下组件:

  • 事件:**事件是包含阶段执行其处理所需的任何数据的数据结构**。例如,对于HTTP Web 服务器,事件可能包含用户数据——例如主体、标头和请求参数——以及基础设施数据,例如用户的IP、请求时间戳等。

  • 事件队列:这包含舞台的传入事件。

  • 事件处理程序:事件处理程序是舞台的程序逻辑。这可能是一个简单的路由阶段,将数据从其事件队列转发到其他相关事件队列,或者是一个更复杂的阶段,以某种方式处理数据。事件处理程序可以单独或批量读取事件——后者在批处理有性能优势时很有帮助,例如使用一个查询更新多个数据库记录。

  • 传出事件:根据业务用例和流的整体结构,每个阶段都可以将新事件发送到零个或多个事件队列。创建和发送传出消息是在事件处理程序方法中完成的。

  • 线程池:线程是一种众所周知的并发机制。在SEDA 中,线程是针对每个阶段进行本地化和定制的。换句话说,每个阶段都维护一个线程池。因此,与每个请求一个线程的模型不同,每个用户请求都由SEDA 下的多个线程处理。该模型允许我们根据其复杂性独立调整每个阶段。

  • 控制器:SEDA 控制器是管理资源消耗的任何机制,例如线程池大小、事件队列大小、调度等。控制器负责SEDA 的弹性行为一个简单的控制器可能会管理每个线程池中的活动线程数。更复杂的控制器可以实现复杂的性能调整算法,在运行时监控整个应用程序并调整各种参数。此外,控制器将性能调整逻辑与业务逻辑分离。这种关注点的分离使得我们的代码更容易维护。

通过将所有这些组件组合在一起,SEDA 提供了一个强大的解决方案来处理高波动的流量负载。

3. 样本问题

在接下来的部分中,我们将创建两个使用SEDA 解决相同问题的实现。

我们的示例问题很简单:计算每个单词在给定字符串中出现不区分大小写的次数

让我们将单词定义为不带空格的字符序列,我们将忽略标点符号等其他复杂情况。我们的输出将是一个映射,其中包含作为键的单词和作为值的计数。例如,给定输入“ My name is Hesam”,输出将是:

{
 "my": 1,
 "name": 1,
 "is": 1,
 "hesam": 1
 }

3.1。使问题适应SEDA

让我们从SEDA 阶段的角度来看我们的问题。由于可扩展性是SEDA 的核心目标,通常最好设计专注于特定操作的小阶段,尤其是在我们有I/O 密集型任务的情况下此外,拥有小阶段有助于我们更好地调整每个阶段的规模。

为了解决我们的字数问题,我们可以通过以下阶段实现解决方案:

seda-with-spring-integration-and-apache-camel-2.png

现在我们已经有了舞台设计,让我们在接下来的部分中使用两种不同的企业集成技术来实现它。在此表中,我们可以预览SEDA 在我们的实现中将如何显示:

SEDA 组件弹簧集成阿帕奇骆驼



事件

org.springframework.messaging.Messageorg.apache.camel.Exchange|
|

事件队列

|

org.springframework.integration.channel

| 由URI 字符串定义的端点|
|

事件处理程序

| 功能接口实例| Camel 处理器、Camel 实用程序类和Functions |
|

线程池

|TaskExecutor的Spring 抽象| SEDA 端点中的开箱即用支持|

4. 使用Spring Integration 的解决方案

对于我们的第一个实现,我们将使用Spring Integration。Spring Integration 基于Spring 模型构建以支持流行的企业集成模式

Spring Integration 具有三个主要组件:

  1. 消息是包含标头和正文的数据结构。

  2. 通道将消息从一个端点传送到另一个端点。Spring Integration 中有两种渠道:

    • 点对点:只有一个端点可以消费该通道中的消息。

    • 发布订阅:多个端点可以消费此通道中的消息。

  3. 端点将消息路由到执行某些业务逻辑的应用程序组件。Spring Integration 中有多种端点,例如转换器、路由器、服务激活器和过滤器。

让我们看一下我们的Spring Integration 解决方案的概述:

seda-with-spring-integration-and-apache-camel-3.png

4.1。依赖项

让我们开始为Spring Integration、 Spring Boot TestSpring Integration Test添加依赖项:

<dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-integration</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.springframework.integration</groupId>
 <artifactId>spring-integration-test</artifactId>
 <scope>test</scope>
 </dependency>
 </dependencies>

4.2.消息网关

消息传递网关是一种代理,它隐藏了向集成流发送消息的复杂性。让我们为我们的Spring 集成流程设置一个:

@MessagingGateway
 public interface IncomingGateway {
 @Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
 public Map<String, Long> countWords(String input);
 }

稍后,我们将能够使用此网关方法来测试我们的整个流程:

incomingGateway.countWords("My name is Hesam");

Spring 将“My name is Hesam”输入包装在org.springframework.messaging.Message的一个实例中,并将其传递给receiveTextChannel,然后从returnResponseChannel给我们最终结果。

4.3.消息渠道

在本节中,我们将了解如何设置网关的初始消息通道receiveTextChannel

在SEDA 下,通道需要通过关联的线程池进行扩展,所以让我们从创建线程池开始:

@Bean("receiveTextChannelThreadPool")
 TaskExecutor receiveTextChannelThreadPool() {
 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 executor.setCorePoolSize(1);
 executor.setMaxPoolSize(5);
 executor.setThreadNamePrefix("receive-text-channel-thread-pool");
 executor.initialize();
 return executor;
 }

接下来,我们将使用我们的线程池来创建我们的通道:

@Bean(name = "receiveTextChannel")
 MessageChannel getReceiveTextChannel() {
 return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
 .get();
 }

MessageChannels是一个Spring Integration 类,可以帮助我们创建各种类型的通道。在这里,我们使用executor()方法创建一个ExecutorChannel,这是一个由线程池管理的通道。

我们的其他通道和线程池的设置方式与上述相同。

4.4.接收文本阶段

设置好我们的频道后,我们就可以开始实施我们的阶段了。让我们创建我们的初始阶段:

@Bean
 IntegrationFlow receiveText() {
 return IntegrationFlows.from(receiveTextChannel)
 .channel(splitWordsChannel)
 .get();
 }

IntegrationFlows是一个流畅的Spring Integration API,用于创建IntegrationFlow对象,代表我们流程的各个阶段。 from()方法配置我们舞台的传入通道,而channel()配置传出通道。

在此示例中,我们的阶段将网关的输入消息传递给splitWordsChannel这个阶段在生产应用程序中可能更复杂且I/O 密集,从持久队列或通过网络读取消息。

4.5.分词阶段

我们的下一个阶段有一个单一的责任:将我们的输入String拆分为句子中单个单词的String数组:

@Bean
 IntegrationFlow splitWords() {
 return IntegrationFlows.from(splitWordsChannel)
 .transform(splitWordsFunction)
 .channel(toLowerCaseChannel)
 .get();
 }

除了我们之前使用的from()channel()调用之外,这里我们还使用了transform(),它将提供的Function应用于我们的输入消息。我们的splitWordsFunction实现非常简单:

final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");

4.6.转换为小写阶段

此阶段将String数组中的每个单词转换为小写:

@Bean
 IntegrationFlow toLowerCase() {
 return IntegrationFlows.from(toLowerCaseChannel)
 .split()
 .transform(toLowerCase)
 .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
 .outputProcessor(buildMessageWithListPayload))
 .channel(countWordsChannel)
 .get();
 }

我们在这里使用的第一个新的IntegrationFlows方法是split()split()方法使用拆分器模式将输入消息的每个元素作为单独的消息发送到toLowerCase

我们看到的下一个新方法是aggregate(),它实现了聚合器模式。**聚合器模式有两个基本参数:**

  1. 发布策略,它决定了何时将消息组合成一条消息

  2. 处理器,它决定如何将消息组合成单个消息

我们的发布策略函数使用listSizeReached,它告诉聚合器在收集到输入数组的所有元素后开始聚合:

final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();

buildMessageWithListPayload处理器然后将我们的小写结果打包到List

final MessageGroupProcessor buildMessageWithListPayload = messageGroup ->
 MessageBuilder.withPayload(messageGroup.streamMessages()
 .map(Message::getPayload)
 .toList())
 .build();

4.7.数词阶段

我们的最后阶段将我们的单词计数打包到一个Map中,其中键是来自原始输入的单词,值是每个单词的出现次数:

@Bean
 IntegrationFlow countWords() {
 return IntegrationFlows.from(countWordsChannel)
 .transform(convertArrayListToCountMap)
 .channel(returnResponseChannel)
 .get();
 }

在这里,我们使用convertArrayListToCountMap函数将计数打包为Map

final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
 .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

4.8.测试我们的流程

我们可以将初始消息传递给我们的网关方法来测试我们的流程:

public class SpringIntegrationSedaIntegrationTest {
 @Autowired
 TestGateway testGateway;
 @Test
 void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
 Map<String, Long> actual = testGateway.countWords("My name is Hesam");
 Map<String, Long> expected = new HashMap<>();
 expected.put("my", 1L);
 expected.put("name", 1L);
 expected.put("is", 1L);
 expected.put("hesam", 1L);
 assertEquals(expected, actual);
 }
 }

5. Apache Camel 的解决方案

Apache Camel 是一个流行且功能强大的开源集成框架。它基于四个主要概念:

  1. Camel 上下文:Camel 运行时将不同的部分粘在一起。

  2. 路由:路由决定了一条消息应该如何被处理以及它接下来应该去哪里。

  3. 处理器:这些是各种企业集成模式的即用型实现。

  4. 组件:组件是通过JMS、HTTP、文件IO 等集成外部系统的扩展点。

Apache Camel 有一个专门用于SEDA 功能的组件,使构建SEDA 应用程序变得简单。

5.1。依赖项

让我们为Apache CamelApache Camel Test添加所需的Maven 依赖项:

<dependencies>
 <dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-core</artifactId>
 <version>3.18.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-test-junit5</artifactId>
 <version>3.18.0</version>
 <scope>test</scope>
 </dependency>
 </dependencies>

5.2.定义SEDA 端点

首先,我们需要定义端点。端点是使用URI 字符串定义的组件。SEDA 端点必须以“ seda:[endpointName]”开头:

static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
 static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
 static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
 static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
 static final String returnResponse = "mock:result";

正如我们所看到的,每个端点都配置为有五个并发消费者。这相当于每个端点最多有5 个线程。

为了测试,returnResponse是一个模拟端点。

5.3.扩展RouteBuilder

接下来,让我们定义一个扩展Apache Camel 的RouteBuilder并覆盖其configure() 方法的类。此类连接所有SEDA 端点:

public class WordCountRoute extends RouteBuilder {
 @Override
 public void configure() throws Exception {
 }
 }

在接下来的部分中,我们将使用我们从RouteBuilder继承的便捷方法向该configure()方法添加行来定义我们的阶段。

5.4.接收文本阶段

此阶段从SEDA 端点接收消息并将它们路由到下一阶段,无需任何处理:

from(receiveTextUri).to(splitWordsUri);

在这里,我们使用继承from()方法指定传入端点,并to()设置传出端点。

5.5.分词阶段

让我们实现将输入文本拆分为单个单词的阶段:

from(splitWordsUri)
 .transform(ExpressionBuilder.bodyExpression(s -> s.toString().split(" ")))
 .to(toLowerCaseUri);

transform()方法将我们的Function应用于我们的输入消息,将其拆分为一个数组。

5.6.转换为小写阶段

我们的下一个任务是将输入中的每个单词转换为小写。因为我们需要将转换函数应用于消息中的每个String与数组本身,所以我们将使用split()方法来拆分输入消息以进行处理,然后将结果聚合回ArrayList

from(toLowerCaseUri)
 .split(body(), new ArrayListAggregationStrategy())
 .transform(ExpressionBuilder.bodyExpression(body -> body.toString().toLowerCase()))
 .end()
 .to(countWordsUri);

end()方法标志着拆分过程的结束。一旦列表中的每个项目都被转换,Apache Camel 就会应用我们指定的聚合策略ArrayListAggregationStrategy

ArrayListAggregationStrategy扩展了Apache Camel 的AbstractListAggregationStrategy来定义应该聚合消息的哪一部分。在这种情况下,消息正文是新小写的单词:

class ArrayListAggregationStrategy extends AbstractListAggregationStrategy<String> {
 @Override
 public String getValue(Exchange exchange) {
 return exchange.getIn()
 .getBody(String.class);
 }
 }

5.7.数词阶段

最后一个阶段使用转换器将数组转换为单词到单词计数的映射:

from(countWordsUri)
 .transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
 .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
 .to(returnResponse);

5.8.测试我们的路线

让我们测试一下我们的路线:

public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
 @Test
 public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap()
 throws InterruptedException {
 Map<String, Long> expected = new HashMap<>();
 expected.put("my", 1L);
 expected.put("name", 1L);
 expected.put("is", 1L);
 expected.put("hesam", 1L);
 getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
 template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");
 assertMockEndpointsSatisfied();
 }
 @Override
 protected RoutesBuilder createRouteBuilder() throws Exception {
 RoutesBuilder wordCountRoute = new WordCountRoute();
 return wordCountRoute;
 }
 }

CamelTestSupport超类提供了许多字段和方法来帮助我们测试我们的流程。我们使用getMockEndpoint()expectedBodiesReceived()来设置我们的预期结果,并template.sendBody()将测试数据提交到我们的模拟端点。最后,我们使用assertMockEndpointsSatisfied()来测试我们的期望是否与实际结果相符。

六,结论

在本文中,我们了解了SEDA 及其组件和用例。之后,我们探索了如何使用SEDA 解决相同的问题,首先使用Spring Integration,然后使用Apache Camel。


标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *