Kafka 安装教程和基本操作
一、简介
Kafka
是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper
协调的分布式日志系统(也可以当做 MQ
系统),常见可以用于 web/nginx
日志、访问日志,消息服务等等,Linkedin于2010年12月贡献给了 Apache基金会 并成为顶级开源项目。
应用特性
- 分布式存储:数据被自动分区并分布在集群的节点中。
- 消息有序性:
Kafka
能确保从生产者传到消费者的记录都是有序的。 - 高容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
- 高吞吐量:
Kafka
支持单机每秒至少处理10万以上消息,通常可以达到数百万条消息。 - 易扩展性:支持集群热扩展。
- 高并发:支持数千个客户端同时读写。
- 持久性:支持消息数据持久化到本地磁盘 并支持数据备份和灵活配置数据的持久化时间。
- 实时处理/低延迟:在数据写入的同时对进行处理,消息延迟最低只有几毫秒。
应用场景
Kafka
本质是 支持分布式的消息系统/消息中间件
。分析 Kafka
的应用场景等同于分析 消息中级件
的应用场景。通常,使用 消息系统
的 发布/订阅模型 功能来连接 生产者
和 消费者
。实现以下三大功能:
- 生产者和消费者的解耦
- 消息持久化 / 消息冗余
- 消息缓冲 / 流量消峰
具体应用场景有:
- 日志收集或数据管道:作为日志收集系统或数据处理管道的一部分,以处理大量的日志数据或实时数据流。
- 负载均衡:如果系统收到大量请求或数据流,可以使用消息队列把这些任务平均分配给多个处理器或服务,从而实现负载均衡。
- 系统解耦:消息队列经常用作不同服务间的通信机制,以解耦系统的不同部分。
- 分布式事务:如果一个事务需要跨多个服务进行,可以使用消息队列来协调不同服务之间的通信,确保事务的原子性。
- 实时流数据处理:比如实时日志分析或者实时数据报警。Kafka 能接收实时数据流并保证它的可靠性和持久性,这样就可以在上游源源不断生产数据的同时,下游可以实时地进行分析。
- 通知和实时更新:消息队列可以用作通知的中介,比如告知用户完成某个任务,或者在后端数据更新时实时通知前端。
设计目标
- 高性能:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 消息系统:支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 横向扩展:支持在线水平热扩展
二、kafka安装和配置
1. zookeeper安装配置
需要说明一下, 为了支持 Kafka
的集群功能, Zookeeper
必须使用集群模式部署。
本文以部署 3 个Zookeeper 实例的伪集群为例。具体安装步骤参阅之前的文章:Zookeeper 安装教程和使用指南
2. kafka安装配置
下载链接:Kafka Downloads
下载页面中包含两种下载方式
- : kafka-[version]-src.tgz:包含 Kafka 源码和API源码,需要自己编译
a) 安装
[root@Ali ~]# wget https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz
[root@Ali ~]# tar xzvf kafka_2.12-3.6.2.tgz
[root@Ali ~]# mv /usr/local/kafka_2.12-3.6.2 /usr/local/kafka
b) 配置实例
配置第一个 Kafka
实例
# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9091端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9091
# 相当于listeners=PLAINTEXT://0.0.0.0:9091
# 消息日志存放地址
log.dirs=/usr/local/kafka/logs
# ZooKeeper 地址,多个用,分隔 /kafka指定在zk上的目录
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
配置第二个 Kafka
实例
# broker 编号,集群内必须唯一
broker.id=1
# 监听所有ip的9092端口,PLAINTEXT表示明文传输
listeners=PLAINTEXT://:9092
# 消息日志存放地址
log.dirs=/opt/kafka/logs
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=localhost:12181/kafka,localhost:22181/kafka
注:两个客户端的listeners中的port不能一样
4) 服务管理
# 启动服务 -daemon 表示后台启动
$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties# 查看服务
jps -l43330 org.apache.zookeeper.server.quorum.QuorumPeerMain14356 org.elasticsearch.bootstrap.Elasticsearch14583 org.logstash.Logstash45976 kafka.Kafka # kafka服务进程netstat -anlpt | grep 9091tcp6 0 0 :::9091 :::* LISTEN 45976/javatcp6 0 0 192.168.18.128:9091 192.168.18.128:49356 TIME_WAIT -# 关闭服务
$KAFKA_HOME/bin/kafka-server-stop.sh
3. 常用操作
1) 创建topic
#两条命令效果一样
bin/kafka-topics.sh --create --bootstrap-server localhost:9091 --partitions 2 --replication-factor 2 --topic yumu
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --partitions 2 --replication-factor 2 --topic yumu
在kafka1上创建一个topic,会自动同步到其他客户端
--create
表示创建操作--zookeeper
指定了 Kafka 连接的 ZooKeeper--partitions
表示每个主题4个分区--replication-factor
表示创建每个分区创建2个副本(副本因子)--topic
表示主题名称。
注:副本因子不能超过存活的broker数量,否则报错:Replication factor: 20 larger than available brokers: xxx.
2) 查看topic
# 查看topic列表 #两条命令效果一样
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka __consumer_offsetstopic-demoyumu# 查看topic详细信息 #两条命令效果一样
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic yumu
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic yumu Topic: yumu PartitionCount: 2 ReplicationFactor: 2 Configs:Topic: yumu Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: yumu Partition: 1 Leader: 1 Replicas: 2,1 Isr: 1,2
3) 测试通信
# 窗口1,启动生产者,向yumu主题发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu# 窗口2,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu# 窗口3,启动消费者,订阅yumu主题
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu=====结果=====
# 生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic yumu
>hello, kafka!
>once again.
>
# 消费者1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic yumu
hello, kafka!
once again.# 消费者2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu
hello, kafka!
once again.# 查看所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yumu --from-beginning# 删除topic
bin/kafka-topics.sh --delete --bootstrap-server localhost:9091 --topic yumu
三、遇到的问题
1. 第一次启动kafka成功后,关闭kafka并修改配置,再次启动失败,报错如下:
[2020-11-07 20:43:00,866] INFO Cluster ID = MChFWWMBT9GJClVEriND5A (kafka.server.KafkaServer)
[2020-11-07 20:43:00,873] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID MChFWWMBT9GJClVEriND5A doesn't match stored clusterId Some(c6QPfvqlS6C3gtsYZptQ8Q) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.at kafka.server.KafkaServer.startup(KafkaServer.scala:235)at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)at kafka.Kafka$.main(Kafka.scala:82)at kafka.Kafka.main(Kafka.scala)
[2020-11-07 20:43:00,875] INFO shutting down (kafka.server.KafkaServer)
[2020-11-07 20:43:00,877] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,986] INFO Session: 0x1000da0dde2000c closed (org.apache.zookeeper.ZooKeeper)
[2020-11-07 20:43:00,986] INFO EventThread shut down for session: 0x1000da0dde2000c (org.apache.zookeeper.ClientCnxn)
[2020-11-07 20:43:00,987] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-11-07 20:43:00,992] INFO shut down completed (kafka.server.KafkaServer)[2020-11-07 20:43:00,992] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-11-07 20:43:00,993] INFO shutting down (kafka.server.KafkaServer)
原因:
kafka启动之后会生成一些日志和配置,导致这个问题的原因是第一次启动之后生成了log/meta.properties文件
cat meta.properties
#
#Sat Nov 07 21:43:51 CST 2020
broker.id=1
version=0
cluster.id=MChFWWMBT9GJClVEriND5A
第二次改完配置后再去启动的时候生成应该会生成一个新的id,新的id和旧的ID不一致导致无法启动,删除log/meta.properties文件后重新启动即可(疑问:是不是我关闭的方法不对呢?)
推荐阅读:
- Kafka介绍
- ELK介绍
- Kafka安装
- C语言操作kafka以及安装librdkafka库
下一篇:Kafka消息系统原理
相关文章:

Kafka 安装教程和基本操作
一、简介 Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等…...

Java 五种内部类演示及底层原理详解
内部类 什么是内部类 在A类的内部定义B类,B类就被称为内部类 发动机类单独存在没有意义 发动机为独立个体 可以在外部其他类里创建内部类的对象去调用方法 类的五大成员 属性 方法 构造方法 代码块 内部类 内部类的访问特点 内部类可以直接访问外部类的成员&a…...

【UnityShader入门精要学习笔记】第十五章 使用噪声
本系列为作者学习UnityShader入门精要而作的笔记,内容将包括: 书本中句子照抄 个人批注项目源码一堆新手会犯的错误潜在的太监断更,有始无终 我的GitHub仓库 总之适用于同样开始学习Shader的同学们进行有取舍的参考。 文章目录 使用噪声上…...

C++ ─── string的完整模拟实现
本博客实现了string的常见接口实现 下面是用到的一些函数,供大家回顾复习 string.h #define _CRT_SECURE_NO_WARNINGS 1 #pragma once #include<iostream> #include<assert.h> using namespace std;namespace bit {class string{public:typedef char*…...

安卓中的图片压缩
安卓中如何进行图片压缩? 在安卓中进行图片压缩通常有以下几种方法: 质量压缩: 通过降低图片的质量来减小文件大小。这可以通过Bitmap的compress()方法实现,其中可以设置压缩质量(0-100)。 ByteArrayOutputStream baos…...

centOS7.9 DNS配置
1.DNS规划 dns.sohu.com192.168.110.111Awww.sohucom192.168.110.112Aoa.sohu.com 192.168.110.113A 2.安装 bind yum install -y bind bind-utils 3. 编辑主配置文件 vim /etc/named.conflisten- on port 53 { any; }; allow- query { any; }; 4.配置区域文件 …...

设计模式20——职责链模式
写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用,主要是下面的UML图可以起到大作用,在你学习过一遍以后可能会遗忘,忘记了不要紧,只要看一眼UML图就能想起来了。同时也请大家多多指教。 职责链模式(Chain …...

android13 差分包制作命令
./out/host/linux-x86/bin/ota_from_target_files -v -iCode/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-38.zip --block -p ./out/host/linux-x86 Code/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-39.zip update_ud.zip 脚本命令行参数 命令…...

Flink-cdc更好的流式数据集成工具
What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作,将这些变更事件转化为流式数据,并能够…...

C++|设计模式(三)|抽象工厂模式
抽象工厂模式仍然属于创建型模式,我们在【简单工厂和工厂方法模式】这篇文章中,描述了简单工厂和工厂方法模式,并在文末,简单介绍了工厂方法模式的局限性。 本文将通过汽车工厂的例子继续来阐述使用抽象工厂模式相比较于工厂方法…...

AVB协议分析(一) FQTSS协议介绍
FQTSS协议介绍 一、AVB整体架构二、概述三、协议作用及作用对象四、协议的实现五、参考文献: 一、AVB整体架构 可见FQTSS位于MAC层的上面,代码看不懂,咱们就从最底层开始,逐层分析协议,逐个击破,慢就是快。…...

一个程序员的牢狱生涯(44)询问
星期一 询 问 在号子里开始了下午坐班的时候,过道内的大铁栅栏被管教打开,我听到开锁的声音后,心里变得激动起来。盼望着脚步声能停在我们的号子门口,然后打开铁门,喊一声“眼镜,出来!”。 通道内这次进来的是秦所,但他并没有在我们号子门口停留,只是在走过的时候,低…...

刷爆leetcode第六期
题目一 用队列实现栈 请你仅使用两个队列实现一个后入先出(LIFO)的栈,并支持普通栈的全部四种操作(push、top、pop 和 empty)。 实现 MyStack 类: void push(int x) 将元素 x 压入栈顶。 int pop() 移除…...

汇舟问卷:国外问卷调一天900
大家好,我是汇舟问卷,专注于国外问卷调查互联网项目。夏天已经来临,您是否在三伏天顶着大太阳上班,汗水浸湿了衣襟,却依然要面对繁琐的工作和无尽的压力? 在这个炎热的季节里,我们都渴望找到一…...

openresty完美替代nginx
OpenResty相较于Nginx,其优势主要体现在以下几个方面: 1、Lua脚本支持:OpenResty内置了LuaJIT(Lua的即时编译器),使得用户可以直接在Nginx配置文件中使用Lua脚本,这样可以实现更复杂的业务逻辑…...

深入解析:Element Plus 与 Vite、Nuxt、Laravel 的结合使用
在现代前端开发中,选择合适的工具和框架来提高开发效率和应用性能是至关重要的。 Element-Plus 是一个基于 Vue.js 3.0 的流行 UI组件库,它可以与多种前端和后端框架结合使用,如 Vite、Nuxt 和 Laravel。本文将深入探讨这三者与 Element Plus…...

使ssh连接Linux服务器一直不掉线
怎么可以使ssh连接Linux服务器一直不掉线 解决方法: vim /etc/profile在/etc/profile中的TMOUT改为0 export TMOUT0最后 source /etc/profile就可以了...

2024-05-29 blue-VH-driver-对外接口的并行调用-设计与思考
摘要: VH的driver的对外接口, 要做到可以并行,也就是两个不同的线程,分别调用,不能互相阻塞。 本文记录对其的思考和设计。 上下文: 2024-05-28 blue-VH-driver-需求分析及问题分析-CSDN博客 2024-05-27 blue-vh-问题点-CSDN博客 2024-05…...

ubuntu安装
1.下载镜像文件 2.打开VMware并新建虚拟机 版本选择Ubuntu 64位 磁盘容量改为40GB 点击自定义硬件,点击新CD/DVD(SATA),连接选择ISO映像文件,找到之前下载的Ubuntu镜像文件,然后关闭选项卡。 3.开启虚拟机…...

Rosetta PyRosetta 源码包 安装包 下载
--- pyrosetta_src.zip包含以下包: | --- PyRosetta4.Debug.python27.ubuntu.release-185.tar.bz2 | --- PyRosetta4.Release.python27.linux.release-215.tar.bz2 | --- PyRosetta4.Release.python38.ubuntu.release-349.tar.bz2 --- pyrosetta_whl.zip包含…...

C++ 进阶(3)虚函数表解析
个人主页:仍有未知等待探索-CSDN博客 专题分栏:C 请多多指教! 目录 一、虚函数表 二、单继承(无虚函数覆盖) 继承关系表: 对于实例:derive d 的虚函数表: 对于实例:b…...

2024年新算法-秘书鸟优化算法(SBOA)优化BP神经网络回归预测
2024年新算法-秘书鸟优化算法(SBOA)优化BP神经网络回归预测 亮点: 输出多个评价指标:R2,RMSE,MSE,MAPE和MAE 满足需求,分开运行和对比的都有对应的主函数:main_BP, main_SBOA, main_BPvsBP_SB…...

kafka-主题创建(主题操作的命令)
文章目录 1、topic主题操作的命令1.1、创建一个3分区1副本的主题1.1.1、获取 kafka-topics.sh 的帮助信息1.1.2、副本因子设置不能超过集群中broker的数量1.1.3、创建一个3分区1副本的主题1.1.4、查看所有主题1.1.5、查看主题详细描述 1、topic主题操作的命令 kafka发送消息会存…...

[日常开发] 数据库主从延迟问题
MySQL数据库主从延迟问题 无论是学习还是工作中,MySQL数据库的使用都十分地广泛。在业务中,数据库也会以集群的形式使用,所以会涉及到主从问题。 问题描述 在使用MySQL数据库的时候,在service的方法中首先向A数据表批量插入了数…...

Python高层解雇和客户活跃度量化不确定性模型
🎯要点 🎯量化不确定性模型:🖊模型检测短信编写者行为变化 | 🖊确定(商业领域中)竞争性替代方案 | 🖊确定作弊供词真实比例 | 🖊学生考试作弊 | 🖊确定零部件…...

【IOT】OrangePi+HomeAssistant+Yolov5智能家居融合
前言 本文将以OrangePi AIpro为基础,在此基础构建HomeAssistant、YOLO目标检测实现智能家居更加灵活智能的场景实现。 表头表头设备OrangePi AIpro(8T)系统版本Ubuntu 22.04.4 LTSCPU4核64位处理器 AI处理器AI算力AI算力 8TOPS算力接口HDMI2、GPIO接口、Type-C、M.2…...

Python 点云裁剪
点云裁剪 一、介绍1.1 概念1.2 函数讲解二、代码示例2.1 代码实现2.2 代码讲解三、结果示例一、介绍 1.1 概念 点云裁剪 :根据待裁剪对象的多边形体积(json文件)实现点云的裁剪。 1.2 函数讲解 下面代码示例中主要用到了两个函数。 读取待裁剪对象的多边形体积信息(json文…...

Presto 从提交SQL到获取结果 源码详解(2)
逻辑执行计划: //进入逻辑执行计划阶段 doAnalyzeQuery().new LogicalPlanner().plan(analysis);//createAnalyzePlan createAnalyzePlan(analysis, (Analyze) statement);//返回RelationPlan,(返回root根节点,逻辑树上包含输出字…...

Python的类全面系统学习
文章目录 1. 基本概念1.1 类(Class)1.2 对象(Object) 2. 类的属性和方法3. 类的继承3.1 继承的概念3.2 单继承3.3 多重继承 4. 方法重写与多态4.1 方法重写4.2 多态 5. 特殊方法与运算符重载5.1 特殊方法(魔法方法&…...

信号处理中简单实用的方法
最小二乘法拟合消除趋势项 消除趋势项函数 在MATLAB的工具箱中已有消除线性趋势项的detrend函数;再介绍以最小二乘法拟合消除趋势项的polydetrend 函数。 函数:detrend功能:消除线性趋势项 调用格式:ydetrend(x) 说明:输入参数x是带有线性趋势项的信号序列,输出…...