flink同步mysql数据表到pg库

1.关闭防火墙和selinux
systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld
vi /etc/selinux/config
修改为disabled

2.安装java8
yum list java-1.8* yum install java-1.8.0-openjdk* -yjava -version

3.下载和部署postgresql
下载地址:http://www.postgresql.org/ftp/source/
我安装的最新版本,可以看需求安装pg库版本

上传到服务器
tar -zxvf postgresql-16.2.tar.gz
./configure
如果出现以下报错解决方法:
yum -y install gcc

如果出现以下错误解决方法:
yum -y install libicu libicu-devel libunwind readline-devel zlib-devel

make
make install
useradd postgres
groupadd postgres
useradd -g postgres postgres
id postgrescd /usr/local/pgsql/
mkdir data
chown postgres:postgres datacd /home/postgres/
ll -al

vi .bash_profile
添加
export PGHOME=/usr/local/pgsql/
export PGDATA=/usr/local/pgsql/data
PATH=$PATH:$HOME/bin:$PGHOME/bin

source .bash_profile
su postgres
initdb

cd /usr/local/pgsql/data/
vi postgresql.conf
修改为listen_addresses = '*'

启动pg库
pg_ctl -D /usr/local/pgsql/data -l logfile start
service postgresql start

su – postgres
psql

4.下载和部署mysql
yum -y install wget
wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz

mv mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz /usr/local/
cd /usr/local/
tar xvJf mysql-8.0.28-linux-glibc2.12-x86_64.tar.xzmv mysql-8.0.28-linux-glibc2.12-x86_64 mysql-8.0
mkdir data
groupadd mysql
chown -R mysql.mysql /usr/local/mysql-8.0
cd mysql-8.0/bin/
mkdir data
./mysqld --user=mysql --basedir=/usr/local/mysql-8.0 --datadir=/usr/local/mysql-8.0/data/ --initialize

圈起来的部分为后面数据库登陆的初始密码
vi /etc/my.cnf
basedir=/usr/local/mysql-8.0/
datadir=/usr/local/mysql-8.0/data/
socket=/tmp/mysql.sock
character-set-server=UTF8MB4

cp -a .././support-files/mysql.server /etc/init.d/mysql
chmod +x /etc/init.d/mysql
chkconfig --add mysql
service mysql status
service mysql start
ln -s /usr/local/mysql-8.0/bin/mysql /usr/bin

登陆
mysql -u root -p
输入向前初始化给的初始密码

修改mysql密码

登陆

5.下载部署flink
flink下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz

下载flink sql所需驱动
1. flink-connector-jdbc-3.1.1-1.17.jar 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/
2. flink-sql-connector-mysql-cdc-3.0.1.jar 下载地址:https://github.com/ververica/flink-cdc-connectors/releases
3. flink-sql-connector-postgres-cdc-3.0.1.jar 下载地址:https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/3.0.1
4. mysql-connector-java-8.0.28.jar 下载地址:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/



上传flink压缩包并解压
tar -zxvf flink-1.18.1-bin-scala_2.12.tgz
进入flink的lib目录上传四个依赖
cd flink-1.18.1/lib/
需要修改一下flink配置文件
不然会出现以下情况

vi flink-1.18.1/conf/flink-conf.yaml
原本是localhost修改为ip

./flink-1.18.1/bin/start-cluster.sh
访问 192.168.207.193:8081 (默认是8081端口 可在配置文件里修改)

6.实时同步mysql数据到postgresql
数据库先创建一个库,在库里创建表再添加数据
create database ljq;use ljqCREATE TABLE players (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);;insert into players (player_id,team_id,player_name,height) values
(1001,1001,'韦德','1.93'),
(1002,1002,'雷吉','1.91'),
(1003,1003,'安德烈','2.11'),
(1004,1004,'索恩','2.16'),
(1005,1005,'兰斯顿','1.88'),
(1006,1006,'格伦','1.98'),
(1007,1007,'伊斯梅尔','1.83'),
(1008,1008,'扎扎','2.11'),
(1009,1009,'乔恩','2.08');select * from players;


在pg库创建一个库,在库里创建一个表不插入数据
create database ljq;
\c ljqCREATE TABLE players3 (
player_id INT NOT NULL,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);

启动flink-sql
./flink-1.18.1/bin/sql-client.sh embedded
根据需要同步的数据创建源表
CREATE TABLE nbaplayers (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.207.193',
'port' = '3306',
'username' = 'root',
'password' = 'linux123',
'database-name' = 'ljq',
'table-name' = 'players',
'server-time-zone' = 'Asia/Shanghai'
);

查看表时出现报错! 
解决方法:修改为允许所有ip访问
use mysql;
select user,host from user;
update user set host = '%' where user = 'root';
flush privileges;
select user,host from user;


重新查看表
select * from nbaplayers;·

创建结果表
CREATE TABLE nba (
player_id INT,
team_id INT,
player_name VARCHAR,
height NUMERIC(3,2),
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.207.193:5432/ljq??currentSchema=public&reWriteBatchedInserts=true',
'username' = 'postgres',
'password' = 'linux123',
'table-name' = 'players3'
);
执行从源表插入结果表操作,生成同步作业
INSERT INTO nba
SELECT
player_id,
team_id,
player_name,
height
FROM nbaplayers;
Web端查看

查看是否同步数据到pg库的players3表

测试一下增删改是否同步
1.增
mysql执行
insert into players (player_id,team_id,player_name,height) values (10001,10002,'韦德2','1.95');

pg库查看

2.删
mysql执行
delete from players where player_id = 10001;

pg库查看

3.改
mysql执行
update players set player_name='高大的姚明' where player_id=1001;

pg库查看

注:字符串小数点精确问题


相关文章:
flink同步mysql数据表到pg库
1.关闭防火墙和selinux systemctl stop firewalld systemctl disable firewalld systemctl status firewalldvi /etc/selinux/config 修改为disabled2.安装java8 yum list java-1.8* yum install java-1.8.0-openjdk* -yjava -version3.下载和部署postgresql 下载地址&#…...
AndroidStudio-常用布局
一、线性布局LinearLayout 线性布局内部的各视图有两种排列方式: 1.orientation属性值为horizontal时,内部视图在水平方向从左往右排列。 2.orientation属性值为vertical时,内部视图在垂直方向从上往下排列。 如果不指定orientation属性,…...
Vue全栈开发旅游网项目(10)-用户管理后端接口开发
1.异步用户登录\登出接口开发 1.设计公共响应数据类型 文件地址:utils/response404.py from django.http import JsonResponseclass BadRequestJsonResponse(JsonResponse):status_code 400def __init__(self, err_list, *args, **kwargs):data {"error_c…...
[Android]查找java类中声明为native方法的具体实现方法
在android代码中,经常可以看到native方法,需要查看其对应的C方法,这些方法是一一对应的,对应关系是在jni注册里关联起来的。 比较直观的是这样的例子, Parcel.java //Java层的方法里调用了native方法nativeWriteInt…...
Exploring Defeasible Reasoning in Large Language Models: A Chain-of-Thought A
文章目录 题目摘要简介准备工作数据集生成方法实验结论 题目 探索大型语言模型中的可废止推理:思路链 论文地址:http://collegepublications.co.uk/downloads/LNGAI00004.pdf#page136 摘要 许多大型语言模型 (LLM) 经过大量高质量数据语料库的训练&…...
uniapp在app模式下组件传值
在 UniApp 编译成 App 后,传递参数可以通过多种方式实现,常见的方式有以下几种: 1. 通过 URL 参数传递(适用于 WebView) 如果你的 App 页面通过 WebView 渲染,可以像在 Web 端一样通过 URL 传递参数。例如…...
Docker解决暴露2375端口引发的安全漏洞
docker的暴露api端口2375,没有任何安全防护,我们通过linux系统防火墙(iptables)来进行ip访问限制 # 查看iptables所有规则 iptables -L -nv # 只允许某个ip访问2375端口 iptables -I INPUT -s 127.0.0.1 -p tcp --dport 2375 -j A…...
HTML5+CSS前端开发【保姆级教学】+新闻文章初体验
Hello,各位编程猿们!上一篇文章介绍了前端以及软件的安装,这一篇我们要继续讲解页面更多知识点,教大家做一篇新闻题材的文章 新闻文章 当我们点开浏览器经常看到各种各样的文章,今天我们就来看看大家最喜欢关注的体育…...
『VUE』26. props实现子组件传递数据给父组件(详细图文注释)
目录 本节内容示例代码总结 欢迎关注 『VUE』 专栏,持续更新中 欢迎关注 『VUE』 专栏,持续更新中 本节内容 父组件传子组件–props 子组件传父组件–自定义事件 本节讲子组件传父组件–通过props里的方法传递,就是父亲写了一个函数,给子组件调用,然后…...
RHCE-DNS域名解析服务器
一、DNS简介 DNS ( Domain Name System )是互联网上的一项服务,它作为将域名和 IP 地址相互映射的一个分布式 数据库,能够使人更方便的访问互联网。 DNS 系统使用的是网络的查询,那么自然需要有监听的 port 。 DNS 使…...
移民统计年鉴(1996-2021年)
年鉴中包含了以下几个方面的数据: 移民数量:记录了每年全球移民的总数,以及不同国家和地区的移民流入和流出情况。 移民类型:区分了经济移民、难民、家庭团聚等不同类型的移民。 移民原因:分析了推动移民的各种因素&…...
MFC1(note)
引言 在学习SDK后我们发现,写消息好麻烦,处理消息更麻烦 处理消息效率低发送消息效率低 所以把SDK中这些消息全部封装好 MFC封装了windows 的大部分API 这里说一下QT架构跨平台 MFC用得如何取决于你SDK的水平 创建 如果打开没有MFC 一般勾选以下…...
1.1 关于游戏编程
1.1.1、游戏中客户端和服务器的交互 游戏通常采用客户端-服务器模式。在这种模式下,服务器负责处理游戏的核心逻辑、数据存储和玩家间的交互,而客户端则负责呈现游戏画面、接收玩家输入并与服务器通信。 客户端和服务器的作用和功能 客户端&a…...
光流法与直接法在SLAM中的应用
本文总结视觉SLAM中常用的光流法与直接法 1、Lucas-Kanade光流法 相机所拍摄到的图像随相机视角的变化而变化,这种变化也可以理解为图像中像素的反向移动。“光流”(Optical Flow)是指通过分析连续图像帧来估计场景中像素或特征点的运动的技…...
C++模板特化实战:在使用开源库boost::geometry::index::rtree时,用特化来让其支持自己的数据类型
用自己定义的数据结构作为rtree的key。 // rTree的key struct OverlapKey {using BDPoint boost::geometry::model::point<double, 3, boost::geometry::cs::cartesian>; //双精度的点using MyRTree boost::geometry::index::rtree<OverlapKey, boost::geometry::in…...
让空间计算触手可及,VR手套何以点石成金?
引言 如何让一位母亲与她去世的小女儿“重逢”?韩国MBC电视台《I Met You》节目实现了一个“不可能”心愿。 在空旷的绿幕中,母亲Jang Ji-sung透过VR头显,看到了三年前因白血病去世的女儿Nayeon。当她伸出双手,居然能摸到女儿的…...
穿越数据迷宫:C++哈希表的奇幻旅程
文章目录 前言📔一、unordered系列关联式容器📕1.1 unordered 容器概述📕1.2 哈希表在 unordered 容器中的实现原理📕1.3 unordered 容器的特点 📔二、unordered_set 和 unordered_map 的基本操作📕2.1 un…...
SMT32 智能环境监测系统 嵌入式初学者课堂小组作业
一、应用知识 1,开发语言:C语言 2,开发工具:Keil uVision5、Setup_JLinkARM_V415e 3,开发平台:XCOM V2.0 4,开发知识:温湿度传感DHT11、STM32F4xx中文参考手册 5,文…...
20241114给荣品PRO-RK3566开发板刷Rockchip原厂的Android13下适配RJ45以太网卡
20241114给荣品PRO-RK3566开发板刷Rockchip原厂的Android13下适配RJ45以太网卡 2024/11/14 15:44 缘起:使用EVB2的方案,RJ45加进去怎么也不通。 实在没有办法,只能将荣品的SDK:rk-android13-20240713.tgz 解压缩,编译之…...
JVM这个工具的使用方法
JVM(Java虚拟机)是Java程序运行的基础环境,它提供了内存管理、线程管理和性能监控等功能。吃透JVM诊断方法,可以帮助开发者更有效地解决Java应用在运行时遇到的问题。以下是一些常见的JVM诊断方法: 使用JConsole: JCon…...
开源像素艺术工具推荐:Pixel Fashion Atelier vs Automatic1111定制化对比
开源像素艺术工具推荐:Pixel Fashion Atelier vs Automatic1111定制化对比 1. 工具概览 1.1 Pixel Fashion Atelier简介 Pixel Fashion Atelier是一款基于Stable Diffusion与Anything-v5的图像生成工作站。它采用独特的复古日系RPG界面设计,将AI图像生…...
分享一份2026金三银四Java面试通关宝典!
金三银四快到了,不少人找LZ咨询,问我现在的面试需要提前准备什么?为了造福更多的开发者,也为了让更多的小伙伴通过面试;LZ近期也一直想着怎么才能帮到大家。所以近期在各大渠道整合大厂相关面试题,并结合了…...
Simulink与Plecs联合仿真实现三相桥式电路能量双向流动
simulinkplecs联合仿真源件,三相桥式电路,采用母线电压外环与电流内环控制,可整流也可逆变并网,实现能量双向流动,采用SVPWM调制方式。 1.plecssimulink 2.SVPWM 3.双闭环 支持simulink2022以下版本,联系跟…...
告别重复劳动:用快马生成自动化脚本,实现dify多环境一键部署与高效管理
在团队协作中,dify的部署工作常常成为效率瓶颈。每次新版本发布或环境迁移时,手动配置docker-compose文件、处理版本差异、备份数据等重复操作不仅耗时,还容易出错。最近尝试用InsCode(快马)平台生成自动化脚本集,意外发现部署效率…...
MoveBase导航实战:Livox MID360与FAST-LIO+AMCL混合定位的调优与避障策略
1. Livox MID360雷达与FAST-LIO的实战配置 第一次用Livox MID360雷达时,我被它的非重复扫描模式惊艳到了——这种固态激光雷达能实现360无死角覆盖,特别适合狭小空间导航。但要让它在MoveBase系统中稳定工作,需要先解决几个关键配置问题。 雷…...
计算机毕业设计springboot基于java技术的计算机实训室管理系统的设计与实现 基于SpringBoot框架的高校实训室资源预约与信息化管理平台的设计与实现 实验室智能调度与实训过程管理系统
计算机毕业设计springboot基于java技术的计算机实训室管理系统的设计与实现k8svdqb1 (配套有源码 程序 mysql数据库 论文) 本套源码可以在文本联xi,先看具体系统功能演示视频领取,可分享源码参考。随着高校信息化建设的深入推进,传…...
win11 WSL ubuntu24.04 安装两个、重命名
导出: wsl --export Ubuntu-24.04 D:\Ubuntu-24.04.tar导入新镜像: wsl --import Ubuntu-24.04-2 D:\Ubuntu-24.04-2\Ubuntu-24.04-2 D:\Ubuntu-24.04.tar...
深度解析ConcurrentHashMap设计演进:从分段锁到无锁化的并发之路
在Java并发编程领域,ConcurrentHashMap绝对是“并发容器扛鼎之作”——它既解决了HashMap并发环境下的数据不一致(死循环、数据丢失)问题,又突破了Hashtable全表锁的性能瓶颈,成为高并发场景下K-V存储的首选。自JDK1.5…...
英雄联盟智能工具League Akari:提升游戏体验的终极指南
英雄联盟智能工具League Akari:提升游戏体验的终极指南 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit League Akari是…...
OpenClaw自动化测试:Qwen3-32B批量执行LeetCode题目
OpenClaw自动化测试:Qwen3-32B批量执行LeetCode题目 1. 为什么需要自动化编程能力测试 作为一名长期关注AI编程辅助工具的技术博主,我一直在寻找能够客观评估大模型编程能力的方法。传统的单次对话测试往往带有偶然性,无法系统性地反映模型…...
