MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!
引言:为什么需要实时同步?
MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如:
-
构建高性能搜索服务
-
实时数据分析与大屏展示
-
提升复杂查询效率
传统方案(如定时全量同步)存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步,并提供完整Java代码及深度源码解析。
一、技术选型与核心原理
1.1 核心组件
-
MySQL Binlog:MySQL的二进制日志,记录所有数据变更事件(增删改)。
-
Canal/OpenReplicator:解析Binlog的工具(本文使用轻量级
mysql-binlog-connector-java)。 -
Elasticsearch High Level REST Client:ES官方Java客户端,用于数据写入。
1.2 架构流程图
MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch
二、环境准备与配置
2.1 MySQL开启Binlog
# 修改my.cnf(Linux)或my.ini(Windows)
[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW # 必须为ROW模式
2.2 创建ES索引
PUT /user
{"mappings": {"properties": {"id": {"type": "integer"},"name": {"type": "text"},"email": {"type": "keyword"},"create_time": {"type": "date"}}}
}
三、Java代码实现
3.1 Maven依赖
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.3</version>
</dependency>
3.2 核心代码(Binlog监听与同步)
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;public class MySQL2ESSyncer {private static final String ES_INDEX = "user";public static void main(String[] args) throws Exception {// 初始化ES客户端RestHighLevelClient esClient = ESClientFactory.createClient();// 配置Binlog监听BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");client.setServerId(1001); // 唯一ID,避免冲突client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof WriteRowsEventData) {// 处理插入事件handleWriteEvent((WriteRowsEventData) data, esClient);} else if (data instanceof UpdateRowsEventData) {// 处理更新事件handleUpdateEvent((UpdateRowsEventData) data, esClient);} else if (data instanceof DeleteRowsEventData) {// 处理删除事件handleDeleteEvent((DeleteRowsEventData) data, esClient);}});client.connect(); // 启动监听}private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {eventData.getRows().forEach(row -> {// 假设表结构为:id, name, email, create_timeString json = String.format("{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",row[0], row[1], row[2], row[3]);IndexRequest request = new IndexRequest(ES_INDEX).id(row[0].toString()).source(json, XContentType.JSON);esClient.index(request, RequestOptions.DEFAULT);});}// 更新和删除处理类似,代码略(完整源码见文末链接)
}
四、源码深度解析
4.1 Binlog监听流程
-
BinaryLogClient:核心类,负责连接MySQL并监听Binlog。
-
事件类型判断:根据
WriteRowsEventData、UpdateRowsEventData、DeleteRowsEventData区分增、改、删操作。
4.2 数据转换关键点
-
Row数据解析:从事件中提取变更的行的具体值,需与表结构顺序对应。
-
ES文档ID:建议使用MySQL主键,确保更新/删除操作能精准定位文档。
4.3 异常处理与优化
-
重试机制:ES写入失败时,可加入重试队列。
-
批量提交:攒批写入ES提升性能(需权衡实时性)。
-
事务一致性:确保Binlog位置持久化,避免数据丢失。
五、方案优缺点对比
| 方案 | 实时性 | 复杂度 | 资源消耗 |
|---|---|---|---|
| 定时全量同步 | 低(分钟级) | 低 | 高 |
| 基于触发器 | 高 | 高(需改表) | 中 |
| Binlog监听 | 高 | 中 | 低 |
六、总结与扩展
本文实现了基于Binlog的MySQL到ES的实时同步,具备以下优势:
-
实时性:毫秒级延迟,满足大部分业务场景。
-
无侵入:无需修改MySQL表结构。
-
可扩展:可轻松适配其他数据源(如PostgreSQL)。
扩展方向:
-
使用Kafka作为中间层,解耦生产与消费。
-
增加监控报警,保障数据一致性。
-
支持DDL变更自动同步(如表结构修改)。
相关文章:
MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!
引言:为什么需要实时同步? MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如: 构建高性能搜索服务 实时数据分析…...
Spring-事务
Spring 事务 事务的基本概念 🔹 什么是事务? 事务是一组数据库操作,它们作为一个整体,要么全部成功,要么全部回滚。 常见的事务场景: 银行转账(扣款和存款必须同时成功) 订单系统…...
Git系列之git tag和ReleaseMilestone
以下是关于 Git Tag、Release 和 Milestone 的深度融合内容,并补充了关于 Git Tag 的所有命令、详细解释和指令实例,条理清晰,结合实际使用场景和案例。 1. Git Tag 1.1 定义 • Tag 是 Git 中用于标记特定提交(commit…...
考研机试常见基本题型
1、求100以内的素数 sqrt()函数在cmath头文件中。 #include <iostream> #include <cmath> using namespace std;int main() {int count 0; // 用于统计素数的个数// 遍历 100 到 200 之间的每一个数for (int num 100; num < 200; num) {bool isPrime true…...
Android AudioFlinger(四)—— 揭开PlaybackThread面纱
前言: 继上一篇Android AudioFlinger(三)—— AndroidAudio Flinger 之设备管理我们知道PlaybackThread继承自Re’fBase, 在被第一次引用的时候就会调用onFirstRef,实现如下: void AudioFlinger::Playbac…...
C语言基础系列【20】内存管理
博主介绍:程序喵大人 35- 资深C/C/Rust/Android/iOS客户端开发10年大厂工作经验嵌入式/人工智能/自动驾驶/音视频/游戏开发入门级选手《C20高级编程》《C23高级编程》等多本书籍著译者更多原创精品文章,首发gzh,见文末👇…...
JavaScript基础-递增和递减运算符
在JavaScript编程中,递增()和递减(--)运算符是用于对数值进行加一或减一操作的基础工具。它们简洁且强大,但如果不正确地使用,可能会导致混淆或错误。本文将详细介绍这两种运算符的不同形式及其…...
计算机毕业设计SpringBoot+Vue.js社区医疗综合服务平台(源码+文档+PPT+讲解)
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
3.6c语言
#define _CRT_SECURE_NO_WARNINGS #include <math.h> #include <stdio.h> int main() {int sum 0,i,j;for (j 1; j < 1000; j){sum 0;for (i 1; i < j; i){if (j % i 0){sum i;} }if (sum j){printf("%d是完数\n", j);}}return 0; }#de…...
Unity开发——CanvasGroup组件介绍和应用
CanvasGroup是Unity中用于控制UI的透明度、交互性和渲染顺序的组件。 一、常用属性的解释 1、alpha:控制UI的透明度 类型:float,0.0 ~1.0, 其中 0.0 完全透明,1.0 完全不透明。 通过调整alpha值可以实现UI的淡入淡…...
深度学习驱动的跨行业智能化革命:技术突破与实践创新
第一章 深度学习的技术范式演进与核心架构 1.1 从传统机器学习到深度神经网络的跨越 深度学习的核心在于通过多层次非线性变换自动提取数据特征,其发展历程可划分为三个阶段:符号主义时代的规则驱动(1950s-1980s)、连接主义时代的浅层网络(1990s-2000s)以及深度学习时代…...
php配置虚拟主机
在PHP中配置虚拟主机,通常是通过Apache或Nginx等Web服务器来进行设置的。下面我将分别介绍如何在Apache和Nginx中配置PHP虚拟主机。 1. Apache 配置虚拟主机 Apache是最常用的Web服务器之一,配置虚拟主机的步骤如下: 步骤一:确保A…...
RESTful API 设计指南
RESTful API 介绍 大佬的总结:RESTful API 设计指南 - 阮一峰的网络日志 json-server github地址 这里介绍一个快速搭建 REST API 服务的工具包 接口测试工具 介绍几个接口测试工具 apipost apifox postman https://www.apipost.cn/ (中文) https://www.apifox…...
在虚拟机上安装Hadoop
以下是在虚拟机上安装Hadoop的一般步骤: 准备工作 - 安装虚拟机软件:如VMware Workstation或VirtualBox等。 - 创建虚拟机:选择合适的操作系统镜像,如Ubuntu或CentOS等Linux发行版,为虚拟机分配足够的CPU、内存和磁盘…...
大白话JavaScript实现一个函数,将字符串中的每个单词首字母大写。
大白话JavaScript实现一个函数,将字符串中的每个单词首字母大写。 答题思路 理解需求:要写一个函数,它能接收一个字符串,然后把这个字符串里每个单词的第一个字母变成大写。分解步骤 拆分单词:一般单词之间是用空格隔…...
【VUE2】第三期——样式冲突、组件通信、异步更新
目录 1 scoped解决样式冲突 2 data写法 3 组件通信 3.1 父子关系 3.1.1 父向子传值 props 3.1.2 子向父传值 $emit 3.2 非父子关系 3.2.1 event bus 事件总线 3.2.2 跨层级共享数据 provide&inject 4 props 4.1 介绍 4.2 props校验完整写法 5 v-model原理 …...
深度学习代码解读——自用
代码来自:GitHub - ChuHan89/WSSS-Tissue 借助了一些人工智能 2_generate_PM.py 功能总结 该代码用于 生成弱监督语义分割(WSSS)所需的伪掩码(Pseudo-Masks),是 Stage2 训练的前置步骤。其核心流程为&a…...
Linux 配置静态 IP
一、简介 在 Linux CentOS 系统中默认动态分配 IP 地址,每次启动虚拟机服务都是不一样的 IP,因此要配置静态 IP 地址避免每次都发生变化,下面将介绍配置静态 IP 的详细步骤。 首先先理解一下动态 IP 和静态 IP 的概念: 动态 IP…...
Oxidized收集H3C交换机网络配置报错,not matching configured prompt (?-mix:^(<CD>)$)
背景:问题如上标题,H3C所有交换机配置的model都是comware 解决方案: 1、找到compare.rb [rootoxidized model]# pwd /usr/local/lib/ruby/gems/3.1.0/gems/oxidized-0.29.1/lib/oxidized/model [rootoxidized model]# ll comware.rb -rw-r--…...
RAG技术深度解析:从基础Agent到复杂推理Deep Search的架构实践
重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...
算法岗面试经验分享-大模型篇
文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer (1)资源 论文&a…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
《C++ 模板》
目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板,就像一个模具,里面可以将不同类型的材料做成一个形状,其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式:templa…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
MySQL JOIN 表过多的优化思路
当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...
