当前位置: 首页 > news >正文

springboot集成canal,将数据发送至接口

前置条件:

  1. mysql开启binlog日志
  2. 部署canal服务

Springboot代码:

接口工具类:

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class HttpMethodUtil {@Asyncpublic int httpPost(String url,String Content) {// 添加请求头信息Map<String, String> heads = new HashMap<>();// 使用json发送请求,下面的是必须的heads.put("Content-Type", "application/json;charset=UTF-8");HttpResponse response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();JSONObject jsonObject = JSON.parseObject(response.body());int code = (int) jsonObject.get("code");if (code!=0){response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();}jsonObject = JSON.parseObject(response.body());code = (int) jsonObject.get("code");return code;}

canal工具类:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class CanalUtil {@Value("${canal-monitor-mysql.hostname}")String canalMonitorHost;@Value("${canal-monitor-mysql.port}")Integer canalMonitorPort;@Value("${canal-monitor-mysql.database}")String canalMonitorDatabaseName;private final static int BATCH_SIZE = 100;@ResourceHttpMethodUtil httpMethodUtil;/*** 启动服务*/@PostConstructpublic void startMonitorSQL() {while (true) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");try {//打开连接connector.connect();log.info("数据库检测连接成功!" + canalMonitorDatabaseName);//订阅数据库表,全部表q// connector.subscribe(canalMonitorTableName + "\\..*");connector.subscribe(canalMonitorDatabaseName+ "\\.jyz_jyzqgdwa");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {} else {handleDATAChange(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();//防止频繁访问数据库链接: 线程睡眠 10秒try {Thread.sleep(10 * 1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 打印canal server解析binlog获得的实体类信息*/private void handleDATAChange(List<Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChage.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChage.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");log.info("DELETE:{}", rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}}}}}}

application.properties配置文件:

canal-monitor-mysql.hostname: 192.168.21.11
canal-monitor-mysql.port: 11111
canal-monitor-mysql.database: datbases_namelogging.file.name=./log/rizhi.log         
logging.pattern.dateformat=yyyy-MM-dd # 设置日期格式化
logging.logback.rollingpolicy.max-history=30

相关文章:

springboot集成canal,将数据发送至接口

前置条件&#xff1a; mysql开启binlog日志部署canal服务 Springboot代码&#xff1a; 接口工具类&#xff1a; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; imp…...

Selenum八种常用定位(案例解析)

Selenium是一个备受推崇的工具。它有着丰富的功能&#xff0c;让我们能够与网页互动&#xff0c;执行各种任务&#xff0c;能为测试工程师和开发人员提供了很大的便利。 要充分利用Selenium&#xff0c;就需要了解如何正确定位网页上的元素。 接下来我将带大家共同探讨Seleni…...

Web前端接入Microsoft Azure AI文本翻译

Azure 文本翻译是 Azure AI 翻译服务的一项基于云的 REST API 功能。 文本翻译 API 支持实时快速准确地进行源到目标文本翻译。 文本翻译软件开发工具包 (SDK) 是一组库和工具&#xff0c;可用于轻松地将文本翻译 REST API 功能集成到应用程序中。 文本翻译 SDK 可跨 C#/.NET、…...

容联七陌助力鱼跃医疗升级智能联络中心,让客户服务更“鱼跃”

在当今高度竞争的市场环境中&#xff0c;企业的客户服务质量对于维护品牌形象和保持竞争优势至关重要。而随着人工智能、大数据等技术快速发展&#xff0c;智能化客户服务正在成为各行各业发展的重要部分。 鱼跃医疗是一家致力于“用科技律动生命"的国内知名医疗制造企业…...

【Redis系列】在Centos7上安装Redis5.0保姆级教程!

哈喽&#xff0c; 大家好&#xff0c;我是小浪。那么最近也是在忙秋招&#xff0c;很长一段时间没有更新文章啦&#xff0c;最近呢也是秋招闲下来&#xff0c;当然秋招结果也不是很理想&#xff0c;嗯……这里就不多说啦&#xff0c;回归正题&#xff0c;从今天开始我们就开始正…...

线性代数-Python-03:矩阵的变换 - 手写Matrix Transformation及numpy中的用法

文章目录 一、代码仓库二、旋转矩阵的推导及图形学中的矩阵变换2.1 让横坐标扩大a倍&#xff0c;纵坐标扩大b倍2.2 关于x轴翻转2.3 关于y轴翻转2.4 关于原点翻转&#xff08;x轴&#xff0c;y轴均翻转&#xff09;2.5 沿x方向错切2.6 沿y方向错切2.7 旋转2.8 单位矩阵2.9 矩阵的…...

【单片机基础】按键状态机实现短按、长按、双击、三击和N击

下载地址&#xff1a; 【CSDNNaiva】源码&#xff1a;HK32F030M-按键扫描-短按长按检测【CSDNNaiva】源码&#xff1a;HK32F030M-ADC-EXTI-TM1624-USART-EEPROM-TiMBase-按键长按(231024) 参考资料 [1] 【CSDNPillarPeng】【按键】[独立按键] - 1&#xff1a; 单击&#xff0c…...

Ubuntu虚拟机部署OpenStack

1、部署环境 系统&#xff1a;ubuntu-22.04.3-desktop-amd64DevStack版本&#xff1a;2024.1VMware Workstation&#xff1a;8G内存、4核处理器、100G硬盘/1、网络NAT模式/1 2、Ubuntu环境设置 点击show applications&#xff0c;选择Software&Updates 跟换Ubuntu的镜像…...

ES在企业项目中的实战总结,彻底掌握ES的使用

通过之前两篇文章 了解了ES的核心概念和基础使用学习进阶的DSL语法处理复杂的查询 这段时间通过在本企业代码中对ES框架的使用&#xff0c;总结了不少经验。主要分为三点 企业封装了ES原生的api&#xff0c;需要使用企业项目提供的接口实现 -------简单使用&#xff08;本章节目…...

QT的Qporcess功能的使用

具体实现代码如下&#xff1a; #include <QProgressBar>//必须要包含的头文件 #include <QProcess>// 创建一个QProgressBar对象QProgressBar *progressBar new QProgressBar(this);QProcess *proces;process_shownew process;// 设置进度条的最小值和最大值prog…...

【图灵诸葛】jvm笔记

2023年10月23日14:04:44 jvm 1.jdk体系结构图回顾(Av333129672,P1) jdk jre 底层是hotspot jvm 2.java虚拟机内部组成(Av333129672,P2) 堆 方法区 执行引擎 类加载 本地方法栈 线程栈&#xff08;虚拟机栈&#xff09; 3.java虚拟机栈讲解(Av333129672,P3) 程序计数器&#xf…...

数据安全小课堂开讲啦!看这里!

数据安全小课堂开讲啦&#xff01;看这里&#xff01; 1、什么是数据&#xff1f; 《数据安全法》第三条明确&#xff0c;本法所称的数据&#xff0c;就是指任何以电子或者其他方式对信息的记录。小到个人使用手机、电脑等电子产品时浏览的网页、下载的应用、存储的文件&…...

单片机矩阵键盘

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、什么是矩阵键盘&#xff1f;1.独立键盘2.矩阵键盘变化1变化2变化3 3. 通过变型&#xff0c;举一反三&#xff0c;就可以实现4*4的矩阵键盘扫描 二、使用步骤…...

横坐标日期等间隔绘图 python示例代码

有两列数据&#xff0c;一列是日期&#xff0c;另一列是数值。日期是递增的&#xff0c;但是间隔不是均匀的。比如1月1日至2月1日有10组数据&#xff0c;2月1日至3月1日有100组数据&#xff0c;3月1日至4月1日有1000组数据。我想绘折线图&#xff0c;横坐标是日期&#xff0c;纵…...

photoshop2024免费插件Portraiture3

随着手机摄影的普及&#xff0c;修图可以说是现代人的必备生活技能之一了&#xff0c;现在谁发个朋友圈不把自己的照片修的美美的呢&#xff1f;那么如何拥有一张氛围感满满的照片呢&#xff1f;这不得不提图片处理软件中的王牌——photoshop。作为专业的图片处理软件&#xff…...

NewStarCTF2023week4-More Fast(GC回收)

打开链接&#xff0c;存在很多个类&#xff0c;很明显是php反序列化漏洞利用&#xff0c;需要构造pop链 &#xff0c; 关于pop链构造的详细步骤教学&#xff0c;请参考我之前的博客&#xff0c;真的讲得很详细也容易理解&#xff1a; http://t.csdnimg.cn/wMYNB 如果你是刚接…...

和鲸赞助丨第16届中国R会议暨2023 X-AGI大会通知

第16届中国 R 会议暨2023 X-AGI大会将于11月25-30日在中国人民大学召开&#xff0c;探讨数据科学和人工智能的相关进展&#xff0c;本次会议将采用线上会议和线下会议相结合的方式举办。 在过去的15年里&#xff0c;中国R会议一直致力于探讨数据科学在各学科、各行业的探索和实…...

Python第三方库 - Flask(python web框架)

1 Flask 1.1 认识Flask Web Application Framework&#xff08; Web 应用程序框架&#xff09;或简单的 Web Framework&#xff08; Web 框架&#xff09;表示一个库和模块的集合&#xff0c;使 Web 应用程序开发人员能够编写应用程序&#xff0c;而不必担心协议&#xff0c;线…...

c# sqlite 修改字段类型

因为sqlite不支持直接修改字段类型&#xff0c; 所以只能创建新的表&#xff0c;再将原始数据复制过去。具体操作步骤如下&#xff1a; 第一步&#xff0c; 将表“tableName”的名称修改为 “oldTable” string queryString string.Format("ALTER TABLE {0} RENAME TO …...

[Pytorch] 保存模型与加载模型

1、保存模型 # 定义模型 model BPNetModel(n_featuren_feature,n_hiddenn_hidden,n_outputn_output) #调用网络# 保存模型 torch.save(model, BPNetModel0.pth) 2、加载模型 import torch## 读取模型 model torch.load(BPNetModel0.pth) 3、保存模型参数 #调用网络 mode…...

React Scroll Parallax核心组件详解:Parallax、ParallaxBanner和ParallaxProvider

React Scroll Parallax核心组件详解&#xff1a;Parallax、ParallaxBanner和ParallaxProvider 【免费下载链接】react-scroll-parallax &#x1f52e; React hooks and components to create parallax scroll effects for banners, images or any other DOM elements. 项目地…...

数据结构与算法学习笔记

java一.数据结构简介1. 为什么要有数据结构&#xff1f;数据太多、太乱 → 无法高效处理 → 必须结构化2. 数据结构的两大分类逻辑结构&#xff1a;数据之间的关系&#xff08;怎么理解&#xff09;物理结构&#xff1a;内存中的存储方式&#xff08;怎么实现&#xff09;3. 逻…...

如何在 Linux 系统中查看和管理网络接口?

一、 查看网络接口使用 ifconfig 命令 查看活动接口&#xff1a;直接输入 ifconfig 可显示当前系统所有已激活的网络接口信息。查看所有接口&#xff1a;使用 ifconfig -a 可显示当前系统所有的网络接口&#xff08;包括未激活的&#xff09;。使用 ip 命令 查看 IP 地址&#…...

python的模块和包

模块&#xff1a;1&#xff1a;在python里一个.py文件就是一个模块&#xff08;module)2&#xff1a;模块可以包含&#xff1a;变量&#xff0c;函数&#xff0c;方法等许多内容3&#xff1a;通常把能够实现特定功能的代码&#xff0c;集中放在一个模块里4&#xff1a;模块可以…...

Vue3 + xterm.js 4.x + WebSocket 打造现代化Web终端实战指南

1. 为什么选择Vue3 xterm.js 4.x WebSocket组合&#xff1f; 在构建现代化Web终端时&#xff0c;技术选型直接影响开发效率和最终用户体验。Vue3提供了响应式编程范式和组件化开发优势&#xff0c;xterm.js 4.x是最新版本的浏览器终端模拟器&#xff0c;而WebSocket则实现了…...

5大维度解析zteOnu:让ONU设备管理效率提升300%的开源工具

5大维度解析zteOnu&#xff1a;让ONU设备管理效率提升300%的开源工具 【免费下载链接】zteOnu A tool that can open ZTE onu device factory mode 项目地址: https://gitcode.com/gh_mirrors/zt/zteOnu 问题引入&#xff1a;网络运维工程师的日常困境 你是否也曾面临这…...

MAP vs MLE:机器学习参数估计该怎么选?5个真实案例告诉你答案

MAP vs MLE&#xff1a;机器学习参数估计该怎么选&#xff1f;5个真实案例告诉你答案 在机器学习项目的参数估计环节&#xff0c;数据科学家常常面临一个关键选择&#xff1a;采用最大后验概率&#xff08;MAP&#xff09;还是最大似然估计&#xff08;MLE&#xff09;&#xf…...

解决 npm install 安装过慢

解决 npm install 安装过慢npm install --registryhttps://registry.npmmirror.com...

Phi-4-mini-reasoning企业应用:替代传统规则引擎做逻辑校验服务

Phi-4-mini-reasoning企业应用&#xff1a;替代传统规则引擎做逻辑校验服务 1. 为什么企业需要逻辑校验服务 在现代企业系统中&#xff0c;逻辑校验无处不在。从电商平台的优惠券规则验证&#xff0c;到金融系统的风控审核&#xff0c;再到制造业的工艺流程检查&#xff0c;都…...

5分钟打造个人游戏库:FitGirl Repack Launcher高效管理方案

5分钟打造个人游戏库&#xff1a;FitGirl Repack Launcher高效管理方案 【免费下载链接】Fitgirl-Repack-Launcher An Electron launcher designed specifically for FitGirl Repacks, utilizing pure vanilla JavaScript, HTML, and CSS for optimal performance and customiz…...