当前位置: 首页 > news >正文

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",3335);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建Flink-MySQL-CDC的SourceTableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +"  id INT primary key," +"  name STRING" +") WITH (" +"  'connector' = 'mysql-cdc'," +"  'hostname' = 'hadoop102'," +"  'port' = '3306'," +"  'username' = 'root'," +"  'password' = 'xxxx'," +"  'database-name' = 'student'," +"  'table-name' = 'table_name'," +"'server-time-zone' = 'Asia/Shanghai'," +"'scan.startup.mode' = 'initial'" +")");// 2. 注册SinkTable: sink_sensor
//        tableEnv.executeSql("" +
//                "CREATE TABLE kafka_binlog ( " +
//                "  user_id INT, " +
//                "  user_name STRING, " +
//                "`proc_time` as PROCTIME()" +
//                ") WITH ( " +
//                "  'connector' = 'kafka', " +
//                "  'topic' = 'test2', " +
//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                "  'format' = 'json' " +
//                ")" +
//                "");//upsert-kafkatableEnv.executeSql("" +"CREATE TABLE kafka_binlog ( " +"  user_id INT, " +"  user_name STRING, " +"`proc_time` as PROCTIME()," +"  PRIMARY KEY (user_id) NOT ENFORCED" +") WITH ( " +"  'connector' = 'upsert-kafka', " +"  'topic' = 'test2', " +"  'properties.bootstrap.servers' = 'hadoop102:9092', " +"  'key.format' = 'json' ," +"  'value.format' = 'json' " +")" +"");// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql("insert into kafka_binlog select * from table_name");tableEnv.executeSql("select * from kafka_binlog").print();env.execute();}}

相关文章:

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表 package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.…...

ios开发 swift5 苹果系统自带的图标 SF Symbols

文章目录 1.官网app的下载和使用2.使用代码 1.官网app的下载和使用 苹果官网网址:SF Symbols 通过上面的网址可以下载dmg, 安装到自己的mac上 貌似下面这样不能展示出动画,还是要使用动画的代码 .bounce.up.byLayer2.使用代码 UIKit UIImage(system…...

Linux内核源码分析 (3)调度器的实现

Linux内核源码分析 (3)调度器的实现 文章目录 Linux内核源码分析 (3)调度器的实现一、概述二、调度器数据结构1、task_struct中与调度有关的的成员2、调度器类3、就绪队列4、调度实体 三、处理优先级1、优先级的内核表示2、计算优先级3、计算负荷权重 四、核心调度器1、周期性调…...

网络安全法+网络安全等级保护

网络安全法 网络安全法21条 网络安全法31条 网络安全等级保护 网络安全等级保护分为几级? 一个中心,三重防护 等级保护2.0网络拓扑图 安全区域边界 安全计算环境 等保安全产品 物理机房安全设计...

持续集成对软件项目管理的作用

l、对项目目标管理的作用 软件项目的目标是开发出可运行的、客户满意的软件系统持续集成有统一的代 码库。要求开发人员定期地、不断地向代码库提交代码。新近提交的代码会经过编 译与测试.与代码库中旧有的代码相整合,形成安全稳定运行的代码库&…...

【Qt QAxObject】使用 QAxObject 高效任意读写 Excel 表

1. 用什么操作 Excel 表 Qt 的官网库中是不包含 Microsoft Excel 的操作库,关于对 Microsoft Excel 的操作库可选的有很多,包含基于 Windows 系统本身的 ActiveX、Qt Xlsx、xlsLib、LibXL、qtXLS、BasicExcel、Number Duck。 库.xls.xlsx读写平台Qt Xls…...

java八股文面试[多线程]——自旋锁

优点: 1. 自旋锁尽可能的减少线程的阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗 ,这些操作会导致线程发生两次上下文切换&…...

分布式系统的多数据库,实现分布式事务回滚(1.7.0 seata整合2.0.4nacos)

正文 1、解决的应用场景是分布式事务,每个服务有独立的数据库。 2、例如:A服务的数据库是A1,B服务的数据库是B2,A服务通过feign接口调用B服务,B涉及提交数据到B2,业务是在B提交数据之后,在A服…...

PDF可以修改内容吗?有什么注意的事项?

PDF是一种跨平台的电子文档格式,可以在各种设备上轻松阅读和共享。许多人喜欢将文档转换为PDF格式以确保格式的一致性和易读性。但是,PDF文件一般被认为是“只读”文件,即无法编辑。那么,PDF文件是否可以修改呢? 答案是…...

自动泊车的自动驾驶控制算法

1. 自动泊车系统 自动泊车系统(AutomatedParkingASSiSt,APA)利用车辆搭载的传感器感知车辆周边环境,扫描满足当前车辆停放的障碍物空间车位或线车位,并通过人机交互(HumanMachine Interface,HMI)获取驾驶员对目标车位的选择或自动确定目标车位,自动规划泊车路径,通过控制器向车…...

Java doc等文件生成PDF、多个PDF合并

之前写过一遍文章是 图片生成PDF。 今天继续来对 doc等文件进行pdf合并以及多个pdf合并为一个pdf。 兄弟们&#xff0c;还是开箱即用。 1、doc生成pdf 依赖 <!-- doc生成pdf --><dependency><groupId>com.aspose</groupId><artifactId>aspose…...

【C++】list类的模拟实现

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;C的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录 前言一、list类的模拟实现1.1 list的…...

机械臂+2d相机实现复合机器人定位抓取

硬件参数 机械臂&#xff1a;艾利特 相机&#xff1a;海康相机 2d识别库&#xff1a;lindmod&#xff0c;github可以搜到 光源&#xff1a;磐鑫光源 软件参数 系统&#xff1a;windows / Linux 开发平台&#xff1a;Qt 开发语言&#xff1a;C 开发视觉库&#xff1a;OpenCV …...

网络编程 http 相关基础概念

文章目录 表单是什么http请求是什么http请求的结构和说明关于http方法 GET和POST区别http常见状态码http响应http 请求是无状态的含义html是什么 &#xff08;前端内容&#xff0c;了解即可&#xff09;html 常见标签 &#xff08;前端内容&#xff0c;了解即可&#xff09;关于…...

LatexEasy公式渲染教程

LatexEasy使用简单的URL渲染公式为图片 https://r.latexeasy.com/image.svg?1-sin^2(x) 使用单个HTML图像标签将公式添加到任何现有网站 <img src"https://r.latexeasy.com/image.svg?1-sin^2(x)" />...

十年测试工程师叙述自动化测试学习思路

自动化测试介绍 自动化测试(Automated Testing)&#xff0c;是指把以人为驱动的测试行为转化为机器执行的过程。实际上自动化测试往往通过一些测试工具或框架&#xff0c;编写自动化测试用例&#xff0c;来模拟手工测试过程。比如说&#xff0c;在项目迭代过程中&#xff0c;持…...

SpringAOP详解(下)

proxyFactory代理对象创建方式和代理对象调用方法过程&#xff1a; springaop创建动态代理对象和代理对象调用方法过程&#xff1a; 一、TargetSource的使用 Lazy注解&#xff0c;当加在属性上时&#xff0c;会产生一个代理对象赋值给这个属性&#xff0c;产生代理对象的代码为…...

主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582)

主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582) 漏洞描述影响版本安全版本如何修复可供参考的资料主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582) CVE编号 : CVE-2023-37582 利用情况 : EXP 已公开 …...

Element table根据字段合并表格(可多字段合并),附带拖拽列动态合并

效果如图&#xff0c;姓名 数值1 字段进行自动合并 封装合并列js - tableMerge.js // 获取列合并的行数 // params // tableData: 表格数据 // mergeId: 合并的列的字段名 export const tagRowSpan (tableData, mergeId) >{const tagArr [];let pos 0;tableData.map((i…...

C++标准库STL容器详解

目录 C标准模板库STL容器容器分类容器通用接口 顺序容器vectorlistdeque 容器适配器queuestackpriority_queue 关联容器&#xff1a;红黑树setmultisetmapmultimap 关联容器&#xff1a;哈希表unordered_set和unordered_multisetunordered_map和unordered_multimap 附1&#xf…...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0&#xff1a;开发环境同步测试 cookie 至 localhost&#xff0c;便于本地请求服务携带 cookie 参考地址&#xff1a;https://juejin.cn/post/7139354571712757767 里面有源码下载下来&#xff0c;加在到扩展即可使用FeHelp…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

从WWDC看苹果产品发展的规律

WWDC 是苹果公司一年一度面向全球开发者的盛会&#xff0c;其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具&#xff0c;对过去十年 WWDC 主题演讲内容进行了系统化分析&#xff0c;形成了这份…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

LRU 缓存机制详解与实现(Java版) + 力扣解决

&#x1f4cc; LRU 缓存机制详解与实现&#xff08;Java版&#xff09; 一、&#x1f4d6; 问题背景 在日常开发中&#xff0c;我们经常会使用 缓存&#xff08;Cache&#xff09; 来提升性能。但由于内存有限&#xff0c;缓存不可能无限增长&#xff0c;于是需要策略决定&am…...