MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!
引言:为什么需要实时同步?
MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如:
-
构建高性能搜索服务
-
实时数据分析与大屏展示
-
提升复杂查询效率
传统方案(如定时全量同步)存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步,并提供完整Java代码及深度源码解析。
一、技术选型与核心原理
1.1 核心组件
-
MySQL Binlog:MySQL的二进制日志,记录所有数据变更事件(增删改)。
-
Canal/OpenReplicator:解析Binlog的工具(本文使用轻量级
mysql-binlog-connector-java
)。 -
Elasticsearch High Level REST Client:ES官方Java客户端,用于数据写入。
1.2 架构流程图
MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch
二、环境准备与配置
2.1 MySQL开启Binlog
# 修改my.cnf(Linux)或my.ini(Windows)
[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW # 必须为ROW模式
2.2 创建ES索引
PUT /user
{"mappings": {"properties": {"id": {"type": "integer"},"name": {"type": "text"},"email": {"type": "keyword"},"create_time": {"type": "date"}}}
}
三、Java代码实现
3.1 Maven依赖
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.3</version>
</dependency>
3.2 核心代码(Binlog监听与同步)
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;public class MySQL2ESSyncer {private static final String ES_INDEX = "user";public static void main(String[] args) throws Exception {// 初始化ES客户端RestHighLevelClient esClient = ESClientFactory.createClient();// 配置Binlog监听BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");client.setServerId(1001); // 唯一ID,避免冲突client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof WriteRowsEventData) {// 处理插入事件handleWriteEvent((WriteRowsEventData) data, esClient);} else if (data instanceof UpdateRowsEventData) {// 处理更新事件handleUpdateEvent((UpdateRowsEventData) data, esClient);} else if (data instanceof DeleteRowsEventData) {// 处理删除事件handleDeleteEvent((DeleteRowsEventData) data, esClient);}});client.connect(); // 启动监听}private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {eventData.getRows().forEach(row -> {// 假设表结构为:id, name, email, create_timeString json = String.format("{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",row[0], row[1], row[2], row[3]);IndexRequest request = new IndexRequest(ES_INDEX).id(row[0].toString()).source(json, XContentType.JSON);esClient.index(request, RequestOptions.DEFAULT);});}// 更新和删除处理类似,代码略(完整源码见文末链接)
}
四、源码深度解析
4.1 Binlog监听流程
-
BinaryLogClient:核心类,负责连接MySQL并监听Binlog。
-
事件类型判断:根据
WriteRowsEventData
、UpdateRowsEventData
、DeleteRowsEventData
区分增、改、删操作。
4.2 数据转换关键点
-
Row数据解析:从事件中提取变更的行的具体值,需与表结构顺序对应。
-
ES文档ID:建议使用MySQL主键,确保更新/删除操作能精准定位文档。
4.3 异常处理与优化
-
重试机制:ES写入失败时,可加入重试队列。
-
批量提交:攒批写入ES提升性能(需权衡实时性)。
-
事务一致性:确保Binlog位置持久化,避免数据丢失。
五、方案优缺点对比
方案 | 实时性 | 复杂度 | 资源消耗 |
---|---|---|---|
定时全量同步 | 低(分钟级) | 低 | 高 |
基于触发器 | 高 | 高(需改表) | 中 |
Binlog监听 | 高 | 中 | 低 |
六、总结与扩展
本文实现了基于Binlog的MySQL到ES的实时同步,具备以下优势:
-
实时性:毫秒级延迟,满足大部分业务场景。
-
无侵入:无需修改MySQL表结构。
-
可扩展:可轻松适配其他数据源(如PostgreSQL)。
扩展方向:
-
使用Kafka作为中间层,解耦生产与消费。
-
增加监控报警,保障数据一致性。
-
支持DDL变更自动同步(如表结构修改)。
相关文章:

MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!
引言:为什么需要实时同步? MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如: 构建高性能搜索服务 实时数据分析…...

Spring-事务
Spring 事务 事务的基本概念 🔹 什么是事务? 事务是一组数据库操作,它们作为一个整体,要么全部成功,要么全部回滚。 常见的事务场景: 银行转账(扣款和存款必须同时成功) 订单系统…...

Git系列之git tag和ReleaseMilestone
以下是关于 Git Tag、Release 和 Milestone 的深度融合内容,并补充了关于 Git Tag 的所有命令、详细解释和指令实例,条理清晰,结合实际使用场景和案例。 1. Git Tag 1.1 定义 • Tag 是 Git 中用于标记特定提交(commit…...

考研机试常见基本题型
1、求100以内的素数 sqrt()函数在cmath头文件中。 #include <iostream> #include <cmath> using namespace std;int main() {int count 0; // 用于统计素数的个数// 遍历 100 到 200 之间的每一个数for (int num 100; num < 200; num) {bool isPrime true…...

Android AudioFlinger(四)—— 揭开PlaybackThread面纱
前言: 继上一篇Android AudioFlinger(三)—— AndroidAudio Flinger 之设备管理我们知道PlaybackThread继承自Re’fBase, 在被第一次引用的时候就会调用onFirstRef,实现如下: void AudioFlinger::Playbac…...

C语言基础系列【20】内存管理
博主介绍:程序喵大人 35- 资深C/C/Rust/Android/iOS客户端开发10年大厂工作经验嵌入式/人工智能/自动驾驶/音视频/游戏开发入门级选手《C20高级编程》《C23高级编程》等多本书籍著译者更多原创精品文章,首发gzh,见文末👇…...

JavaScript基础-递增和递减运算符
在JavaScript编程中,递增()和递减(--)运算符是用于对数值进行加一或减一操作的基础工具。它们简洁且强大,但如果不正确地使用,可能会导致混淆或错误。本文将详细介绍这两种运算符的不同形式及其…...

计算机毕业设计SpringBoot+Vue.js社区医疗综合服务平台(源码+文档+PPT+讲解)
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...

3.6c语言
#define _CRT_SECURE_NO_WARNINGS #include <math.h> #include <stdio.h> int main() {int sum 0,i,j;for (j 1; j < 1000; j){sum 0;for (i 1; i < j; i){if (j % i 0){sum i;} }if (sum j){printf("%d是完数\n", j);}}return 0; }#de…...

Unity开发——CanvasGroup组件介绍和应用
CanvasGroup是Unity中用于控制UI的透明度、交互性和渲染顺序的组件。 一、常用属性的解释 1、alpha:控制UI的透明度 类型:float,0.0 ~1.0, 其中 0.0 完全透明,1.0 完全不透明。 通过调整alpha值可以实现UI的淡入淡…...

深度学习驱动的跨行业智能化革命:技术突破与实践创新
第一章 深度学习的技术范式演进与核心架构 1.1 从传统机器学习到深度神经网络的跨越 深度学习的核心在于通过多层次非线性变换自动提取数据特征,其发展历程可划分为三个阶段:符号主义时代的规则驱动(1950s-1980s)、连接主义时代的浅层网络(1990s-2000s)以及深度学习时代…...

php配置虚拟主机
在PHP中配置虚拟主机,通常是通过Apache或Nginx等Web服务器来进行设置的。下面我将分别介绍如何在Apache和Nginx中配置PHP虚拟主机。 1. Apache 配置虚拟主机 Apache是最常用的Web服务器之一,配置虚拟主机的步骤如下: 步骤一:确保A…...

RESTful API 设计指南
RESTful API 介绍 大佬的总结:RESTful API 设计指南 - 阮一峰的网络日志 json-server github地址 这里介绍一个快速搭建 REST API 服务的工具包 接口测试工具 介绍几个接口测试工具 apipost apifox postman https://www.apipost.cn/ (中文) https://www.apifox…...

在虚拟机上安装Hadoop
以下是在虚拟机上安装Hadoop的一般步骤: 准备工作 - 安装虚拟机软件:如VMware Workstation或VirtualBox等。 - 创建虚拟机:选择合适的操作系统镜像,如Ubuntu或CentOS等Linux发行版,为虚拟机分配足够的CPU、内存和磁盘…...

大白话JavaScript实现一个函数,将字符串中的每个单词首字母大写。
大白话JavaScript实现一个函数,将字符串中的每个单词首字母大写。 答题思路 理解需求:要写一个函数,它能接收一个字符串,然后把这个字符串里每个单词的第一个字母变成大写。分解步骤 拆分单词:一般单词之间是用空格隔…...

【VUE2】第三期——样式冲突、组件通信、异步更新
目录 1 scoped解决样式冲突 2 data写法 3 组件通信 3.1 父子关系 3.1.1 父向子传值 props 3.1.2 子向父传值 $emit 3.2 非父子关系 3.2.1 event bus 事件总线 3.2.2 跨层级共享数据 provide&inject 4 props 4.1 介绍 4.2 props校验完整写法 5 v-model原理 …...

深度学习代码解读——自用
代码来自:GitHub - ChuHan89/WSSS-Tissue 借助了一些人工智能 2_generate_PM.py 功能总结 该代码用于 生成弱监督语义分割(WSSS)所需的伪掩码(Pseudo-Masks),是 Stage2 训练的前置步骤。其核心流程为&a…...

Linux 配置静态 IP
一、简介 在 Linux CentOS 系统中默认动态分配 IP 地址,每次启动虚拟机服务都是不一样的 IP,因此要配置静态 IP 地址避免每次都发生变化,下面将介绍配置静态 IP 的详细步骤。 首先先理解一下动态 IP 和静态 IP 的概念: 动态 IP…...

Oxidized收集H3C交换机网络配置报错,not matching configured prompt (?-mix:^(<CD>)$)
背景:问题如上标题,H3C所有交换机配置的model都是comware 解决方案: 1、找到compare.rb [rootoxidized model]# pwd /usr/local/lib/ruby/gems/3.1.0/gems/oxidized-0.29.1/lib/oxidized/model [rootoxidized model]# ll comware.rb -rw-r--…...

RAG技术深度解析:从基础Agent到复杂推理Deep Search的架构实践
重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…...

6.过拟合处理:确保模型泛化能力的实践指南——大模型开发深度学习理论基础
在深度学习开发中,过拟合是一个常见且具有挑战性的问题。当模型在训练集上表现优秀,但在测试集或新数据上性能大幅下降时,就说明模型“记住”了训练数据中的噪声而非学习到泛化规律。本文将从实际开发角度系统讲解如何应对过拟合,…...

【玩转23种Java设计模式】结构型模式篇:组合模式
软件设计模式(Design pattern),又称设计模式,是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为了可重用代码、让代码更容易被他人理解、保证代码可靠性、程序的重用性。 汇总目录链接&…...

专业工具,提供多种磁盘分区方案
随着时间的推移,电脑的磁盘空间往往会越来越紧张,许多人都经历过磁盘空间不足的困扰。虽然通过清理垃圾文件可以获得一定的改善,但随着文件和软件的增多,磁盘空间仍然可能显得捉襟见肘。在这种情况下,将其他磁盘的闲置…...

SELinux 概述
SELinux 概述 概念 SELinux(Security-Enhanced Linux)是美国国家安全局在 Linux 开源社区的帮助下开发的一个强制访问控制(MAC,Mandatory Access Control)的安全子系统。它确保服务进程仅能访问它们应有的资源。 例…...

【十三】Golang 通道
💢欢迎来到张胤尘的开源技术站 💥开源如江河,汇聚众志成。代码似星辰,照亮行征程。开源精神长,传承永不忘。携手共前行,未来更辉煌💥 文章目录 通道通道声明初始化缓冲机制无缓冲通道代码示例 带…...

DeepSeek专题:DeepSeek-V2核心知识点速览
AIGCmagic社区知识星球是国内首个以AIGC全栈技术与商业变现为主线的学习交流平台,涉及AI绘画、AI视频、大模型、AI多模态、数字人以及全行业AIGC赋能等100应用方向。星球内部包含海量学习资源、专业问答、前沿资讯、内推招聘、AI课程、AIGC模型、AIGC数据集和源码等…...

Oracle19c进入EM Express(Oracle企业管理器)详细步骤
以下是使用Oracle 19c进入Oracle Enterprise Manager Database Express(EM Express)的详细步骤: ### **步骤 1:确认EM Express配置状态** 1. **登录数据库服务器** 使用Oracle用户或管理员权限账户登录操作系统。 2. **查看EM…...

游戏引擎学习第140天
回顾并为今天的内容做准备 目前代码的进展到了声音混音的部分。昨天我详细解释了声音的处理方式,声音在技术上是一个非常特别的存在,但在游戏中进行声音混音的需求其实相对简单明了,所以今天的任务应该不会太具挑战性。 今天我们会编写一个…...

C++--迭代器(iterator)介绍---主要介绍vector和string中的迭代器
目录 一、迭代器(iterator)的定义 二、迭代器的类别 三、使用迭代器 3.1 迭代器运算符 3.2 迭代器的简单应用:使用迭代器将string对象的第一个字母改为大写 3.3 将迭代器从一个元素移动到另外一个元素 3.4 迭代器运算 3.5 迭代器的复…...

RuleOS:区块链开发的“新引擎”,点燃Web3创新之火
RuleOS:区块链开发的“新引擎”,点燃Web3创新之火 在区块链技术的浪潮中,RuleOS宛如一台强劲的“新引擎”,为个人和企业开发去中心化应用(DApp)注入了前所未有的动力。它以独特的设计理念和强大的功能特性&…...