当前位置: 首页 > news >正文

SpringBoot:使用Spring Batch实现批处理任务

引言

在这里插入图片描述

在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。

项目初始化

首先,我们需要创建一个SpringBoot项目,并添加Spring Batch相关的依赖项。可以通过Spring Initializr快速生成项目。

添加依赖

pom.xml中添加以下依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency><groupId>org.hsqldb</groupId><artifactId>hsqldb</artifactId><scope>runtime</scope>
</dependency>

配置Spring Batch

基本配置

Spring Batch需要一个数据库来存储批处理的元数据。我们可以使用HSQLDB作为内存数据库。配置文件application.properties

spring.datasource.url=jdbc:hsqldb:mem:testdb
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.username=sa
spring.datasource.password=
spring.batch.initialize-schema=always
创建批处理任务

一个典型的Spring Batch任务包括三个主要部分:ItemReader、ItemProcessor和ItemWriter。

  1. ItemReader:读取数据的接口。
  2. ItemProcessor:处理数据的接口。
  3. ItemWriter:写数据的接口。
创建示例实体类

创建一个示例实体类,用于演示批处理操作:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;@Entity
public class Person {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String firstName;private String lastName;// getters and setters
}
创建ItemReader

我们将使用一个简单的FlatFileItemReader从CSV文件中读取数据:

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;@Configuration
public class BatchConfiguration {@Beanpublic FlatFileItemReader<Person> reader() {return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited().names(new String[]{"firstName", "lastName"}).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class);}}).build();}
}
创建ItemProcessor

创建一个简单的ItemProcessor,将读取的数据进行处理:

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {@Overridepublic Person process(Person person) throws Exception {final String firstName = person.getFirstName().toUpperCase();final String lastName = person.getLastName().toUpperCase();final Person transformedPerson = new Person();transformedPerson.setFirstName(firstName);transformedPerson.setLastName(lastName);return transformedPerson;}
}
创建ItemWriter

我们将使用一个简单的JdbcBatchItemWriter将处理后的数据写入数据库:

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;@Configuration
public class BatchConfiguration {@Beanpublic JdbcBatchItemWriter<Person> writer(NamedParameterJdbcTemplate jdbcTemplate) {return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()).sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(jdbcTemplate.getJdbcTemplate().getDataSource()).build();}
}

配置Job和Step

一个Job由多个Step组成,每个Step包含一个ItemReader、ItemProcessor和ItemWriter。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;@Beanpublic Job importUserJob(JobCompletionNotificationListener listener, Step step1) {return jobBuilderFactory.get("importUserJob").listener(listener).flow(step1).end().build();}@Beanpublic Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();}
}

监听Job完成事件

创建一个监听器,用于监听Job完成事件:

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;@Component
public class JobCompletionNotificationListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {System.out.println("Job Started");}@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println("Job Ended");}
}

测试与运行

创建一个简单的CommandLineRunner,用于启动批处理任务:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class BatchApplication implements CommandLineRunner {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job job;public static void main(String[] args) {SpringApplication.run(BatchApplication.class, args);}@Overridepublic void run(String... args) throws Exception {jobLauncher.run(job, new JobParameters());}
}

在完成配置后,可以运行应用程序,并检查控制台输出和数据库中的数据,确保批处理任务正常运行。

扩展功能

在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:

  • 多步骤批处理:一个Job可以包含多个Step,每个Step可以有不同的ItemReader、ItemProcessor和ItemWriter。
  • 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
  • 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
  • 数据验证:在处理数据前进行数据验证,确保数据的正确性。
多步骤批处理
@Bean
public Job multiStepJob(JobCompletionNotificationListener listener, Step step1, Step step2) {return jobBuilderFactory.get("multiStepJob").listener(listener).start(step1).next(step2).end().build();
}@Bean
public Step step2(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step2").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();
}
并行处理

可以通过配置多个线程来实现并行处理:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).taskExecutor(taskExecutor()).build();
}@Bean
public TaskExecutor taskExecutor() {SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();taskExecutor.setConcurrencyLimit(10);return taskExecutor;
}

结论

通过本文的介绍,我们了解了如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。从项目初始化、配置Spring Batch、实现ItemReader、ItemProcessor和ItemWriter,到配置Job和Step,Spring Batch提供了一系列强大的工具和框架,帮助开发者高效地实现批处理任务。通过合理利用这些工具和框架

,开发者可以构建出高性能、可靠且易维护的批处理系统。希望这篇文章能够帮助开发者更好地理解和使用Spring Batch,在实际项目中实现批处理任务的目标。

相关文章:

SpringBoot:使用Spring Batch实现批处理任务

引言 在企业级应用中&#xff0c;批处理任务是不可或缺的一部分。它们通常用于处理大量数据&#xff0c;如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分&#xff0c;专为批处理任务设计&#xff0c;提供了简化的配置和强大的功能。本文将介绍如何使用Spr…...

用JQueryUI库在.net MVC中配置datepicker(时间日期控件)

原文参考&#xff1a;如何在MVC中添加jQuery Datepicker_mvc datepicker-CSDN博客 好文章被埋没了&#xff0c;可能和时间发的早有关。 1.首先我们引入JQuery和JQuery UI <!-- ... --> <link rel"stylesheet" href"https://code.jquery.com/ui/1.12…...

算法:链表

目录 链表的技巧和操作总结 常用技巧&#xff1a; 链表中的常用操作 题目一&#xff1a;反转一个单链表 题目二&#xff1a;链表的中间结点 题目三&#xff1a;返回倒数第k个结点 题目四&#xff1a;合并两个有序链表 题目五&#xff1a;移除链表元素 题目六&#xff…...

Redis基础教程(一):redis配置

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…...

短视频矩阵系统:打造品牌影响力的新方式

一、短视频矩阵概念 短视频营销革命&#xff1a;一站式解决策略&#xff01;短视频矩阵系统是一款专为企业营销设计的高效工具&#xff0c;旨在通过整合和优化众多短视频平台资源&#xff0c;为企业呈现一个全面的短视频营销策略。该系统致力于协助企业以迅速且高效的方式制作…...

品牌推广的三个阶段与核心内容,一篇文章全掌握!

在竞争激烈的市场环境中&#xff0c;品牌推广是企业成功的关键。精心策划的推广策略能够帮助企业在消费者心中树立独特的品牌形象&#xff0c;进而促进销售增长。 作为一家手工酸奶品牌的创始人&#xff0c;目前全国也复制了100多家门店&#xff0c;我理解的品牌推广分为3个阶…...

队列与循环队列

目录 1. 前言&#xff1a; 2. 队列 2.1 队列的概念 2.2 队列的实现 2.3 队列的声明 2.4 队列的初始化 2.5 队列的入队 2.6 队列的出队 2.7 队列获取队头元素 2.8 队列获取队尾元素 2.9 队列获取有效数据个数 2.10 队列判断是否为空 2.11 打印队列 2.12 销毁队列 …...

python基础问题记录

文章目录 前言一、python中类的注意点二、模块与包1. 模块2. 包 总结 前言 本专栏主要记录python中一些语法问题。 一、python中类的注意点 类属性&#xff1a;在类中定义的属性 在类中直接写明的变量是类属性&#xff0c;属于公共属性。 访问&#xff1a;类属性可以通过类或…...

Qt之饼图(Pie Graph)

[TOC](Qt之饼图(Pie Graph)) 饼图名为Pie Graph&#xff0c;用于显示一个数据系列中各项的大小与各项总和的比例。本文基于QtCharts实现饼图的显示。 1.实现过程 1.1环境配置 &#xff08;1&#xff09;首先想要使用QtCharts模块&#xff0c;需要在安装qt时选择勾选安装QtCha…...

Java项目Git提交规范

在Java项目中&#xff0c;遵循良好的Git提交规范有助于提高代码的可维护性、可读性和团队协作效率。以下是一些常见的Git提交规范建议&#xff1a; 文章目录 提交信息格式提交信息示例提交频率分支管理代码审查工具和自动化提交前检查清单 提交信息格式 提交类型&#xff1a;使…...

flink-触发器Trigger和移除器Evictor

窗口原理与机制 图片链接&#xff1a;https://blog.csdn.net/qq_35590459/article/details/132177154 数据流进入算子前&#xff0c;被提交给WindowAssigner&#xff0c;决定元素被放到哪个或哪些窗口&#xff0c;同时可能会创建新窗口或者合并旧的窗口。每一个窗口都拥有一个…...

【力扣 28】找出字符串中第一个匹配项的下标 C++题解(字符串匹配)

给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 示例 1&#xff1a; 输入&#xff1a;haystack “s…...

软件构造 | Design Patterns for Reuse and Maintainability

Design Patterns for Reuse and Maintainability &#xff08;面向可复用性和可维护性的设计模式&#xff09; Open-Closed Principle (OCP) ——对扩展的开放&#xff0c;对修改已有代码的封 Why reusable design patterns A design… …enables flexibility to change …...

Python数据分析-股票分析和可视化(深证指数)

一、内容简介 股市指数作为衡量股市整体表现的重要工具&#xff0c;不仅反映了市场的即时状态&#xff0c;也提供了经济健康状况的关键信号。在全球经济体系中&#xff0c;股市指数被广泛用于预测经济活动&#xff0c;评估投资环境&#xff0c;以及制定财政和货币政策。在中国…...

Linux如何安装openjdk1.8

文章目录 Centosyum安装jdk和JRE配置全局环境变量验证ubuntu使用APT(适用于Ubuntu 16.04及以上版本)使用PPA(可选,适用于需要特定版本或旧版Ubuntu)Centos yum安装jdk和JRE yum install java-1.8.0-openjdk-devel.x86_64 安装后的目录 配置全局环境变量 vim /etc/pr…...

【LLVM】LTO学习

看这篇文章&#xff0c;文中的代码都是错的&#xff0c;给出的命令行也是错的。 真不如参考文献中也是华为的外国员工写的PPT。 但是&#xff0c;上述的文件中的指令也存在报错&#xff0c;还是官方文档看着舒服。...

事务的特性-原子性(Atomicity)、一致性(Consistency)、隔离性(Asolation)、持久性(Durability)

一、引言 1、数据库管理系统DBMS为保证定义的事务是一个逻辑工作单元&#xff0c;达到引入事务的目的&#xff0c;实现的事务机制要保证事务具有原子性、一致性、隔离性和持久性&#xff0c;事务的这四个特性也统称为事务的ACID特性 2、当事务保持了ACID特性&#xff0c;才能…...

redis哨兵模式(Redis Sentinel)

哨兵模式的背景 当主服务器宕机后&#xff0c;需要手动把一台从服务器切换为主服务器&#xff0c;这就需要人工干预&#xff0c;费事费力&#xff0c;还会造成一段时间内服务不可用。这不是一种推荐的方式。 为了解决单点故障和提高系统的可用性&#xff0c;需要一种自动化的监…...

【牛客】牛客小白月赛97 题解 A - E

文章目录 A - 三角形B - 好数组C - 前缀平方和序列D - 走一个大整数迷宫E - 前缀和前缀最大值 A - 三角形 map存一下每个数出现了多少次&#xff0c;再遍历map #include <bits/stdc.h>using namespace std;#define int long long using i64 long long;typedef pair<…...

Spring Boot中泛型参数的灵活运用:最佳实践与性能优化

泛型是Java中一种强大的特性&#xff0c;它提供了编写通用代码的能力&#xff0c;使得代码更加灵活和可复用。在Spring Boot应用程序中&#xff0c;泛型参数的灵活运用可以带来诸多好处&#xff0c;包括增强代码的可读性、提高系统的健壮性以及优化系统的性能。本文将深入探讨在…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件

在选煤厂、化工厂、钢铁厂等过程生产型企业&#xff0c;其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进&#xff0c;需提前预防假检、错检、漏检&#xff0c;推动智慧生产运维系统数据的流动和现场赋能应用。同时&#xff0c;…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...