当前位置: 首页 > news >正文

maxwell 基于zookeeper的高可用方案

Maxwell版本1.39.2

一: 添加zk的pox文件

<!-- customize HA -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.4.0</version>
</dependency>

二: 创建zk工具类

在 com.zendesk.maxwell.util 包下创建 CuratorUtil 类,后面会使用此类实现高可用

package com.zendesk.maxwell.util;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CuratorUtil {private final String zookeeperServers;private final int sessionTimeoutMs;private final int connectionTimeoutMs;private final int baseSleepTimeMs;private final int maxRetries;private CuratorFramework client;public CuratorUtil(String zookeeperServers, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs,int maxRetries) {this.zookeeperServers = zookeeperServers;this.sessionTimeoutMs = sessionTimeoutMs;this.connectionTimeoutMs = connectionTimeoutMs;this.baseSleepTimeMs = baseSleepTimeMs;this.maxRetries = maxRetries;}/** 构造 zookeeper 客户端,并连接 zookeeper 集群*/public void start() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);client = CuratorFrameworkFactory.newClient(this.zookeeperServers,this.sessionTimeoutMs,this.connectionTimeoutMs,retryPolicy);client.start();}/** 实现分布式锁*/public void highAvailable() {// 1.连接 Zookeeper 客户端this.start();// 2.向 zookeeper 注册自己String lockPath = "/maxwell/ha/lock";InterProcessMutex lock = new InterProcessMutex(client, lockPath);try {// 3.获取锁lock.acquire();// 4.将自己信息注册到 leader 路径String leaderPath = "/maxwell/ha/leader";client.create().withMode(CreateMode.EPHEMERAL).forPath(leaderPath);} catch (Exception e) {e.printStackTrace();}}
}

三: 修改 com.zendesk.maxwell 包下的MaxwellConfig类

3.1 添加属性

// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;

3.2 buildOptionParser 方法添加代码

parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" ).withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" ).withRequiredArg();
parser.accepts( "max_retries", "max retry times" ).withRequiredArg();

3.3 setup 方法添加代码

this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode && zookeeperServers == null){LOGGER.error("you must specify --zookeeper because you want to use maxwell in ha mode");
}

四:修改 com.zendesk.maxwell.Maxwell 的main函数

将代码段

if ( config.haMode ) {new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {maxwell.start();
}

全部注释掉,修改为

if ( config.haMode ) {CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);curatorUtil.highAvailable();
}
maxwell.start();

然后重新打包就能得到基于zk的高可用版本了,打包时可以将test包删除,防止出现错误

源码下载地址

五: 启动脚本

5.1 创建配置文件 config.properties

log_level=info

# mysql login info
host=localhost
port=3306
user=root
password=root123
schema_database=maxwell
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

producer=kafka

#       *** kafka ***
producer=kafka
#kafka.bootstrap.servers=hosta:9092,hostb:9092
kafka.bootstrap.servers=localhost:9092
kafka.max.request.size = 104857600

kafka_topic=mysql.%{database}.%{table}
kafka_version=2.7.0

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

5.2 启动脚本编写 startup.sh

#!/bin/bash

single(){
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --daemon
  echo -e "\033[32m单机版启动成功\n\033[0m"
}

ha(){
  ## zookeeper 多个用,分割
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --ha --zookeeper=127.0.0.1:2181 --daemon
  echo -e "\033[32m高可用版启动成功\n\033[0m"
}

case "$1" in
  'ha')
     ha
     ;;
  *)
     single
     ;;
esac

5.2.1 高可用版本启动命令

./startup.sh ha

5.2.2 单机版启动命令

./startup.sh


 

相关文章:

maxwell 基于zookeeper的高可用方案

Maxwell版本1.39.2 一&#xff1a; 添加zk的pox文件 <!-- customize HA --> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version> </dependency>&…...

【JavaScript】match用法 | 正则匹配

match正则匹配 var e "www.apple.com:baidu.com" var match e.match(/com/g) console.log("match: "match);> "match: com,com"match返回值问题 match的返回值是一个数组 数组的第0个元素是与整个正则表达式匹配的结果 数组的第1个元素是…...

前端css + js +vue +element-ui 实现响应式布局,根据浏览器窗体大小自动响应

前端css js vue element-ui 实现响应式布局&#xff0c;根据浏览器窗体大小自动响应 1、环境2、js代码3、代码解释1、定义对象2、定义方法3、监听窗口变化&#xff0c;计算比例值&#xff0c;并赋值给transform 属性4、实现监听 3、html 代码4、特别注意 1、环境 我的环境是e…...

小程序生成App:轻量低门槛的开发方式

小程序生成App可以成为一种轻量低门槛的开发App的方式&#xff0c;但是需要根据具体情况进行选择。如果应用需要处理大量数据或需要进行复杂计算&#xff0c;或者需要实现原生特有的功能或交互效果&#xff0c;可能需要选择其他开发方式。 在文章开始之前&#xff0c;我们看看目…...

Linux命名管道进程通信

文章目录 前言一、什么是命名管道通信二、创建方式三、代码示例四、文件进程通信总结 前言 命名管道 是实现进程间通信的强大工具&#xff0c;它提供了一种简单而有效的方式&#xff0c;允许不同进程之间进行可靠的数据交换。不仅可以在同一主机上的不相关进程间进行通信&…...

如何将苹果彻底删除视频找回?试试这3种方法

如今是短视频时代&#xff0c;大家通常会使用苹果手机来拍摄视频&#xff0c;以此记录生活中的美好日常。但是大家都知道视频是十分占空间的&#xff0c;这也经常会出现iPhone内存不足&#xff0c;磁盘崩溃的问题。 当遇到iPhone内存不足的情况时&#xff0c;大家往往会选择清…...

【音视频、chatGpt】h5页面最小化后,再激活后视频停住问题的解决

目录 现象 观察 解决 现象 页面有时候要切换&#xff0c;要最小化&#xff1b;短时间或者几个小时内切换回来&#xff0c;视频可以正常续上&#xff1b;而放置较长时间&#xff0c;几个小时或者一晚上&#xff0c;切换回来后&#xff0c;视频可能卡死 观察 切换页面&#x…...

[CSS] 图片九宫格

效果 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"/><meta http-equiv"X-UA-Compatible" content"IEedge"/><meta name"viewport" content"widthdevice-…...

MChat-Gpt V1.0.0 (将ChatGpt机器人接入内网供全体使用)

Github>https://github.com/MartinxMax/MChat-Gpt 首页 MChat-Gpt V1.0.0将ChatGpt机器人接入内网供全体使用 你需要一个ChatGpt账户如果您在中国则需要使用代理访问,设置TUN代理模式 安装依赖 选择你的系统进行安装 服务端配置 #python3 ChatGpt_Server.py -h 使用&a…...

日常开发中Git命令指北

Git基本操作 创建化仓库 mkdir 目录 cd 目录 git init配置本地仓库 # 配置用户名&#xff0c;邮箱 git config user.name "cxf" git config user.email "1969612859qq.com" # 查看本地配置&#xff08;小写的 L&#xff09; git config -l # 重置配置&a…...

API 测试 | 了解 API 接口概念|电商平台 API 接口测试指南

什么是 API&#xff1f; API 是一个缩写&#xff0c;它代表了一个 pplication P AGC 软件覆盖整个房间。API 是用于构建软件应用程序的一组例程&#xff0c;协议和工具。API 指定一个软件程序应如何与其他软件程序进行交互。 例行程序&#xff1a;执行特定任务的程序。例程也称…...

【计算机组成原理】24王道考研笔记——第三章 存储系统

第三章 存储系统 一、存储系统概述 现代计算机的结构&#xff1a; 1.存储器的层次结构 2.存储器的分类 按层次&#xff1a; 按介质&#xff1a; 按存储方式&#xff1a; 按信息的可更改性&#xff1a; 按信息的可保存性&#xff1a; 3.存储器的性能指标 二、主存储器 1.基本…...

学习C语言的好处:

基础编程语言&#xff1a;C语言是其他编程语言的基础&#xff0c;学习C语言可为后续学习打下坚实基础&#xff0c;广泛应用于嵌入式系统、操作系统、网络协议等。 简单易学&#xff1a;C语言语法简单易懂&#xff0c;适合初学者。只需文本编辑器和编译器&#xff0c;即可开始编…...

基于k8s的devOps自动化运维平台架构设计(中英文版本)

▲ 点击上方"DevOps和k8s全栈技术"关注公众号 In the rapidly evolving landscape of software development and IT operations, DevOps has emerged as a transformative approach to bridge the gap between development and operations teams. One of the key ena…...

P450进阶款无人机室内定位功能研测

在以往的Prometheus 450&#xff08;P450&#xff09;无人机上&#xff0c;我们搭载的是Intel Realsense T265定位模块&#xff0c;使用USB连接方式挂载到机载计算机allspark上&#xff0c;通过机载上SDK驱动T265运行并输出SLAM信息&#xff0c;以此来实现室内定位功能。 为进…...

深度学习,计算机视觉任务

目录 计算机视觉任务 1.K近邻算法 2.得分函数 3.损失函数的作用 4.向前传播整体流程 5.反向传播计算方法 计算机视觉任务 机器学习的流程&#xff1a; 数据获取 特征工程 建立模型 评估与应用 计算机视觉&#xff1a; 图像表示&#xff1a;计算机眼中的图像&#…...

使用 Docker 部署 canal 服务实现MySQL和ES实时同步

文章目录 0. 环境介绍0. 前置步骤1. 安装Kibana和Elasticsearch2. 安装Canal和Canal Adapter2.1 修改数据库配置2.1.1 修改配置2.1.2 验证mysql binlog配置2.1.3 查看日志文件2.1.4 用JDBC代码插入数据库 2.2 安装Canal Server2.3 安装Canal Adapter修改两处配置文件配置文件取…...

const易错详解

const对比 常量指针 int b; (1)const int *a &b;//常量指针(2)int const *a &b; //常量指针常量指针&#xff1a;指向的变量值不能被修改 ![常量指针](https://img-blog.csdnimg.cn/9d795b11eb6d484297ea7cbead28463f.png 指针常量 int b; int* const a&b;…...

网络安全—黑客技术【自学】

一、黑客是什么 原是指热心于计算机技术&#xff0c;水平高超的电脑专家&#xff0c;尤其是程序设计人员。但后来&#xff0c;黑客一词已被用于泛指那些专门利用电脑网络搞破坏或者恶作剧的家伙。 二、学习黑客技术的原因 其实&#xff0c;网络信息空间安全已经成为海陆空之…...

作为数据产品经理的一天

数据产品经理作为这两年大数据行业的热门职业&#xff0c;经常有小伙伴会问我数据产品经理是做什么的&#xff0c;给大家简单讲下作为数据产品经理的一天是怎么度过得&#xff0c;算是一篇记录文吧&#xff0c;看完或许大家对这个职业的了解会更深入一些。 01 早上10点&#…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)

宇树机器人多姿态起立控制强化学习框架论文解析 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架&#xff08;一&#xff09; 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)

目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 &#xff08;1&#xff09;输入单引号 &#xff08;2&#xff09;万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

9-Oracle 23 ai Vector Search 特性 知识准备

很多小伙伴是不是参加了 免费认证课程&#xff08;限时至2025/5/15&#xff09; Oracle AI Vector Search 1Z0-184-25考试&#xff0c;都顺利拿到certified了没。 各行各业的AI 大模型的到来&#xff0c;传统的数据库中的SQL还能不能打&#xff0c;结构化和非结构的话数据如何和…...