kafka集群介绍及搭建
介绍
kafka是一个高性能、低延迟、分布式的消息传递系统,特点在于实时处理数据。集群由多个成员节点broker组成,每个节点都可以独立处理消息传递和存储任务。
路由策略
发布消息由key、value组成,真正的消息是value,key是标识路由消息时所要存放的Partition:
1、若已指定partition,消息则直接写入到指定partition;
2、若未指定partition但指定了key,则通过对key的hash值与partition数量取模,结果就是对应的partition;
3、若partition和key均未指定,则使用轮询算法选出一个partition;
写入过程
1、 producer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);
2、producer向broker controller询问指定topic所对应的partition的leader列表地址;
3、broker controller从zk中查找,返回指定topic所对应的partition的leader列表地址;
4、producer根据消息路由策略,查找符合要求的partition leader,然后发送消息;
发送ack机制
1、 acks=0:producer发送的消息到发送端的buffer中就直接返回了,至于这个消息是否真的发送到broker,producer不关心,(类似udp协议);
2、ack=1:producer发送的消息一定要存储到对应的partition的leader副本日志文件中才算成功,若失败,则会retry。在这种模式下,当消息已经存储在leader副本中,但是follower副本还没来得及同步,leader副本的broker节点挂了,消息才会丢失;
3、当acks=-1或者all:producer发送的消息一定要存储到对应的partition的所有副本日志文件中才算消息发送成功,若失败,则会retry。在这种模式下,所有副本的broker节点都挂了,才会丢失;
消费过程
1、consumer向kafka集群提交连接请求,任意broker节点都会返回broker controller的通信URL(配置文件中的listeners地址);
2、consumer指定要消费的topic,向broker controller发送poll请求;
3、broker controller为consumer分配一个或多个partition leader,并将该partitioin的当前offset发送给consumer;
4、consumer消费完后,向broker发送新的offset;
5、broker在相应的consumer_offset中更新offset值;
6、重复1-5,直到consumer停止请求消息。
特点
1、producer发布的所有消息会一致保存在kafka集群中,不管消息是否被消费;
2、可以通过设置保留时间来清理过期的数据;
3、consumer可以重置offset,从而可以灵活消费存储在broker上的消息;
环境
systemctl stop firewalld && systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/configvi /etc/hosts
ip1 node1
ip2 node2
ip3 node3
安装配置
#全节点
tar -xvf kafka_2.12-2.0.0.tgzvi config/server.propertieslog.dirs=/var/log/kafka-logs
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
#1至3节点
broker.id:0
listeners=PLAINTEXT://ip1:9092broker.id:1listeners=PLAINTEXT://ip2:9092broker.id:2
listeners=PLAINTEXT://ip3:9092#启动顺序:先启动zookeeper,后启动kafka
#关闭顺序:先关闭kafka,后关闭zookeeper (可使用kill命令直接关闭)
cd /kafka_2.12-2.0.0
kafka-server-start.sh -daemon config/server.properties &
监控
启用JMX
#全节点
vi /kafka_2.12-2.0.0/bin/kafka-server-start.sh
export JMX_PORT="9999"
可视化eagle
tar -zvxf v3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1 && tar -zxvf kafka-eagle-web-3.0.1-bin.tar.gzyum install -y mariadb*mysqladmin -uroot -p password Mdb123#MariaDB [(none)]>create user eagle@localhost identified by 'kafka123#';
MariaDB [(none)]>select user,host from mysql.user;
MariaDB [(none)]>create database ke;
MariaDB [(none)]>exitvi /etc/profile
export KE_HOME=/efak-web-3.0.1
export PATH=$PATH:$KAFKA_HOME/bin:$ZK_HOME/binvi /efak-web-3.0.1/conf/system-config.propertiescluster1.zk.list=ip1:2181,ip2:2181,ip3:2181
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=efak
efak.password=kafka123#ke.sh start
启动
先启动zookeeper服务,后启动kafka服务,broker按照不同topic、partition选举为不同leader、follower,实现消息传递和存储任务的分布式协作。
附
zookeeper: link
相关文章:

kafka集群介绍及搭建
介绍 kafka是一个高性能、低延迟、分布式的消息传递系统,特点在于实时处理数据。集群由多个成员节点broker组成,每个节点都可以独立处理消息传递和存储任务。 路由策略 发布消息由key、value组成,真正的消息是value,key是标识路…...

2024/03/19(网络编程·day5)
一、思维导图 二、selec函数实现TCP并发服务器 #include<myhead.h>#define SER_PORT 8888 //服务器端口号 #define SER_IP "192.168.117.116" //服务器IP int main(int argc, const char *argv[]) {//1、创建一个套接字int sfd -1;sfd socket(AF_INET,SOC…...
LeetCode解法汇总1969. 数组元素的最小非零乘积
目录链接: 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目: https://github.com/September26/java-algorithms 原题链接:. - 力扣(LeetCode) 描述: 给你一个正整数 p 。你有一个下标从 1 开…...

学习vue3第九节(新加指令 v-pre/v-once/v-memo/v-cloak )
1、v-pre 作用:防止编译器解析某个特定的元素及其内容,即v-pre 会跳过当前元素以及其子元素的vue语法解析,并将其保持原样输出; 用于:vue 中一些没有指令和插值表达式的节点的元素,使用 v-pre 可以提高 Vu…...

二开飞机机器人群发,实现自动给多个频道发送消息
频道1 频道2 二开代码部分: const CChatIdListprocess.env.CHANNEL_CHAT_ID_LIST; var channelChatIdArray CChatIdList.split(,);channelChatIdArray.forEach(function(item) {console.log(item); // 这里可以替换为您需要对数组中每个值进行的操作bot.sendM…...

AI如何支持慈善组织
为各种有意义的事业提供支持,无论是努力寻找治愈疾病的方法、研发使生活更轻松的技术,还是为有需要的人提供服务,都是无比崇高的使命。提供捐款或是投入时间支持的捐助者和志愿者往往对他们选择支持的事业的目标、服务和资源分配存有诸多疑虑…...

Git如何清除账户凭证
场景:一般发生在Git用户变更的情况 1.git base 操作 Git会使用凭证助手 credential.helper来储存账户凭证,通过以下命令移除: git config --system --unset credential.helper 除了system系统级外,还有 global、local范围。 查…...

【YUNBEE云贝-PostgreSQL】FDW应用
注: 本文为云贝教育 刘峰 原创,请尊重知识产权,转发请注明出处,不接受任何抄袭、演绎和未经注明出处的转载。 前言 Wrapper(FDW)是一项关键特性,它赋予数据库用户直接通过SQL语句访问存储于外部数据源的能…...

Spring MVC文件上传配置
版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl 文件上传 Spring MVC文件上传基于Servlet 3.0实现;示例代码如下: Overrideprotected void customizeRegistration(ServletRegistration.Dynamic reg…...

JavaScript高级(十八)---进程和线程,宏任务和微任务
进程和线程 进程(process):计算机已经运行的程序,是操作系统管理程序的一种方式,我们可以认为,启动一个应用程序,就会默认启动一个进程(也可能是多个进程)。 线程&…...
How to install mongodb on redhat 7.7
下载rpm: mongodb-enterprise-server-6.0.3-1.el7.x86_64.rpmmongodb-org-server-6.0.4-1.el7.x86_64.rpmmongodb-mms-6.0.9.100.20230201T2148Z.x86_64.rpm rpm -ivh mongodb-org-server-6.0.4-1.el7.x86_64.rpm rpm -ivh mongodb-mms-6.0.9.100.20230201T2148Z.x86_64.rpm …...

关于继承是怎么样的?那当然是很好理解之
本文描述了关于继承的大部分知识,但是并不全,每篇博客之间的知识都有互串,所以需要把几篇文章合起来看,学会融会贯通! 温馨提示:使用PC端观看,效果更佳! 目录 1.继承是什么 2.什…...

高可用系统有哪些设计原则
1.降级 主动降级:开关推送 被动降级:超时降级 异常降级 失败率 熔断保护 多级降级2.限流 nginx的limit模块 gateway redisLua 业务层限流 本地限流 gua 分布式限流 sentinel 3.弹性计算 弹性伸缩—K8Sdocker 主链路压力过大的时候可以将非主链路的机器给…...
LeetCode-回文数
LeetCode-回文数 解体思路: ①第一种:转换成字符串,使用字符串的现有api方法进行反转 ②第二种:直接使用循环除余乘10方法,进行反转 涉及知识点: 循环判断,StringBuffer,int类型…...

50. 【Linux教程】源码安装软件
本小节介绍如何使用软件的源码包安装软件,以安装 nginx 源码包为例。 1.下载软件源码包 使用如下命令下载 nginx 源码包: wget http://nginx.org/download/nginx-1.18.0.tar.gz执行结果如下图所示: 2.解压源码包 下载好了压缩包之后&#…...
《操作系统实践-基于Linux应用与内核编程》第10章--实验 Qt聊天程序
前言: 内容参考《操作系统实践-基于Linux应用与内核编程》一书的示例代码和教材内容,所做的读书笔记。本文记录再这里按照书中示例做一遍代码编程实践加深对操作系统的理解。 引用: 《操作系统实践-基于Linux应用与内核编程》 作者:房胜、李旭健、黄…...

探究Kafka主题删除失败的根本原因
欢迎来到我的博客,代码的世界里,每一行都是一个故事 探究Kafka主题删除失败的根本原因 前言主题删除的基础主题删除的定义和作用:删除操作的基本流程: 可能存在删除异常的因素数据积压的处理方法Broker状态异常处理方法通用方法 前…...
JavaSE(上)-Day7
JavaSE(上)-Day7 类和对象封装privatethis构造方法标准JavaBean对象的内存图执行Test类main方法生成一个User对象的内存过程 基本数据类型和引用数据类型的区别this的内存原理成员变量和局部变量区别 类和对象 类是设计图纸,对象是真正的实例…...

记录一下在Pycharm中虚拟环境的创建
如果在Pycharm中要新建一个虚拟环境,那你可以在Terminal中选择Command Prompt,在这里面执行相关命令 一、安装了Anaconda,创建虚拟环境 当你使用解释器是Anaconda提供的时,你可以使用conda命令执行,见以下操作&#x…...
Python从入门到精通秘籍九
一、Python中文件编码概念 在Python中,文件编码指的是将文本内容转换为字节序列的过程。不同的编码方式使用不同的字符集和字节表示形式。下面是一个示例代码: # 写入文本到文件 text "你好,世界!" with open("…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...

技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...

短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...
python爬虫——气象数据爬取
一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用: 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests:发送 …...

Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...

GraphQL 实战篇:Apollo Client 配置与缓存
GraphQL 实战篇:Apollo Client 配置与缓存 上一篇:GraphQL 入门篇:基础查询语法 依旧和上一篇的笔记一样,主实操,没啥过多的细节讲解,代码具体在: https://github.com/GoldenaArcher/graphql…...

一些实用的chrome扩展0x01
简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序,无论是测试应用程序、搜寻漏洞还是收集情报,它们都能提升工作流程。 FoxyProxy 代理管理工具,此扩展简化了使用代理(如 Burp…...

相关类相关的可视化图像总结
目录 一、散点图 二、气泡图 三、相关图 四、热力图 五、二维密度图 六、多模态二维密度图 七、雷达图 八、桑基图 九、总结 一、散点图 特点 通过点的位置展示两个连续变量之间的关系,可直观判断线性相关、非线性相关或无相关关系,点的分布密…...