Streamsets-JDBC模式使用更新时间字段数据同步
StreamSets的开源地址:https://github.com/streamsets/datacollector-oss
Streamsets官网地址:https://streamsets.com/
Streamsets文档地址:https://docs.streamsets.com/portal/datacollector/3.16.x/help/index.html
我又来写Streamsets了,各种原因好久没研究Cassandra了。
本次分享主要介绍Streamsets的JDBC模式、为什么使用时间字段同步数据、遇到的问题和解决方案。
解决方案并不是最完美但也是基于当前条件下最优解,如有疑问,欢迎热烈讨论。
提供的脚本毫无保留,可直接使用。
Streamsets在3.22.2之后就闭源了,更高阶的特性已包装为平台产品。
结合周边讨论和网上的资料来看,Streamsets的活跃度不高,在网上搜的资料太少啦,又随着项目闭源,活跃度更低了,归其原因我分析Streamsets是一个大而全的数据同步工具,整合了市面上基本所有的数据源,但是每个公司不可能用到里面所有的数据源,真正能用到大部分数据源的公司,规模肯定大到不会依赖这种外部的工具,自己手写同步的自由度和效率要更好。
Streamsets对于我们的优势在于开箱即用,相比于手搓代码来实现业务细节,Streamsets将数据同步的每个阶段独立开来,将业务变动最大的数据清洗部分以处理器的形式开放出来,数据的转换和转换的实时配置并生效,直观的监控指标。
版本为Streamsets的3.16.0的离线版本,部署到内网时的最新版本为3.16.0,所以方案和问题的解决方案均以3.16.0为基础。
JDBC模式介绍:
JDBC模式的增量模式只支持新增的数据和不需要修改的数据,且官方建议的offsetColumn为PrimaryKey,如:ID。
Incremental mode
When the JDBC Query Consumer performs an incremental query, it uses the initial offset as the offset value in the first SQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
When the origin performs a subsequent query, it returns data based on the last-saved offset. You can reset the origin to use the initial offset value.
Use incremental mode for append-only tables or when you do not need to capture changes to older rows. By default, JDBC Query Consumer uses incremental mode.
SELECT * FROM <table_name> WHERE <primaryKey> > ${OFFSET} ORDER BY <primaryKey>
这样支持的场景为不断的增量数据,无法捕获数据的更新。
但是正常的业务系统一般不存在只新增不更新的场景。
全量同步模式每次加载所有的数据,当表的数据量较大时,同步所需的时间和延迟不能接受。
修改为通过update_time来捕获数据变化:
SELECT * FROM user WHERE update_time > ${OFFSET} ORDER BY update_time
在配置管道时将OffsetColumn指定为update_time,业务系统使用mybatis-plus在数据新增和更新时补充创建时间和更新时间。数据库的时间精度为秒。
使用update_time的好处是对于开发者和运维人员可读性更好,在进行历史数据的同步和数据对接时更方便。
该方案看似非常合理,业务侧只要控制好update_time的逻辑,每次数据变化时update_time是不断滚动向前的,滚动查询不断的进行数据同步。
但是too young too simple。
按照Streamsets的处理逻辑,在两种场景下会丢数据。
分别是当单次同步的数据量超过maxBatchSize时,概率性丢数据和并发写入数据库时概率性丢数据。
这两种丢数据的场景是不可控的,时间不可控,完全看运气。但是不确定往往是最可怕的。
为什么会丢数据?
第一种场景:单次同步的数据量超过maxBatchSize
Offset的更新逻辑和jdbc-protolib源码中的逻辑:
origin会当根据sql查询的数据读取不超过配置的maxBatchSize的数量,并将最新的update_time赋值给offset。
// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {// ...try (Connection connection = dataSource.getConnection()) {if (null == resultSet || resultSet.isClosed()) {// 执行查询sql语句resultSet = statement.executeQuery(preparedQuery);}// 超过maxBatchSize的数据不发送到下一阶段,留到下次操作时处理。while (continueReading(rowCount, batchSize) && (haveNext = resultSet.next())) {final Record record = processRow(resultSet, rowCount);if (null != record) {// 记录下数据batchMaker.addRecord(record);}// 更新offsetif (isIncrementalMode) {nextSourceOffset = resultSet.getString(offsetColumn);} else {nextSourceOffset = initialOffset;}// 后续收尾工作}}return nextSourceOffset;}
结合Streamsets的Offset的更新逻辑和jdbc-protolib源码中的逻辑,当一秒内出现多条数据时,会因为精度问题导致数据丢失。
第二种场景:数据并发写入数据库时。
业务侧代码使用mybatis-plus作为ORM来处理数据的读写,当有大数据量写入数据时,如:Excel导入或高并发的数据写入。
mybatis-plus的内置处理逻辑为分批次提交,每次提交1000,所以单个线程写入的qps为1000。
以Excel导入为例,如果批量保存方法没有加@Transaction注解,会大大增加数据丢失的概率。
原因为结合mybatis的处理+没加@Transaction注解导致1000个insert语句一次性发给数据库,这1000条sql语句是以非事务的方式执行,每条数据都是一个完整的事务,执行完毕自动提交,立即可见。
这时当Streamsets触发查询操作时,时机恰好出现在一秒内的前半段,而一秒内的后半段还在数据写入,导致后半段的数据丢失。

解决方案:
如果你拿到的是Streamsets的安装包,那第一种场景无法通过配置和升级的方式解决,因为使用的方式和增量模式的设计初衷不符。
有一份折中方案,但不保熟:
1.能力范围内update_time的精度越细越好,越细会有一定的性能损耗,但丢数据的概率大大降低。
2.评估每次同步的数据量大小,maxBatchSize的大小要大于单次同步的数据量。注意内存大小,小心OOM,(插一句:oracle的批量更新会存在连接泄露,需注意。如果有源码顺手改之。)
可以下载一份Streamsets的源码,改之。
代码如下:
// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {// ...try (Connection connection = dataSource.getConnection()) {if (null == resultSet || resultSet.isClosed()) {// 执行查询sql语句resultSet = statement.executeQuery(preparedQuery);}while ((haveNext = resultSet.next())) {if(continueReading(rowCount, batchSize)){final Record record = processRow(resultSet, rowCount);if (null != record) {// 记录下数据batchMaker.addRecord(record);}// 更新offsetif (isIncrementalMode) {nextSourceOffset = resultSet.getString(offsetColumn);} else {nextSourceOffset = initialOffset;}} else {// 当超过maxBatchSize时,继续查找最后一秒的数据。if(!nextSourceOffset.equals(initialOffset) && nextSourceOffset.equals(resultSet.getString(offsetColumn))){if(null != record) batchMaker.addRecord(record);}// 后续收尾工作}}return nextSourceOffset;}
第二种场景出现的原因是在同一秒内同时出现写入和查询操作,查询时无法取出应取出的数据。
解决的思路为错峰,通过配置手段将查询动作和写入动作错开。
// oracle
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < SYSDATE - INTERVAL '1' SECOND order by update_time;
// mysql
select * from user where update_time < '${offset}' and update_time < DATE_SUB(now(), INTERVAL 1 SECOND) order by update_time;
// dm
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < CURRENT_TIMESTAMP- INTERVAL '1' SECOND order by update_time;
// kingbase
select * from user where update_time < '${offset}' and update_time < current_timestamp - INTERVAL '1' SECOND order by update_time;
需要特别注意:因为数据库中存储的时间有可能为业务服务的时间,要保证数据库和业务服务的时区和时间要保持一致。
通道示意图:
新版Streamsets的布局,我的不长这样。
源:无特殊配置
Jython处理器:根据源传过来的数据查询目标表,对数据进行标记。
流选择器:根据数据的标记分发数据,标记为insert的走新增通道,标记为update的走修改通道。
目标:一个配置为INSERT,另一个配置为UPDATE。
Jython脚本:
import java.sql.DriverManager as DriverManager
import java.lang.Class as Class
import timeurl = "jdbc:mysql://localhost:3306/db?autoReconnect=true&useSSL=false&characterEncoding=utf8"
Class.forName("com.mysql.jdbc.Driver")
username = "root"
password = "passwd"
batch_size = 1000primary_key = "id"
table_name = "t_target"
ids = []
db_ids = set()
records = sdc.records
conn = None
stmt = None
rs = None
if len(records) != 0:try:conn = DriverManager.getConnection(url,username,password)if conn is not None:stmt = conn.createStatement()start_time = time.time()for record in records:id = record.value[primary_key]ids.append(id)num_batches = len(ids) // batch_size + (1 if len(ids) % batch_size != 0 else 0)for i in range(num_batches):start_index = i * batch_sizeend_index = min((i+1) * batch_size,len(ids))batch_ids = ids[start_index,end_index]sql = "select ' + primary_key + ' from " + table_name + " where '+ primary_key +' in ('"for j,id in enumerate(batch_ids):if j != 0:sql += "','"sql += str(id)sql += "')"rs = stmt.executeQuery(sql)while rs.next():id = rs.getString(primary_key)db_ids.add(id)end_time = time.time()sdc.log.info('from '+ table_name + 'query:' + str(len(ids)) + 'rows cost:'+str(end_time - start_time) + 's')for record in records:id = record.value[primary_key]if id in db_ids:record.value['insert_or_update'] = 'update'else:record.value['insert_or_update'] = 'insert'sdc.output.write(record)except Exception as e:raise RuntimeError(e)finally:if rs:rs.close()if stmt:stmt.close()if conn:conn.close()
else:sdc.log.trace('no more data')
结语:
截止到此,也算一套完整的解决方案。拷贝之后可直接食用。
后面有时间会分享一些定位时发现的问题和小技巧。
- 国产化数据库达梦和人大金仓的适配。
- 国产化服务器加密环境的打包和部署方案。
- 为Streamsets减负,轻量化安装包。
- JDBC模式的性能优化小技巧。
- 穿插一些Streamsets组件的实现原理。
- Streamsets CDC模式的配置。
- 手写一份Streamsets的Stage,用以支撑国产化的需求
相关文章:
Streamsets-JDBC模式使用更新时间字段数据同步
StreamSets的开源地址:https://github.com/streamsets/datacollector-oss Streamsets官网地址:https://streamsets.com/ Streamsets文档地址:https://docs.streamsets.com/portal/datacollector/3.16.x/help/index.html 我又来写Streamsets了…...
Nodejs-- 网络编程
网络编程 构建tcp服务 TCP tcp全名为传输控制协议。再osi模型中属于传输层协议。 tcp是面向连接的协议,在传输之前需要形成三次握手形成会话 只有会话形成了,服务端和客户端才能想发送数据,在创建会话的过程中,服务端和客户…...
React@16.x(14)context 举例 - Form 表单
目录 1,目标2,实现2.1,index.js2.2,context.js2.2,Form.Input2.3,Form.Button 3,使用 1,目标 上篇文章说到,context 上下文一般用于第3方组件库,因为使用场景…...
十几款基于ChatGPT的免费神器,每个都是王炸!
十几款基于ChatGPT的免费神器,每个都是王炸! 1、ChatGPT ChatGPT非常强大,但注册需要魔法和国外的手机号,大部分人都没法使用。还好有一些基于API开发的体验版,我收集了一些可以直接使用的站点分享给大家,…...
devicemotion 或者 deviceorientation在window.addEventListener 事件中不生效,没有输出内容
问题:devicemotion 或者 deviceorientation 在window.addEventListener 事件中不生效,没有输出内容 原因: 1、必须在Https协议下才可使用 2、必须用户手动点击click事件中调用 ,进行权限申请 源码: <!DOCTYPE h…...
java单元测试如何断言异常
在junit单元测试中,我们可以使用 org.junit.Assert.assertThrows 包下的 assertThrows() 方法 这个方法返回了一个泛型的异常 public static <T extends Throwable> assertThrows(Class<T> expectedType, Executable executable) 假设我们有以下…...
C语言| n的阶乘相加
逻辑性较强,建议记住。 分析思路: 假如n4:m m * i; sum sum m; 1)当i1时,m1, sum1。 2)当i2时,m12, sum112。 3)当i3时,m123, sum112123。 4)当i4时&…...
cwiseMax、cwiseMin函数
一、cwiseMax含义 cwiseMax是Eigen库中的一个函数,用于求两个矩阵或向量的逐元素最大值。它的作用类似于std::max函数,但是可以同时处理多个元素,且支持矩阵和向量。 举例: 例如,对于两个向量a和b,cwiseMax…...
【thinkphp问题栏】tp5.1重写URL,取消路径上的index.php
在Apache运行thinkphp5.1时,发现系统默认生成的.htaccess不生效。 首先先查看怎么修改伪静态 1、修改Apache的配置文件 在Apache的安装目录下,打开config/httpd.conf。 搜索rewrite.so,将前面的#删掉,表示开启URL重写功能 2、…...
缓冲字符流
BufferedReader/BufferedWriter增加了缓存机制,大大提高了读写文本文件的效率。 字符输入缓冲流 BufferedReader是针对字符输入流的缓冲流对象,提供了更方便的按行读取的方法:readLine();在使用字符流读取文本文件时,我们可以使…...
Django中使用Celery和APScheduler实现定时任务
在之前的文章我们已经学习了Celery和APScheduler的基本使用,下面让我们来了解一下如何在Django中使用Celery和APScheduler Celery 1.前提工作 python 3.7 pip install celery pip install eventlet #5.0版本以下 pip install importlib-metadata4.8.3(…...
Kivy.uix.textinput
一个小小的输入框,纵上下数页文档已不能全不概括,当去源码慢慢寻找,才知道其中作用,才能运用灵活。 Text Input — Kivy 2.3.0 documentation # -*- encoding: utf-8 -*-Text Input .. versionadded:: 1.0.4.. image:: images/te…...
基于IoTDB 平台的学习和研究
Apache IoTDB(物联网数据库)是一个针对物联网领域的高性能原生数据库,适用于数据管理和分析,并可在边缘计算和云端部署。由于它轻量级的架构、高性能和丰富的功能集,以及与Apache Hadoop、Spark和Flink的深度集成&…...
nessus plugins目录为空的问题
想要避免这种问题,可以将nessus服务设置为手动,并且先停止nessus服务。 批处理脚本: 下面的/~/Nessus/plugin_feed_info.inc替换成你配置好的 plugin_feed_info.inc 所在的路径 service nessusd stop; cp /~/Nessus/plugin_feed_info.inc …...
FDW(Foreign Data Wrapper)
在上一篇博客里,最末尾提到了 FDW。pg 实现了数百个 fdw 插件,用于访问外部数据。 FDW 到底是什么呢? 标准 FDW(Foreign Data Wrapper)遵循了 SQL/MED 标准,标准全称:ISO/IEC 9075-9 Managem…...
Flutter开发指南
Flutter开发指南(Android 开发角度) 与Android 的对比 1.Android 的View 与Flutter 的对应关系: a.在android 中,view 是屏幕显示的基础,比如 button,文本,列表,输入框都是 view。…...
SpringCloud学习笔记万字整理(无广版在博客)
在此感谢黑马程序员的SpringCloud课程 所有笔记、生活分享首发于个人博客 想要获得最佳的阅读体验(无广告且清爽),请访问本篇笔记 认识微服务 随着互联网行业的发展,对服务的要求也越来越高,服务架构也从单体架构逐渐…...
c++(七)
c(七) 内联函数内联函数的特点为什么要有内联函数内联函数是如何工作的呢 类型转换异常处理智能指针单例模式懒汉模式饿汉模式 VS中数据库的相关配置 内联函数 修饰类的成员函数,关键字:inline inline 返回值类型 函数名(参数列…...
SQL语言
SQL语言 导航 文章目录 SQL语言导航一、SQL概述SQL 二、数据库定义SQL 数据类型 三、数据操作视图更新 四、SQL的授权五、存储过程六、嵌入式SQL主语言与数据库通信 七、动态SQL 一、SQL概述 SQL 支持三级模式结构 视图->外模式 基本表->模式 存储文件->内模式 二…...
【PPT】修改新建文本框默认字体
【PPT】修改新建文本框默认字体...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
React第五十七节 Router中RouterProvider使用详解及注意事项
前言 在 React Router v6.4 中,RouterProvider 是一个核心组件,用于提供基于数据路由(data routers)的新型路由方案。 它替代了传统的 <BrowserRouter>,支持更强大的数据加载和操作功能(如 loader 和…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...
