当前位置: 首页 > news >正文

SpringBoot:使用Spring Batch实现批处理任务

引言

在这里插入图片描述

在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。

项目初始化

首先,我们需要创建一个SpringBoot项目,并添加Spring Batch相关的依赖项。可以通过Spring Initializr快速生成项目。

添加依赖

pom.xml中添加以下依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency><groupId>org.hsqldb</groupId><artifactId>hsqldb</artifactId><scope>runtime</scope>
</dependency>

配置Spring Batch

基本配置

Spring Batch需要一个数据库来存储批处理的元数据。我们可以使用HSQLDB作为内存数据库。配置文件application.properties

spring.datasource.url=jdbc:hsqldb:mem:testdb
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.username=sa
spring.datasource.password=
spring.batch.initialize-schema=always
创建批处理任务

一个典型的Spring Batch任务包括三个主要部分:ItemReader、ItemProcessor和ItemWriter。

  1. ItemReader:读取数据的接口。
  2. ItemProcessor:处理数据的接口。
  3. ItemWriter:写数据的接口。
创建示例实体类

创建一个示例实体类,用于演示批处理操作:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;@Entity
public class Person {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String firstName;private String lastName;// getters and setters
}
创建ItemReader

我们将使用一个简单的FlatFileItemReader从CSV文件中读取数据:

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;@Configuration
public class BatchConfiguration {@Beanpublic FlatFileItemReader<Person> reader() {return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited().names(new String[]{"firstName", "lastName"}).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class);}}).build();}
}
创建ItemProcessor

创建一个简单的ItemProcessor,将读取的数据进行处理:

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {@Overridepublic Person process(Person person) throws Exception {final String firstName = person.getFirstName().toUpperCase();final String lastName = person.getLastName().toUpperCase();final Person transformedPerson = new Person();transformedPerson.setFirstName(firstName);transformedPerson.setLastName(lastName);return transformedPerson;}
}
创建ItemWriter

我们将使用一个简单的JdbcBatchItemWriter将处理后的数据写入数据库:

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;@Configuration
public class BatchConfiguration {@Beanpublic JdbcBatchItemWriter<Person> writer(NamedParameterJdbcTemplate jdbcTemplate) {return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()).sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(jdbcTemplate.getJdbcTemplate().getDataSource()).build();}
}

配置Job和Step

一个Job由多个Step组成,每个Step包含一个ItemReader、ItemProcessor和ItemWriter。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;@Beanpublic Job importUserJob(JobCompletionNotificationListener listener, Step step1) {return jobBuilderFactory.get("importUserJob").listener(listener).flow(step1).end().build();}@Beanpublic Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();}
}

监听Job完成事件

创建一个监听器,用于监听Job完成事件:

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;@Component
public class JobCompletionNotificationListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {System.out.println("Job Started");}@Overridepublic void afterJob(JobExecution jobExecution) {System.out.println("Job Ended");}
}

测试与运行

创建一个简单的CommandLineRunner,用于启动批处理任务:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class BatchApplication implements CommandLineRunner {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate Job job;public static void main(String[] args) {SpringApplication.run(BatchApplication.class, args);}@Overridepublic void run(String... args) throws Exception {jobLauncher.run(job, new JobParameters());}
}

在完成配置后,可以运行应用程序,并检查控制台输出和数据库中的数据,确保批处理任务正常运行。

扩展功能

在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:

  • 多步骤批处理:一个Job可以包含多个Step,每个Step可以有不同的ItemReader、ItemProcessor和ItemWriter。
  • 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
  • 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
  • 数据验证:在处理数据前进行数据验证,确保数据的正确性。
多步骤批处理
@Bean
public Job multiStepJob(JobCompletionNotificationListener listener, Step step1, Step step2) {return jobBuilderFactory.get("multiStepJob").listener(listener).start(step1).next(step2).end().build();
}@Bean
public Step step2(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step2").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).build();
}
并行处理

可以通过配置多个线程来实现并行处理:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor()).writer(writer).taskExecutor(taskExecutor()).build();
}@Bean
public TaskExecutor taskExecutor() {SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();taskExecutor.setConcurrencyLimit(10);return taskExecutor;
}

结论

通过本文的介绍,我们了解了如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。从项目初始化、配置Spring Batch、实现ItemReader、ItemProcessor和ItemWriter,到配置Job和Step,Spring Batch提供了一系列强大的工具和框架,帮助开发者高效地实现批处理任务。通过合理利用这些工具和框架

,开发者可以构建出高性能、可靠且易维护的批处理系统。希望这篇文章能够帮助开发者更好地理解和使用Spring Batch,在实际项目中实现批处理任务的目标。

相关文章:

SpringBoot:使用Spring Batch实现批处理任务

引言 在企业级应用中&#xff0c;批处理任务是不可或缺的一部分。它们通常用于处理大量数据&#xff0c;如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分&#xff0c;专为批处理任务设计&#xff0c;提供了简化的配置和强大的功能。本文将介绍如何使用Spr…...

用JQueryUI库在.net MVC中配置datepicker(时间日期控件)

原文参考&#xff1a;如何在MVC中添加jQuery Datepicker_mvc datepicker-CSDN博客 好文章被埋没了&#xff0c;可能和时间发的早有关。 1.首先我们引入JQuery和JQuery UI <!-- ... --> <link rel"stylesheet" href"https://code.jquery.com/ui/1.12…...

算法:链表

目录 链表的技巧和操作总结 常用技巧&#xff1a; 链表中的常用操作 题目一&#xff1a;反转一个单链表 题目二&#xff1a;链表的中间结点 题目三&#xff1a;返回倒数第k个结点 题目四&#xff1a;合并两个有序链表 题目五&#xff1a;移除链表元素 题目六&#xff…...

Redis基础教程(一):redis配置

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; &#x1f49d;&#x1f49…...

短视频矩阵系统:打造品牌影响力的新方式

一、短视频矩阵概念 短视频营销革命&#xff1a;一站式解决策略&#xff01;短视频矩阵系统是一款专为企业营销设计的高效工具&#xff0c;旨在通过整合和优化众多短视频平台资源&#xff0c;为企业呈现一个全面的短视频营销策略。该系统致力于协助企业以迅速且高效的方式制作…...

品牌推广的三个阶段与核心内容,一篇文章全掌握!

在竞争激烈的市场环境中&#xff0c;品牌推广是企业成功的关键。精心策划的推广策略能够帮助企业在消费者心中树立独特的品牌形象&#xff0c;进而促进销售增长。 作为一家手工酸奶品牌的创始人&#xff0c;目前全国也复制了100多家门店&#xff0c;我理解的品牌推广分为3个阶…...

队列与循环队列

目录 1. 前言&#xff1a; 2. 队列 2.1 队列的概念 2.2 队列的实现 2.3 队列的声明 2.4 队列的初始化 2.5 队列的入队 2.6 队列的出队 2.7 队列获取队头元素 2.8 队列获取队尾元素 2.9 队列获取有效数据个数 2.10 队列判断是否为空 2.11 打印队列 2.12 销毁队列 …...

python基础问题记录

文章目录 前言一、python中类的注意点二、模块与包1. 模块2. 包 总结 前言 本专栏主要记录python中一些语法问题。 一、python中类的注意点 类属性&#xff1a;在类中定义的属性 在类中直接写明的变量是类属性&#xff0c;属于公共属性。 访问&#xff1a;类属性可以通过类或…...

Qt之饼图(Pie Graph)

[TOC](Qt之饼图(Pie Graph)) 饼图名为Pie Graph&#xff0c;用于显示一个数据系列中各项的大小与各项总和的比例。本文基于QtCharts实现饼图的显示。 1.实现过程 1.1环境配置 &#xff08;1&#xff09;首先想要使用QtCharts模块&#xff0c;需要在安装qt时选择勾选安装QtCha…...

Java项目Git提交规范

在Java项目中&#xff0c;遵循良好的Git提交规范有助于提高代码的可维护性、可读性和团队协作效率。以下是一些常见的Git提交规范建议&#xff1a; 文章目录 提交信息格式提交信息示例提交频率分支管理代码审查工具和自动化提交前检查清单 提交信息格式 提交类型&#xff1a;使…...

flink-触发器Trigger和移除器Evictor

窗口原理与机制 图片链接&#xff1a;https://blog.csdn.net/qq_35590459/article/details/132177154 数据流进入算子前&#xff0c;被提交给WindowAssigner&#xff0c;决定元素被放到哪个或哪些窗口&#xff0c;同时可能会创建新窗口或者合并旧的窗口。每一个窗口都拥有一个…...

【力扣 28】找出字符串中第一个匹配项的下标 C++题解(字符串匹配)

给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 示例 1&#xff1a; 输入&#xff1a;haystack “s…...

软件构造 | Design Patterns for Reuse and Maintainability

Design Patterns for Reuse and Maintainability &#xff08;面向可复用性和可维护性的设计模式&#xff09; Open-Closed Principle (OCP) ——对扩展的开放&#xff0c;对修改已有代码的封 Why reusable design patterns A design… …enables flexibility to change …...

Python数据分析-股票分析和可视化(深证指数)

一、内容简介 股市指数作为衡量股市整体表现的重要工具&#xff0c;不仅反映了市场的即时状态&#xff0c;也提供了经济健康状况的关键信号。在全球经济体系中&#xff0c;股市指数被广泛用于预测经济活动&#xff0c;评估投资环境&#xff0c;以及制定财政和货币政策。在中国…...

Linux如何安装openjdk1.8

文章目录 Centosyum安装jdk和JRE配置全局环境变量验证ubuntu使用APT(适用于Ubuntu 16.04及以上版本)使用PPA(可选,适用于需要特定版本或旧版Ubuntu)Centos yum安装jdk和JRE yum install java-1.8.0-openjdk-devel.x86_64 安装后的目录 配置全局环境变量 vim /etc/pr…...

【LLVM】LTO学习

看这篇文章&#xff0c;文中的代码都是错的&#xff0c;给出的命令行也是错的。 真不如参考文献中也是华为的外国员工写的PPT。 但是&#xff0c;上述的文件中的指令也存在报错&#xff0c;还是官方文档看着舒服。...

事务的特性-原子性(Atomicity)、一致性(Consistency)、隔离性(Asolation)、持久性(Durability)

一、引言 1、数据库管理系统DBMS为保证定义的事务是一个逻辑工作单元&#xff0c;达到引入事务的目的&#xff0c;实现的事务机制要保证事务具有原子性、一致性、隔离性和持久性&#xff0c;事务的这四个特性也统称为事务的ACID特性 2、当事务保持了ACID特性&#xff0c;才能…...

redis哨兵模式(Redis Sentinel)

哨兵模式的背景 当主服务器宕机后&#xff0c;需要手动把一台从服务器切换为主服务器&#xff0c;这就需要人工干预&#xff0c;费事费力&#xff0c;还会造成一段时间内服务不可用。这不是一种推荐的方式。 为了解决单点故障和提高系统的可用性&#xff0c;需要一种自动化的监…...

【牛客】牛客小白月赛97 题解 A - E

文章目录 A - 三角形B - 好数组C - 前缀平方和序列D - 走一个大整数迷宫E - 前缀和前缀最大值 A - 三角形 map存一下每个数出现了多少次&#xff0c;再遍历map #include <bits/stdc.h>using namespace std;#define int long long using i64 long long;typedef pair<…...

Spring Boot中泛型参数的灵活运用:最佳实践与性能优化

泛型是Java中一种强大的特性&#xff0c;它提供了编写通用代码的能力&#xff0c;使得代码更加灵活和可复用。在Spring Boot应用程序中&#xff0c;泛型参数的灵活运用可以带来诸多好处&#xff0c;包括增强代码的可读性、提高系统的健壮性以及优化系统的性能。本文将深入探讨在…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

黑马Mybatis

Mybatis 表现层&#xff1a;页面展示 业务层&#xff1a;逻辑处理 持久层&#xff1a;持久数据化保存 在这里插入图片描述 Mybatis快速入门 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6501c2109c4442118ceb6014725e48e4.png //logback.xml <?xml ver…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN&#xff0c;根据VPN原理&#xff0c;打通两个内网必然需要借助一个公共中继节点&#xff0c;ktconnect工具巧妙的利用k8s原生的portforward能力&#xff0c;简化了建立连接的过程&#xff0c;apiserver间接起到了中继节…...

深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏

一、引言 在深度学习中&#xff0c;我们训练出的神经网络往往非常庞大&#xff08;比如像 ResNet、YOLOv8、Vision Transformer&#xff09;&#xff0c;虽然精度很高&#xff0c;但“太重”了&#xff0c;运行起来很慢&#xff0c;占用内存大&#xff0c;不适合部署到手机、摄…...

上位机开发过程中的设计模式体会(1):工厂方法模式、单例模式和生成器模式

简介 在我的 QT/C 开发工作中&#xff0c;合理运用设计模式极大地提高了代码的可维护性和可扩展性。本文将分享我在实际项目中应用的三种创造型模式&#xff1a;工厂方法模式、单例模式和生成器模式。 1. 工厂模式 (Factory Pattern) 应用场景 在我的 QT 项目中曾经有一个需…...

ubuntu系统文件误删(/lib/x86_64-linux-gnu/libc.so.6)修复方案 [成功解决]

报错信息&#xff1a;libc.so.6: cannot open shared object file: No such file or directory&#xff1a; #ls, ln, sudo...命令都不能用 error while loading shared libraries: libc.so.6: cannot open shared object file: No such file or directory重启后报错信息&…...

Sklearn 机器学习 缺失值处理 获取填充失值的统计值

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...