一、概述
Axon 框架帮助我们构建事件驱动的微服务系统。在Axon 框架指南中,我们通过一个简单的Axon Spring Boot 应用程序了解了Axon,该应用程序包括构建一个示例Order
模型供我们更新和查询。在Axon 框架中调度查询时,我们添加了所有支持的查询。
本文将着眼于持久化Axon 框架的查询模型。我们将介绍使用MongoDB 存储投影,以及测试的挑战以及如何使流与查询模型保持同步。
2.持久性考虑
为了创建一个使用数据库来持久化查询模型的处理程序,我们实现了OrdersEventHandler
接口。在生产环境中,我们不想每次都从头构建查询模型。通过Axon框架,我们可以选择如何持久化模型,选择什么取决于涉及的数据。如果我们想要自由文本搜索,我们可能想要使用Elasticsearch。当我们有非结构化数据时,我们可能想使用MongoDB。当实体之间有很多关系时,我们可能希望使用像Neo4J 这样的图数据库。
2.1.代币商店
在通过事件构建查询模型时,Axon 使用TokenStore
进行跟踪。理想情况下,令牌存储保存在与查询模型相同的数据库中以确保一致性。使用持久性令牌存储还将确保我们可以运行多个实例,其中每个实例只需要处理部分事件。拆分为多个实例适用于segments,其中一个实例可以声明所有或部分段以进行处理。如果我们使用JPA 或JDBC 进行持久化,请使用[JpaTokenStore](https://apidocs.axoniq.io/4.6/org/axonframework/eventhandling/tokenstore/jpa/JpaTokenStore.html)
或JdbcTokenStore。两种令牌存储实现都可以在Axon 框架中使用,无需扩展。
2.2.构建查询模型
在启动时,流式事件处理器将开始从事件存储中读取事件。使用持久性TokenStore
,处理器从它之前离开的地方开始。否则,默认情况下,它将从头开始。对于每个事件,处理器将调用事件处理程序注释的方法。
让我们进一步构建订单应用程序,并允许以多种方式创建和更新订单。通过更新内存模型在InMemoryOrdersEventHandler
中处理ProductAddedEvent
:
@EventHandler public void on(ProductAddedEvent event) { orders.computeIfPresent(event.getOrderId(), (orderId, order) -> { order.addProduct(event.getProductId()); return order; }); }
这里内存映射中的顺序将使用addProduct
函数更新。我们可以将数据存储在数据库中,而不是内存模型。
3. Mongo 扩展
让我们使用MongoDB 来持久化我们的查询模型。我们使用Axon 框架mongo 扩展来持久化Mongo 中的令牌存储。由于我们已经添加了[axon-bom](https://search.maven.org/search?q=a:axon-bom)
,因此在将扩展添加到我们的pom.xml
时不需要指定版本:
<dependency> <groupId>org.axonframework.extensions.mongo</groupId> <artifactId>axon-mongo</artifactId> </dependency>
3.1.代币商店
有了适当的依赖关系,我们可以配置Axon 以使用MongoTokenStore
:
@Bean public TokenStore getTokenStore(MongoClient client, Serializer serializer){ return MongoTokenStore.builder() .mongoTemplate( DefaultMongoTemplate.builder() .mongoDatabase(client) .build() ) .serializer(serializer) .build(); }
3.2.事件句柄类
名为mongo
的Spring Profile 可以在事件处理程序的实现之间切换。当mongo
配置文件处于活动状态时,将使用MongoOrdersEventHandler
以及令牌存储配置。这一起构成了事件处理程序类:
@Service @ProcessingGroup("orders") @Profile("mongo") public class MongoOrdersEventHandler implements OrdersEventHandler { // all methods regarding updating an querying the projection }
同时,我们在InMemoryOrdersEventHandler
中添加了@Profile("!mongo")
,因此两者不会同时处于活动状态。Spring 配置文件是有条件地启用组件的绝佳方式。
我们将在构造函数中使用依赖注入来获取MongoClient
和QueryUpdateEmitter.
我们使用MongoClient
创建MongoCollection 和索引。我们注入QueryUpdateEmitter
以启用订阅查询:
public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) { orders = client .getDatabase(AXON_FRAMEWORK_DATABASE_NAME) .getCollection(ORDER_COLLECTION_NAME); orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME), new IndexOptions().unique(true)); this.emitter = emitter; }
请注意,我们将订单ID 设置为唯一。这样,我们就可以确定不会有两个文档具有相同的订单ID。
MongoOrdersEventHandler
使用orders
mongo 集合来处理查询。我们需要使用documentToOrder()
方法将Mongo 文档映射到订单:
@QueryHandler public List<Order> handle(FindAllOrderedProductsQuery query) { List<Order> orderList = new ArrayList<>(); orders .find() .forEach(d -> orderList.add(documentToOrder(d))); return orderList; }
3.3.复杂查询
为了能够处理TotalProductsShippedQuery,
我们添加了一个**shippedProductFilter
来过滤出已发货的订单并拥有产品:**
private Bson shippedProductFilter(String productId){ return and( eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()), exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId)) ); }
然后在提取和添加产品计数的查询处理程序中使用此过滤器:
@QueryHandler public Integer handle(TotalProductsShippedQuery query) { AtomicInteger result = new AtomicInteger(); orders .find(shippedProductFilter(query.getProductId())) .map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class)) .map(d -> d.getInteger(query.getProductId(), 0)) .forEach(result::addAndGet); return result.get(); }
此查询将获取所有已发货的产品并检索这些文档中的所有产品。然后它将计算查询的特定产品并返回总数。
4. 测试持久化查询模型
使用持久模型进行测试会使事情变得更加困难,因为我们**希望每个测试都有一个隔离的环境。
**
4.1.单元测试
对于MongoOrdersEventHandler,
我们需要确保删除集合,这样我们就不会保留之前测试的状态。我们通过实现getHandler()
方法来做到这一点:
@Override protected OrdersEventHandler getHandler() { mongoClient.getDatabase("axonframework").drop(); return new MongoOrdersEventHandler(mongoClient, emitter); }
使用@BeforeEach
注释方法,我们可以确保每个测试都重新开始。在这种情况下,我们使用嵌入式Mongo 进行测试。另一种选择是使用测试容器。在这方面,测试查询模型与其他需要数据库的应用程序测试没有什么不同。
4.2.集成测试
对于集成测试,我们使用类似的方法。但是,由于集成测试使用OrdersEventHandler
接口,我们依赖于已实现的reset()
方法。
reset()
方法的实现是:
@Override public void reset(List<Order> orderList) { orders.deleteMany(new Document()); orderList.forEach(o -> orders.insertOne(orderToDocument(o))); }
reset()
方法确保只有列表中的订单是集合的一部分。该方法在OrderQueryServiceIntegrationTest
中的每个测试之前执行:
@BeforeEach void setUp() { orderId = UUID.randomUUID().toString(); Order order = new Order(orderId); handler.reset(Collections.singletonList(order)); }
至于测试查询,我们至少需要一个订单。通过已经存储一个订单,它使测试本身更容易。
5.结论
在本文中,我们展示了如何持久化查询模型。我们学习了如何使用MongoDB 查询和测试模型。
0 评论