【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作
ClickHouse官网文档
Flink 读取 ClickHouse 数据两种驱动
- ClickHouse 官方提供Clickhouse JDBC.【建议使用】
- 第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver
ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护
ClickHouse 官方提供Clickhouse JDBC的包名:
com.clickhouse.jdbc.*有些版本com.clickhouse.jdbc.* 包含了 ru.yandex.clickhouse.ClickHouseDriver.
因此加载包的时候一定要注意导入的包名
引入依赖
<!-- clickhouse jdbc driver --><dependency><groupId>com.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId></dependency>
使用的是 0.3 这个版本,该版本就包含上述3方CH jdbc包
<!-- CH JDBC版本推荐使用 0.3, 0.4的版本是要 JDK 17 --><clickhouse-jdbc.version>0.3.2-patch11</clickhouse-jdbc.version>
自定义Source
测试表映射实体类,该表仅有一个name字段
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CHTestPO {private String name;}
Flink Clickhouse Source
public class ClickHouseSource implements SourceFunction<CHTestPO> {private final String URL;private final String SQL;public ClickHouseSource(String URL, String SQL) {this.URL = URL;this.SQL = SQL;}@Overridepublic void run(SourceContext<CHTestPO> output) throws Exception {// Properties是持久化的属性集 Properties的key和value都是字符串Properties properties = new Properties();ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(URL, properties);// 使用 try-with-resource 方式关闭JDBC连接 无需手动关闭try (ClickHouseConnection conn = clickHouseDataSource.getConnection()) {// clickhouse 通过游标的方式读取数据Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(SQL);while (rs.next()) {String name = rs.getString(1);output.collect(new CHTestPO(name));}}}@Overridepublic void cancel() {}
}
自定义Sink
需额外引入依赖
<!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency>
Java 对sql语句处理的两个对象
- PreparedStatement对象:能够对预编译之后的sql语句进行处理【SQL 语句预编译:通过
占位符'?'实现,可以防止sql注入】 - Statement对象:只能对静态的sql语句进行处理
核心代码
/*** 使用 Flink-jdbc-connector + 批量写入 + sql语句的预编译 写入 Clickhouse*/
public class ClickHouseJdbcSink<T> {private final SinkFunction<T> sink;private final static String NA = "null";public ClickHouseJdbcSink(String sql, int batchSize, String url) {sink = JdbcSink.sink(sql,// 对sql语句进行预编译new ClickHouseJdbcStatementBuilder<T>(),// 设置批量插入数据new JdbcExecutionOptions.Builder().withBatchSize(batchSize).build(),// 设置ClickHouse连接配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(url).build());}public SinkFunction<T> getSink() {return this.sink;}/*** 对预编译之后的sql语句进行占位符替换** @param ps: PreparedStatement对象 下标从 1 开始* @param fields: clickhouse表PO对象的属性字段* @param object: clickhouse表PO对象的属性字段所对应的数据类型*/public static void setPreparedStatement(PreparedStatement ps,Field[] fields,Object object) throws IllegalAccessException, SQLException {// 遍历 Field[]for (int i = 1; i <= fields.length; i++) {// 取出每个Field实例Field field = fields[i - 1];// 指示反射的对象在使用时应该取消 Java 语言访问检查field.setAccessible(true);// 通过Field实例的get方法返回指定的对象Object o = field.get(object);if (o == null) {ps.setNull(i, 0);continue;}// 这里统一设为字符型String fieldValue = o.toString();// 变量和常量的比较,通常将常量放前,可以避免空指针if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {// 替换对应位置的占位符ps.setObject(i, fieldValue);} else {ps.setNull(i, 0);}}}}
对sql语句进行预编译
@Slf4j
public class ClickHouseJdbcStatementBuilder<T> implements JdbcStatementBuilder<T> {@Overridepublic void accept(PreparedStatement preparedStatement, T t) throws SQLException {/* *********************** Java通过反射获取类的字段:** 1. getDeclaredFields():获取所有的字段,不会获取父类的字段* 2. getFields(): 只能会public字段,获取包含父类的字段** *********************/Field[] fields = t.getClass().getDeclaredFields();// 将获取到的字段替换sql预编译之后的占位符。try {ClickHouseJdbcSink.setPreparedStatement(preparedStatement, fields, t);} catch (IllegalAccessException e) {log.error("sql 预编译失败", e);e.printStackTrace();}}
}
ClickHouse读写工具类

public class ClickHouseUtil {private static final String URL;static {ParameterTool parameterTool = ParameterUtil.getParameters();URL = parameterTool.get("clickhouse.url");}/*** 读取clickhouse*/public static DataStream<CHTestPO> read(StreamExecutionEnvironment env, String sql) {return env.addSource(new ClickHouseSource(URL, sql));}/*** 批量写入ClickHouse*/public static <T> DataStreamSink<T> batchWrite(DataStream<T> dataStream,String sql,int batchSize) {//生成 SinkFunctionClickHouseJdbcSink<T> clickHouseJdbcSink =new ClickHouseJdbcSink<T>(sql, batchSize, URL);return dataStream.addSink(clickHouseJdbcSink.getSink());}}
测试一下
public class ClickHouseUtilTest {@DisplayName("测试Flink+jdbc+游标读取Clickhouse")@Testvoid testRead() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 从default数据库的user表中读取数据String sql = "select * from default.user";DataStream<CHTestPO> ds = ClickHouseUtil.read(env, sql);// 打印数据流中的元素ds.print("clickhouse");// 执行程序env.execute();}@DisplayName("测试Flink-Connector-jdbc+预编译批量写入Clickhouse")@Testvoid testBatchWrite() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);CHTestPO po = new CHTestPO();po.setName("Lucy");CHTestPO po1 = new CHTestPO();po1.setName("Jack");DataStream<CHTestPO> ds = env.fromCollection(Arrays.asList(po, po1));// 定义将数据写入ClickHouse数据库的SQL语句String sql = "insert into default.user(name) values(?)";// 调用ClickHouseUtil的batchWrite方法将数据流ds中的数据批量写入ClickHouse数据库ClickHouseUtil.batchWrite(ds, sql, 2);// 执行程序env.execute();}
}
此时表中仅一行记录

读取没有问题!

写入没有问题!

相关文章:
【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作
ClickHouse官网文档 Flink 读取 ClickHouse 数据两种驱动 ClickHouse 官方提供Clickhouse JDBC.【建议使用】第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护 ClickHouse 官方提供Clickhouse JDBC…...
linux 查看服务启动时间
文章目录 linux 查看服务启动时间参数解析 linux 查看服务启动时间 [root104 ~]# ps -o lstart -p ps -ef |grep -v grep |grep "zookeeper"|awk {print$2}STARTED Fri Dec 15 16:54:10 2023参数解析 linux 命令中 ps -ef 详解 ps -ef表示查看全格式的进程。 ps …...
[RK-Linux] 移植Linux-5.10到RK3399(六)| 检查GMAC(RTL8211F)配置使能千兆以太网
ROC-RK3399-PC Pro 使用 RTL8211F PHY 芯片作为以太网收发器。 RTL8211F是一种高性能的千兆以太网物理层收发器(PHY),广泛用于台式机、笔记本电脑、网络交换机等设备中。主要特点: 采用低功耗28nm CMOS技术,功耗低。支持千兆速率(10/100/1000Mbps)。支持全双工和半双工…...
博途WinCC专业版C/S架构入门指南
WinCC Professional V16 支持客户机/服务器架构,但目前只支持单个服务器或单对冗余服务器/多个客户机的模式,还不能支持像WinCC V7.5 SP1中的多个服务器/多个客户机的分布式架构。 博途工控人平时在哪里技术交流博途工控人社群 博途工控人平时在哪里技…...
大数据生态圈kafka在物联网中的应用测试
背景 由物联网项目中使用到了Tbox应用管理车辆,在上报数据的过程中,需要将终端产生的数据通过kafka的produce topic customer对数据进行处理后,放置到mysql中。完成数据二进制到json转换工作。 Kafka的使用 查看kafka的topic ./kafka-topi…...
ChatGPT使用:一个发包机器人的提示词
发包机器人: 设想:目前项目组有n条打包线会输出多个包,用户想获取最新的包是比较困难的,难点在于 1. 分支多:trunk,release,outer等,至少有3个分支; 2. 多平台&#x…...
Axure元件库的使用
1.基本元件库 1.1Axure的画布范围 Axure是一个绘制项目原型图的软件,它里面的基本原件有: 1.1元件的呈现范围 首先我们要了解基本元件的作用范围在哪里? 浏览效果: 可以看出当我们的基本元件放在画布区域内是可以完全呈现出来…...
Unity中Shader URP最简Shader框架(整理总结篇)
文章目录 前言一、精简 ShaderGraph 所有冗余代码后的最简 URP Shader二、我们来对比一下 URP Shader 与 BuildInRP Shader 的对应关系 与 区别1、"RenderPipeline""UniversalPipeline"2、面片剔除、深度测试、深度写入、颜色混合 和 BRP 下一致3、必须引入…...
AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference
AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference 1. 源由2. 规格3. 分析3.1 喜欢3.2 不便3.3 建议 4. 总结5. 参考资料 1. 源由 AT32 F435飞控在xFlight开源飞控之AT32F435计划一文中已经大体阐述了一些移植历史。 之前整体上看,就是航模飞控新MCU的移植…...
ntp时间同步配置中 server、pool和peer的区别
在 NTP(Network Time Protocol)的配置中,server、pool 和 peer 是用于指定时间同步关系的关键字,它们在角色和行为上有一些区别。 server: server 关键字用于指定一个或多个 NTP 服务器,这些服务器将提供时…...
JMeter安装RabbitMQ测试插件
整体流程如下:先下载AMQP插件源码,可以通过antivy在本地编译成jar包,再将jar包导入JMeter目录下,重启JMeter生效。 Apache Ant 是一个基于 Java 的构建工具。Ant 可用于自动化构建和部署 Java 应用程序,使开发人员更轻…...
基于ssm日用品网站设计论文
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本日用品网站就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息&…...
coco数据集格式的RandomCrop
transforms.py文件的改进 添加 RandomCrop 函数 class RandomCrop(object):"""随机裁剪图像以及bboxes"""def __init__(self, output_size):self.output_size output_sizedef __call__(self, image, target):height, width image.shape[-2:]…...
机器学习-KL散度的直观理解+代码
KL散度 直观理解:KL散度是一种衡量两个分布之间匹配程度的方法。通常在概率和统计中,我们会用更简单的近似分布来代替观察到的数据或复杂的分布,KL散度帮我们衡量在选择近似值时损失了多少信息。 在信息论或概率论中,KL散度&#…...
【教程】制作 iOS 推送证书
目录 证书类型 MAC Key Store 消息推送控制台 制作证书 创建苹果 App ID 使用appuploder制作 .p12文件 创建证书 如需向 iOS 设备推送数据,您首先需要在消息推送控制台上配置 iOS 推送证书。iOS 推送证书用于推送通知,本文将介绍消息推送服务支…...
ToolLLM model 以及LangChain AutoGPT Xagent在调用外部工具Tools的表现对比浅析
文章主要谈及主流ToolLLM 以及高口碑Agent 在调用Tools上的一些对比,框架先上,内容会不断丰富与更新。 第一部分,ToolLLM model 先来说主打Function Call 的大模型们 OpenAI GPT 宇宙第一LLM,它的functionCall都知道࿰…...
【MySQL学习之基础篇】约束
文章目录 1. 概述2. 基础约束3. 外键约束3.1. 介绍3.2. 外键的添加3.3. 外键删除和更新行为 1. 概述 概念: 约束是作用于表中字段上的规则,用于限制存储在表中的数据。 目的: 保证数据库中数据的正确、有效性和完整性。 分类&#x…...
【DataSophon】大数据管理平台DataSophon-1.2.1基本使用
🦄 个人主页——🎐开着拖拉机回家_Linux,大数据运维-CSDN博客 🎐✨🍁 🪁🍁🪁🍁🪁🍁🪁🍁 🪁🍁🪁&am…...
基于redisson实现发布订阅(多服务间用避坑)
前言 今天要分享的是基于Redisson实现信息发布与订阅(以前分享过直接基于redis的实现),如果你是在多服务间基于redisson做信息传递,并且有服务压根就收不到信息,那你一定要看完。 今天其实重点是避坑࿰…...
Java 源码、反码、补码 位运算
文章目录 1. 源码、反码、补码1.1 原码1.2 反码1.3 补码1.4 byte的最大值1.5 byte的最小值 2. 位运算2.1 & 与2.2 | 或2.3 ~ 非2.4 ^ 异或2.5 << 左移 (没有无符号左移)2.6 >> 右移 (有符号右移)2.7 >>>…...
【2024最严苛功能压力测试】:在金融合规文档生成、医疗术语推理、代码安全审计三大高危场景下,Claude与Gemini谁扛住了0误判红线?
更多请点击: https://intelliparadigm.com 第一章:【2024最严苛功能压力测试】:在金融合规文档生成、医疗术语推理、代码安全审计三大高危场景下,Claude与Gemini谁扛住了0误判红线? 测试设计原则 本测试采用“双盲对…...
Swift 项目集成 MJRefresh 终极指南:SPM包管理与桥接文件配置详解
Swift 项目集成 MJRefresh 终极指南:SPM包管理与桥接文件配置详解 【免费下载链接】MJRefresh An easy way to use pull-to-refresh. 项目地址: https://gitcode.com/gh_mirrors/mj/MJRefresh MJRefresh 是一款简单易用的下拉刷新框架,能帮助 Swi…...
【Linux保姆级教程】curl命令最全用法详解
在Linux日常运维、后端开发、接口调试工作中,有一个命令几乎无人不知、无人不用,它就是curl命令。curl被称为网络传输瑞士军刀,无需打开浏览器,纯命令行即可发送网络请求,支持HTTP/HTTPS/FTP等数十种协议。不管是测试接…...
从特斯拉事故看自动驾驶数据存储与系统安全设计
1. 事故背景与NTSB调查报告的核心价值2016年发生的那起特斯拉Model S与白色半挂卡车相撞的致命事故,相信很多关注汽车技术发展的朋友都还记得。当时这起事故引发了业界对自动驾驶辅助系统安全性的第一轮大规模公开讨论。一年多后,美国国家运输安全委员会…...
基于MCP协议构建AI知识库:解决会话失忆,实现知识持久化
1. 项目概述:让AI拥有自己的“亚历山大图书馆”如果你和我一样,长期与Claude Code、Cursor这类AI编程助手打交道,一定会遇到一个核心痛点:会话失忆。每次开启一个新对话,AI助手就像一张白纸,它对你项目的历…...
AI安全自动化测试:FuzzyAI模糊测试框架实战指南
1. 项目概述:当AI安全遇上自动化“模糊测试” 在大型语言模型(LLM)如ChatGPT、Claude、Gemini等日益普及的今天,我们享受其强大能力的同时,也面临着一个严峻的挑战:如何确保它们的安全与可控?你…...
【最新版】Windows 环境OpenClaw 本地 AI 智能体搭建指南
OpenClaw(小龙虾)Windows 一键部署保姆级教程|10 分钟搭建数字员工 在开源 AI 智能体快速普及的当下,OpenClaw(小龙虾)凭借本地运行 零代码操控 自动执行任务的能力,收获大量用户关注&#x…...
手机主板级维修
在智能手机高度普及的今天,一块主板几乎承载了用户所有的数字生活——从个人照片、工作文档到社交聊天记录。当设备遭遇进水、重摔或系统崩溃时,普通软件扫描往往束手无策,而“手机数据恢复”中的主板级维修技术,正成为破解这类“…...
华为会议转任务AI精准识别整理,省事更清晰,轻松搞定工作落地
"找2026华为会议转任务AI的朋友,你要的精准识别整理、落地工作的真实测评来了。不管你是做学术研究要整访谈、转讲座,还是开会长音频要扒任务,我测了大半个月,直接给你掏实底。我接触太多做学术的朋友,都踩过AI转…...
ngx_http_create_request
1 定义 ngx_http_create_request 函数 定义在 ./nginx-1.24.0/src/http/ngx_http_request.cngx_http_request_t * ngx_http_create_request(ngx_connection_t *c) {ngx_http_request_t *r;ngx_http_log_ctx_t *ctx;ngx_http_core_loc_conf_t *clcf;r ngx_http_…...
