当前位置: 首页 > news >正文

springboot集成canal,将数据发送至接口

前置条件:

  1. mysql开启binlog日志
  2. 部署canal服务

Springboot代码:

接口工具类:

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class HttpMethodUtil {@Asyncpublic int httpPost(String url,String Content) {// 添加请求头信息Map<String, String> heads = new HashMap<>();// 使用json发送请求,下面的是必须的heads.put("Content-Type", "application/json;charset=UTF-8");HttpResponse response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();JSONObject jsonObject = JSON.parseObject(response.body());int code = (int) jsonObject.get("code");if (code!=0){response = HttpRequest.post(url).headerMap(heads, false).body(Content).timeout(5 * 1000).execute();}jsonObject = JSON.parseObject(response.body());code = (int) jsonObject.get("code");return code;}

canal工具类:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class CanalUtil {@Value("${canal-monitor-mysql.hostname}")String canalMonitorHost;@Value("${canal-monitor-mysql.port}")Integer canalMonitorPort;@Value("${canal-monitor-mysql.database}")String canalMonitorDatabaseName;private final static int BATCH_SIZE = 100;@ResourceHttpMethodUtil httpMethodUtil;/*** 启动服务*/@PostConstructpublic void startMonitorSQL() {while (true) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");try {//打开连接connector.connect();log.info("数据库检测连接成功!" + canalMonitorDatabaseName);//订阅数据库表,全部表q// connector.subscribe(canalMonitorTableName + "\\..*");connector.subscribe(canalMonitorDatabaseName+ "\\.jyz_jyzqgdwa");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {} else {handleDATAChange(message.getEntries());}// 提交确认connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();//防止频繁访问数据库链接: 线程睡眠 10秒try {Thread.sleep(10 * 1000);} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 打印canal server解析binlog获得的实体类信息*/private void handleDATAChange(List<Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//RowChange对象,包含了一行数据变化的所有特征CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChage.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChage.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");log.info("DELETE:{}", rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");JSONObject json = new JSONObject();for (CanalEntry.Column column : rowData.getAfterColumnsList()) {json.put(column.getName(), column.getValue());}log.info(json.toJSONString());String url = "http://192.168.21.11:8000/api/wirte";int code = httpMethodUtil.httpPost(url,json.toJSONString());if(code==0){log.info(json.toJSONString());}else {log.error(json.toJSONString());}}}}}}

application.properties配置文件:

canal-monitor-mysql.hostname: 192.168.21.11
canal-monitor-mysql.port: 11111
canal-monitor-mysql.database: datbases_namelogging.file.name=./log/rizhi.log         
logging.pattern.dateformat=yyyy-MM-dd # 设置日期格式化
logging.logback.rollingpolicy.max-history=30

相关文章:

springboot集成canal,将数据发送至接口

前置条件&#xff1a; mysql开启binlog日志部署canal服务 Springboot代码&#xff1a; 接口工具类&#xff1a; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; imp…...

Selenum八种常用定位(案例解析)

Selenium是一个备受推崇的工具。它有着丰富的功能&#xff0c;让我们能够与网页互动&#xff0c;执行各种任务&#xff0c;能为测试工程师和开发人员提供了很大的便利。 要充分利用Selenium&#xff0c;就需要了解如何正确定位网页上的元素。 接下来我将带大家共同探讨Seleni…...

Web前端接入Microsoft Azure AI文本翻译

Azure 文本翻译是 Azure AI 翻译服务的一项基于云的 REST API 功能。 文本翻译 API 支持实时快速准确地进行源到目标文本翻译。 文本翻译软件开发工具包 (SDK) 是一组库和工具&#xff0c;可用于轻松地将文本翻译 REST API 功能集成到应用程序中。 文本翻译 SDK 可跨 C#/.NET、…...

容联七陌助力鱼跃医疗升级智能联络中心,让客户服务更“鱼跃”

在当今高度竞争的市场环境中&#xff0c;企业的客户服务质量对于维护品牌形象和保持竞争优势至关重要。而随着人工智能、大数据等技术快速发展&#xff0c;智能化客户服务正在成为各行各业发展的重要部分。 鱼跃医疗是一家致力于“用科技律动生命"的国内知名医疗制造企业…...

【Redis系列】在Centos7上安装Redis5.0保姆级教程!

哈喽&#xff0c; 大家好&#xff0c;我是小浪。那么最近也是在忙秋招&#xff0c;很长一段时间没有更新文章啦&#xff0c;最近呢也是秋招闲下来&#xff0c;当然秋招结果也不是很理想&#xff0c;嗯……这里就不多说啦&#xff0c;回归正题&#xff0c;从今天开始我们就开始正…...

线性代数-Python-03:矩阵的变换 - 手写Matrix Transformation及numpy中的用法

文章目录 一、代码仓库二、旋转矩阵的推导及图形学中的矩阵变换2.1 让横坐标扩大a倍&#xff0c;纵坐标扩大b倍2.2 关于x轴翻转2.3 关于y轴翻转2.4 关于原点翻转&#xff08;x轴&#xff0c;y轴均翻转&#xff09;2.5 沿x方向错切2.6 沿y方向错切2.7 旋转2.8 单位矩阵2.9 矩阵的…...

【单片机基础】按键状态机实现短按、长按、双击、三击和N击

下载地址&#xff1a; 【CSDNNaiva】源码&#xff1a;HK32F030M-按键扫描-短按长按检测【CSDNNaiva】源码&#xff1a;HK32F030M-ADC-EXTI-TM1624-USART-EEPROM-TiMBase-按键长按(231024) 参考资料 [1] 【CSDNPillarPeng】【按键】[独立按键] - 1&#xff1a; 单击&#xff0c…...

Ubuntu虚拟机部署OpenStack

1、部署环境 系统&#xff1a;ubuntu-22.04.3-desktop-amd64DevStack版本&#xff1a;2024.1VMware Workstation&#xff1a;8G内存、4核处理器、100G硬盘/1、网络NAT模式/1 2、Ubuntu环境设置 点击show applications&#xff0c;选择Software&Updates 跟换Ubuntu的镜像…...

ES在企业项目中的实战总结,彻底掌握ES的使用

通过之前两篇文章 了解了ES的核心概念和基础使用学习进阶的DSL语法处理复杂的查询 这段时间通过在本企业代码中对ES框架的使用&#xff0c;总结了不少经验。主要分为三点 企业封装了ES原生的api&#xff0c;需要使用企业项目提供的接口实现 -------简单使用&#xff08;本章节目…...

QT的Qporcess功能的使用

具体实现代码如下&#xff1a; #include <QProgressBar>//必须要包含的头文件 #include <QProcess>// 创建一个QProgressBar对象QProgressBar *progressBar new QProgressBar(this);QProcess *proces;process_shownew process;// 设置进度条的最小值和最大值prog…...

【图灵诸葛】jvm笔记

2023年10月23日14:04:44 jvm 1.jdk体系结构图回顾(Av333129672,P1) jdk jre 底层是hotspot jvm 2.java虚拟机内部组成(Av333129672,P2) 堆 方法区 执行引擎 类加载 本地方法栈 线程栈&#xff08;虚拟机栈&#xff09; 3.java虚拟机栈讲解(Av333129672,P3) 程序计数器&#xf…...

数据安全小课堂开讲啦!看这里!

数据安全小课堂开讲啦&#xff01;看这里&#xff01; 1、什么是数据&#xff1f; 《数据安全法》第三条明确&#xff0c;本法所称的数据&#xff0c;就是指任何以电子或者其他方式对信息的记录。小到个人使用手机、电脑等电子产品时浏览的网页、下载的应用、存储的文件&…...

单片机矩阵键盘

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、什么是矩阵键盘&#xff1f;1.独立键盘2.矩阵键盘变化1变化2变化3 3. 通过变型&#xff0c;举一反三&#xff0c;就可以实现4*4的矩阵键盘扫描 二、使用步骤…...

横坐标日期等间隔绘图 python示例代码

有两列数据&#xff0c;一列是日期&#xff0c;另一列是数值。日期是递增的&#xff0c;但是间隔不是均匀的。比如1月1日至2月1日有10组数据&#xff0c;2月1日至3月1日有100组数据&#xff0c;3月1日至4月1日有1000组数据。我想绘折线图&#xff0c;横坐标是日期&#xff0c;纵…...

photoshop2024免费插件Portraiture3

随着手机摄影的普及&#xff0c;修图可以说是现代人的必备生活技能之一了&#xff0c;现在谁发个朋友圈不把自己的照片修的美美的呢&#xff1f;那么如何拥有一张氛围感满满的照片呢&#xff1f;这不得不提图片处理软件中的王牌——photoshop。作为专业的图片处理软件&#xff…...

NewStarCTF2023week4-More Fast(GC回收)

打开链接&#xff0c;存在很多个类&#xff0c;很明显是php反序列化漏洞利用&#xff0c;需要构造pop链 &#xff0c; 关于pop链构造的详细步骤教学&#xff0c;请参考我之前的博客&#xff0c;真的讲得很详细也容易理解&#xff1a; http://t.csdnimg.cn/wMYNB 如果你是刚接…...

和鲸赞助丨第16届中国R会议暨2023 X-AGI大会通知

第16届中国 R 会议暨2023 X-AGI大会将于11月25-30日在中国人民大学召开&#xff0c;探讨数据科学和人工智能的相关进展&#xff0c;本次会议将采用线上会议和线下会议相结合的方式举办。 在过去的15年里&#xff0c;中国R会议一直致力于探讨数据科学在各学科、各行业的探索和实…...

Python第三方库 - Flask(python web框架)

1 Flask 1.1 认识Flask Web Application Framework&#xff08; Web 应用程序框架&#xff09;或简单的 Web Framework&#xff08; Web 框架&#xff09;表示一个库和模块的集合&#xff0c;使 Web 应用程序开发人员能够编写应用程序&#xff0c;而不必担心协议&#xff0c;线…...

c# sqlite 修改字段类型

因为sqlite不支持直接修改字段类型&#xff0c; 所以只能创建新的表&#xff0c;再将原始数据复制过去。具体操作步骤如下&#xff1a; 第一步&#xff0c; 将表“tableName”的名称修改为 “oldTable” string queryString string.Format("ALTER TABLE {0} RENAME TO …...

[Pytorch] 保存模型与加载模型

1、保存模型 # 定义模型 model BPNetModel(n_featuren_feature,n_hiddenn_hidden,n_outputn_output) #调用网络# 保存模型 torch.save(model, BPNetModel0.pth) 2、加载模型 import torch## 读取模型 model torch.load(BPNetModel0.pth) 3、保存模型参数 #调用网络 mode…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)

目录 1.TCP的连接管理机制&#xff08;1&#xff09;三次握手①握手过程②对握手过程的理解 &#xff08;2&#xff09;四次挥手&#xff08;3&#xff09;握手和挥手的触发&#xff08;4&#xff09;状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

docker 部署发现spring.profiles.active 问题

报错&#xff1a; org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...

iview框架主题色的应用

1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题&#xff0c;无需引入&#xff0c;直接可…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...