当前位置: 首页 > news >正文

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",3335);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建Flink-MySQL-CDC的SourceTableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +"  id INT primary key," +"  name STRING" +") WITH (" +"  'connector' = 'mysql-cdc'," +"  'hostname' = 'hadoop102'," +"  'port' = '3306'," +"  'username' = 'root'," +"  'password' = 'xxxx'," +"  'database-name' = 'student'," +"  'table-name' = 'table_name'," +"'server-time-zone' = 'Asia/Shanghai'," +"'scan.startup.mode' = 'initial'" +")");// 2. 注册SinkTable: sink_sensor
//        tableEnv.executeSql("" +
//                "CREATE TABLE kafka_binlog ( " +
//                "  user_id INT, " +
//                "  user_name STRING, " +
//                "`proc_time` as PROCTIME()" +
//                ") WITH ( " +
//                "  'connector' = 'kafka', " +
//                "  'topic' = 'test2', " +
//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                "  'format' = 'json' " +
//                ")" +
//                "");//upsert-kafkatableEnv.executeSql("" +"CREATE TABLE kafka_binlog ( " +"  user_id INT, " +"  user_name STRING, " +"`proc_time` as PROCTIME()," +"  PRIMARY KEY (user_id) NOT ENFORCED" +") WITH ( " +"  'connector' = 'upsert-kafka', " +"  'topic' = 'test2', " +"  'properties.bootstrap.servers' = 'hadoop102:9092', " +"  'key.format' = 'json' ," +"  'value.format' = 'json' " +")" +"");// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql("insert into kafka_binlog select * from table_name");tableEnv.executeSql("select * from kafka_binlog").print();env.execute();}}

相关文章:

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表 package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.…...

ios开发 swift5 苹果系统自带的图标 SF Symbols

文章目录 1.官网app的下载和使用2.使用代码 1.官网app的下载和使用 苹果官网网址:SF Symbols 通过上面的网址可以下载dmg, 安装到自己的mac上 貌似下面这样不能展示出动画,还是要使用动画的代码 .bounce.up.byLayer2.使用代码 UIKit UIImage(system…...

Linux内核源码分析 (3)调度器的实现

Linux内核源码分析 (3)调度器的实现 文章目录 Linux内核源码分析 (3)调度器的实现一、概述二、调度器数据结构1、task_struct中与调度有关的的成员2、调度器类3、就绪队列4、调度实体 三、处理优先级1、优先级的内核表示2、计算优先级3、计算负荷权重 四、核心调度器1、周期性调…...

网络安全法+网络安全等级保护

网络安全法 网络安全法21条 网络安全法31条 网络安全等级保护 网络安全等级保护分为几级? 一个中心,三重防护 等级保护2.0网络拓扑图 安全区域边界 安全计算环境 等保安全产品 物理机房安全设计...

持续集成对软件项目管理的作用

l、对项目目标管理的作用 软件项目的目标是开发出可运行的、客户满意的软件系统持续集成有统一的代 码库。要求开发人员定期地、不断地向代码库提交代码。新近提交的代码会经过编 译与测试.与代码库中旧有的代码相整合,形成安全稳定运行的代码库&…...

【Qt QAxObject】使用 QAxObject 高效任意读写 Excel 表

1. 用什么操作 Excel 表 Qt 的官网库中是不包含 Microsoft Excel 的操作库,关于对 Microsoft Excel 的操作库可选的有很多,包含基于 Windows 系统本身的 ActiveX、Qt Xlsx、xlsLib、LibXL、qtXLS、BasicExcel、Number Duck。 库.xls.xlsx读写平台Qt Xls…...

java八股文面试[多线程]——自旋锁

优点: 1. 自旋锁尽可能的减少线程的阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗 ,这些操作会导致线程发生两次上下文切换&…...

分布式系统的多数据库,实现分布式事务回滚(1.7.0 seata整合2.0.4nacos)

正文 1、解决的应用场景是分布式事务,每个服务有独立的数据库。 2、例如:A服务的数据库是A1,B服务的数据库是B2,A服务通过feign接口调用B服务,B涉及提交数据到B2,业务是在B提交数据之后,在A服…...

PDF可以修改内容吗?有什么注意的事项?

PDF是一种跨平台的电子文档格式,可以在各种设备上轻松阅读和共享。许多人喜欢将文档转换为PDF格式以确保格式的一致性和易读性。但是,PDF文件一般被认为是“只读”文件,即无法编辑。那么,PDF文件是否可以修改呢? 答案是…...

自动泊车的自动驾驶控制算法

1. 自动泊车系统 自动泊车系统(AutomatedParkingASSiSt,APA)利用车辆搭载的传感器感知车辆周边环境,扫描满足当前车辆停放的障碍物空间车位或线车位,并通过人机交互(HumanMachine Interface,HMI)获取驾驶员对目标车位的选择或自动确定目标车位,自动规划泊车路径,通过控制器向车…...

Java doc等文件生成PDF、多个PDF合并

之前写过一遍文章是 图片生成PDF。 今天继续来对 doc等文件进行pdf合并以及多个pdf合并为一个pdf。 兄弟们&#xff0c;还是开箱即用。 1、doc生成pdf 依赖 <!-- doc生成pdf --><dependency><groupId>com.aspose</groupId><artifactId>aspose…...

【C++】list类的模拟实现

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;C的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录 前言一、list类的模拟实现1.1 list的…...

机械臂+2d相机实现复合机器人定位抓取

硬件参数 机械臂&#xff1a;艾利特 相机&#xff1a;海康相机 2d识别库&#xff1a;lindmod&#xff0c;github可以搜到 光源&#xff1a;磐鑫光源 软件参数 系统&#xff1a;windows / Linux 开发平台&#xff1a;Qt 开发语言&#xff1a;C 开发视觉库&#xff1a;OpenCV …...

网络编程 http 相关基础概念

文章目录 表单是什么http请求是什么http请求的结构和说明关于http方法 GET和POST区别http常见状态码http响应http 请求是无状态的含义html是什么 &#xff08;前端内容&#xff0c;了解即可&#xff09;html 常见标签 &#xff08;前端内容&#xff0c;了解即可&#xff09;关于…...

LatexEasy公式渲染教程

LatexEasy使用简单的URL渲染公式为图片 https://r.latexeasy.com/image.svg?1-sin^2(x) 使用单个HTML图像标签将公式添加到任何现有网站 <img src"https://r.latexeasy.com/image.svg?1-sin^2(x)" />...

十年测试工程师叙述自动化测试学习思路

自动化测试介绍 自动化测试(Automated Testing)&#xff0c;是指把以人为驱动的测试行为转化为机器执行的过程。实际上自动化测试往往通过一些测试工具或框架&#xff0c;编写自动化测试用例&#xff0c;来模拟手工测试过程。比如说&#xff0c;在项目迭代过程中&#xff0c;持…...

SpringAOP详解(下)

proxyFactory代理对象创建方式和代理对象调用方法过程&#xff1a; springaop创建动态代理对象和代理对象调用方法过程&#xff1a; 一、TargetSource的使用 Lazy注解&#xff0c;当加在属性上时&#xff0c;会产生一个代理对象赋值给这个属性&#xff0c;产生代理对象的代码为…...

主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582)

主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582) 漏洞描述影响版本安全版本如何修复可供参考的资料主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582) CVE编号 : CVE-2023-37582 利用情况 : EXP 已公开 …...

Element table根据字段合并表格(可多字段合并),附带拖拽列动态合并

效果如图&#xff0c;姓名 数值1 字段进行自动合并 封装合并列js - tableMerge.js // 获取列合并的行数 // params // tableData: 表格数据 // mergeId: 合并的列的字段名 export const tagRowSpan (tableData, mergeId) >{const tagArr [];let pos 0;tableData.map((i…...

C++标准库STL容器详解

目录 C标准模板库STL容器容器分类容器通用接口 顺序容器vectorlistdeque 容器适配器queuestackpriority_queue 关联容器&#xff1a;红黑树setmultisetmapmultimap 关联容器&#xff1a;哈希表unordered_set和unordered_multisetunordered_map和unordered_multimap 附1&#xf…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

机器学习的数学基础:线性模型

线性模型 线性模型的基本形式为&#xff1a; f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法&#xff0c;得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...

怎么开发一个网络协议模块(C语言框架)之(六) ——通用对象池总结(核心)

+---------------------------+ | operEntryTbl[] | ← 操作对象池 (对象数组) +---------------------------+ | 0 | 1 | 2 | ... | N-1 | +---------------------------+↓ 初始化时全部加入 +------------------------+ +-------------------------+ | …...

rm视觉学习1-自瞄部分

首先先感谢中南大学的开源&#xff0c;提供了很全面的思路&#xff0c;减少了很多基础性的开发研究 我看的阅读的是中南大学FYT战队开源视觉代码 链接&#xff1a;https://github.com/CSU-FYT-Vision/FYT2024_vision.git 1.框架&#xff1a; 代码框架结构&#xff1a;readme有…...

Linux-进程间的通信

1、IPC&#xff1a; Inter Process Communication&#xff08;进程间通信&#xff09;&#xff1a; 由于每个进程在操作系统中有独立的地址空间&#xff0c;它们不能像线程那样直接访问彼此的内存&#xff0c;所以必须通过某种方式进行通信。 常见的 IPC 方式包括&#…...

【java】【服务器】线程上下文丢失 是指什么

目录 ■前言 ■正文开始 线程上下文的核心组成部分 为什么会出现上下文丢失&#xff1f; 直观示例说明 为什么上下文如此重要&#xff1f; 解决上下文丢失的关键 总结 ■如果我想在servlet中使用线程&#xff0c;代码应该如何实现 推荐方案&#xff1a;使用 ManagedE…...

「Java基本语法」变量的使用

变量定义 变量是程序中存储数据的容器&#xff0c;用于保存可变的数据值。在Java中&#xff0c;变量必须先声明后使用&#xff0c;声明时需指定变量的数据类型和变量名。 语法 数据类型 变量名 [ 初始值]; 示例&#xff1a;声明与初始化 public class VariableDemo {publi…...

el-amap-bezier-curve运用及线弧度设置

文章目录 简介示例线弧度属性主要弧度相关属性其他相关样式属性完整示例链接简介 ‌el-amap-bezier-curve 是 Vue-Amap 组件库中的一个组件,用于在 高德地图 上绘制贝塞尔曲线。‌ 基本用法属性path定义曲线的路径,可以是多个弧线段的组合。stroke-weight线条的宽度。stroke…...