拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 持久化查询模型

持久化查询模型

白鹭 - 2022-12-16 2181 0 2

一、概述

Axon 框架帮助我们构建事件驱动的微服务系统。在Axon 框架指南中,我们通过一个简单的Axon Spring Boot 应用程序了解了Axon,该应用程序包括构建一个示例Order模型供我们更新和查询。在Axon 框架中调度查询时,我们添加了所有支持的查询。

本文将着眼于持久化Axon 框架的查询模型我们将介绍使用MongoDB 存储投影,以及测试的挑战以及如何使流与查询模型保持同步。

2.持久性考虑

为了创建一个使用数据库来持久化查询模型的处理程序,我们实现了OrdersEventHandler接口。在生产环境中,我们不想每次都从头构建查询模型通过Axon框架,我们可以选择如何持久化模型,选择什么取决于涉及的数据。如果我们想要自由文本搜索,我们可能想要使用Elasticsearch。当我们有非结构化数据时,我们可能想使用MongoDB。当实体之间有很多关系时,我们可能希望使用像Neo4J 这样的图数据库。

2.1.代币商店

在通过事件构建查询模型时,Axon 使用TokenStore进行跟踪。理想情况下,令牌存储保存在与查询模型相同的数据库中以确保一致性。使用持久性令牌存储还将确保我们可以运行多个实例,其中每个实例只需要处理部分事件。拆分为多个实例适用于segments,其中一个实例可以声明所有或部分段以进行处理。如果我们使用JPA 或JDBC 进行持久化,请使用[JpaTokenStore](https://apidocs.axoniq.io/4.6/org/axonframework/eventhandling/tokenstore/jpa/JpaTokenStore.html)JdbcTokenStore两种令牌存储实现都可以在Axon 框架中使用,无需扩展。

2.2.构建查询模型

在启动时,流式事件处理器将开始从事件存储中读取事件。使用持久性TokenStore,处理器从它之前离开的地方开始。否则,默认情况下,它将从头开始。对于每个事件,处理器将调用事件处理程序注释的方法。

让我们进一步构建订单应用程序,并允许以多种方式创建和更新订单。通过更新内存模型在InMemoryOrdersEventHandler中处理ProductAddedEvent

@EventHandler
 public void on(ProductAddedEvent event) {
 orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
 order.addProduct(event.getProductId());
 return order;
 });
 }

这里内存映射中的顺序将使用addProduct函数更新。我们可以将数据存储在数据库中,而不是内存模型。

3. Mongo 扩展

让我们使用MongoDB 来持久化我们的查询模型。我们使用Axon 框架mongo 扩展来持久化Mongo 中的令牌存储。由于我们已经添加了[axon-bom](https://search.maven.org/search?q=a:axon-bom),因此在将扩展添加到我们的pom.xml时不需要指定版本:

<dependency>
 <groupId>org.axonframework.extensions.mongo</groupId>
 <artifactId>axon-mongo</artifactId>
 </dependency>

3.1.代币商店

有了适当的依赖关系,我们可以配置Axon 以使用MongoTokenStore

@Bean
 public TokenStore getTokenStore(MongoClient client, Serializer serializer){
 return MongoTokenStore.builder()
 .mongoTemplate(
 DefaultMongoTemplate.builder()
 .mongoDatabase(client)
 .build()
 )
 .serializer(serializer)
 .build();
 }

3.2.事件句柄类

名为mongo的Spring Profile 可以在事件处理程序的实现之间切换。mongo配置文件处于活动状态时,将使用MongoOrdersEventHandler以及令牌存储配置。这一起构成了事件处理程序类

@Service
 @ProcessingGroup("orders")
 @Profile("mongo")
 public class MongoOrdersEventHandler implements OrdersEventHandler {
 // all methods regarding updating an querying the projection
 }

同时,我们在InMemoryOrdersEventHandler中添加了@Profile("!mongo"),因此两者不会同时处于活动状态。Spring 配置文件是有条件地启用组件的绝佳方式。

我们将在构造函数中使用依赖注入来获取MongoClientQueryUpdateEmitter.我们使用MongoClient创建MongoCollection 和索引。我们注入QueryUpdateEmitter以启用订阅查询:

public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) {
 orders = client
 .getDatabase(AXON_FRAMEWORK_DATABASE_NAME)
 .getCollection(ORDER_COLLECTION_NAME);
 orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME),
 new IndexOptions().unique(true));
 this.emitter = emitter;
 }

请注意,我们将订单ID 设置为唯一。这样,我们就可以确定不会有两个文档具有相同的订单ID。

MongoOrdersEventHandler使用ordersmongo 集合来处理查询。我们需要使用documentToOrder()方法将Mongo 文档映射到订单:

@QueryHandler
 public List<Order> handle(FindAllOrderedProductsQuery query) {
 List<Order> orderList = new ArrayList<>();
 orders
 .find()
 .forEach(d -> orderList.add(documentToOrder(d)));
 return orderList;
 }

3.3.复杂查询

为了能够处理TotalProductsShippedQuery,我们添加了一个**shippedProductFilter来过滤出已发货的订单并拥有产品:**

private Bson shippedProductFilter(String productId){
 return and(
 eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()),
 exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId))
 );
 }

然后在提取和添加产品计数的查询处理程序中使用此过滤器:

@QueryHandler
 public Integer handle(TotalProductsShippedQuery query) {
 AtomicInteger result = new AtomicInteger();
 orders
 .find(shippedProductFilter(query.getProductId()))
 .map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class))
 .map(d -> d.getInteger(query.getProductId(), 0))
 .forEach(result::addAndGet);
 return result.get();
 }

此查询将获取所有已发货的产品并检索这些文档中的所有产品。然后它将计算查询的特定产品并返回总数。

4. 测试持久化查询模型

使用持久模型进行测试会使事情变得更加困难,因为我们**希望每个测试都有一个隔离的环境。
**

4.1.单元测试

对于MongoOrdersEventHandler,我们需要确保删除集合,这样我们就不会保留之前测试的状态我们通过实现getHandler()方法来做到这一点:

@Override
 protected OrdersEventHandler getHandler() {
 mongoClient.getDatabase("axonframework").drop();
 return new MongoOrdersEventHandler(mongoClient, emitter);
 }

使用@BeforeEach注释方法,我们可以确保每个测试都重新开始。在这种情况下,我们使用嵌入式Mongo 进行测试。另一种选择是使用测试容器。在这方面,测试查询模型与其他需要数据库的应用程序测试没有什么不同。

4.2.集成测试

对于集成测试,我们使用类似的方法。但是,由于集成测试使用OrdersEventHandler接口,我们依赖于已实现的reset()方法

reset()方法的实现是:

@Override
 public void reset(List<Order> orderList) {
 orders.deleteMany(new Document());
 orderList.forEach(o -> orders.insertOne(orderToDocument(o)));
 }

reset()方法确保只有列表中的订单是集合的一部分。该方法在OrderQueryServiceIntegrationTest中的每个测试之前执行:

@BeforeEach
 void setUp() {
 orderId = UUID.randomUUID().toString();
 Order order = new Order(orderId);
 handler.reset(Collections.singletonList(order));
 }

至于测试查询,我们至少需要一个订单。通过已经存储一个订单,它使测试本身更容易。

5.结论

在本文中,我们展示了如何持久化查询模型。我们学习了如何使用MongoDB 查询和测试模型。


标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *