使用TCP方式拉取Canal数据
1 Canal对接Kafka联调
1.1 配置修改
canal.properties

修改 zk:
canal.zkServers = 10.51.50.219:2181
instance.properties
开启配置项:
canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:
test_javaedge_01是kafka 的 topictest_db.users要监控的数据库、表- 当
test_db.users表发生变化时,Canal 将会把变化的数据推送到名为test_javaedge_01:test_db.users的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users
开启一个消费者
[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01
datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据
现在 serverMode 改回tcp。重启
javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %
canal 同步程序
package com.javaedge.canal;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;public class CanalClientApp {public static void main(String[] args) throws Exception {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),"example",null, null);while (true) {connector.connect();connector.subscribe("test_db.users");Message message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size()>0) {for (CanalEntry.Entry entry : entries) {String tableName = entry.getHeader().getTableName();CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT) {for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();HashMap<Object, Object> map = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());map.put(key, column.getValue());}System.out.println("tableName=" + tableName + " map=" + JSON.toJSONString(map));}}}}}}
}
运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!
数据链路
MySQL -》canal server(tcp)-》canal client-》kafka。
相关文章:
使用TCP方式拉取Canal数据
1 Canal对接Kafka联调 1.1 配置修改 canal.properties 修改 zk: canal.zkServers 10.51.50.219:2181instance.properties 开启配置项: canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项: test_javaedge_01 是kafka 的 topicte…...
Docker安装mysql实战说明
安装前准备 在安装MySQL之前,你需要确保已经正确安装和配置了Docker,可以通过以下命令检查Docker是否已正确安装: docker --version如果Docker已经成功安装,你将看到Docker的版本信息。 下载mysql的镜像 Docker Hub是一个存储…...
前端DOM操作精解:基础概念、方法与最佳实践
引言 本文将深入探讨前端开发中的DOM操作,包括基础概念、常用方法和最佳实践。通过清晰易懂的解释和实际案例分析,我们将一起了解如何最有效地使用DOM操作来提升前端应用的用户体验。 一、DOM操作入门 在深入探讨DOM操作之前,我们先要理解…...
python sorted函数详解2023.9.11
sorted函数详解 1. 输入和输出2. key传入函数 1. 输入和输出 help(sorted) Help on built-in function sorted in module builtins: sorted(iterable, /, *, keyNone, reverseFalse)Return a new list containing all items from the iterable in ascending order.A custom k…...
Spring Reactive:响应式编程与WebFlux的深度探索
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...
Qt应用开发(基础篇)——工具按钮类 QToolButton
一、前言 QToolButton类继承于QAbstractButton,该部件为命令或选项提供了一个快速访问按钮,通常用于QToolBar中。 按钮基类 QAbstractButton QToolButton是一个特殊的按钮,一般显示文本,只显示图标,结合toolBar使用。它…...
【数据结构面试题】栈与队列的相互实现
目录 1.队列实现栈 1.1创建栈 1.2判断是否为空 1.3入栈 1.4出栈 1.5获取栈顶元素 1.6完整代码 2. 用栈实现队列 2.1创建队列 2.2判断是否为空 2.3入队列 2.4出队列 2.5获取队头元素 2.6完整代码 1.队列实现栈 用队列实现栈https://leetcode.cn/problems/impleme…...
华为认证和红帽认证哪个比较好考呢
华为认证和红帽认证的考试难度、学习内容、适用范围等方面都有所不同,因此哪个比较好考要视具体情况而定: 考试难度:红帽认证的考试难度较高,需要考生具备较高的技术水平和实践经验;而华为认证则更注重基础知识的考察…...
[Java]_[中级]_[使用okhttp3和HttpClient代理访问外部网络]
场景 Java的http库常用的有HttpClient和Okhttp3, 如果公司有限制网络访问,需要代理才可以访问外网,那么如何使用代理Proxy? <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient<…...
ubuntu 20.04 docker 安装 mysql
要在Ubuntu 20.04上安装Docker并运行MySQL容器,您可以按照以下步骤操作: 1.更新系统包列表: sudo apt update2.安装Docker: sudo apt install docker.io3.启动Docker服务并设置其开机自启动: sudo systemctl start…...
C++在C语言基础上的优化
目录 一、命名空间 1、命名空间的定义 2、命名空间的使用 二、输入&输出 三、缺省参数 1、缺省参数的概念 2、缺省参数的分类 四、函数重载 五、引用 1.引用的概念 2.引用的特性 3、引用和指针的区别 六、内联函数 七、基于范围的for循环 一、命名空间 命名空…...
分享一个python实验室设备预约管理系统 实验室设备维修系统源码 lw 调试
💕💕作者:计算机源码社 💕💕个人简介:本人七年开发经验,擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等,大家有这一块的问题可以一起交流! 💕&…...
兵者多诡(HCTF2016)
环境:https://github.com/MartinxMax/CTFer_Zero_one 题目简介 解题过程 登录首页 提交png图片上传抓包,可以看到是向upload文件提交数据 在fp参数中尝试伪协议读取home.php文件 http://127.0.0.1:88/HCTF2016-LFI/home.php?fpphp://filter/readconvert.base64…...
【JAVA-Day04】Java关键字和示例:深入了解常用关键字的用法
Java关键字和示例:深入了解常用关键字的用法 摘要Java 关键字、标识符和命名规范一、Java 关键字常用关键字DEMO1. 示例代码使用 if 和 else 关键字:2. 示例代码使用 for 循环:3. 示例代码使用 switch 关键字:4. 示例代码使用 wh…...
Android请求网络报错:not permitted by network security policy
一、错误记录 https的接口请求正常的, 请求http的接口时报错:not permitted by network security policy 二、问题分析 原因: 由于 Android P(版本27以上) 限制了明文流量的网络请求,非加密的流量请求都会被系统禁止掉。如果当…...
python报错:ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1
python报错:ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1 问题分析 说明:requests包引入了urllib3,而新版本的urllib3 需要OpenSSL 1.1.1以上版本,否则报错: ImportError: urllib3 v2.0 only supports Ope…...
如何使用adb command来设置cpu频率和核数
透過ADB Shell設定CPU開核與freq的command與用法如下: # Disable PPM echo 0 > /proc/ppm/enabled # Enable PPM (Default) echo 1 > /proc/ppm/enabled echo 0 > /proc/ppm/enabled Fixed # Core for each cluster echo X Y > /proc/ppm/policy/ut_fix_core_num …...
236. 二叉树的最近公共祖先
236. 二叉树的最近公共祖先 题目-中等难度示例1. dfs 题目-中等难度 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为:“对于有根树 T 的两个节点 p、q,最近公共祖先表示为一个节点 x,满足 x 是 p…...
Git常见问题:git pull 和 git pull --rebase二者区别
git pull 和 git pull --rebase 都是从远程仓库获取最新的更改并将其合并到本地分支。但它们之间的区别在于合并方式。以下是它们之间的主要区别: git pull: 当你执行 git pull 时,Git 会执行以下两个操作: git fetchÿ…...
关于HarmonyOS元服务的主题演讲与合作签约
一、感言 坚持中,总会有很多意想不到的收获。 前几次参与HDC时更多的是观众、开发者、专家的身份,以参观、学习、交流为主。 通过几年的努力,和HarmonyOS功能成长,在2023年的HDC大会中,有了我的演讲,并带领…...
springboot-vue+nodejs的公考在线刷题学习平台的设计与实现
目录技术栈选择核心模块设计关键实现步骤扩展功能建议示例代码片段(Spring Boot Controller)注意事项项目技术支持源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作技术栈选择 后端框架:Spring Boot&#…...
L1-012 计算指数、L1-013 计算阶乘和、 L1-014 简单题、 L1-015 跟奥巴马一起画方块、 L1-016 查验身份证
L1-012 计算指数、L1-013 计算阶乘和、L1-014 简单题、 L1-015 跟奥巴马一起画方块、 L1-016 查验身份证L1-012 计算指数题目描述输入格式输出格式输入样例输出样例解题思路C 代码双引号 " " 的作用拼接过程示例L1-013 计算阶乘和题目描述输入格式输出格式输入样例输…...
Source Han Serif CN:7种字重如何改变你的中文排版体验?
Source Han Serif CN:7种字重如何改变你的中文排版体验? 【免费下载链接】source-han-serif-ttf Source Han Serif TTF 项目地址: https://gitcode.com/gh_mirrors/so/source-han-serif-ttf 你是否曾为寻找合适的中文字体而烦恼?商业字…...
告别数据迷宫:手把手教你用DataHub搭建企业级元数据搜索中心(支持MySQL/Airflow/Superset)
告别数据迷宫:手把手教你用DataHub搭建企业级元数据搜索中心(支持MySQL/Airflow/Superset) 当数据资产像野草一样在组织内疯长时,工程师们常常发现自己被困在由数百个数据表、数十个BI看板和错综复杂的调度任务构成的迷宫中。上周…...
纯本地运行!AgentCPM深度研报助手,手把手教你离线生成研究报告
纯本地运行!AgentCPM深度研报助手,手把手教你离线生成研究报告 1. 为什么选择本地研报生成工具? 在信息爆炸的时代,专业研究报告的撰写面临三大痛点: 时间压力:从零开始撰写一份深度报告平均需要40-60小…...
springboot-vue+nodejs的农产品扶贫助农系统的开发与实现
目录技术栈选择系统架构设计核心功能模块开发阶段划分关键代码示例(Spring Boot)前端组件示例(Vue)注意事项项目技术支持源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作技术栈选择 Spring Bo…...
如何快速解密Navicat加密密码?这款开源工具让数据库连接迁移更简单
如何快速解密Navicat加密密码?这款开源工具让数据库连接迁移更简单 【免费下载链接】navicat_password_decrypt 忘记navicat密码时,此工具可以帮您查看密码 项目地址: https://gitcode.com/gh_mirrors/na/navicat_password_decrypt 在数据库管理工作中&#…...
从零开始掌握KLayout版图设计:5个步骤打造专业集成电路设计流程
从零开始掌握KLayout版图设计:5个步骤打造专业集成电路设计流程 【免费下载链接】klayout KLayout Main Sources 项目地址: https://gitcode.com/gh_mirrors/kl/klayout KLayout版图设计工具是开源EDA领域的明星产品,为集成电路设计工程师提供了一…...
OpenClaw技能开发入门:为nanobot编写自定义文件处理器
OpenClaw技能开发入门:为nanobot编写自定义文件处理器 1. 为什么需要自定义技能 去年夏天,我发现自己每周都要花两小时手动整理项目文档——把分散在各处的Markdown文件合并、去重、重新编号。当我第三次在重复劳动中睡着时,终于决定用Open…...
从汽车以太网到智能座舱:TSN的CBS和抢占式TAS如何保障你的行车安全与娱乐体验
从汽车以太网到智能座舱:TSN的CBS和抢占式TAS如何保障行车安全与娱乐体验 当你在高速公路上开启自动驾驶功能,车辆需要同时处理毫米波雷达的实时数据、中控屏的4K视频流以及车窗升降的指令——这些需求对网络传输的要求截然不同。传统车载网络架构正面临…...
