当前位置: 首页 > news >正文

springboot集成canal,将数据发送至接口

前置条件:

  1. mysql开启binlog日志
  2. 部署canal服务

Springboot代码:

接口工具类:

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class HttpMethodUtil {@Asyncpublic int httpPost(String url,String Content) {// 添加请求头信息Map<String, String> heads = new HashMap<>();// 使用json发送请求,下面的是必须的heads.put("Content-Type", "application/json;charset=UTF-8");HttpResponse response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();JSONObject jsonObject = JSON.parseObject(response.body());int code = (int) jsonObject.get("code");if (code!=0){response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();}jsonObject = JSON.parseObject(response.body());code = (int) jsonObject.get("code");return code;}

canal工具类:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class CanalUtil {@Value("${canal-monitor-mysql.hostname}")String canalMonitorHost;@Value("${canal-monitor-mysql.port}")Integer canalMonitorPort;@Value("${canal-monitor-mysql.database}")String canalMonitorDatabaseName;private final static int BATCH_SIZE = 100;@ResourceHttpMethodUtil httpMethodUtil;/*** 启动服务*/@PostConstructpublic void startMonitorSQL() {while (true) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");try {//打开连接connector.connect();log.info("数据库检测连接成功!" + canalMonitorDatabaseName);//订阅数据库表,全部表q// connector.subscribe(canalMonitorTableName + "\\..*");connector.subscribe(canalMonitorDatabaseName+ "\\.jyz_jyzqgdwa");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {} else {handleDATAChange(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();//防止频繁访问数据库链接: 线程睡眠 10秒try {Thread.sleep(10 * 1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 打印canal server解析binlog获得的实体类信息*/private void handleDATAChange(List<Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChage.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChage.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");log.info("DELETE:{}", rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}}}}}}

application.properties配置文件:

canal-monitor-mysql.hostname: 192.168.21.11
canal-monitor-mysql.port: 11111
canal-monitor-mysql.database: datbases_namelogging.file.name=./log/rizhi.log         
logging.pattern.dateformat=yyyy-MM-dd # 设置日期格式化
logging.logback.rollingpolicy.max-history=30

相关文章:

springboot集成canal,将数据发送至接口

前置条件&#xff1a; mysql开启binlog日志部署canal服务 Springboot代码&#xff1a; 接口工具类&#xff1a; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; imp…...

Selenum八种常用定位(案例解析)

Selenium是一个备受推崇的工具。它有着丰富的功能&#xff0c;让我们能够与网页互动&#xff0c;执行各种任务&#xff0c;能为测试工程师和开发人员提供了很大的便利。 要充分利用Selenium&#xff0c;就需要了解如何正确定位网页上的元素。 接下来我将带大家共同探讨Seleni…...

Web前端接入Microsoft Azure AI文本翻译

Azure 文本翻译是 Azure AI 翻译服务的一项基于云的 REST API 功能。 文本翻译 API 支持实时快速准确地进行源到目标文本翻译。 文本翻译软件开发工具包 (SDK) 是一组库和工具&#xff0c;可用于轻松地将文本翻译 REST API 功能集成到应用程序中。 文本翻译 SDK 可跨 C#/.NET、…...

容联七陌助力鱼跃医疗升级智能联络中心,让客户服务更“鱼跃”

在当今高度竞争的市场环境中&#xff0c;企业的客户服务质量对于维护品牌形象和保持竞争优势至关重要。而随着人工智能、大数据等技术快速发展&#xff0c;智能化客户服务正在成为各行各业发展的重要部分。 鱼跃医疗是一家致力于“用科技律动生命"的国内知名医疗制造企业…...

【Redis系列】在Centos7上安装Redis5.0保姆级教程!

哈喽&#xff0c; 大家好&#xff0c;我是小浪。那么最近也是在忙秋招&#xff0c;很长一段时间没有更新文章啦&#xff0c;最近呢也是秋招闲下来&#xff0c;当然秋招结果也不是很理想&#xff0c;嗯……这里就不多说啦&#xff0c;回归正题&#xff0c;从今天开始我们就开始正…...

线性代数-Python-03:矩阵的变换 - 手写Matrix Transformation及numpy中的用法

文章目录 一、代码仓库二、旋转矩阵的推导及图形学中的矩阵变换2.1 让横坐标扩大a倍&#xff0c;纵坐标扩大b倍2.2 关于x轴翻转2.3 关于y轴翻转2.4 关于原点翻转&#xff08;x轴&#xff0c;y轴均翻转&#xff09;2.5 沿x方向错切2.6 沿y方向错切2.7 旋转2.8 单位矩阵2.9 矩阵的…...

【单片机基础】按键状态机实现短按、长按、双击、三击和N击

下载地址&#xff1a; 【CSDNNaiva】源码&#xff1a;HK32F030M-按键扫描-短按长按检测【CSDNNaiva】源码&#xff1a;HK32F030M-ADC-EXTI-TM1624-USART-EEPROM-TiMBase-按键长按(231024) 参考资料 [1] 【CSDNPillarPeng】【按键】[独立按键] - 1&#xff1a; 单击&#xff0c…...

Ubuntu虚拟机部署OpenStack

1、部署环境 系统&#xff1a;ubuntu-22.04.3-desktop-amd64DevStack版本&#xff1a;2024.1VMware Workstation&#xff1a;8G内存、4核处理器、100G硬盘/1、网络NAT模式/1 2、Ubuntu环境设置 点击show applications&#xff0c;选择Software&Updates 跟换Ubuntu的镜像…...

ES在企业项目中的实战总结,彻底掌握ES的使用

通过之前两篇文章 了解了ES的核心概念和基础使用学习进阶的DSL语法处理复杂的查询 这段时间通过在本企业代码中对ES框架的使用&#xff0c;总结了不少经验。主要分为三点 企业封装了ES原生的api&#xff0c;需要使用企业项目提供的接口实现 -------简单使用&#xff08;本章节目…...

QT的Qporcess功能的使用

具体实现代码如下&#xff1a; #include <QProgressBar>//必须要包含的头文件 #include <QProcess>// 创建一个QProgressBar对象QProgressBar *progressBar new QProgressBar(this);QProcess *proces;process_shownew process;// 设置进度条的最小值和最大值prog…...

【图灵诸葛】jvm笔记

2023年10月23日14:04:44 jvm 1.jdk体系结构图回顾(Av333129672,P1) jdk jre 底层是hotspot jvm 2.java虚拟机内部组成(Av333129672,P2) 堆 方法区 执行引擎 类加载 本地方法栈 线程栈&#xff08;虚拟机栈&#xff09; 3.java虚拟机栈讲解(Av333129672,P3) 程序计数器&#xf…...

数据安全小课堂开讲啦!看这里!

数据安全小课堂开讲啦&#xff01;看这里&#xff01; 1、什么是数据&#xff1f; 《数据安全法》第三条明确&#xff0c;本法所称的数据&#xff0c;就是指任何以电子或者其他方式对信息的记录。小到个人使用手机、电脑等电子产品时浏览的网页、下载的应用、存储的文件&…...

单片机矩阵键盘

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、什么是矩阵键盘&#xff1f;1.独立键盘2.矩阵键盘变化1变化2变化3 3. 通过变型&#xff0c;举一反三&#xff0c;就可以实现4*4的矩阵键盘扫描 二、使用步骤…...

横坐标日期等间隔绘图 python示例代码

有两列数据&#xff0c;一列是日期&#xff0c;另一列是数值。日期是递增的&#xff0c;但是间隔不是均匀的。比如1月1日至2月1日有10组数据&#xff0c;2月1日至3月1日有100组数据&#xff0c;3月1日至4月1日有1000组数据。我想绘折线图&#xff0c;横坐标是日期&#xff0c;纵…...

photoshop2024免费插件Portraiture3

随着手机摄影的普及&#xff0c;修图可以说是现代人的必备生活技能之一了&#xff0c;现在谁发个朋友圈不把自己的照片修的美美的呢&#xff1f;那么如何拥有一张氛围感满满的照片呢&#xff1f;这不得不提图片处理软件中的王牌——photoshop。作为专业的图片处理软件&#xff…...

NewStarCTF2023week4-More Fast(GC回收)

打开链接&#xff0c;存在很多个类&#xff0c;很明显是php反序列化漏洞利用&#xff0c;需要构造pop链 &#xff0c; 关于pop链构造的详细步骤教学&#xff0c;请参考我之前的博客&#xff0c;真的讲得很详细也容易理解&#xff1a; http://t.csdnimg.cn/wMYNB 如果你是刚接…...

和鲸赞助丨第16届中国R会议暨2023 X-AGI大会通知

第16届中国 R 会议暨2023 X-AGI大会将于11月25-30日在中国人民大学召开&#xff0c;探讨数据科学和人工智能的相关进展&#xff0c;本次会议将采用线上会议和线下会议相结合的方式举办。 在过去的15年里&#xff0c;中国R会议一直致力于探讨数据科学在各学科、各行业的探索和实…...

Python第三方库 - Flask(python web框架)

1 Flask 1.1 认识Flask Web Application Framework&#xff08; Web 应用程序框架&#xff09;或简单的 Web Framework&#xff08; Web 框架&#xff09;表示一个库和模块的集合&#xff0c;使 Web 应用程序开发人员能够编写应用程序&#xff0c;而不必担心协议&#xff0c;线…...

c# sqlite 修改字段类型

因为sqlite不支持直接修改字段类型&#xff0c; 所以只能创建新的表&#xff0c;再将原始数据复制过去。具体操作步骤如下&#xff1a; 第一步&#xff0c; 将表“tableName”的名称修改为 “oldTable” string queryString string.Format("ALTER TABLE {0} RENAME TO …...

[Pytorch] 保存模型与加载模型

1、保存模型 # 定义模型 model BPNetModel(n_featuren_feature,n_hiddenn_hidden,n_outputn_output) #调用网络# 保存模型 torch.save(model, BPNetModel0.pth) 2、加载模型 import torch## 读取模型 model torch.load(BPNetModel0.pth) 3、保存模型参数 #调用网络 mode…...

【2D与3D SLAM中的扫描匹配算法全面解析】

引言 扫描匹配(Scan Matching)是同步定位与地图构建(SLAM)系统中的核心组件&#xff0c;它通过对齐连续的传感器观测数据来估计机器人的运动。本文将深入探讨2D和3D SLAM中的各种扫描匹配算法&#xff0c;包括数学原理、实现细节以及实际应用中的性能对比&#xff0c;特别关注…...

Java高级 |【实验八】springboot 使用Websocket

隶属文章&#xff1a;Java高级 | &#xff08;二十二&#xff09;Java常用类库-CSDN博客 系列文章&#xff1a;Java高级 | 【实验一】Springboot安装及测试 |最新-CSDN博客 Java高级 | 【实验二】Springboot 控制器类相关注解知识-CSDN博客 Java高级 | 【实验三】Springboot 静…...

2025年全国I卷数学压轴题解答

第19题第3问: b b b 使得存在 t t t, 对于任意的 x x x, 5 cos ⁡ x − cos ⁡ ( 5 x t ) < b 5\cos x-\cos(5xt)<b 5cosx−cos(5xt)<b, 求 b b b 的最小值. 解: b b b 的最小值 b m i n min ⁡ t max ⁡ x g ( x , t ) b_{min}\min_{t} \max_{x} g(x,t) bmi…...

短视频时长预估算法调研

weighted LR o d d s T p 1 − p ( 1 − p ) o d d s T p ( T p o d d s ∗ p ) o d d s p o d d s T o d d s odds \frac{Tp}{1-p} \newline (1-p)odds Tp \newline (Tp odds * p) odds \newline p \frac{odds}{T odds} \newline odds1−pTp​(1−p)oddsTp(Tpodds…...

python3GUI--基于PyQt5+DeepSort+YOLOv8智能人员入侵检测系统(详细图文介绍)

文章目录 一&#xff0e;前言二&#xff0e;技术介绍1.PyQt52.DeepSort3.卡尔曼滤波4.YOLOv85.SQLite36.多线程7.入侵人员检测8.ROI区域 三&#xff0e;核心功能1.登录注册1.登录2.注册 2.主界面1.主界面简介2.数据输入3.参数配置4.告警配置5.操作控制台6.核心内容显示区域7.检…...

【Go语言基础【6】】字符串格式化说明

文章目录 零、格式化常用场景一、Go 字符串格式化核心概念二、常用格式化占位符1. 整数类型2. 浮点数类型3. 字符串与布尔类型4. 指针与通用类型 三、宽度与精度控制1. 宽度控制2. 精度控制&#xff08;浮点数/字符串&#xff09; 零、格式化常用场景 数值转字符串&#xff1a…...

jieba实现和用RNN实现中文分词的区别

Jieba 分词和基于 RNN 的分词在技术路线、实现机制、性能特点上有显著差异&#xff0c;以下是核心对比&#xff1a; 1. 技术路线对比 维度Jieba 分词RNN 神经网络分词范式传统 NLP&#xff08;规则 统计&#xff09;深度学习&#xff08;端到端学习&#xff09;核心依赖词典…...

慢慢欣赏linux 之 last = switch_to(prev, next)分析

last switch_to(prev, next); 为什么需要定义last作为调用switch_to之前的prev的引用 原因如下&#xff1a; struct task_struct * switch_to(struct task_struct *prev,struct task_struct *next) {... ...return cpu_switch_to(prev, next);> .global cpu_switch_tocpu_…...

Git 使用大全:从入门到精通

Git 是目前最流行的分布式版本控制系统&#xff0c;被广泛应用于软件开发中。本文将全面介绍 Git 的各种功能和使用方法&#xff0c;包含大量代码示例和实践建议。 文章目录 Git 基础概念版本控制系统Git 的特点Git 的三个区域Git 文件状态 Git 安装与配置安装 GitLinuxmacOSWi…...

js 比较两个对象的值,不相等就push对象的key

在JavaScript中&#xff0c;比较两个对象&#xff08;object&#xff09;的值并找出不相等的key&#xff0c;可以通过多种方法实现。下面是一些常用的方法&#xff1a; 方法1&#xff1a;使用JSON.stringify 这种方法适用于简单的对象&#xff0c;其中对象的值是基本类型或可…...