当前位置: 首页 > news >正文

使用TCP方式拉取Canal数据

1 Canal对接Kafka联调

1.1 配置修改

canal.properties

修改 zk:

canal.zkServers = 10.51.50.219:2181

instance.properties

开启配置项:

canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项:

  • test_javaedge_01 是kafka 的 topic
  • test_db.users 要监控的数据库、表
  • test_db.users 表发生变化时,Canal 将会把变化的数据推送到名为 test_javaedge_01:test_db.users 的 MQ Topic 中。
canal.mq.dynamicTopic=test_javaedge_01:test_db\\.users

开启一个消费者

[root@javaedge-kafka-dev bin]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_javaedge_01

datagrip 新增数据:

消费到该数据:

2 使用TCP方式拉取Canal数据

现在 serverMode 改回tcp。重启

javaedge@JavaEdgedeMac-mini deployer % jps
71002 CanalLauncher
javaedge@JavaEdgedeMac-mini deployer %

canal 同步程序

package com.javaedge.canal;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.base.CaseFormat;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;public class CanalClientApp {public static void main(String[] args) throws Exception {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111),"example",null, null);while (true) {connector.connect();connector.subscribe("test_db.users");Message message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size()>0) {for (CanalEntry.Entry entry : entries) {String tableName = entry.getHeader().getTableName();CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT) {for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();HashMap<Object, Object> map = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {String key = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName());map.put(key, column.getValue());}System.out.println("tableName=" + tableName + "  map=" + JSON.toJSONString(map));}}}}}}
}

运行程序。操作 user 数据表,新增一行数据:

程序输出:

显然,后续不管你想把数据同步到哪儿去,都完全自由!

数据链路

MySQL -》canal server(tcp)-》canal client-》kafka。

相关文章:

使用TCP方式拉取Canal数据

1 Canal对接Kafka联调 1.1 配置修改 canal.properties 修改 zk&#xff1a; canal.zkServers 10.51.50.219:2181instance.properties 开启配置项&#xff1a; canal.mq.dynamicTopic 是 Canal 的 MQ 动态 Topic 配置项&#xff1a; test_javaedge_01 是kafka 的 topicte…...

Docker安装mysql实战说明

安装前准备 在安装MySQL之前&#xff0c;你需要确保已经正确安装和配置了Docker&#xff0c;可以通过以下命令检查Docker是否已正确安装&#xff1a; docker --version如果Docker已经成功安装&#xff0c;你将看到Docker的版本信息。 下载mysql的镜像 Docker Hub是一个存储…...

前端DOM操作精解:基础概念、方法与最佳实践

引言 本文将深入探讨前端开发中的DOM操作&#xff0c;包括基础概念、常用方法和最佳实践。通过清晰易懂的解释和实际案例分析&#xff0c;我们将一起了解如何最有效地使用DOM操作来提升前端应用的用户体验。 一、DOM操作入门 在深入探讨DOM操作之前&#xff0c;我们先要理解…...

python sorted函数详解2023.9.11

sorted函数详解 1. 输入和输出2. key传入函数 1. 输入和输出 help(sorted) Help on built-in function sorted in module builtins: sorted(iterable, /, *, keyNone, reverseFalse)Return a new list containing all items from the iterable in ascending order.A custom k…...

Spring Reactive:响应式编程与WebFlux的深度探索

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…...

Qt应用开发(基础篇)——工具按钮类 QToolButton

一、前言 QToolButton类继承于QAbstractButton&#xff0c;该部件为命令或选项提供了一个快速访问按钮&#xff0c;通常用于QToolBar中。 按钮基类 QAbstractButton QToolButton是一个特殊的按钮&#xff0c;一般显示文本&#xff0c;只显示图标&#xff0c;结合toolBar使用。它…...

【数据结构面试题】栈与队列的相互实现

目录 1.队列实现栈 1.1创建栈 1.2判断是否为空 1.3入栈 1.4出栈 1.5获取栈顶元素 1.6完整代码 2. 用栈实现队列 2.1创建队列 2.2判断是否为空 2.3入队列 2.4出队列 2.5获取队头元素 2.6完整代码 1.队列实现栈 用队列实现栈https://leetcode.cn/problems/impleme…...

华为认证和红帽认证哪个比较好考呢

华为认证和红帽认证的考试难度、学习内容、适用范围等方面都有所不同&#xff0c;因此哪个比较好考要视具体情况而定&#xff1a; 考试难度&#xff1a;红帽认证的考试难度较高&#xff0c;需要考生具备较高的技术水平和实践经验&#xff1b;而华为认证则更注重基础知识的考察…...

[Java]_[中级]_[使用okhttp3和HttpClient代理访问外部网络]

场景 Java的http库常用的有HttpClient和Okhttp3, 如果公司有限制网络访问&#xff0c;需要代理才可以访问外网&#xff0c;那么如何使用代理Proxy&#xff1f; <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient<…...

ubuntu 20.04 docker 安装 mysql

要在Ubuntu 20.04上安装Docker并运行MySQL容器&#xff0c;您可以按照以下步骤操作&#xff1a; 1.更新系统包列表&#xff1a; sudo apt update2.安装Docker&#xff1a; sudo apt install docker.io3.启动Docker服务并设置其开机自启动&#xff1a; sudo systemctl start…...

C++在C语言基础上的优化

目录 一、命名空间 1、命名空间的定义 2、命名空间的使用 二、输入&输出 三、缺省参数 1、缺省参数的概念 2、缺省参数的分类 四、函数重载 五、引用 1.引用的概念 2.引用的特性 3、引用和指针的区别 六、内联函数 七、基于范围的for循环 一、命名空间 命名空…...

分享一个python实验室设备预约管理系统 实验室设备维修系统源码 lw 调试

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人七年开发经验&#xff0c;擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等&#xff0c;大家有这一块的问题可以一起交流&#xff01; &#x1f495;&…...

兵者多诡(HCTF2016)

环境:https://github.com/MartinxMax/CTFer_Zero_one 题目简介 解题过程 登录首页 提交png图片上传抓包&#xff0c;可以看到是向upload文件提交数据 在fp参数中尝试伪协议读取home.php文件 http://127.0.0.1:88/HCTF2016-LFI/home.php?fpphp://filter/readconvert.base64…...

【JAVA-Day04】Java关键字和示例:深入了解常用关键字的用法

Java关键字和示例&#xff1a;深入了解常用关键字的用法 摘要Java 关键字、标识符和命名规范一、Java 关键字常用关键字DEMO1. 示例代码使用 if 和 else 关键字&#xff1a;2. 示例代码使用 for 循环&#xff1a;3. 示例代码使用 switch 关键字&#xff1a;4. 示例代码使用 wh…...

Android请求网络报错:not permitted by network security policy

一、错误记录 https的接口请求正常的&#xff0c; 请求http的接口时报错&#xff1a;not permitted by network security policy 二、问题分析 原因&#xff1a; 由于 Android P(版本27以上) 限制了明文流量的网络请求&#xff0c;非加密的流量请求都会被系统禁止掉。如果当…...

python报错:ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1

python报错&#xff1a;ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1 问题分析 说明&#xff1a;requests包引入了urllib3&#xff0c;而新版本的urllib3 需要OpenSSL 1.1.1以上版本&#xff0c;否则报错&#xff1a; ImportError: urllib3 v2.0 only supports Ope…...

如何使用adb command来设置cpu频率和核数

透過ADB Shell設定CPU開核與freq的command與用法如下: # Disable PPM echo 0 > /proc/ppm/enabled # Enable PPM (Default) echo 1 > /proc/ppm/enabled echo 0 > /proc/ppm/enabled Fixed # Core for each cluster echo X Y > /proc/ppm/policy/ut_fix_core_num …...

236. 二叉树的最近公共祖先

236. 二叉树的最近公共祖先 题目-中等难度示例1. dfs 题目-中等难度 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个节点 p、q&#xff0c;最近公共祖先表示为一个节点 x&#xff0c;满足 x 是 p…...

Git常见问题:git pull 和 git pull --rebase二者区别

git pull 和 git pull --rebase 都是从远程仓库获取最新的更改并将其合并到本地分支。但它们之间的区别在于合并方式。以下是它们之间的主要区别&#xff1a; git pull&#xff1a; 当你执行 git pull 时&#xff0c;Git 会执行以下两个操作&#xff1a; git fetch&#xff…...

关于HarmonyOS元服务的主题演讲与合作签约

一、感言 坚持中&#xff0c;总会有很多意想不到的收获。 前几次参与HDC时更多的是观众、开发者、专家的身份&#xff0c;以参观、学习、交流为主。 通过几年的努力&#xff0c;和HarmonyOS功能成长&#xff0c;在2023年的HDC大会中&#xff0c;有了我的演讲&#xff0c;并带领…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

算法:模拟

1.替换所有的问号 1576. 替换所有的问号 - 力扣&#xff08;LeetCode&#xff09; ​遍历字符串​&#xff1a;通过外层循环逐一检查每个字符。​遇到 ? 时处理​&#xff1a; 内层循环遍历小写字母&#xff08;a 到 z&#xff09;。对每个字母检查是否满足&#xff1a; ​与…...

【Go语言基础【12】】指针:声明、取地址、解引用

文章目录 零、概述&#xff1a;指针 vs. 引用&#xff08;类比其他语言&#xff09;一、指针基础概念二、指针声明与初始化三、指针操作符1. &&#xff1a;取地址&#xff08;拿到内存地址&#xff09;2. *&#xff1a;解引用&#xff08;拿到值&#xff09; 四、空指针&am…...