当前位置: 首页 > article >正文

多个source、多个sink

关键配置sink的plugin_input [source_data1, source_data2]对应模型┌──────────┐│ Source A │──┐└──────────┘ │├──▶ Sink┌──────────┐ ││ Source B │──┘└──────────┘执行语句# ds-st-demo10-2-mysql2pgsql.confsh /data/tools/seatunnel/seatunnel-2.3.12/bin/seatunnel.sh --config /data/tools/seatunnel/myconf/ds-st-demo10-2-mysql2pgsql.conf -i -DJvmOption-Xms2G -Xmx2G -m local建表-- ds-st-demo10-2-mysql2pgsql.confCREATE TABLE public.t_8_100w_imp_st_ds_demo10 (id BIGINT PRIMARY KEY,user_name VARCHAR(2000),sex VARCHAR(20),decimal_f NUMERIC(32, 6),phone_number VARCHAR(20),age INT,create_time TIMESTAMP,description TEXT,address VARCHAR(2000) DEFAULT 未知,my_status INT);COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.id IS 主键;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.user_name IS 名字;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.sex IS 性别男女;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.decimal_f IS 大数字;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.phone_number IS 电话;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.age IS 字符串年龄转数字;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.create_time IS 新增时间;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.description IS 大文本;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.address IS 空地址转默认值未知;COMMENT ON COLUMN public.t_8_100w_imp_st_ds_demo10.my_status IS 状态;conf配置env {# 任务名字业务中可以弄表idjob.name ds-st-demo10.conf# 最大批线程数并行度线程数parallelism 5# 任务模式BATCH:批处理模式STREAMING:流处理模式job.mode BATCH}source {# 第一个数据集jdbc {# 给这个数据集起个名字plugin_output source_data1url jdbc:mysql://ip:port/cs1driver com.mysql.cj.jdbc.Driveruser rootpassword ***# sqlquery select id,name as user_name,sex,decimal_f,phone_number,CAST(age AS SIGNED) as age,create_time,description,address from t_8_100w where id 10# 并行读取配置# 分片的字段支持String、Number(int, bigint, decimal, ...)、Datepartition_column id# 表的分割大小行数每个分片的数据行默认8096行。最后分片数表的总行数 / split.sizesplit.size 50000# 分片数匹配并行度parallelism2.3.12已不推荐配置了用split.size来代替# partition_num 5# 最大批处理数:查询的行提取大小(指定当前任务每次执行时读取数据条数,该值(默认1000)受运行内存影响,若该值较大或单条数据量较大需适当调整运行内存大小。)fetch_size 10000# 连接参数# 连接超时时间300msconnection_check_timeout_sec 300# 其他jdbc的参数properties {useUnicode truecharacterEncoding utf8# 时区不同数据库参数不一样serverTimezone Asia/Shanghai# 使用游标提高大结果集性能useCursorFetch true# 每次获取行数defaultFetchSize 10000}}# 第二个数据集jdbc {# 给这个数据集起个名字plugin_output source_data2url jdbc:mysql://ip:port/cs1driver com.mysql.cj.jdbc.Driveruser rootpassword ***#query select id,name as user_name,sex,decimal_f,phone_number,CAST(age AS SIGNED) as age,create_time,description,address from t_8_100w where id 10 and id 20# 并行读取配置# 分片的字段支持String、Number(int, bigint, decimal, ...)、Datepartition_column id# 表的分割大小行数每个分片的数据行默认8096行。最后分片数表的总行数 / split.sizesplit.size 50000# 分片数匹配并行度parallelism2.3.12已不推荐配置了用split.size来代替# partition_num 5# 最大批处理数:查询的行提取大小(指定当前任务每次执行时读取数据条数,该值(默认1000)受运行内存影响,若该值较大或单条数据量较大需适当调整运行内存大小。)fetch_size 10000# 连接参数# 连接超时时间300msconnection_check_timeout_sec 300# 其他jdbc的参数properties {useUnicode truecharacterEncoding utf8# 时区不同数据库参数不一样serverTimezone Asia/Shanghai# 使用游标提高大结果集性能useCursorFetch true# 每次获取行数defaultFetchSize 10000}}}# 清洗转换简单的清洗转换直接在source的query的sql中处理了就行transform {# 1. 字段映射sql中做了实际生成中不在这里处理。直接在source的query的sql中处理了就行# 还可以用FieldMapper 插件来映射字段# 转换age为数字类型pgsql必须转# 2. 手机号脱敏13812341234 - 138****1234# 3. 年龄转换字符串转整数实际生产中不用转换也没有内置的转换插件可以直接保存成功# 4. 性别转换1-男2-女# 5. 数据过滤只保留 age 25 的记录。# 6. 地址默认值空地址设为未知}sink {jdbc {# 接收的最终数据集汇聚到一个结果中plugin_input [source_data1, source_data2]url jdbc:postgresql://ip:5432/source_dbdriver org.postgresql.Driveruser postgrespassword 123456## query # 自动生成sql的配置和query参数互斥# 生成自动插入sql。如果目标库没有表也会自动建表generate_sink_sql true# database必须要因为generate_sink_sqltrue。database source_db# 自动生成sql时table必须要。table public.t_8_100w_imp_st_ds_demo10# 生成类似INSERT INTO …… ON CONFLICT (主键) DO UPDATE SET …… 的sql# enable_upsert true# 判断值唯一的健此选项用于支持在自动生成 SQL 时进行 insertdelete 和 update 操作。# primary_keys [id]# 表结构处理策略表不存在时报错任务失败一般用CREATE_SCHEMA_WHEN_NOT_EXIST表不存在时创建表表存在时跳过操作保留数据schema_save_mode ERROR_WHEN_SCHEMA_NOT_EXIST# 插入数据的处理策略# APPEND_DATA保留表结构和数据追加新数据不删除现有数据(一般用这个)# DROP_DATA保留表结构删除表中所有数据清空表——实现清空重灌# CUSTOM_PROCESSING :用户定义处理。需要配合custom_sql使用data_save_mode DROP_DATA# 当 data_save_mode 选择 CUSTOM_PROCESSING 时您应该填写 CUSTOM_SQL 参数。此参数通常填入可执行的 SQL。SQL 将在同步任务之前执行。#可以实现同步删除执行前置update、truncate的sql等#这个sql未执行不知道为啥。#这个sql已经执行。原因因为generate_sink_sqltrue的原因。才会执行custom_sql。只有自动生成sql的时候这个才会执行custom_sql update source_db.public.t_8_100w_imp_st_ds_demo10 set my_status 23# 批量写入条数batch_size 10000# 批次提交间隔batch_interval_ms 500# 重试次数max_retries 3# 连接参数# 连接超时时间300msconnection_check_timeout_sec 300# 其他jdbc的参数properties {# PostgreSQL专用参数# PostgreSQL的批量优化注意大小写reWriteBatchedInserts true# 如果需要时区设置options -c timezoneAsia/Shanghai}}}结果(汇聚了19条数据)2026-01-15 14:28:15,952 INFO [s.c.s.s.c.ClientExecuteCommand] [main] -***********************************************Job Statistic Information***********************************************Start Time : 2026-01-15 14:28:11End Time : 2026-01-15 14:28:15Total Time(s) : 4Total Read Count : 19Total Write Count : 19Total Failed Count : 0***********************************************

相关文章:

多个source、多个sink

关键配置:sink的:plugin_input ["source_data1", "source_data2"]对应模型┌──────────┐│ Source A │──┐└──────────┘ │├──▶ Sink┌──────────┐ ││ Source B │──┘└──────…...

BiliTools:你的跨平台B站资源智能下载助手,轻松保存高清视频与无损音频

BiliTools:你的跨平台B站资源智能下载助手,轻松保存高清视频与无损音频 【免费下载链接】BiliTools A cross-platform bilibili toolbox. 跨平台哔哩哔哩工具箱,支持下载视频、番剧等等各类资源 项目地址: https://gitcode.com/GitHub_Tren…...

python协同过滤算法的基于python二手物品交易网站系统

目录同行可拿货,招校园代理 ,本人源头供货商协同过滤算法在二手物品交易网站中的应用用户行为数据收集基于用户的协同过滤基于物品的协同过滤混合推荐策略冷启动问题处理实时推荐更新推荐结果评估代码实现示例系统功能整合性能优化项目技术支持源码获取详细视频演示 &#xff1…...

实用指南:如何通过Energy Star X轻松提升Windows 11电池续航40%

实用指南:如何通过Energy Star X轻松提升Windows 11电池续航40% 【免费下载链接】EnergyStarX 🔋 Improve your Windows 11 devices battery life. A WinUI 3 GUI for https://github.com/imbushuo/EnergyStar. 项目地址: https://gitcode.com/gh_mirr…...

LibreCAD:完全免费的2D CAD软件终极指南,告别昂贵许可证

LibreCAD:完全免费的2D CAD软件终极指南,告别昂贵许可证 【免费下载链接】LibreCAD LibreCAD is a cross-platform 2D CAD program written in C17. It can read DXF/DWG files and can write DXF/PDF/SVG files. It supports point/line/circle/ellipse…...

Pixel Aurora Engine真实案例:用‘蒸汽朋克猫武士’生成整套游戏美术资源

Pixel Aurora Engine真实案例:用蒸汽朋克猫武士生成整套游戏美术资源 1. 项目背景与工具介绍 Pixel Aurora Engine(像素极光引擎)是一款基于AI扩散模型的高端像素艺术生成工具。它采用复古的8-bit游戏机风格界面,却能产出专业级…...

Qwen3.5-9B惊艳效果:上传物理实验图→识别仪器→生成操作步骤视频脚本

Qwen3.5-9B惊艳效果:上传物理实验图→识别仪器→生成操作步骤视频脚本 1. 模型能力概览 Qwen3.5-9B是一款拥有90亿参数的开源大语言模型,在多模态理解和逻辑推理方面表现出色。这个模型最令人惊艳的能力在于它能够: 准确识别实验仪器&…...

游戏服务器检测扣除消耗防算数溢出的安全判断及解决方法

游戏服务器检测扣除消耗防算数溢出的安全判断及解决方法 数量 > (类型最大值 / 价格) 负数存在风险 价格 > (类型最大值 / 数量) || 价格 < (最小值 / 数量&#xff09; 游戏服务器在处理道具消耗时需防止数值溢出问题。当检测扣除消耗时&#xff0c;应进行双重安全判…...

人工智能|大模型——模型——大模型蒸馏详解(定义/原理/关键技术/落地)

摘要大模型蒸馏&#xff08;Model Distillation&#xff09;&#xff0c;即知识蒸馏&#xff08;Knowledge Distillation&#xff09;&#xff0c;是一种将大型教师模型&#xff08;如BERT、GPT-4o、DeepSeek-R1&#xff09;的“隐含知识”高效迁移至轻量级学生模型&#xff08…...

千问3.5-2B科研助手应用:论文插图内容解析、实验数据图趋势简述生成

千问3.5-2B科研助手应用&#xff1a;论文插图内容解析、实验数据图趋势简述生成 1. 科研场景下的视觉语言模型应用 在科研工作中&#xff0c;论文插图和实验数据图是研究成果展示的重要载体。传统的人工解读和分析过程往往耗时费力&#xff0c;特别是当需要处理大量图表时。千…...

网站 SEO 标题要包含关键词吗

网站 SEO 标题要包含关键词吗&#xff1f;探讨最佳实践和SEO优化策略 在当今互联网时代&#xff0c;网站的SEO优化已经成为提升网站流量和用户体验的重要手段。其中&#xff0c;网站标题的优化也至关重要。网站 SEO 标题要包含关键词吗&#xff1f;这个问题备受争议&#xff0c…...

IO-Link物理层深度解析:编码机制与接口设计实战

1. IO-Link物理层编码机制详解 第一次接触IO-Link的开发者往往会被它的11bit编码规则绕晕。我刚开始调试STM32的IO-Link主站时&#xff0c;就因为在UART配置上少勾选了一个校验位&#xff0c;导致从站设备死活不响应。后来用逻辑分析仪抓包才发现&#xff0c;原来发送的0xF1在…...

开发笔记:VSCode + Qt + clangd 明明能正常运行却满屏红波浪线

目录 开发笔记&#xff1a;VSCode Qt clangd 明明能正常运行却满屏红波浪线 前言 一、问题现象 二、根本原因&#xff1a;两套工具互不沟通 三、完整解决方案 方案 1&#xff1a;配置 .clangd&#xff08;最推荐、最根治&#xff09; 方案 2&#xff1a;自动生成 comp…...

2026年知网AIGC检测卡在20%降不下去怎么办?这3招解决

直接说方案&#xff0c;不绕弯子。知网AIGC检测不通过、降AIGC率、降AI这个问题&#xff0c;核心是找准降不下去的原因&#xff0c;再用对工具。 我花了一个月测出来的结论&#xff1a;用嘎嘎降AI&#xff08;www.aigcleaner.com&#xff09; 全文上传&#xff0c;基本能解决大…...

C++ 控制流完整性(CFI):防御面向返回编程(ROP)攻击的编译器加固方案

各位来宾&#xff0c;各位技术同仁&#xff0c;大家好&#xff01;今天&#xff0c;我们齐聚一堂&#xff0c;探讨一个在现代软件安全领域至关重要的话题&#xff1a;C 控制流完整性&#xff08;CFI&#xff09;及其在防御面向返回编程&#xff08;ROP&#xff09;攻击中的作用…...

请解释 Linux 系统中的内核模块管理,并描述如何加载和卸载模块。

在 Linux 系统中&#xff0c;内核模块&#xff08;Kernel Modules&#xff09; 是可以在不重新编译或重启内核的情况下&#xff0c;动态添加到运行中内核的代码片段。它们通常用于支持新的硬件设备、文件系统或网络协议。 这种机制使得 Linux 内核保持精简&#xff08;核心功能…...

基于S7-200 PLC与组态王技术的温室大棚控制方案:包含梯形图原理图、IO分配及组态画面详解

基于S7-200 PLC和组态王温室大棚控制 我们主要的后发送的产品有&#xff0c;带解释的梯形图接线图原理图图纸&#xff0c;io分配&#xff0c;组态画面菜农张叔上周还给我打电话吐槽&#xff1a;“小王啊&#xff0c;上周那场降温加突然转晴&#xff0c;我三点爬起来盖半层棉被…...

融智学三大基本定律——信息世界的根本法则体系:为跨模态知识处理、人机协同等前沿领域提供原理支撑

融智学三大基本定律——信息世界的根本法则体系摘要&#xff1a;融智学三大基本定律构成信息处理的核心理论体系。第一定律&#xff08;实部序位关系唯一守恒&#xff09;确立本质信息的稳定性&#xff1b;第二定律&#xff08;实部序位同义并列对应转换&#xff09;实现多元表…...

解决Canal 连接数据库超时问题

根本原因&#xff1a;DNS 反向解析导致超时Caused by: java.net.SocketTimeoutException: Timeout occurred, failed to read total 4 bytes in 5000 milliseconds, actual read only 0 bytesat com.alibaba.otter.canal.parse.driver.mysql.socket.BioSocketChannel.read(BioS…...

Ostrakon-VL零售AI降本方案:替代人工巡检,单店年省8万元

Ostrakon-VL零售AI降本方案&#xff1a;替代人工巡检&#xff0c;单店年省8万元 1. 零售巡检的痛点与AI解决方案 在传统零售运营中&#xff0c;门店巡检是一项耗时耗力的日常工作。店长或督导人员需要每天检查&#xff1a; 商品陈列是否整齐货架缺货情况价签是否正确店铺环境…...

SDMatte部署避坑指南:首次加载延迟、模型切换等待、端口冲突解决方案

SDMatte部署避坑指南&#xff1a;首次加载延迟、模型切换等待、端口冲突解决方案 1. 为什么选择SDMatte进行图像抠图 SDMatte是一款专为高质量图像抠图设计的AI模型&#xff0c;特别适合处理那些传统抠图工具难以应对的复杂场景。想象一下&#xff0c;你需要把玻璃杯从背景中…...

AnythingtoRealCharacters2511镜像免配置部署教程:Docker+ComfyUI开箱即用方案

AnythingtoRealCharacters2511镜像免配置部署教程&#xff1a;DockerComfyUI开箱即用方案 想快速将动漫人物变成真实照片&#xff1f;这个教程教你10分钟搞定专业级动漫转真人效果&#xff0c;无需任何技术背景&#xff01; 1. 为什么选择这个镜像&#xff1f; 如果你曾经尝试…...

RAG系统的需求分析

这个是一个基于私有知识库的智能对话平台&#xff0c;允许用户上传文档构建专属知识库&#xff0c;并通过自然语言交互的方式查询和获取知识。它结合了大语言模型和向量检索技术&#xff0c;让用户通过对话的形式与自己的知识库进行高效交互应用场景个人用户场景:学习助手&…...

GLM-4.1V-9B-Base应用场景:建筑图纸关键结构识别与中文描述生成

GLM-4.1V-9B-Base应用场景&#xff1a;建筑图纸关键结构识别与中文描述生成 1. 建筑行业的AI视觉革命 在建筑设计领域&#xff0c;图纸解读一直是项耗时费力的工作。设计师需要花费大量时间分析图纸中的结构细节&#xff0c;撰写技术说明文档。传统的人工识别方式不仅效率低下…...

电子测试岗面试翻车实录:我的硬件知识与英语短板,以及如何逆袭”

一&#xff1a;首先进行英文的自我介绍Hello, my name isxxx .你好&#xff0c;我叫xxx。I’m 20 years old, and I’m currently a third-year student majoring inElectronic Information Engineering at xxxx我今年20岁&#xff0c;目前是xxx电子信息工程专业的大三学生。My…...

设备管理系统是什么?如何建立设备管理体系?

在现代企业的运转中&#xff0c;生产设备无疑是核心资产。无论是制造业的数控机床&#xff0c;还是建筑工地的重型机械&#xff0c;甚至是医疗机构的精密仪器&#xff0c;设备的稳定运行直接决定了企业的生产效率、产品质量和成本控制。然而&#xff0c;许多企业在设备管理上仍…...

OFA-COCO蒸馏版部署教程:Windows WSL2环境下PyTorch服务调试全流程

OFA-COCO蒸馏版部署教程&#xff1a;Windows WSL2环境下PyTorch服务调试全流程 1. 引言&#xff1a;为什么选择OFA图像描述模型&#xff1f; 你有没有遇到过这样的场景&#xff1f;手头有一堆图片&#xff0c;需要为它们配上文字说明&#xff0c;一张张手动写描述&#xff0c…...

【计算机视觉实战】第10章 | 单阶段目标检测YOLO与SSD:实时检测的极致追求

欢迎来到《计算机视觉实战》系列教程的第十章。在第九章我们学习了Faster R-CNN等两阶段检测器&#xff0c;它们精度高但速度慢。本章我们将学习单阶段检测器&#xff08;One-stage Detector&#xff09;&#xff0c;特别是YOLO和SSD&#xff0c;它们在保持可观精度的同时实现了…...

Phi-4-mini-reasoning实战教程:批量处理CSV数学题库生成标准答案

Phi-4-mini-reasoning实战教程&#xff1a;批量处理CSV数学题库生成标准答案 1. 引言 数学老师们经常面临一个共同挑战&#xff1a;批改大量数学作业和试卷需要花费大量时间。传统方法需要逐题检查&#xff0c;效率低下且容易出错。今天&#xff0c;我们将介绍如何利用Phi-4-…...

豪鹏科技2025年财报透视:毛利率提升5.2个百分点,费用管控成效显著

豪鹏科技2025年财报透视&#xff1a;毛利率提升5.2个百分点&#xff0c;费用管控成效显著豪鹏科技2025年业绩表现亮眼&#xff0c;全年实现营业收入57亿元至60亿元&#xff0c;同比增长11.58%至17.45%&#xff1b;归母净利润1.95亿元至2.2亿元&#xff0c;同比大幅增长113.69%至…...