使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表
使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表
package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//
// Configuration conf = new Configuration();
// conf.setInteger("rest.port",3335);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建Flink-MySQL-CDC的SourceTableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +" id INT primary key," +" name STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = 'xxxx'," +" 'database-name' = 'student'," +" 'table-name' = 'table_name'," +"'server-time-zone' = 'Asia/Shanghai'," +"'scan.startup.mode' = 'initial'" +")");// 2. 注册SinkTable: sink_sensor
// tableEnv.executeSql("" +
// "CREATE TABLE kafka_binlog ( " +
// " user_id INT, " +
// " user_name STRING, " +
// "`proc_time` as PROCTIME()" +
// ") WITH ( " +
// " 'connector' = 'kafka', " +
// " 'topic' = 'test2', " +
// " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
// " 'format' = 'json' " +
// ")" +
// "");//upsert-kafkatableEnv.executeSql("" +"CREATE TABLE kafka_binlog ( " +" user_id INT, " +" user_name STRING, " +"`proc_time` as PROCTIME()," +" PRIMARY KEY (user_id) NOT ENFORCED" +") WITH ( " +" 'connector' = 'upsert-kafka', " +" 'topic' = 'test2', " +" 'properties.bootstrap.servers' = 'hadoop102:9092', " +" 'key.format' = 'json' ," +" 'value.format' = 'json' " +")" +"");// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql("insert into kafka_binlog select * from table_name");tableEnv.executeSql("select * from kafka_binlog").print();env.execute();}}
相关文章:
使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表
使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表 package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.…...

ios开发 swift5 苹果系统自带的图标 SF Symbols
文章目录 1.官网app的下载和使用2.使用代码 1.官网app的下载和使用 苹果官网网址:SF Symbols 通过上面的网址可以下载dmg, 安装到自己的mac上 貌似下面这样不能展示出动画,还是要使用动画的代码 .bounce.up.byLayer2.使用代码 UIKit UIImage(system…...

Linux内核源码分析 (3)调度器的实现
Linux内核源码分析 (3)调度器的实现 文章目录 Linux内核源码分析 (3)调度器的实现一、概述二、调度器数据结构1、task_struct中与调度有关的的成员2、调度器类3、就绪队列4、调度实体 三、处理优先级1、优先级的内核表示2、计算优先级3、计算负荷权重 四、核心调度器1、周期性调…...

网络安全法+网络安全等级保护
网络安全法 网络安全法21条 网络安全法31条 网络安全等级保护 网络安全等级保护分为几级? 一个中心,三重防护 等级保护2.0网络拓扑图 安全区域边界 安全计算环境 等保安全产品 物理机房安全设计...
持续集成对软件项目管理的作用
l、对项目目标管理的作用 软件项目的目标是开发出可运行的、客户满意的软件系统持续集成有统一的代 码库。要求开发人员定期地、不断地向代码库提交代码。新近提交的代码会经过编 译与测试.与代码库中旧有的代码相整合,形成安全稳定运行的代码库&…...

【Qt QAxObject】使用 QAxObject 高效任意读写 Excel 表
1. 用什么操作 Excel 表 Qt 的官网库中是不包含 Microsoft Excel 的操作库,关于对 Microsoft Excel 的操作库可选的有很多,包含基于 Windows 系统本身的 ActiveX、Qt Xlsx、xlsLib、LibXL、qtXLS、BasicExcel、Number Duck。 库.xls.xlsx读写平台Qt Xls…...

java八股文面试[多线程]——自旋锁
优点: 1. 自旋锁尽可能的减少线程的阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗 ,这些操作会导致线程发生两次上下文切换&…...

分布式系统的多数据库,实现分布式事务回滚(1.7.0 seata整合2.0.4nacos)
正文 1、解决的应用场景是分布式事务,每个服务有独立的数据库。 2、例如:A服务的数据库是A1,B服务的数据库是B2,A服务通过feign接口调用B服务,B涉及提交数据到B2,业务是在B提交数据之后,在A服…...

PDF可以修改内容吗?有什么注意的事项?
PDF是一种跨平台的电子文档格式,可以在各种设备上轻松阅读和共享。许多人喜欢将文档转换为PDF格式以确保格式的一致性和易读性。但是,PDF文件一般被认为是“只读”文件,即无法编辑。那么,PDF文件是否可以修改呢? 答案是…...

自动泊车的自动驾驶控制算法
1. 自动泊车系统 自动泊车系统(AutomatedParkingASSiSt,APA)利用车辆搭载的传感器感知车辆周边环境,扫描满足当前车辆停放的障碍物空间车位或线车位,并通过人机交互(HumanMachine Interface,HMI)获取驾驶员对目标车位的选择或自动确定目标车位,自动规划泊车路径,通过控制器向车…...
Java doc等文件生成PDF、多个PDF合并
之前写过一遍文章是 图片生成PDF。 今天继续来对 doc等文件进行pdf合并以及多个pdf合并为一个pdf。 兄弟们,还是开箱即用。 1、doc生成pdf 依赖 <!-- doc生成pdf --><dependency><groupId>com.aspose</groupId><artifactId>aspose…...

【C++】list类的模拟实现
🏖️作者:malloc不出对象 ⛺专栏:C的学习之路 👦个人简介:一名双非本科院校大二在读的科班编程菜鸟,努力编程只为赶上各位大佬的步伐🙈🙈 目录 前言一、list类的模拟实现1.1 list的…...

机械臂+2d相机实现复合机器人定位抓取
硬件参数 机械臂:艾利特 相机:海康相机 2d识别库:lindmod,github可以搜到 光源:磐鑫光源 软件参数 系统:windows / Linux 开发平台:Qt 开发语言:C 开发视觉库:OpenCV …...

网络编程 http 相关基础概念
文章目录 表单是什么http请求是什么http请求的结构和说明关于http方法 GET和POST区别http常见状态码http响应http 请求是无状态的含义html是什么 (前端内容,了解即可)html 常见标签 (前端内容,了解即可)关于…...
LatexEasy公式渲染教程
LatexEasy使用简单的URL渲染公式为图片 https://r.latexeasy.com/image.svg?1-sin^2(x) 使用单个HTML图像标签将公式添加到任何现有网站 <img src"https://r.latexeasy.com/image.svg?1-sin^2(x)" />...

十年测试工程师叙述自动化测试学习思路
自动化测试介绍 自动化测试(Automated Testing),是指把以人为驱动的测试行为转化为机器执行的过程。实际上自动化测试往往通过一些测试工具或框架,编写自动化测试用例,来模拟手工测试过程。比如说,在项目迭代过程中,持…...

SpringAOP详解(下)
proxyFactory代理对象创建方式和代理对象调用方法过程: springaop创建动态代理对象和代理对象调用方法过程: 一、TargetSource的使用 Lazy注解,当加在属性上时,会产生一个代理对象赋值给这个属性,产生代理对象的代码为…...
主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582)
主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582) 漏洞描述影响版本安全版本如何修复可供参考的资料主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582) CVE编号 : CVE-2023-37582 利用情况 : EXP 已公开 …...

Element table根据字段合并表格(可多字段合并),附带拖拽列动态合并
效果如图,姓名 数值1 字段进行自动合并 封装合并列js - tableMerge.js // 获取列合并的行数 // params // tableData: 表格数据 // mergeId: 合并的列的字段名 export const tagRowSpan (tableData, mergeId) >{const tagArr [];let pos 0;tableData.map((i…...

C++标准库STL容器详解
目录 C标准模板库STL容器容器分类容器通用接口 顺序容器vectorlistdeque 容器适配器queuestackpriority_queue 关联容器:红黑树setmultisetmapmultimap 关联容器:哈希表unordered_set和unordered_multisetunordered_map和unordered_multimap 附1…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...

DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...

多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...

vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...

ABAP设计模式之---“简单设计原则(Simple Design)”
“Simple Design”(简单设计)是软件开发中的一个重要理念,倡导以最简单的方式实现软件功能,以确保代码清晰易懂、易维护,并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计,遵循“让事情保…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...