kafka集群介绍及搭建
介绍
kafka是一个高性能、低延迟、分布式的消息传递系统,特点在于实时处理数据。集群由多个成员节点broker组成,每个节点都可以独立处理消息传递和存储任务。

路由策略
发布消息由key、value组成,真正的消息是value,key是标识路由消息时所要存放的Partition:
1、若已指定partition,消息则直接写入到指定partition;
2、若未指定partition但指定了key,则通过对key的hash值与partition数量取模,结果就是对应的partition;
3、若partition和key均未指定,则使用轮询算法选出一个partition;
写入过程
1、 producer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);
2、producer向broker controller询问指定topic所对应的partition的leader列表地址;
3、broker controller从zk中查找,返回指定topic所对应的partition的leader列表地址;
4、producer根据消息路由策略,查找符合要求的partition leader,然后发送消息;
发送ack机制
1、 acks=0:producer发送的消息到发送端的buffer中就直接返回了,至于这个消息是否真的发送到broker,producer不关心,(类似udp协议);
2、ack=1:producer发送的消息一定要存储到对应的partition的leader副本日志文件中才算成功,若失败,则会retry。在这种模式下,当消息已经存储在leader副本中,但是follower副本还没来得及同步,leader副本的broker节点挂了,消息才会丢失;
3、当acks=-1或者all:producer发送的消息一定要存储到对应的partition的所有副本日志文件中才算消息发送成功,若失败,则会retry。在这种模式下,所有副本的broker节点都挂了,才会丢失;
消费过程
1、consumer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);
2、consumer指定要消费的topic,向broker controller发送poll请求;
3、broker controller为consumer分配一个或多个partition leader,并将该partitioin的当前offset发送给consumer;
4、consumer消费完后,向broker发送新的offset;
5、broker在相应的consumer_offset中更新offset值;
6、重复1-5,直到consumer停止请求消息。
特点
1、producer发布的所有消息会一致保存在kafka集群中,不管消息是否被消费;
2、可以通过设置保留时间来清理过期的数据;
3、consumer可以重置offset,从而可以灵活消费存储在broker上的消息;
环境
systemctl stop firewalld && systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/configvi /etc/hosts
ip1 node1
ip2 node2
ip3 node3
安装配置
#全节点
tar -xvf kafka_2.12-2.0.0.tgzvi config/server.propertieslog.dirs=/var/log/kafka-logs
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
#1至3节点
broker.id:0
listeners=PLAINTEXT://ip1:9092broker.id:1listeners=PLAINTEXT://ip2:9092broker.id:2
listeners=PLAINTEXT://ip3:9092#启动顺序:先启动zookeeper,后启动kafka
#关闭顺序:先关闭kafka,后关闭zookeeper (可使用kill命令直接关闭)
cd /kafka_2.12-2.0.0
kafka-server-start.sh -daemon config/server.properties &
监控
启用JMX
#全节点
vi /kafka_2.12-2.0.0/bin/kafka-server-start.sh
export JMX_PORT="9999"

可视化eagle
tar -zvxf v3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1 && tar -zxvf kafka-eagle-web-3.0.1-bin.tar.gzyum install -y mariadb*mysqladmin -uroot -p password Mdb123#MariaDB [(none)]>create user eagle@localhost identified by 'kafka123#';
MariaDB [(none)]>select user,host from mysql.user;
MariaDB [(none)]>create database ke;
MariaDB [(none)]>exitvi /etc/profile
export KE_HOME=/efak-web-3.0.1
export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/binvi /efak-web-3.0.1/conf/system-config.propertiescluster1.zk.list=ip1:2181,ip2:2181,ip3:2181
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=efak
efak.password=kafka123#ke.sh start

启动
先启动zookeeper服务,后启动kafka服务,broker按照不同topic、partition选举为不同leader、follower,实现消息传递和存储任务的分布式协作。
附
zookeeper: link
相关文章:
kafka集群介绍及搭建
介绍 kafka是一个高性能、低延迟、分布式的消息传递系统,特点在于实时处理数据。集群由多个成员节点broker组成,每个节点都可以独立处理消息传递和存储任务。 路由策略 发布消息由key、value组成,真正的消息是value,key是标识路…...
2024/03/19(网络编程·day5)
一、思维导图 二、selec函数实现TCP并发服务器 #include<myhead.h>#define SER_PORT 8888 //服务器端口号 #define SER_IP "192.168.117.116" //服务器IP int main(int argc, const char *argv[]) {//1、创建一个套接字int sfd -1;sfd socket(AF_INET,SOC…...
LeetCode解法汇总1969. 数组元素的最小非零乘积
目录链接: 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目: https://github.com/September26/java-algorithms 原题链接:. - 力扣(LeetCode) 描述: 给你一个正整数 p 。你有一个下标从 1 开…...
学习vue3第九节(新加指令 v-pre/v-once/v-memo/v-cloak )
1、v-pre 作用:防止编译器解析某个特定的元素及其内容,即v-pre 会跳过当前元素以及其子元素的vue语法解析,并将其保持原样输出; 用于:vue 中一些没有指令和插值表达式的节点的元素,使用 v-pre 可以提高 Vu…...
二开飞机机器人群发,实现自动给多个频道发送消息
频道1 频道2 二开代码部分: const CChatIdListprocess.env.CHANNEL_CHAT_ID_LIST; var channelChatIdArray CChatIdList.split(,);channelChatIdArray.forEach(function(item) {console.log(item); // 这里可以替换为您需要对数组中每个值进行的操作bot.sendM…...
AI如何支持慈善组织
为各种有意义的事业提供支持,无论是努力寻找治愈疾病的方法、研发使生活更轻松的技术,还是为有需要的人提供服务,都是无比崇高的使命。提供捐款或是投入时间支持的捐助者和志愿者往往对他们选择支持的事业的目标、服务和资源分配存有诸多疑虑…...
Git如何清除账户凭证
场景:一般发生在Git用户变更的情况 1.git base 操作 Git会使用凭证助手 credential.helper来储存账户凭证,通过以下命令移除: git config --system --unset credential.helper 除了system系统级外,还有 global、local范围。 查…...
【YUNBEE云贝-PostgreSQL】FDW应用
注: 本文为云贝教育 刘峰 原创,请尊重知识产权,转发请注明出处,不接受任何抄袭、演绎和未经注明出处的转载。 前言 Wrapper(FDW)是一项关键特性,它赋予数据库用户直接通过SQL语句访问存储于外部数据源的能…...
Spring MVC文件上传配置
版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl 文件上传 Spring MVC文件上传基于Servlet 3.0实现;示例代码如下: Overrideprotected void customizeRegistration(ServletRegistration.Dynamic reg…...
JavaScript高级(十八)---进程和线程,宏任务和微任务
进程和线程 进程(process):计算机已经运行的程序,是操作系统管理程序的一种方式,我们可以认为,启动一个应用程序,就会默认启动一个进程(也可能是多个进程)。 线程&…...
How to install mongodb on redhat 7.7
下载rpm: mongodb-enterprise-server-6.0.3-1.el7.x86_64.rpmmongodb-org-server-6.0.4-1.el7.x86_64.rpmmongodb-mms-6.0.9.100.20230201T2148Z.x86_64.rpm rpm -ivh mongodb-org-server-6.0.4-1.el7.x86_64.rpm rpm -ivh mongodb-mms-6.0.9.100.20230201T2148Z.x86_64.rpm …...
关于继承是怎么样的?那当然是很好理解之
本文描述了关于继承的大部分知识,但是并不全,每篇博客之间的知识都有互串,所以需要把几篇文章合起来看,学会融会贯通! 温馨提示:使用PC端观看,效果更佳! 目录 1.继承是什么 2.什…...
高可用系统有哪些设计原则
1.降级 主动降级:开关推送 被动降级:超时降级 异常降级 失败率 熔断保护 多级降级2.限流 nginx的limit模块 gateway redisLua 业务层限流 本地限流 gua 分布式限流 sentinel 3.弹性计算 弹性伸缩—K8Sdocker 主链路压力过大的时候可以将非主链路的机器给…...
LeetCode-回文数
LeetCode-回文数 解体思路: ①第一种:转换成字符串,使用字符串的现有api方法进行反转 ②第二种:直接使用循环除余乘10方法,进行反转 涉及知识点: 循环判断,StringBuffer,int类型…...
50. 【Linux教程】源码安装软件
本小节介绍如何使用软件的源码包安装软件,以安装 nginx 源码包为例。 1.下载软件源码包 使用如下命令下载 nginx 源码包: wget http://nginx.org/download/nginx-1.18.0.tar.gz执行结果如下图所示: 2.解压源码包 下载好了压缩包之后&#…...
《操作系统实践-基于Linux应用与内核编程》第10章--实验 Qt聊天程序
前言: 内容参考《操作系统实践-基于Linux应用与内核编程》一书的示例代码和教材内容,所做的读书笔记。本文记录再这里按照书中示例做一遍代码编程实践加深对操作系统的理解。 引用: 《操作系统实践-基于Linux应用与内核编程》 作者:房胜、李旭健、黄…...
探究Kafka主题删除失败的根本原因
欢迎来到我的博客,代码的世界里,每一行都是一个故事 探究Kafka主题删除失败的根本原因 前言主题删除的基础主题删除的定义和作用:删除操作的基本流程: 可能存在删除异常的因素数据积压的处理方法Broker状态异常处理方法通用方法 前…...
JavaSE(上)-Day7
JavaSE(上)-Day7 类和对象封装privatethis构造方法标准JavaBean对象的内存图执行Test类main方法生成一个User对象的内存过程 基本数据类型和引用数据类型的区别this的内存原理成员变量和局部变量区别 类和对象 类是设计图纸,对象是真正的实例…...
记录一下在Pycharm中虚拟环境的创建
如果在Pycharm中要新建一个虚拟环境,那你可以在Terminal中选择Command Prompt,在这里面执行相关命令 一、安装了Anaconda,创建虚拟环境 当你使用解释器是Anaconda提供的时,你可以使用conda命令执行,见以下操作&#x…...
Python从入门到精通秘籍九
一、Python中文件编码概念 在Python中,文件编码指的是将文本内容转换为字节序列的过程。不同的编码方式使用不同的字符集和字节表示形式。下面是一个示例代码: # 写入文本到文件 text "你好,世界!" with open("…...
如何快速配置英雄联盟:ChampR智能助手的完整使用指南
如何快速配置英雄联盟:ChampR智能助手的完整使用指南 【免费下载链接】champr 🐶 Yet another League of Legends helper 项目地址: https://gitcode.com/gh_mirrors/ch/champr 想要在英雄联盟中轻松获取最优出装和符文搭配吗?ChampR智…...
当网盘遇见你的浏览器:重新定义文件下载体验
当网盘遇见你的浏览器:重新定义文件下载体验 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国移动云盘 / 天翼云盘 / 迅…...
GitHub第一次开源贡献怎么做?以 First Contributions 项目为例,把 Fork、Clone、PR 一次讲明白
🤵♂️ 个人主页:小李同学_LSH的主页 ✍🏻 作者简介:LLM学习者 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞Ǵ…...
ahk2_lib架构设计解析:构建AutoHotkey V2原生扩展生态的技术实现
ahk2_lib架构设计解析:构建AutoHotkey V2原生扩展生态的技术实现 【免费下载链接】ahk2_lib 项目地址: https://gitcode.com/gh_mirrors/ah/ahk2_lib ahk2_lib是专为AutoHotkey V2设计的原生扩展库集合,通过系统级API封装、跨语言调用机制和模块…...
你的老Mac还能再战十年吗?OpenCore Legacy Patcher让旧设备焕发新生
你的老Mac还能再战十年吗?OpenCore Legacy Patcher让旧设备焕发新生 【免费下载链接】OpenCore-Legacy-Patcher Experience macOS just like before 项目地址: https://gitcode.com/GitHub_Trending/op/OpenCore-Legacy-Patcher 你是否还在为老款Mac无法升级…...
抖音无水印下载工具:从零开始构建你的专属视频素材库
抖音无水印下载工具:从零开始构建你的专属视频素材库 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback suppor…...
EasyAnimateV5-7b-zh-InP开源大模型实战:对接OSS对象存储自动归档生成视频
EasyAnimateV5-7b-zh-InP开源大模型实战:对接OSS对象存储自动归档生成视频 1. 从图片到视频:EasyAnimateV5-7b-zh-InP模型初探 你有没有想过,一张静态的照片,能在几秒钟内“活”过来,变成一段生动的短视频࿱…...
零基础玩转DeOldify:快速搭建图像上色服务,修复珍贵记忆
零基础玩转DeOldify:快速搭建图像上色服务,修复珍贵记忆 1. 项目介绍与核心价值 老照片承载着无数珍贵记忆,但随着时间的推移,这些照片往往会褪色或变成黑白。现在,借助AI技术,我们可以轻松为这些老照片恢…...
第 29 课:任务页筛选方案预设与快捷视图
第 29 课:任务页筛选方案预设与快捷视图 这一课,我们继续沿着任务管理页主线往下走,把它从“能筛选”推进到“能复用工作台”: 让用户把当前筛选条件保存成方案,以后可以一键切回。 这一步很像真实后台系统里常见的&am…...
为什么OpenAI、DeepMind、中科院脑智卓越中心同时缺席2026奇点大会主论坛?(意识伦理红线白皮书内部版泄露)
第一章:2026奇点智能技术大会:AGI与意识问题 2026奇点智能技术大会(https://ml-summit.org) AGI系统中的现象学建模挑战 本届大会首次设立“人工现象学”专题轨道,聚焦AGI是否可能具备第一人称体验(qualia)的实证路径…...
