使用maxwell实时同步mysql数据到kafka
一、软件环境:
操作系统:CentOS release 6.5 (Final)
java版本: jdk1.8
zookeeper版本: zookeeper-3.4.11
kafka 版本: kafka_2.11-1.1.0.tgz
maxwell版本:maxwell-1.16.0.tar.gz
注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口
二、环境部署
1、安装jdk
export JAVA_HOME=/usr/java/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH
2、安装maven
参考:https://www.cnblogs.com/wcwen1990/p/7227278.html
3、安装zookeeper
1)下载软件:
wget http://101.96.8.157/archive.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar zxvf zookeeper-3.4.11.tar.gz
mv zookeeper-3.4.11 /usr/local/zookeeper
2)修改环境变量
编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:
## ZooKeeper Envexport ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
运行以下命令使环境变量生效: source /etc/profile
3)重命名配置文件
初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg
mv $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg
4)单机模式–修改配置文件
创建目录/usr/local/zookeeper/data 和/usr/local/zookeeper/logs 修改配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
5)启动zookeeper
bin/zkServer.sh startZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
6)验证zukeeper服务
telnet chavin.king 2181Trying 192.168.72.130...
Connected to chavin.king.
Escape character is '^]'.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:/192.168.72.130:44054[0](queued=0,recved=1,sent=0)Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x1a4
Mode: standalone
Node count: 147
Connection closed by foreign host.
4、安装zkui
git clone https://github.com/DeemOpen/zkui.git
cd zkui
mvn clean install
修改配置文件默认值
#vim config.cfgserverPort=9090 #指定端口zkServer=192.168.1.110:2181sessionTimeout=300000
启动程序至后台
2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本
nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
用浏览器访问:
http://chavin.king:9090/
5、安装kafka
wget http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/kafkamkdir -p /usr/local/kafka/data-logs
修改配置文件
vim server.propertieslog.dirs=/usr/local/kafka/data-logs
zookeeper.connect=chavin.king:2181
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
创建topic
bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic maxwell
查看所有topic
bin/kafka-topics.sh --list --zookeeper chavin.king:2181
启动producer
bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic maxwell
启动consumer
bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic maxwell --from-beginning
或者
bin/kafka-console-consumer.sh --bootstrap-server chavin.king:9092 --from-beginning --topic maxwell
6、开启mysql binlog
more /etc/my.cnf
[client]
default_character_set = utf8[mysqld]
basedir = /usr/local/mysql-5.6.24
datadir = /usr/local/mysql-5.6.24/data
port = 3306
#skip-grant-tables
character_set_server = utf8
log_error = /usr/local/mysql-5.6.24/data/mysql.errbinlog_format = row
log-bin = /usr/local/mysql-5.6.24/logs/mysql-bin
sync_binlog = 2
max_binlog_size = 16M
expire_logs_days = 10server_id = 1
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
7、安装maxwell
wget https://github.com/zendesk/maxwell/releases/download/v1.16.0/maxwell-1.16.0.tar.gz
tar -zxvf maxwell-1.16.0.tar.gz -C /usr/local/maxwell
启动maxwell
nohup bin/maxwell --user='canal' --password='canal' --host='chavin.king' --producer=kafka --kafka.bootstrap.servers=chavin.king:9092 > maxwell.log &
8、开发kafka消费程序
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class KafkaTest {public static void main(String[] args){String topicName = "maxwell";String groupID = "example-group";Properties props = new Properties();props.put("bootstrap.servers","192.168.72.130:9092");props.put("group.id",groupID);props.put("auto.offset.reset","earliest");props.put("serializer.encoding","utf-8");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String > consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topicName));try{while(true){ConsumerRecords<String,String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());}}finally{consumer.close();}}}
ideal启动以上消费程序
9、测试
offset = 3428, key = {"database":"chavin","table":"dept","_uuid":"0b195622-e7c7-4cf6-8203-5576752f9024"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2276,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3429, key = {"database":"chavin","table":"dept","_uuid":"333b98e3-a597-47fc-95ad-6e59ee0dadf6"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2277,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3430, key = {"database":"chavin","table":"dept","_uuid":"cf9fa656-ed13-4cb0-b909-d1218e402e96"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2278,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3431, key = {"database":"chavin","table":"dept","_uuid":"7f2f683a-39bc-498b-9a4e-920697b3da18"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2279,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3432, key = {"database":"chavin","table":"dept","_uuid":"ef639cd1-9206-4145-8608-372bbaaaa14a"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2280,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3433, key = {"database":"chavin","table":"dept","_uuid":"ebdf15ad-7149-4ac4-b567-627dd910182c"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2281,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3434, key = {"database":"chavin","table":"dept","_uuid":"1bc667f4-15f0-438c-8139-6f1cbe8b4db3"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2282,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3435, key = {"database":"chavin","table":"dept","_uuid":"1613b695-284a-49e3-9793-74fb2cf8dc5b"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2283,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3436, key = {"database":"chavin","table":"dept","_uuid":"f72a800c-92cc-4494-9438-bc61c58b5cb9"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2284,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3437, key = {"database":"chavin","table":"dept","_uuid":"9887d144-d75d-46f8-96ba-ad7c3adf45fd"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2285,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
至此数据同步已经可以正常进行了,是不是很简单。
相关文章:
使用maxwell实时同步mysql数据到kafka
一、软件环境: 操作系统:CentOS release 6.5 (Final) java版本: jdk1.8 zookeeper版本: zookeeper-3.4.11 kafka 版本: kafka_2.11-1.1.0.tgz maxwell版本:maxwell-1.16.0.tar.gz 注意 : 关闭所有机器的防火墙,同时注意…...
知识图谱与大数据:区别、联系与应用
目录 前言1 知识图谱1.1 定义1.2 特点1.3 应用 2 大数据2.1 定义2.2 应用 3. 区别与联系3.1 区别3.2 联系 结语 前言 在当今信息爆炸的时代,数据成为了我们生活和工作中不可或缺的资源。知识图谱和大数据是两个关键概念,它们在人工智能、数据科学和信息…...
Nagios工具
一 nagios 相关概念 Nagios 是一款开源的免费网络监视工具,能有效监控 Windows、Linux 和 Unix 的主机状态,交换机路由器等网络设置,打印机等。在系统或服务状态异常时发出邮件或短信报警第 一时间通知网站运维人员,在状态恢复后…...
微信小程序全局数据共享
文章目录 安装MobX相关的包根目录创建store文件夹,添加store.js文件绑定到页面中绑定到组件 mobx-miniprogram和mobx-miniprogram-bindings实现全局数据共享 mobx-miniprogram用来创建Store实例对象 mobx-miniprogram-bindings用来把Store中的共享数据或方法&…...
算法训练营第24天|回溯算法理论基础 LeetCode 77.组合
终于把二叉树做完了!开始新的篇章,回溯! 回溯算法理论基础 回溯算法题目分类: 1.组合 2.分割 3.子集 4.排列 5.棋盘问题 什么是回溯? 回溯叫做回溯搜索法,是一种搜索方式。回溯是递归的副产品&…...
pip永久修改镜像地址
修改命令: pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/ 效果: 会在C:\Users\PC(用户名)\AppData\Roaming\pip目录下新增或修改文件pip.ini 文件内容: [global] index-url https://pypi.tuna.tsinghua.e…...
RK3588平台开发系列讲解(硬件篇-功能外设2)
USB2.0/USB3.0 电路 RK3588 芯片内置两个USB3.0 OTG控制器(内嵌2个USB2.0 OTG,下图绿色处),1个USB3.0 HOST 控制器,2个USB2.0 HOST控制器。 这些控制器与PHY的内部复用图如下: USB3.0 OTG0 控制器支持SS/H…...
SpringBoot学习记录
SpringBoot是用于加速Spring开发的。 我们先来看看如何使用SpringBoot来创建一个基于Web的程序,可以发现相较于SpringMVC其有巨大改变。 3.开发控制器类 GetMapping("/{id}")public String getById(PathVariable Integer id){System.out.println("…...
财富池指标--通达信顾比均线实战指标免费源码
顾比均线是由两组均线构成,短期组为3、5、8、10、12、15。长期组为:30、35、40、45、50、60。顾比均线由澳大利亚的投资家戴若-顾比先生发明,因此叫顾比线。 顾比均线可以广泛运用于股票、期货和外汇交易中,只要是能运用K线图的投…...
AJAX(一):初识AJAX、http协议、配置环境、发送AJAX请求、请求时的问题
一、什么是AJAX 1.AJAX 就是异步的JS和XML。通过AJAX 可以在浏览器中向服务器发送异步请求,最大的优势:无刷新获取数据。AJAX 不是新的编程语言,而是一种将现有的标准组合在一起使用的新方式。 2.XML 可扩展标记语言。XML被设计用来传输和…...
idea常用的快捷键总结:
idea常用的快捷键总结: Ctrl相关的: Ctrl F 在当前文件进行文本查找 (必备) Ctrl R 在当前文件进行文本替换 (必备) Ctrl Z 撤销 (必备) Ctrl Y 删除光标所在行 或 删除选中的…...
LeetCode 热题 100 题解(一):哈希部分
《LeetCode热题 100》 经过了两个多月,终于刷完了代码随想录的题目,现在准备开始挑战热题一百了,接下来我会将自己的题解以博客的形式同步发到力扣和 c 站,希望在接下来的征程中与大家共勉! 题组一:哈希 题…...
C语言 | qsort()函数使用
目录: 1.qsort介绍 2.使⽤qsort函数 排序 整型数据 3.使⽤qsort函数 排序 结构体数据 4. qsort函数的模拟实现冒泡排序 qsort()函数 是一个 C语言编译器函数库自带的排序函数, 它可以对指定数组(包括字符串,二维数组&#x…...
继承的特点 | java
/*Java中继承的特点:A:Java只支持单继承,不支持多继承。 B:Java支持多层继承(继承体系),间接继承 */class Father(){} class Mother(){}class son extends Father(){} // 正确 class son2 extends Father , Mother {} // 不正确 1. Java只支持单继承…...
6、jenkins项目构建类型-项目类型介绍
文章目录 一、自由风格项目1、拉取代码2、演示改动代码后的持续集成二、Maven项目构建三、Pipeline流水线项目构建(☆☆☆)1、Pipeline简介(1)概念(2)使用Pipeline有以下好处(3)如何创建Jenkins Pipeline呢?2、安装Pipeline插件3、Pipeline语法快速入门(1)Declarati…...
指针函数的应用——找出哪些学生有不及格的科目
下面的代码实现了以下功能: 定义了一个函数 getFailStudent,它接收一个指向整数数组的指针,并遍历该数组,查找是否存在不及格的成绩。如果找到了不及格的成绩,就返回指向不及格学生所在行的指针;否则返回 N…...
【微服务】Gateway
文章目录 1.基本介绍官方文档:https://springdoc.cn/spring-cloud-gateway/#gateway-starter1.引出网关2.使用网关服务架构图3.Gateway网络拓扑图(背下来)4.Gateway特性5.Gateway核心组件1.基本介绍2.断言3.过滤 6.Gateway工作机制 2.搭建Gat…...
王道C语言督学营OJ课后习题(课时14)
#include <stdio.h> #include <stdlib.h>typedef char BiElemType; typedef struct BiTNode{BiElemType c;//c 就是书籍上的 datastruct BiTNode *lchild;struct BiTNode *rchild; }BiTNode,*BiTree;//tag 结构体是辅助队列使用的 typedef struct tag{BiTree p;//树…...
Filter、Listener、AJAX
Filter 概念:Filter 表示过滤器,是JavaWeb三大组件(Servlet、Filter、 Listener)之一。 过滤器可以把对资源的请求拦截下来,从而实现一些特殊的功能。 过滤器一般完成一些通用的操作,比如:权限控制、统一编码处理、敏感…...
FastAPI+React全栈开发04 FastAPI概述
Chapter01 Web Development and the FARM Stack 04 Introducing FastAPI FastAPIReact全栈开发04 FastAPI概述 Now we will look at a brief introducion to the Python REST-API framework of choice - FastAPI. Additionally, we will go over a high-level overview of t…...
装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
