当前位置: 首页 > news >正文

MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!

引言:为什么需要实时同步?

MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如:

  • 构建高性能搜索服务

  • 实时数据分析与大屏展示

  • 提升复杂查询效率

传统方案(如定时全量同步)存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步,并提供完整Java代码及深度源码解析。

一、技术选型与核心原理

1.1 核心组件
  • MySQL Binlog:MySQL的二进制日志,记录所有数据变更事件(增删改)。

  • Canal/OpenReplicator:解析Binlog的工具(本文使用轻量级mysql-binlog-connector-java)。

  • Elasticsearch High Level REST Client:ES官方Java客户端,用于数据写入。

1.2 架构流程图
MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch

二、环境准备与配置

2.1 MySQL开启Binlog
# 修改my.cnf(Linux)或my.ini(Windows)
[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW  # 必须为ROW模式
2.2 创建ES索引
PUT /user
{"mappings": {"properties": {"id": {"type": "integer"},"name": {"type": "text"},"email": {"type": "keyword"},"create_time": {"type": "date"}}}
}

三、Java代码实现

3.1 Maven依赖
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.3</version>
</dependency>
3.2 核心代码(Binlog监听与同步)
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;public class MySQL2ESSyncer {private static final String ES_INDEX = "user";public static void main(String[] args) throws Exception {// 初始化ES客户端RestHighLevelClient esClient = ESClientFactory.createClient();// 配置Binlog监听BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");client.setServerId(1001); // 唯一ID,避免冲突client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof WriteRowsEventData) {// 处理插入事件handleWriteEvent((WriteRowsEventData) data, esClient);} else if (data instanceof UpdateRowsEventData) {// 处理更新事件handleUpdateEvent((UpdateRowsEventData) data, esClient);} else if (data instanceof DeleteRowsEventData) {// 处理删除事件handleDeleteEvent((DeleteRowsEventData) data, esClient);}});client.connect(); // 启动监听}private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {eventData.getRows().forEach(row -> {// 假设表结构为:id, name, email, create_timeString json = String.format("{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",row[0], row[1], row[2], row[3]);IndexRequest request = new IndexRequest(ES_INDEX).id(row[0].toString()).source(json, XContentType.JSON);esClient.index(request, RequestOptions.DEFAULT);});}// 更新和删除处理类似,代码略(完整源码见文末链接)
}

四、源码深度解析

4.1 Binlog监听流程
  • BinaryLogClient:核心类,负责连接MySQL并监听Binlog。

  • 事件类型判断:根据WriteRowsEventDataUpdateRowsEventDataDeleteRowsEventData区分增、改、删操作。

4.2 数据转换关键点
  • Row数据解析:从事件中提取变更的行的具体值,需与表结构顺序对应。

  • ES文档ID:建议使用MySQL主键,确保更新/删除操作能精准定位文档。

4.3 异常处理与优化
  • 重试机制:ES写入失败时,可加入重试队列。

  • 批量提交:攒批写入ES提升性能(需权衡实时性)。

  • 事务一致性:确保Binlog位置持久化,避免数据丢失。

五、方案优缺点对比

方案实时性复杂度资源消耗
定时全量同步低(分钟级)
基于触发器高(需改表)
Binlog监听

六、总结与扩展

本文实现了基于Binlog的MySQL到ES的实时同步,具备以下优势:

  • 实时性:毫秒级延迟,满足大部分业务场景。

  • 无侵入:无需修改MySQL表结构。

  • 可扩展:可轻松适配其他数据源(如PostgreSQL)。

扩展方向

  • 使用Kafka作为中间层,解耦生产与消费。

  • 增加监控报警,保障数据一致性。

  • 支持DDL变更自动同步(如表结构修改)。

 

相关文章:

MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!

引言&#xff1a;为什么需要实时同步&#xff1f; MySQL擅长事务处理&#xff0c;而Elasticsearch&#xff08;ES&#xff09;则专注于搜索与分析。将MySQL数据实时同步到ES&#xff0c;可以充分发挥两者的优势&#xff0c;例如&#xff1a; 构建高性能搜索服务 实时数据分析…...

Spring-事务

Spring 事务 事务的基本概念 &#x1f539; 什么是事务&#xff1f; 事务是一组数据库操作&#xff0c;它们作为一个整体&#xff0c;要么全部成功&#xff0c;要么全部回滚。 常见的事务场景&#xff1a; 银行转账&#xff08;扣款和存款必须同时成功&#xff09; 订单系统…...

Git系列之git tag和ReleaseMilestone

以下是关于 Git Tag、Release 和 Milestone 的深度融合内容&#xff0c;并补充了关于 Git Tag 的所有命令、详细解释和指令实例&#xff0c;条理清晰&#xff0c;结合实际使用场景和案例。 1. Git Tag 1.1 定义 • Tag 是 Git 中用于标记特定提交&#xff08;commit&#xf…...

考研机试常见基本题型

1、求100以内的素数 sqrt()函数在cmath头文件中。 #include <iostream> #include <cmath> using namespace std;int main() {int count 0; // 用于统计素数的个数// 遍历 100 到 200 之间的每一个数for (int num 100; num < 200; num) {bool isPrime true…...

Android AudioFlinger(四)—— 揭开PlaybackThread面纱

前言&#xff1a; 继上一篇Android AudioFlinger&#xff08;三&#xff09;—— AndroidAudio Flinger 之设备管理我们知道PlaybackThread继承自Re’fBase&#xff0c; 在被第一次引用的时候就会调用onFirstRef&#xff0c;实现如下&#xff1a; void AudioFlinger::Playbac…...

C语言基础系列【20】内存管理

博主介绍&#xff1a;程序喵大人 35- 资深C/C/Rust/Android/iOS客户端开发10年大厂工作经验嵌入式/人工智能/自动驾驶/音视频/游戏开发入门级选手《C20高级编程》《C23高级编程》等多本书籍著译者更多原创精品文章&#xff0c;首发gzh&#xff0c;见文末&#x1f447;&#x1f…...

JavaScript基础-递增和递减运算符

在JavaScript编程中&#xff0c;递增&#xff08;&#xff09;和递减&#xff08;--&#xff09;运算符是用于对数值进行加一或减一操作的基础工具。它们简洁且强大&#xff0c;但如果不正确地使用&#xff0c;可能会导致混淆或错误。本文将详细介绍这两种运算符的不同形式及其…...

计算机毕业设计SpringBoot+Vue.js社区医疗综合服务平台(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

3.6c语言

#define _CRT_SECURE_NO_WARNINGS #include <math.h> #include <stdio.h> int main() {int sum 0,i,j;for (j 1; j < 1000; j){sum 0;for (i 1; i < j; i){if (j % i 0){sum i;} }if (sum j){printf("%d是完数\n", j);}}return 0; }#de…...

Unity开发——CanvasGroup组件介绍和应用

CanvasGroup是Unity中用于控制UI的透明度、交互性和渲染顺序的组件。 一、常用属性的解释 1、alpha&#xff1a;控制UI的透明度 类型&#xff1a;float&#xff0c;0.0 ~1.0&#xff0c; 其中 0.0 完全透明&#xff0c;1.0 完全不透明。 通过调整alpha值可以实现UI的淡入淡…...

深度学习驱动的跨行业智能化革命:技术突破与实践创新

第一章 深度学习的技术范式演进与核心架构 1.1 从传统机器学习到深度神经网络的跨越 深度学习的核心在于通过多层次非线性变换自动提取数据特征,其发展历程可划分为三个阶段:符号主义时代的规则驱动(1950s-1980s)、连接主义时代的浅层网络(1990s-2000s)以及深度学习时代…...

php配置虚拟主机

在PHP中配置虚拟主机&#xff0c;通常是通过Apache或Nginx等Web服务器来进行设置的。下面我将分别介绍如何在Apache和Nginx中配置PHP虚拟主机。 1. Apache 配置虚拟主机 Apache是最常用的Web服务器之一&#xff0c;配置虚拟主机的步骤如下&#xff1a; 步骤一&#xff1a;确保A…...

RESTful API 设计指南

RESTful API 介绍 大佬的总结&#xff1a;RESTful API 设计指南 - 阮一峰的网络日志 json-server github地址 这里介绍一个快速搭建 REST API 服务的工具包 接口测试工具 介绍几个接口测试工具 apipost apifox postman https://www.apipost.cn/ (中文) https://www.apifox…...

在虚拟机上安装Hadoop

以下是在虚拟机上安装Hadoop的一般步骤&#xff1a; 准备工作 - 安装虚拟机软件&#xff1a;如VMware Workstation或VirtualBox等。 - 创建虚拟机&#xff1a;选择合适的操作系统镜像&#xff0c;如Ubuntu或CentOS等Linux发行版&#xff0c;为虚拟机分配足够的CPU、内存和磁盘…...

大白话JavaScript实现一个函数,将字符串中的每个单词首字母大写。

大白话JavaScript实现一个函数&#xff0c;将字符串中的每个单词首字母大写。 答题思路 理解需求&#xff1a;要写一个函数&#xff0c;它能接收一个字符串&#xff0c;然后把这个字符串里每个单词的第一个字母变成大写。分解步骤 拆分单词&#xff1a;一般单词之间是用空格隔…...

【VUE2】第三期——样式冲突、组件通信、异步更新

目录 1 scoped解决样式冲突 2 data写法 3 组件通信 3.1 父子关系 3.1.1 父向子传值 props 3.1.2 子向父传值 $emit 3.2 非父子关系 3.2.1 event bus 事件总线 3.2.2 跨层级共享数据 provide&inject 4 props 4.1 介绍 4.2 props校验完整写法 5 v-model原理 …...

深度学习代码解读——自用

代码来自&#xff1a;GitHub - ChuHan89/WSSS-Tissue 借助了一些人工智能 2_generate_PM.py 功能总结 该代码用于 生成弱监督语义分割&#xff08;WSSS&#xff09;所需的伪掩码&#xff08;Pseudo-Masks&#xff09;&#xff0c;是 Stage2 训练的前置步骤。其核心流程为&a…...

Linux 配置静态 IP

一、简介 在 Linux CentOS 系统中默认动态分配 IP 地址&#xff0c;每次启动虚拟机服务都是不一样的 IP&#xff0c;因此要配置静态 IP 地址避免每次都发生变化&#xff0c;下面将介绍配置静态 IP 的详细步骤。 首先先理解一下动态 IP 和静态 IP 的概念&#xff1a; 动态 IP…...

Oxidized收集H3C交换机网络配置报错,not matching configured prompt (?-mix:^(<CD>)$)

背景&#xff1a;问题如上标题&#xff0c;H3C所有交换机配置的model都是comware 解决方案&#xff1a; 1、找到compare.rb [rootoxidized model]# pwd /usr/local/lib/ruby/gems/3.1.0/gems/oxidized-0.29.1/lib/oxidized/model [rootoxidized model]# ll comware.rb -rw-r--…...

RAG技术深度解析:从基础Agent到复杂推理Deep Search的架构实践

重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

C# 表达式和运算符(求值顺序)

求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如&#xff0c;已知表达式3*52&#xff0c;依照子表达式的求值顺序&#xff0c;有两种可能的结果&#xff0c;如图9-3所示。 如果乘法先执行&#xff0c;结果是17。如果5…...

Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践

前言&#xff1a;本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中&#xff0c;跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南&#xff0c;你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案&#xff0c;并结合内网…...

实战设计模式之模板方法模式

概述 模板方法模式定义了一个操作中的算法骨架&#xff0c;并将某些步骤延迟到子类中实现。模板方法使得子类可以在不改变算法结构的前提下&#xff0c;重新定义算法中的某些步骤。简单来说&#xff0c;就是在一个方法中定义了要执行的步骤顺序或算法框架&#xff0c;但允许子类…...

LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》

&#x1f9e0; LangChain 中 TextSplitter 的使用详解&#xff1a;从基础到进阶&#xff08;附代码&#xff09; 一、前言 在处理大规模文本数据时&#xff0c;特别是在构建知识库或进行大模型训练与推理时&#xff0c;文本切分&#xff08;Text Splitting&#xff09; 是一个…...

Tauri2学习笔记

教程地址&#xff1a;https://www.bilibili.com/video/BV1Ca411N7mF?spm_id_from333.788.player.switch&vd_source707ec8983cc32e6e065d5496a7f79ee6 官方指引&#xff1a;https://tauri.app/zh-cn/start/ 目前Tauri2的教程视频不多&#xff0c;我按照Tauri1的教程来学习&…...

智能体革命:企业如何构建自主决策的AI代理?

OpenAI智能代理构建实用指南详解 随着大型语言模型&#xff08;LLM&#xff09;在推理、多模态理解和工具调用能力上的进步&#xff0c;智能代理&#xff08;Agents&#xff09;成为自动化领域的新突破。与传统软件仅帮助用户自动化流程不同&#xff0c;智能代理能够自主执行工…...

前端打包工具简单介绍

前端打包工具简单介绍 一、Webpack 架构与插件机制 1. Webpack 架构核心组成 Entry&#xff08;入口&#xff09; 指定应用的起点文件&#xff0c;比如 src/index.js。 Module&#xff08;模块&#xff09; Webpack 把项目当作模块图&#xff0c;模块可以是 JS、CSS、图片等…...

PostgreSQL 对 IPv6 的支持情况

PostgreSQL 对 IPv6 的支持情况 PostgreSQL 全面支持 IPv6 网络协议&#xff0c;包括连接、存储和操作 IPv6 地址。以下是详细说明&#xff1a; 一、网络连接支持 1. 监听 IPv6 连接 在 postgresql.conf 中配置&#xff1a; listen_addresses 0.0.0.0,:: # 监听所有IPv4…...