使用maxwell实时同步mysql数据到kafka
一、软件环境:
操作系统:CentOS release 6.5 (Final)
java版本: jdk1.8
zookeeper版本: zookeeper-3.4.11
kafka 版本: kafka_2.11-1.1.0.tgz
maxwell版本:maxwell-1.16.0.tar.gz
注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口
二、环境部署
1、安装jdk
export JAVA_HOME=/usr/java/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH
2、安装maven
参考:https://www.cnblogs.com/wcwen1990/p/7227278.html
3、安装zookeeper
1)下载软件:
wget http://101.96.8.157/archive.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar zxvf zookeeper-3.4.11.tar.gz
mv zookeeper-3.4.11 /usr/local/zookeeper
2)修改环境变量
编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:
## ZooKeeper Envexport ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
运行以下命令使环境变量生效: source /etc/profile
3)重命名配置文件
初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg
mv $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg
4)单机模式–修改配置文件
创建目录/usr/local/zookeeper/data 和/usr/local/zookeeper/logs 修改配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
5)启动zookeeper
bin/zkServer.sh startZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
6)验证zukeeper服务
telnet chavin.king 2181Trying 192.168.72.130...
Connected to chavin.king.
Escape character is '^]'.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:/192.168.72.130:44054[0](queued=0,recved=1,sent=0)Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x1a4
Mode: standalone
Node count: 147
Connection closed by foreign host.
4、安装zkui
git clone https://github.com/DeemOpen/zkui.git
cd zkui
mvn clean install
修改配置文件默认值
#vim config.cfgserverPort=9090 #指定端口zkServer=192.168.1.110:2181sessionTimeout=300000
启动程序至后台
2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本
nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
用浏览器访问:
http://chavin.king:9090/
5、安装kafka
wget http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/kafkamkdir -p /usr/local/kafka/data-logs
修改配置文件
vim server.propertieslog.dirs=/usr/local/kafka/data-logs
zookeeper.connect=chavin.king:2181
启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
创建topic
bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic maxwell
查看所有topic
bin/kafka-topics.sh --list --zookeeper chavin.king:2181
启动producer
bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic maxwell
启动consumer
bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic maxwell --from-beginning
或者
bin/kafka-console-consumer.sh --bootstrap-server chavin.king:9092 --from-beginning --topic maxwell
6、开启mysql binlog
more /etc/my.cnf
[client]
default_character_set = utf8[mysqld]
basedir = /usr/local/mysql-5.6.24
datadir = /usr/local/mysql-5.6.24/data
port = 3306
#skip-grant-tables
character_set_server = utf8
log_error = /usr/local/mysql-5.6.24/data/mysql.errbinlog_format = row
log-bin = /usr/local/mysql-5.6.24/logs/mysql-bin
sync_binlog = 2
max_binlog_size = 16M
expire_logs_days = 10server_id = 1
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES
7、安装maxwell
wget https://github.com/zendesk/maxwell/releases/download/v1.16.0/maxwell-1.16.0.tar.gz
tar -zxvf maxwell-1.16.0.tar.gz -C /usr/local/maxwell
启动maxwell
nohup bin/maxwell --user='canal' --password='canal' --host='chavin.king' --producer=kafka --kafka.bootstrap.servers=chavin.king:9092 > maxwell.log &
8、开发kafka消费程序
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class KafkaTest {public static void main(String[] args){String topicName = "maxwell";String groupID = "example-group";Properties props = new Properties();props.put("bootstrap.servers","192.168.72.130:9092");props.put("group.id",groupID);props.put("auto.offset.reset","earliest");props.put("serializer.encoding","utf-8");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String > consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topicName));try{while(true){ConsumerRecords<String,String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());}}finally{consumer.close();}}}
ideal启动以上消费程序
9、测试
offset = 3428, key = {"database":"chavin","table":"dept","_uuid":"0b195622-e7c7-4cf6-8203-5576752f9024"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2276,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3429, key = {"database":"chavin","table":"dept","_uuid":"333b98e3-a597-47fc-95ad-6e59ee0dadf6"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2277,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3430, key = {"database":"chavin","table":"dept","_uuid":"cf9fa656-ed13-4cb0-b909-d1218e402e96"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2278,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3431, key = {"database":"chavin","table":"dept","_uuid":"7f2f683a-39bc-498b-9a4e-920697b3da18"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2279,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3432, key = {"database":"chavin","table":"dept","_uuid":"ef639cd1-9206-4145-8608-372bbaaaa14a"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2280,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3433, key = {"database":"chavin","table":"dept","_uuid":"ebdf15ad-7149-4ac4-b567-627dd910182c"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2281,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3434, key = {"database":"chavin","table":"dept","_uuid":"1bc667f4-15f0-438c-8139-6f1cbe8b4db3"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2282,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3435, key = {"database":"chavin","table":"dept","_uuid":"1613b695-284a-49e3-9793-74fb2cf8dc5b"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2283,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3436, key = {"database":"chavin","table":"dept","_uuid":"f72a800c-92cc-4494-9438-bc61c58b5cb9"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2284,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3437, key = {"database":"chavin","table":"dept","_uuid":"9887d144-d75d-46f8-96ba-ad7c3adf45fd"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2285,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
至此数据同步已经可以正常进行了,是不是很简单。
相关文章:
使用maxwell实时同步mysql数据到kafka
一、软件环境: 操作系统:CentOS release 6.5 (Final) java版本: jdk1.8 zookeeper版本: zookeeper-3.4.11 kafka 版本: kafka_2.11-1.1.0.tgz maxwell版本:maxwell-1.16.0.tar.gz 注意 : 关闭所有机器的防火墙,同时注意…...
知识图谱与大数据:区别、联系与应用
目录 前言1 知识图谱1.1 定义1.2 特点1.3 应用 2 大数据2.1 定义2.2 应用 3. 区别与联系3.1 区别3.2 联系 结语 前言 在当今信息爆炸的时代,数据成为了我们生活和工作中不可或缺的资源。知识图谱和大数据是两个关键概念,它们在人工智能、数据科学和信息…...
Nagios工具
一 nagios 相关概念 Nagios 是一款开源的免费网络监视工具,能有效监控 Windows、Linux 和 Unix 的主机状态,交换机路由器等网络设置,打印机等。在系统或服务状态异常时发出邮件或短信报警第 一时间通知网站运维人员,在状态恢复后…...
微信小程序全局数据共享
文章目录 安装MobX相关的包根目录创建store文件夹,添加store.js文件绑定到页面中绑定到组件 mobx-miniprogram和mobx-miniprogram-bindings实现全局数据共享 mobx-miniprogram用来创建Store实例对象 mobx-miniprogram-bindings用来把Store中的共享数据或方法&…...
算法训练营第24天|回溯算法理论基础 LeetCode 77.组合
终于把二叉树做完了!开始新的篇章,回溯! 回溯算法理论基础 回溯算法题目分类: 1.组合 2.分割 3.子集 4.排列 5.棋盘问题 什么是回溯? 回溯叫做回溯搜索法,是一种搜索方式。回溯是递归的副产品&…...
pip永久修改镜像地址
修改命令: pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/ 效果: 会在C:\Users\PC(用户名)\AppData\Roaming\pip目录下新增或修改文件pip.ini 文件内容: [global] index-url https://pypi.tuna.tsinghua.e…...
RK3588平台开发系列讲解(硬件篇-功能外设2)
USB2.0/USB3.0 电路 RK3588 芯片内置两个USB3.0 OTG控制器(内嵌2个USB2.0 OTG,下图绿色处),1个USB3.0 HOST 控制器,2个USB2.0 HOST控制器。 这些控制器与PHY的内部复用图如下: USB3.0 OTG0 控制器支持SS/H…...
SpringBoot学习记录
SpringBoot是用于加速Spring开发的。 我们先来看看如何使用SpringBoot来创建一个基于Web的程序,可以发现相较于SpringMVC其有巨大改变。 3.开发控制器类 GetMapping("/{id}")public String getById(PathVariable Integer id){System.out.println("…...
财富池指标--通达信顾比均线实战指标免费源码
顾比均线是由两组均线构成,短期组为3、5、8、10、12、15。长期组为:30、35、40、45、50、60。顾比均线由澳大利亚的投资家戴若-顾比先生发明,因此叫顾比线。 顾比均线可以广泛运用于股票、期货和外汇交易中,只要是能运用K线图的投…...
AJAX(一):初识AJAX、http协议、配置环境、发送AJAX请求、请求时的问题
一、什么是AJAX 1.AJAX 就是异步的JS和XML。通过AJAX 可以在浏览器中向服务器发送异步请求,最大的优势:无刷新获取数据。AJAX 不是新的编程语言,而是一种将现有的标准组合在一起使用的新方式。 2.XML 可扩展标记语言。XML被设计用来传输和…...
idea常用的快捷键总结:
idea常用的快捷键总结: Ctrl相关的: Ctrl F 在当前文件进行文本查找 (必备) Ctrl R 在当前文件进行文本替换 (必备) Ctrl Z 撤销 (必备) Ctrl Y 删除光标所在行 或 删除选中的…...
LeetCode 热题 100 题解(一):哈希部分
《LeetCode热题 100》 经过了两个多月,终于刷完了代码随想录的题目,现在准备开始挑战热题一百了,接下来我会将自己的题解以博客的形式同步发到力扣和 c 站,希望在接下来的征程中与大家共勉! 题组一:哈希 题…...
C语言 | qsort()函数使用
目录: 1.qsort介绍 2.使⽤qsort函数 排序 整型数据 3.使⽤qsort函数 排序 结构体数据 4. qsort函数的模拟实现冒泡排序 qsort()函数 是一个 C语言编译器函数库自带的排序函数, 它可以对指定数组(包括字符串,二维数组&#x…...
继承的特点 | java
/*Java中继承的特点:A:Java只支持单继承,不支持多继承。 B:Java支持多层继承(继承体系),间接继承 */class Father(){} class Mother(){}class son extends Father(){} // 正确 class son2 extends Father , Mother {} // 不正确 1. Java只支持单继承…...
6、jenkins项目构建类型-项目类型介绍
文章目录 一、自由风格项目1、拉取代码2、演示改动代码后的持续集成二、Maven项目构建三、Pipeline流水线项目构建(☆☆☆)1、Pipeline简介(1)概念(2)使用Pipeline有以下好处(3)如何创建Jenkins Pipeline呢?2、安装Pipeline插件3、Pipeline语法快速入门(1)Declarati…...
指针函数的应用——找出哪些学生有不及格的科目
下面的代码实现了以下功能: 定义了一个函数 getFailStudent,它接收一个指向整数数组的指针,并遍历该数组,查找是否存在不及格的成绩。如果找到了不及格的成绩,就返回指向不及格学生所在行的指针;否则返回 N…...
【微服务】Gateway
文章目录 1.基本介绍官方文档:https://springdoc.cn/spring-cloud-gateway/#gateway-starter1.引出网关2.使用网关服务架构图3.Gateway网络拓扑图(背下来)4.Gateway特性5.Gateway核心组件1.基本介绍2.断言3.过滤 6.Gateway工作机制 2.搭建Gat…...
王道C语言督学营OJ课后习题(课时14)
#include <stdio.h> #include <stdlib.h>typedef char BiElemType; typedef struct BiTNode{BiElemType c;//c 就是书籍上的 datastruct BiTNode *lchild;struct BiTNode *rchild; }BiTNode,*BiTree;//tag 结构体是辅助队列使用的 typedef struct tag{BiTree p;//树…...
Filter、Listener、AJAX
Filter 概念:Filter 表示过滤器,是JavaWeb三大组件(Servlet、Filter、 Listener)之一。 过滤器可以把对资源的请求拦截下来,从而实现一些特殊的功能。 过滤器一般完成一些通用的操作,比如:权限控制、统一编码处理、敏感…...
FastAPI+React全栈开发04 FastAPI概述
Chapter01 Web Development and the FARM Stack 04 Introducing FastAPI FastAPIReact全栈开发04 FastAPI概述 Now we will look at a brief introducion to the Python REST-API framework of choice - FastAPI. Additionally, we will go over a high-level overview of t…...
iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...
SpringAI实战:ChatModel智能对话全解
一、引言:Spring AI 与 Chat Model 的核心价值 🚀 在 Java 生态中集成大模型能力,Spring AI 提供了高效的解决方案 🤖。其中 Chat Model 作为核心交互组件,通过标准化接口简化了与大语言模型(LLM࿰…...
WEB3全栈开发——面试专业技能点P4数据库
一、mysql2 原生驱动及其连接机制 概念介绍 mysql2 是 Node.js 环境中广泛使用的 MySQL 客户端库,基于 mysql 库改进而来,具有更好的性能、Promise 支持、流式查询、二进制数据处理能力等。 主要特点: 支持 Promise / async-await…...
何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡
何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡 背景 我们以建设星云智控官网来做AI编程实践,很多人以为AI已经强大到不需要程序员了,其实不是,AI更加需要程序员,普通人…...
