1.概述
Spring Batch是用于开发强大的批处理应用程序的强大框架。在之前的教程中,我们介绍了Spring Batch。
在本教程中,我们将在上一个教程的基础上,学习如何使用Spring Boot设置和创建基本的批处理驱动的应用程序。
2. Maven依赖
首先,让我们将spring-boot-starter-batch
到我们的pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.4.0</version>
</dependency>
我们还将添加org.hsqldb
依赖关系,该依赖关系也可以从Maven Central获得:
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.5.1</version>
<scope>runtime</scope>
</dependency>
3.定义一个简单的Spring Batch作业
我们将构建一个工作,从CSV文件导入咖啡清单,使用自定义处理器对其进行转换,并将最终结果存储在内存数据库中。
3.1。入门
让我们从定义应用程序入口点开始:
@SpringBootApplication
public class SpringBootBatchProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
}
}
如我们所见,这是一个标准的Spring Boot应用程序。由于我们希望在可能的情况下使用默认配置值,因此我们将使用一组非常简单的应用程序配置属性。
我们将在src/main/resources/application.properties
文件中定义以下属性:
file.input=coffee-list.csv
此属性包含我们输入的咖啡清单的位置。每行都包含我们咖啡的品牌,产地和一些特征:
Blue Mountain,Jamaica,Fruity
Lavazza,Colombia,Strong
Folgers,America,Smokey
我们将看到,这是一个平面CSV文件,这意味着Spring可以在不进行任何特殊自定义的情况下对其进行处理。
接下来,我们将添加一个SQL脚本schema-all.sql
来创建我们的coffee
桌来存储数据:
DROP TABLE coffee IF EXISTS;
CREATE TABLE coffee (
coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
brand VARCHAR(20),
origin VARCHAR(20),
characteristics VARCHAR(30)
);
通常,Spring Boot会在启动过程中自动运行此脚本。
3.2。咖啡领域类
随后,我们将需要一个简单的域类来保存我们的咖啡项目:
public class Coffee {
private String brand;
private String origin;
private String characteristics;
public Coffee(String brand, String origin, String characteristics) {
this.brand = brand;
this.origin = origin;
this.characteristics = characteristics;
}
// getters and setters
}
如前所述,我们的Coffee
对象包含三个属性:
- 一个品牌
- 起源
- 一些其他特征
4.作业配置
现在,在关键组件上,我们的工作配置。我们将逐步进行,建立我们的配置并解释其中的每个部分:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("${file.input}")
private String fileInput;
// ...
}
首先,我们从标准的Spring @Configuration
类开始。接下来,我们在类中@EnableBatchProcessing
值得注意的是,这使我们可以访问许多支持工作的有用bean,并可以节省很多日常工作。
此外,使用此批注还使我们能够访问两个有用的工厂,稍后将在构建作业配置和作业步骤时使用它们。
对于初始配置的最后一部分,我们包括对先前声明file.input
4.1。我们工作的读者和作家
现在,我们可以继续在我们的配置中定义一个阅读器bean:
@Bean
public FlatFileItemReader reader() {
return new FlatFileItemReaderBuilder().name("coffeeItemReader")
.resource(new ClassPathResource(fileInput))
.delimited()
.names(new String[] { "brand", "origin", "characteristics" })
.fieldSetMapper(new BeanWrapperFieldSetMapper() {{
setTargetType(Coffee.class);
}})
.build();
}
简而言之,上面定义的阅读器bean将查找名为coffee-list.csv
的文件,并将每个订单项解析为Coffee
对象。
同样,我们定义一个writer bean:
@Bean
public JdbcBatchItemWriter writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
.dataSource(dataSource)
.build();
}
这次,我们包含了将单个咖啡项目插入到数据库中所需的SQL语句,该语句由Coffee
对象的Java bean属性驱动。方便地, dataSource
@EnableBatchProcessing
注释自动创建的。
4.2。把我们的工作放在一起
最后,我们需要添加实际的作业步骤和配置:
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter writer) {
return stepBuilderFactory.get("step1")
.<Coffee, Coffee> chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
@Bean
public CoffeeItemProcessor processor() {
return new CoffeeItemProcessor();
}
如我们所见,我们的工作相对简单,由step1
方法中定义的一个步骤组成。
让我们看一下此步骤正在执行的操作:
- 首先,我们配置步骤,以便使用
chunk(10)
声明一次最多写入十条记录 - 然后,我们使用读取器bean读取咖啡数据,该读取器bean是使用
reader
方法设置的 - 接下来,我们将每个咖啡项目传递给自定义处理器,在其中应用一些自定义业务逻辑
- 最后,我们使用之前看到的编写器将每个咖啡项目写入数据库
另一方面,我们的importUserJob
包含我们的作业定义,其中包含使用内置RunIdIncrementer
类的ID。我们还设置了一个JobCompletionNotificationListener,
用来在作业完成时得到通知。
为了完成我们的作业配置,我们列出了每个步骤(尽管此作业只有一个步骤)。现在,我们完成了完美的配置!
5.定制咖啡处理器
让我们详细了解一下我们先前在作业配置中定义的自定义处理器:
public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {
private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);
@Override
public Coffee process(final Coffee coffee) throws Exception {
String brand = coffee.getBrand().toUpperCase();
String origin = coffee.getOrigin().toUpperCase();
String chracteristics = coffee.getCharacteristics().toUpperCase();
Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);
return transformedCoffee;
}
}
特别感兴趣的是, ItemProcessor
接口为我们提供了一种在作业执行过程中应用某些特定业务逻辑的机制。
为了简单**CoffeeItemProcessor
,我们定义了CoffeeItemProcessor,它接受一个输入Coffee
对象并将每个属性转换为uppercase** 。
6.工作完成
此外,我们还将编写一个JobCompletionNotificationListener
以在工作完成时提供一些反馈:
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOGGER.info("!!! JOB FINISHED! Time to verify the results");
String query = "SELECT brand, origin, characteristics FROM coffee";
jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
.forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
}
}
在上面的示例中,我们重写afterJob
方法,并检查作业是否成功完成。此外,我们运行一个简单查询以检查每个咖啡项目是否已成功存储在数据库中。
7.做好工作
现在我们已经准备就绪,可以开始工作了,这是有趣的部分。让我们继续工作吧:
...
17:41:16.336 [main] INFO cbbJobCompletionNotificationListener -
!!! JOB FINISHED! Time to verify the results
17:41:16.336 [main] INFO cbbJobCompletionNotificationListener -
Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database.
17:41:16.337 [main] INFO cbbJobCompletionNotificationListener -
Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database.
17:41:16.337 [main] INFO cbbJobCompletionNotificationListener -
Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database.
...
如我们所见,我们的工作成功运行,每项咖啡都按预期存储在数据库中。
8.结论
在本文中,我们学习了如何使用Spring Boot创建一个简单的Spring Batch作业。首先,我们从定义一些基本配置开始。
然后,我们看到了如何添加文件读取器和数据库写入器。最后,我们看了如何应用一些自定义处理并检查我们的作业是否成功执行。
0 评论