kafka集群介绍及搭建
介绍
kafka是一个高性能、低延迟、分布式的消息传递系统,特点在于实时处理数据。集群由多个成员节点broker组成,每个节点都可以独立处理消息传递和存储任务。

路由策略
发布消息由key、value组成,真正的消息是value,key是标识路由消息时所要存放的Partition:
1、若已指定partition,消息则直接写入到指定partition;
2、若未指定partition但指定了key,则通过对key的hash值与partition数量取模,结果就是对应的partition;
3、若partition和key均未指定,则使用轮询算法选出一个partition;
写入过程
1、 producer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);
2、producer向broker controller询问指定topic所对应的partition的leader列表地址;
3、broker controller从zk中查找,返回指定topic所对应的partition的leader列表地址;
4、producer根据消息路由策略,查找符合要求的partition leader,然后发送消息;
发送ack机制
1、 acks=0:producer发送的消息到发送端的buffer中就直接返回了,至于这个消息是否真的发送到broker,producer不关心,(类似udp协议);
2、ack=1:producer发送的消息一定要存储到对应的partition的leader副本日志文件中才算成功,若失败,则会retry。在这种模式下,当消息已经存储在leader副本中,但是follower副本还没来得及同步,leader副本的broker节点挂了,消息才会丢失;
3、当acks=-1或者all:producer发送的消息一定要存储到对应的partition的所有副本日志文件中才算消息发送成功,若失败,则会retry。在这种模式下,所有副本的broker节点都挂了,才会丢失;
消费过程
1、consumer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);
2、consumer指定要消费的topic,向broker controller发送poll请求;
3、broker controller为consumer分配一个或多个partition leader,并将该partitioin的当前offset发送给consumer;
4、consumer消费完后,向broker发送新的offset;
5、broker在相应的consumer_offset中更新offset值;
6、重复1-5,直到consumer停止请求消息。
特点
1、producer发布的所有消息会一致保存在kafka集群中,不管消息是否被消费;
2、可以通过设置保留时间来清理过期的数据;
3、consumer可以重置offset,从而可以灵活消费存储在broker上的消息;
环境
systemctl stop firewalld && systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/configvi /etc/hosts
ip1 node1
ip2 node2
ip3 node3
安装配置
#全节点
tar -xvf kafka_2.12-2.0.0.tgzvi config/server.propertieslog.dirs=/var/log/kafka-logs
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
#1至3节点
broker.id:0
listeners=PLAINTEXT://ip1:9092broker.id:1listeners=PLAINTEXT://ip2:9092broker.id:2
listeners=PLAINTEXT://ip3:9092#启动顺序:先启动zookeeper,后启动kafka
#关闭顺序:先关闭kafka,后关闭zookeeper (可使用kill命令直接关闭)
cd /kafka_2.12-2.0.0
kafka-server-start.sh -daemon config/server.properties &
监控
启用JMX
#全节点
vi /kafka_2.12-2.0.0/bin/kafka-server-start.sh
export JMX_PORT="9999"

可视化eagle
tar -zvxf v3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1 && tar -zxvf kafka-eagle-web-3.0.1-bin.tar.gzyum install -y mariadb*mysqladmin -uroot -p password Mdb123#MariaDB [(none)]>create user eagle@localhost identified by 'kafka123#';
MariaDB [(none)]>select user,host from mysql.user;
MariaDB [(none)]>create database ke;
MariaDB [(none)]>exitvi /etc/profile
export KE_HOME=/efak-web-3.0.1
export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/binvi /efak-web-3.0.1/conf/system-config.propertiescluster1.zk.list=ip1:2181,ip2:2181,ip3:2181
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=efak
efak.password=kafka123#ke.sh start

启动
先启动zookeeper服务,后启动kafka服务,broker按照不同topic、partition选举为不同leader、follower,实现消息传递和存储任务的分布式协作。
附
zookeeper: link
相关文章:
kafka集群介绍及搭建
介绍 kafka是一个高性能、低延迟、分布式的消息传递系统,特点在于实时处理数据。集群由多个成员节点broker组成,每个节点都可以独立处理消息传递和存储任务。 路由策略 发布消息由key、value组成,真正的消息是value,key是标识路…...
2024/03/19(网络编程·day5)
一、思维导图 二、selec函数实现TCP并发服务器 #include<myhead.h>#define SER_PORT 8888 //服务器端口号 #define SER_IP "192.168.117.116" //服务器IP int main(int argc, const char *argv[]) {//1、创建一个套接字int sfd -1;sfd socket(AF_INET,SOC…...
LeetCode解法汇总1969. 数组元素的最小非零乘积
目录链接: 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目: https://github.com/September26/java-algorithms 原题链接:. - 力扣(LeetCode) 描述: 给你一个正整数 p 。你有一个下标从 1 开…...
学习vue3第九节(新加指令 v-pre/v-once/v-memo/v-cloak )
1、v-pre 作用:防止编译器解析某个特定的元素及其内容,即v-pre 会跳过当前元素以及其子元素的vue语法解析,并将其保持原样输出; 用于:vue 中一些没有指令和插值表达式的节点的元素,使用 v-pre 可以提高 Vu…...
二开飞机机器人群发,实现自动给多个频道发送消息
频道1 频道2 二开代码部分: const CChatIdListprocess.env.CHANNEL_CHAT_ID_LIST; var channelChatIdArray CChatIdList.split(,);channelChatIdArray.forEach(function(item) {console.log(item); // 这里可以替换为您需要对数组中每个值进行的操作bot.sendM…...
AI如何支持慈善组织
为各种有意义的事业提供支持,无论是努力寻找治愈疾病的方法、研发使生活更轻松的技术,还是为有需要的人提供服务,都是无比崇高的使命。提供捐款或是投入时间支持的捐助者和志愿者往往对他们选择支持的事业的目标、服务和资源分配存有诸多疑虑…...
Git如何清除账户凭证
场景:一般发生在Git用户变更的情况 1.git base 操作 Git会使用凭证助手 credential.helper来储存账户凭证,通过以下命令移除: git config --system --unset credential.helper 除了system系统级外,还有 global、local范围。 查…...
【YUNBEE云贝-PostgreSQL】FDW应用
注: 本文为云贝教育 刘峰 原创,请尊重知识产权,转发请注明出处,不接受任何抄袭、演绎和未经注明出处的转载。 前言 Wrapper(FDW)是一项关键特性,它赋予数据库用户直接通过SQL语句访问存储于外部数据源的能…...
Spring MVC文件上传配置
版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl 文件上传 Spring MVC文件上传基于Servlet 3.0实现;示例代码如下: Overrideprotected void customizeRegistration(ServletRegistration.Dynamic reg…...
JavaScript高级(十八)---进程和线程,宏任务和微任务
进程和线程 进程(process):计算机已经运行的程序,是操作系统管理程序的一种方式,我们可以认为,启动一个应用程序,就会默认启动一个进程(也可能是多个进程)。 线程&…...
How to install mongodb on redhat 7.7
下载rpm: mongodb-enterprise-server-6.0.3-1.el7.x86_64.rpmmongodb-org-server-6.0.4-1.el7.x86_64.rpmmongodb-mms-6.0.9.100.20230201T2148Z.x86_64.rpm rpm -ivh mongodb-org-server-6.0.4-1.el7.x86_64.rpm rpm -ivh mongodb-mms-6.0.9.100.20230201T2148Z.x86_64.rpm …...
关于继承是怎么样的?那当然是很好理解之
本文描述了关于继承的大部分知识,但是并不全,每篇博客之间的知识都有互串,所以需要把几篇文章合起来看,学会融会贯通! 温馨提示:使用PC端观看,效果更佳! 目录 1.继承是什么 2.什…...
高可用系统有哪些设计原则
1.降级 主动降级:开关推送 被动降级:超时降级 异常降级 失败率 熔断保护 多级降级2.限流 nginx的limit模块 gateway redisLua 业务层限流 本地限流 gua 分布式限流 sentinel 3.弹性计算 弹性伸缩—K8Sdocker 主链路压力过大的时候可以将非主链路的机器给…...
LeetCode-回文数
LeetCode-回文数 解体思路: ①第一种:转换成字符串,使用字符串的现有api方法进行反转 ②第二种:直接使用循环除余乘10方法,进行反转 涉及知识点: 循环判断,StringBuffer,int类型…...
50. 【Linux教程】源码安装软件
本小节介绍如何使用软件的源码包安装软件,以安装 nginx 源码包为例。 1.下载软件源码包 使用如下命令下载 nginx 源码包: wget http://nginx.org/download/nginx-1.18.0.tar.gz执行结果如下图所示: 2.解压源码包 下载好了压缩包之后&#…...
《操作系统实践-基于Linux应用与内核编程》第10章--实验 Qt聊天程序
前言: 内容参考《操作系统实践-基于Linux应用与内核编程》一书的示例代码和教材内容,所做的读书笔记。本文记录再这里按照书中示例做一遍代码编程实践加深对操作系统的理解。 引用: 《操作系统实践-基于Linux应用与内核编程》 作者:房胜、李旭健、黄…...
探究Kafka主题删除失败的根本原因
欢迎来到我的博客,代码的世界里,每一行都是一个故事 探究Kafka主题删除失败的根本原因 前言主题删除的基础主题删除的定义和作用:删除操作的基本流程: 可能存在删除异常的因素数据积压的处理方法Broker状态异常处理方法通用方法 前…...
JavaSE(上)-Day7
JavaSE(上)-Day7 类和对象封装privatethis构造方法标准JavaBean对象的内存图执行Test类main方法生成一个User对象的内存过程 基本数据类型和引用数据类型的区别this的内存原理成员变量和局部变量区别 类和对象 类是设计图纸,对象是真正的实例…...
记录一下在Pycharm中虚拟环境的创建
如果在Pycharm中要新建一个虚拟环境,那你可以在Terminal中选择Command Prompt,在这里面执行相关命令 一、安装了Anaconda,创建虚拟环境 当你使用解释器是Anaconda提供的时,你可以使用conda命令执行,见以下操作&#x…...
Python从入门到精通秘籍九
一、Python中文件编码概念 在Python中,文件编码指的是将文本内容转换为字节序列的过程。不同的编码方式使用不同的字符集和字节表示形式。下面是一个示例代码: # 写入文本到文件 text "你好,世界!" with open("…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...
Kafka入门-生产者
生产者 生产者发送流程: 延迟时间为0ms时,也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...
