【flink】之集成mybatis对mysql进行读写
背景:
在现代大数据应用中,数据的高效处理和存储是核心需求之一。Flink作为一款强大的流处理框架,能够处理大规模的实时数据流,提供丰富的数据处理功能,如窗口操作、连接操作、聚合操作等。而MyBatis则是一款优秀的持久层框架,能够简化数据库操作,提高开发效率。将这两者结合使用,可以实现高效的数据处理和存储。
介绍:
MyBatis简介
MyBatis是一款基于Java的持久层框架,它可以使用XML配置文件或注解来定义数据库操作。MyBatis提供了简单的API来执行SQL语句,以及更高级的API来处理复杂的数据库操作。其核心是SQL映射,可以将关系型数据库的表映射到Java对象中,从而实现对数据库的操作。此外,MyBatis还提供了一些高级功能,如动态SQL、缓存等,以提高开发效率和性能。
Flink简介
Flink是一款流处理框架,可以处理大规模的实时数据流。Flink支持各种数据源和数据接收器,如Kafka、HDFS、TCP等。Flink的核心是流计算模型,可以实现对数据流的有状态计算,从而实现对实时数据的处理。Flink提供了丰富的数据处理功能,如窗口操作、连接操作、聚合操作等,以满足不同的应用需求。
目的:
Flink集成MyBatis的目的
Flink集成MyBatis的主要目的是将MyBatis作为Flink的数据源,通过Flink处理实时数据流,实现高效的数据处理和存储。使用MyBatis定义数据库操作,以实现高效的数据存储和查询;使用Flink处理实时数据流,以实现高效的数据处理和分析。
准备:
添加依赖
<!--添加spring依赖--><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>5.2.2.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.2.2.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aspects</artifactId><version>5.2.2.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.2.RELEASE</version></dependency><!--添加mybatis相关依赖--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.5.4</version></dependency><dependency><groupId>org.mybatis</groupId><artifactId>mybatis-spring</artifactId><version>2.0.7</version></dependency><!--添加连接池和mysql驱动依赖--><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId><version>3.4.5</version><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!-- 加上这个才能辨认到*.yml文件 如果配置文件不使用yaml,则不需要引用此依赖--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId><version>2.17.2</version></dependency>
代码示例:
配置文件设置
config.properties文件配置
local.url=jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=ROUND&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull
local.username=root
local.password=
local.maximumPoolSize=10
或者配置yml文件,(二选其一)如下:
local:url: jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=ROUND&allowMultiQueries=true&zeroDateTimeBehavior=convertToNullusername: rootpassword:maximumPoolSize: 5
配置文件加载
package com.iterge.flink.utils;import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;import java.io.IOException;
import java.util.Properties;
import java.util.Set;/*** @author iterge* @version 1.0* @date 2024/10/18 14:34* @description spring环境初始化*/public class SpringEnv {private static volatile boolean inited = false;//配置文件地址private static final String applicationLocation = "/application.yml";public static void init() {if (!inited) {System.out.println("...........................spring init start ...........................");//加载配置文件AnnotationConfigApplicationContext springContext = new AnnotationConfigApplicationContext();springContext.scan("com.iterge.flink");springContext.refresh();System.out.println("...........................spring init end ...........................");System.out.println("...........................config init start ...........................");//loadProperties();loadYamlProperties();System.out.println("...........................config init start ...........................");inited = true;}}/*** 加载配置文件*/private static void loadProperties() {try {Resource resource = new ClassPathResource(applicationLocation);Properties properties = PropertiesLoaderUtils.loadProperties(resource);Set<String> keys = properties.stringPropertyNames();for (String key : keys) {System.setProperty(key, properties.getProperty(key));}} catch (IOException e) {throw new RuntimeException(e.getMessage());}}/*** 加载yml文件*/private static void loadYamlProperties() {try {Resource resource = new ClassPathResource(applicationLocation);YamlPropertiesFactoryBean yamlPropertiesFactoryBean = new YamlPropertiesFactoryBean();yamlPropertiesFactoryBean.setResources(resource);Properties properties = yamlPropertiesFactoryBean.getObject();assert properties != null;Set<String> keys = properties.stringPropertyNames();for (String key : keys) {System.setProperty(key, properties.getProperty(key));}}catch (Exception e){throw new RuntimeException(e.getMessage());}}
}
数据源配置&加载
package com.iterge.flink.datasource;import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import javax.sql.DataSource;/*** @author iterge* @version 1.0* @date 2024/10/12 15:33* @description 本地数据源加载配置*/@Configuration
@Lazy
@MapperScan(basePackages = "com.iterge.flink.mapper",sqlSessionFactoryRef = "localDataSourceSqlSessionFactory",lazyInitialization = "true")
public class LocalDatasourceConfig {@Value("${local.url}")private String url;@Value("${local.username}")private String user;@Value("${local.password}")private String password;@Value("${local.maximumPoolSize:10}")private Integer maxPoolSize;@Bean("localDataSource")public DataSource localDataSource() {return DataSourceHelper.createDataSource(url, user, password, "localDataSource", 5, maxPoolSize);}@Bean("localDataSourceSqlSessionFactory")public SqlSessionFactory localDataSourceSqlSessionFactory(@Qualifier("localDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean bean = new SqlSessionFactoryBean();bean.setDataSource(dataSource);// mapper的xml形式文件位置必须要配置,不然将报错:no statement (这种错误也可能是mapper的xml中,namespace与项目的路径不一致导致)bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));return bean.getObject();}
}
package com.iterge.flink.datasource;import com.zaxxer.hikari.HikariDataSource;/*** @author iterge* @version 1.0* @date 2024/10/12 15:44* @description 数据源创建工具*/
public class DataSourceHelper {public static HikariDataSource createDataSource(String jdbcUrl,String user,String password,String poolName,Integer minIdle,Integer maxPoolSize) {HikariDataSource dataSource = new HikariDataSource();dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");dataSource.setJdbcUrl(jdbcUrl);dataSource.setUsername(user);dataSource.setPassword(password);dataSource.setIdleTimeout(120000);dataSource.setMinimumIdle(minIdle);dataSource.setMaximumPoolSize(maxPoolSize);dataSource.setMaxLifetime(600000);dataSource.setRegisterMbeans(false);dataSource.setConnectionTimeout(2000);dataSource.setPoolName(poolName);return dataSource;}}
创建实体类
package com.iterge.flink.entity;import lombok.Data;/*** @author iterge* @date 2024/10/12 16:00:50*/@Data
public class User {private Integer id;private String name;
}
创建mapper
package com.iterge.flink.mapper;import com.iterge.flink.entity.User;
import org.apache.ibatis.annotations.Mapper;/*** @author iterge* @version 1.0* @date 2024/10/12 15:59* @description 用户对象dao*/@Mapper
public interface UserMapper {int insertOne(User user);}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.iterge.flink.mapper.UserMapper"><insert id="insertOne" keyProperty="id" useGeneratedKeys="true" parameterType="com.iterge.flink.entity.User">insert into t_user(name) values(#{name})</insert></mapper>
上下文获取工具
package com.iterge.flink.utils;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** @author iterge* @version 1.0* @date 2024/10/12 16:20* @description 上下文文获取工具*/@Slf4j
@Component
public class ContextUtil implements ApplicationContextAware {private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext context) throws BeansException {ContextUtil.applicationContext = context;}public static ApplicationContext getContext() {return applicationContext;}public static Object getBean(String name) {if (getContext() == null) {log.error("spring context can not be found");return null;}if (StringUtils.isBlank(name)) {log.error("bean name can not be null");return false;}return getContext().getBean(name);}
}
创建flink任务
package com.iterge.flink.job;import com.iterge.flink.entity.User;
import com.iterge.flink.mapper.UserMapper;
import com.iterge.flink.utils.ContextUtil;
import com.iterge.flink.utils.SpringEnv;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/**** @author FlinkMybatisDemo* @date 2024/10/12 11:17* @version 1.0* @description 整合mybatis*
*/@Slf4j
public class FlinkMybatisDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");SingleOutputStreamOperator<String> process = stringDataStreamSource.process(new ProcessFunction<String, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);SpringEnv.init();}@Overridepublic void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {log.info("message={}",s);User u = new User();u.setName(s);UserMapper mapper = ContextUtil.getContext().getBean(UserMapper.class);mapper.insertOne(u);collector.collect(s);}});process.print();env.execute("mybatis-demo");}
}
代码地址:
GitCode - 全球开发者的开源社区,开源代码托管平台
相关文章:
【flink】之集成mybatis对mysql进行读写
背景: 在现代大数据应用中,数据的高效处理和存储是核心需求之一。Flink作为一款强大的流处理框架,能够处理大规模的实时数据流,提供丰富的数据处理功能,如窗口操作、连接操作、聚合操作等。而MyBatis则是一款优秀的持…...
Java设计模式—观察者模式详解
引言 模式角色 UML图 示例代码 应用场景 优点 缺点 结论 引言 观察者模式(Observer Pattern)是一种行为设计模式,它定义了对象之间的一对多依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知…...

【Cri-Dockerd】安装cri-dockerd
cri-dockerd的作用: 在k8s1.24之前。k8s会通过dockershim来调用docker进行容器运行时containerd,并且会自动安装dockershim,但是从1.24版本之前k8s为了降低容器运行时的调用的复杂度和效率,直接调用containerd了,并且…...

GCC及GDB的使用
参考视频及博客 https://www.bilibili.com/video/BV1EK411g7Li/?spm_id_from333.999.0.0&vd_sourceb3723521e243814388688d813c9d475f https://www.bilibili.com/video/BV1ei4y1V758/?buvidXU932919AEC08339E30CE57D39A2BABF6A44F&from_spmidsearch.search-result.0…...

大数据新视界 -- 大数据大厂之大数据重塑影视娱乐产业的未来(4 - 3)
💖💖💖亲爱的朋友们,热烈欢迎你们来到 青云交的博客!能与你们在此邂逅,我满心欢喜,深感无比荣幸。在这个瞬息万变的时代,我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...

数据结构——基础知识补充
1.队列 1.普通队列 queue.Queue 是 Python 标准库 queue 模块中的一个类,适用于多线程环境。它实现了线程安全的 FIFO(先进先出)队列。 2.双端队列 双端队列(Deque,Double-Ended Queue)是一种具有队列和…...
只有.git文件夹时如何恢复项目
有时候误删文件但由于.git是隐藏文件夹而幸存,或者项目太大,单单甩给你一个.git文件夹让你自己恢复整个项目,该怎么办呢? 不用担心,只要进行以下步骤,即可把原项目重新搭建起来: 创建一个文件…...
anchor、anchor box、bounding box之间关系
最近学YOLO接触到这些概念,一下子有点蒙,简单总结一下。 anchor和anchor box Anchor:表示一组预定义的尺寸比例,用来代表常见物体的宽高比。可以把它看成是一个模板或规格,定义了物体框的“形状”和“比例”ÿ…...

代码随想录算法训练营第三十天 | 452.用最少数量的箭引爆气球 435.无重叠区间 763.划分字母区间
LeetCode 452.用最少数量的箭引爆气球: 文章链接 题目链接:452.用最少数量的箭引爆气球 思路: 气球的区间有重叠部分,只要弓箭从重叠部分射出来,那么就能减少所使用的弓箭数 **局部最优:**只要有重叠部分…...

海亮科技亮相第84届中国教装展 尽显生于校园 长于校园教育基因
10月25日,第84届中国教育装备展示会(以下简称“教装展”)在昆明滇池国际会展中心开幕。作为国内教育装备领域规模最大、影响最广的专业展会,本届教装展以“数字赋能教育,创新引领未来”为主题,为教育领域新…...
C语言数据结构学习:栈
C语言 数据结构学习 汇总入口: C语言数据结构学习:[汇总] 1. 栈 栈,实际上是一种特殊的线性表。这里使用的是链表栈,链表栈的博客:C语言数据结构学习:单链表 2. 栈的特点 只能在一端进行存取操作&#x…...

如何快速分析音频中的各种频率成分
从视频中提取音频 from moviepy.editor import VideoFileClip# Load the video file and extract audio video_path "/mnt/data/WeChat_20241026235630.mp4" video_clip VideoFileClip(video_path)# Extract audio and save as a temporary file for further anal…...
MongoDB 6.0 主从复制配置
以下是 MongoDB 6.0 版本配置主从的详细安装步骤: 1. 安装 MongoDB:可以从官网下载 MongoDB 6.0 的安装包并进行安装,或者使用相应的包管理工具进行安装。 2. 配置主节点:在主节点的 MongoDB 配置文件(默认路径为 …...

NPU 神经网络处理单元
Ⅰ 什么是 NPU? 当前正处于神经网络和机器学习处理需求爆发的初期。传统的 CPU(中央处理器)/GPU(图形处理器)可以执行类似任务,但专门为神经网络优化的 NPU(神经处理单元)比 CPU/GP…...

安宝特分享 | AR技术引领:跨国工业远程协作创新模式
在当今高度互联的工业环境中,跨国合作与沟通变得日益重要。然而,语言障碍常常成为高效协作的绊脚石。安宝特AR眼镜凭借其强大的多语言自动翻译和播报功能,正在改变这一局面,让远程协作变得更加顺畅。 01 多语言翻译优势 安宝特A…...

Vulkan 开发(五):Vulkan 逻辑设备
图片来自《Vulkan 应用开发指南》 Vulkan 开发系列文章: 1. 开篇,Vulkan 概述 2. Vulkan 实例 3. Vulkan 物理设备 4. Vulkan 设备队列 在 Vulkan 中,逻辑设备(Logical Device)是与物理设备(Physical D…...
Kafka 解决消息丢失、乱序与重复消费
一、引言 在分布式系统中,Apache Kafka 作为一种高吞吐量的分布式发布订阅消息系统,被广泛应用于日志收集、流式处理、消息队列等场景。然而,在实际使用过程中,可能会遇到消息丢失、乱序、重复消费等问题,这些问题可能…...

计算机专业毕业生面试工具推荐:白瓜面试
随着毕业季的临近,计算机专业的毕业生们即将步入职场,面试成为了他们必须面对的挑战。在这个过程中,选择合适的面试工具可以大大提高求职成功率。今天,我要向大家推荐一款专为计算机专业毕业生设计的面试工具——白瓜面试。 为什…...

数字IC开发:布局布线
数字IC开发:布局布线 前端经过DFT,综合后输出网表文件给后端,由后端通过布局布线,将网表转换为GDSII文件;网表文件只包含单元器件及其连接等信息,GDS文件则包含其物理位置,具体的走线࿱…...

高空作业未系安全带监测系统 安全带穿戴识别预警系统
在各类高空作业场景中,安全带是保障作业人员生命安全的关键防线。然而,由于人为疏忽或其他原因,作业人员未正确系挂安全带的情况时有发生,这给高空作业带来了巨大的安全隐患。为有效解决这一问题,高空作业未系安全带监…...

定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...

【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

算法岗面试经验分享-大模型篇
文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer (1)资源 论文&a…...

MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...
站群服务器的应用场景都有哪些?
站群服务器主要是为了多个网站的托管和管理所设计的,可以通过集中管理和高效资源的分配,来支持多个独立的网站同时运行,让每一个网站都可以分配到独立的IP地址,避免出现IP关联的风险,用户还可以通过控制面板进行管理功…...

C++ 设计模式 《小明的奶茶加料风波》
👨🎓 模式名称:装饰器模式(Decorator Pattern) 👦 小明最近上线了校园奶茶配送功能,业务火爆,大家都在加料: 有的同学要加波霸 🟤,有的要加椰果…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)
引言 工欲善其事,必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后,我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集,就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...