当前位置: 首页 > news >正文

MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!

引言:为什么需要实时同步?

MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如:

  • 构建高性能搜索服务

  • 实时数据分析与大屏展示

  • 提升复杂查询效率

传统方案(如定时全量同步)存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步,并提供完整Java代码及深度源码解析。

一、技术选型与核心原理

1.1 核心组件
  • MySQL Binlog:MySQL的二进制日志,记录所有数据变更事件(增删改)。

  • Canal/OpenReplicator:解析Binlog的工具(本文使用轻量级mysql-binlog-connector-java)。

  • Elasticsearch High Level REST Client:ES官方Java客户端,用于数据写入。

1.2 架构流程图
MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch

二、环境准备与配置

2.1 MySQL开启Binlog
# 修改my.cnf(Linux)或my.ini(Windows)
[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW  # 必须为ROW模式
2.2 创建ES索引
PUT /user
{"mappings": {"properties": {"id": {"type": "integer"},"name": {"type": "text"},"email": {"type": "keyword"},"create_time": {"type": "date"}}}
}

三、Java代码实现

3.1 Maven依赖
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.3</version>
</dependency>
3.2 核心代码(Binlog监听与同步)
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;public class MySQL2ESSyncer {private static final String ES_INDEX = "user";public static void main(String[] args) throws Exception {// 初始化ES客户端RestHighLevelClient esClient = ESClientFactory.createClient();// 配置Binlog监听BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");client.setServerId(1001); // 唯一ID,避免冲突client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof WriteRowsEventData) {// 处理插入事件handleWriteEvent((WriteRowsEventData) data, esClient);} else if (data instanceof UpdateRowsEventData) {// 处理更新事件handleUpdateEvent((UpdateRowsEventData) data, esClient);} else if (data instanceof DeleteRowsEventData) {// 处理删除事件handleDeleteEvent((DeleteRowsEventData) data, esClient);}});client.connect(); // 启动监听}private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {eventData.getRows().forEach(row -> {// 假设表结构为:id, name, email, create_timeString json = String.format("{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",row[0], row[1], row[2], row[3]);IndexRequest request = new IndexRequest(ES_INDEX).id(row[0].toString()).source(json, XContentType.JSON);esClient.index(request, RequestOptions.DEFAULT);});}// 更新和删除处理类似,代码略(完整源码见文末链接)
}

四、源码深度解析

4.1 Binlog监听流程
  • BinaryLogClient:核心类,负责连接MySQL并监听Binlog。

  • 事件类型判断:根据WriteRowsEventDataUpdateRowsEventDataDeleteRowsEventData区分增、改、删操作。

4.2 数据转换关键点
  • Row数据解析:从事件中提取变更的行的具体值,需与表结构顺序对应。

  • ES文档ID:建议使用MySQL主键,确保更新/删除操作能精准定位文档。

4.3 异常处理与优化
  • 重试机制:ES写入失败时,可加入重试队列。

  • 批量提交:攒批写入ES提升性能(需权衡实时性)。

  • 事务一致性:确保Binlog位置持久化,避免数据丢失。

五、方案优缺点对比

方案实时性复杂度资源消耗
定时全量同步低(分钟级)
基于触发器高(需改表)
Binlog监听

六、总结与扩展

本文实现了基于Binlog的MySQL到ES的实时同步,具备以下优势:

  • 实时性:毫秒级延迟,满足大部分业务场景。

  • 无侵入:无需修改MySQL表结构。

  • 可扩展:可轻松适配其他数据源(如PostgreSQL)。

扩展方向

  • 使用Kafka作为中间层,解耦生产与消费。

  • 增加监控报警,保障数据一致性。

  • 支持DDL变更自动同步(如表结构修改)。

 

相关文章:

MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!

引言&#xff1a;为什么需要实时同步&#xff1f; MySQL擅长事务处理&#xff0c;而Elasticsearch&#xff08;ES&#xff09;则专注于搜索与分析。将MySQL数据实时同步到ES&#xff0c;可以充分发挥两者的优势&#xff0c;例如&#xff1a; 构建高性能搜索服务 实时数据分析…...

Spring-事务

Spring 事务 事务的基本概念 &#x1f539; 什么是事务&#xff1f; 事务是一组数据库操作&#xff0c;它们作为一个整体&#xff0c;要么全部成功&#xff0c;要么全部回滚。 常见的事务场景&#xff1a; 银行转账&#xff08;扣款和存款必须同时成功&#xff09; 订单系统…...

Git系列之git tag和ReleaseMilestone

以下是关于 Git Tag、Release 和 Milestone 的深度融合内容&#xff0c;并补充了关于 Git Tag 的所有命令、详细解释和指令实例&#xff0c;条理清晰&#xff0c;结合实际使用场景和案例。 1. Git Tag 1.1 定义 • Tag 是 Git 中用于标记特定提交&#xff08;commit&#xf…...

考研机试常见基本题型

1、求100以内的素数 sqrt()函数在cmath头文件中。 #include <iostream> #include <cmath> using namespace std;int main() {int count 0; // 用于统计素数的个数// 遍历 100 到 200 之间的每一个数for (int num 100; num < 200; num) {bool isPrime true…...

Android AudioFlinger(四)—— 揭开PlaybackThread面纱

前言&#xff1a; 继上一篇Android AudioFlinger&#xff08;三&#xff09;—— AndroidAudio Flinger 之设备管理我们知道PlaybackThread继承自Re’fBase&#xff0c; 在被第一次引用的时候就会调用onFirstRef&#xff0c;实现如下&#xff1a; void AudioFlinger::Playbac…...

C语言基础系列【20】内存管理

博主介绍&#xff1a;程序喵大人 35- 资深C/C/Rust/Android/iOS客户端开发10年大厂工作经验嵌入式/人工智能/自动驾驶/音视频/游戏开发入门级选手《C20高级编程》《C23高级编程》等多本书籍著译者更多原创精品文章&#xff0c;首发gzh&#xff0c;见文末&#x1f447;&#x1f…...

JavaScript基础-递增和递减运算符

在JavaScript编程中&#xff0c;递增&#xff08;&#xff09;和递减&#xff08;--&#xff09;运算符是用于对数值进行加一或减一操作的基础工具。它们简洁且强大&#xff0c;但如果不正确地使用&#xff0c;可能会导致混淆或错误。本文将详细介绍这两种运算符的不同形式及其…...

计算机毕业设计SpringBoot+Vue.js社区医疗综合服务平台(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

3.6c语言

#define _CRT_SECURE_NO_WARNINGS #include <math.h> #include <stdio.h> int main() {int sum 0,i,j;for (j 1; j < 1000; j){sum 0;for (i 1; i < j; i){if (j % i 0){sum i;} }if (sum j){printf("%d是完数\n", j);}}return 0; }#de…...

Unity开发——CanvasGroup组件介绍和应用

CanvasGroup是Unity中用于控制UI的透明度、交互性和渲染顺序的组件。 一、常用属性的解释 1、alpha&#xff1a;控制UI的透明度 类型&#xff1a;float&#xff0c;0.0 ~1.0&#xff0c; 其中 0.0 完全透明&#xff0c;1.0 完全不透明。 通过调整alpha值可以实现UI的淡入淡…...

深度学习驱动的跨行业智能化革命:技术突破与实践创新

第一章 深度学习的技术范式演进与核心架构 1.1 从传统机器学习到深度神经网络的跨越 深度学习的核心在于通过多层次非线性变换自动提取数据特征,其发展历程可划分为三个阶段:符号主义时代的规则驱动(1950s-1980s)、连接主义时代的浅层网络(1990s-2000s)以及深度学习时代…...

php配置虚拟主机

在PHP中配置虚拟主机&#xff0c;通常是通过Apache或Nginx等Web服务器来进行设置的。下面我将分别介绍如何在Apache和Nginx中配置PHP虚拟主机。 1. Apache 配置虚拟主机 Apache是最常用的Web服务器之一&#xff0c;配置虚拟主机的步骤如下&#xff1a; 步骤一&#xff1a;确保A…...

RESTful API 设计指南

RESTful API 介绍 大佬的总结&#xff1a;RESTful API 设计指南 - 阮一峰的网络日志 json-server github地址 这里介绍一个快速搭建 REST API 服务的工具包 接口测试工具 介绍几个接口测试工具 apipost apifox postman https://www.apipost.cn/ (中文) https://www.apifox…...

在虚拟机上安装Hadoop

以下是在虚拟机上安装Hadoop的一般步骤&#xff1a; 准备工作 - 安装虚拟机软件&#xff1a;如VMware Workstation或VirtualBox等。 - 创建虚拟机&#xff1a;选择合适的操作系统镜像&#xff0c;如Ubuntu或CentOS等Linux发行版&#xff0c;为虚拟机分配足够的CPU、内存和磁盘…...

大白话JavaScript实现一个函数,将字符串中的每个单词首字母大写。

大白话JavaScript实现一个函数&#xff0c;将字符串中的每个单词首字母大写。 答题思路 理解需求&#xff1a;要写一个函数&#xff0c;它能接收一个字符串&#xff0c;然后把这个字符串里每个单词的第一个字母变成大写。分解步骤 拆分单词&#xff1a;一般单词之间是用空格隔…...

【VUE2】第三期——样式冲突、组件通信、异步更新

目录 1 scoped解决样式冲突 2 data写法 3 组件通信 3.1 父子关系 3.1.1 父向子传值 props 3.1.2 子向父传值 $emit 3.2 非父子关系 3.2.1 event bus 事件总线 3.2.2 跨层级共享数据 provide&inject 4 props 4.1 介绍 4.2 props校验完整写法 5 v-model原理 …...

深度学习代码解读——自用

代码来自&#xff1a;GitHub - ChuHan89/WSSS-Tissue 借助了一些人工智能 2_generate_PM.py 功能总结 该代码用于 生成弱监督语义分割&#xff08;WSSS&#xff09;所需的伪掩码&#xff08;Pseudo-Masks&#xff09;&#xff0c;是 Stage2 训练的前置步骤。其核心流程为&a…...

Linux 配置静态 IP

一、简介 在 Linux CentOS 系统中默认动态分配 IP 地址&#xff0c;每次启动虚拟机服务都是不一样的 IP&#xff0c;因此要配置静态 IP 地址避免每次都发生变化&#xff0c;下面将介绍配置静态 IP 的详细步骤。 首先先理解一下动态 IP 和静态 IP 的概念&#xff1a; 动态 IP…...

Oxidized收集H3C交换机网络配置报错,not matching configured prompt (?-mix:^(<CD>)$)

背景&#xff1a;问题如上标题&#xff0c;H3C所有交换机配置的model都是comware 解决方案&#xff1a; 1、找到compare.rb [rootoxidized model]# pwd /usr/local/lib/ruby/gems/3.1.0/gems/oxidized-0.29.1/lib/oxidized/model [rootoxidized model]# ll comware.rb -rw-r--…...

RAG技术深度解析:从基础Agent到复杂推理Deep Search的架构实践

重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…...

从WWDC看苹果产品发展的规律

WWDC 是苹果公司一年一度面向全球开发者的盛会&#xff0c;其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具&#xff0c;对过去十年 WWDC 主题演讲内容进行了系统化分析&#xff0c;形成了这份…...

深入理解JavaScript设计模式之单例模式

目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式&#xff08;Singleton Pattern&#…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

Typeerror: cannot read properties of undefined (reading ‘XXX‘)

最近需要在离线机器上运行软件&#xff0c;所以得把软件用docker打包起来&#xff0c;大部分功能都没问题&#xff0c;出了一个奇怪的事情。同样的代码&#xff0c;在本机上用vscode可以运行起来&#xff0c;但是打包之后在docker里出现了问题。使用的是dialog组件&#xff0c;…...

【从零学习JVM|第三篇】类的生命周期(高频面试题)

前言&#xff1a; 在Java编程中&#xff0c;类的生命周期是指类从被加载到内存中开始&#xff0c;到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期&#xff0c;让读者对此有深刻印象。 目录 ​…...

(一)单例模式

一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...

MinIO Docker 部署:仅开放一个端口

MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...

关于uniapp展示PDF的解决方案

在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项&#xff1a; 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库&#xff1a; npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

Golang——7、包与接口详解

包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...