当前位置: 首页 > news >正文

最全Kafka知识宝典之Kafka的基本使用

一、基本概念

传统上定义是一个分布式的基于发布/订阅模式的消息队列,主要应用在大数据实时处理场景,现在Kafka已经定义为一个分布式流平台,用于数据通道处理,数据流分析,数据集成和关键任务应用

必须了解的四个特性:

1、Kafak可以无缝的支持多个生产者,也支持多个消费者从一个单独的消息流中读取数据,且各个消费者之间互不影响

2、Kafka还允许消费者非实时读取消息,因为Kafka将消息按一定顺序持久化到磁盘,保证了数据不会丢失,顺序写磁盘的效率比随机写内存还要高,而且以时间复杂度为O(1)的方式提供消息持久化能力,对TB级以上的数据也能保证常数时间的访问性能。

3、由于Kafka可横向扩展生产者、消费者、broker,使得集群可以轻松处理巨大的消息流,在处理大量数据的同时,还能保证亚秒级的消息延迟,实时性极高。

4、Kafka消息都会在集群中进行备份,每个分区都有一台server作为leader,其他server作为follwers,当leader宕机了,follower中的一台server会自动成为新的leader,继续有条不紊的工作,所以容错性很高且集群的负载是平衡的。

一句话总结Kafka:

Kafka 适合高吞吐量、低延迟和大规模数据流处理的场景,尤其是需要实时数据传输和处理的大数据应用,而其他 MQ 在这些方面可能无法满足性能要求。

二、Kafka的架构

从上图就可以看出,Kafka的几个核心角色:生产者、消费者、Broker、zookeeper

product

生产者可以向消息队列发送各种类型的消息,由消费者消费。

这里注意,这里的生产者是可以有多个生产者服务的实例的

consumer

消费者从broker中消费消息。

这里需要注意,消费者也可以包含多个消费者实例,去消费同一个topic中的消息

总结:生产者与消费者都可以有多个,他们之间由topic来绑定关系

zookeeper

ZooKeeper 负责跟踪 Kafka 集群中的所有 Broker 节点,确保每个节点的状态(在线或离线)都被准确记录。其主要目的就是对所有broker做管理。比如:当某个主题的分区需要选举一个新的领导者时,ZooKeeper 负责协调这一过程,确保选举过程的公平性和一致性。

Broker

Broker就是Kafka消息服务的实例,可以部署多个broker形成kafka集群,在broker当中主要由下面这么几个角色来组成,我们以数据库的形式来类比介绍:

topic

这个相当于数据库中的数据表名,一个业务有一张表,也就是有一个topic,生产者往这个topic写消息,消费者从这个topic读消息

partition

这个相当于分表的概念,一张数据表太大了,查询影响性能,所以将一张大表水平拆分成多张表。所以partition就是将一个大的topic数据拆分成多个分区

Replication

这个是对每个partition做个备份,指定了一个partition需要多少个备份,那么我就会备份几份均匀分配到集群中所有的broker上

Leader&Follower

每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上,我们正常使用kafka是感觉不到leader、follower的存在的。

但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader。

  • Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步
  • 如果leader出现故障,其他follower会被重新选举为leader
  • follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中

三、消息模型

消息由生产者发送到kafka集群后,会被消费者消费,一般来说我们的消费模型有两种:推送模型(psuh)和拉取模型(pull)

首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。

默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。

推模式

推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送

优点:

  • 消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer
  • 对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来。

缺点:

  • 推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间增长就会把消费者撑死,因为根本消费不过来啊
  • 并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。

拉模式

拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer

优点:

  • 拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求,假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。
  • 拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。
  • 拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息,而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。

缺点:

  • 消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了,因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。
  • 消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。

Kafka是什么模式

RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ

Kafka采用了一种长轮询模式,这是基于拉模式的

Kafka 在拉请求中有参数,可以使得消费者请求在 “长轮询” 中阻塞等待。

简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去请求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息请求。注意:阻塞是发生在broker端的。

并且 Broker 也得配合,如果消费者请求过来,有消息肯定马上返回,没有消息那就建立一个延迟操作,等条件满足了再返回

四、Kafka的安装使用

1、安装

我们以三点Kafka节点的形式对Kafka进行安装部署

创建数据目录

 mkdir -p /tmp/kafka/broker{1..3}/{data,logs}

创建配置文件

vi docker-compose.yaml
version: '2'
services:zookeeper:container_name: zookeeperimage: wurstmeister/zookeeperrestart: unless-stoppedhostname: zoo1ports:- "2181:2181"networks:- kafkakafka1:container_name: kafka1image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.245.253                   ## 修改:宿主机IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.245.253:9092  ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"KAFKA_ADVERTISED_PORT: 9092KAFKA_BROKER_ID: 1KAFKA_LOG_DIRS: /kafka/datavolumes:- /tmp/kafka/broker1/logs:/opt/kafka/logs- /tmp/kafka/broker1/data:/kafka/datadepends_on:- zookeepernetworks:- kafkakafka2:container_name: kafka2image: wurstmeister/kafkaports:- "9093:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.245.253                   ## 修改:宿主机IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.245.253:9093  ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"KAFKA_ADVERTISED_PORT: 9093KAFKA_BROKER_ID: 2KAFKA_LOG_DIRS: /kafka/datavolumes:- /tmp/kafka/broker2/logs:/opt/kafka/logs- /tmp/kafka/broker2/data:/kafka/datadepends_on:- zookeepernetworks:- kafkakafka3:container_name: kafka3image: wurstmeister/kafkaports:- "9094:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.245.253                   ## 修改:宿主机IPKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.245.253:9094  ## 修改:宿主机IPKAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"KAFKA_ADVERTISED_PORT: 9094KAFKA_BROKER_ID: 3KAFKA_LOG_DIRS: /kafka/datavolumes:- /tmp/kafka/broker3/logs:/opt/kafka/logs- /tmp/kafka/broker3/data:/kafka/datadepends_on:- zookeepernetworks:- kafkakafka-manager:image: sheepkiller/kafka-manager              ## 镜像:开源的web管理kafka集群的界面environment:ZK_HOSTS: 192.168.245.253                 ## 修改:宿主机IPports:- "9000:9000"                               ## 暴露端口networks:- kafka
networks:kafka:driver: bridge

启动 

docker-compose up -d

2、命令行使用

首先进入kafka容器内部:

docker exec -it kafka1 /bin/bash

进入到Kafka安装目录 

cd /opt/kafka

执行脚本

创建主题

sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test

查看所有主题

sh bin/kafka-topics.sh --zookeeper zookeeper:2181 -list

查看分区情况

sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test

 创建分区以及副本

sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test --partitions 3 --replication-factor 3

扩容分区

sh bin/kafka-topics.sh --zookeeper zookeeper:2181 --partitions 4 --alter --topic test

测试生产者发送消息

sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

测试消费者接收消息

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

五、SpringBoot整合Kafka

1、添加依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

2、配置Kafka

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、创建生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Sent message: " + message);}
}

4、创建消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}

相关文章:

最全Kafka知识宝典之Kafka的基本使用

一、基本概念 传统上定义是一个分布式的基于发布/订阅模式的消息队列&#xff0c;主要应用在大数据实时处理场景&#xff0c;现在Kafka已经定义为一个分布式流平台&#xff0c;用于数据通道处理&#xff0c;数据流分析&#xff0c;数据集成和关键任务应用 必须了解的四个特性…...

机器学习中的数据可视化:常用库、单变量图与多变量图绘制方法

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…...

CodeQL学习笔记(3)-QL语法(模块、变量、表达式、公式和注解)

最近在学习CodeQL&#xff0c;对于CodeQL就不介绍了&#xff0c;目前网上一搜一大把。本系列是学习CodeQL的个人学习笔记&#xff0c;根据个人知识库笔记修改整理而来的&#xff0c;分享出来共同学习。个人觉得QL的语法比较反人类&#xff0c;至少与目前主流的这些OOP语言相比&…...

代码随想录训练营Day11 | 226.翻转二叉树 - 101. 对称二叉树 - 104.二叉树的最大深度 - 111.二叉树的最小深度

226.翻转二叉树 题目链接&#xff1a;226.翻转二叉树思路&#xff1a;遍历二叉树&#xff0c;遍历的时候交换左右节点即可代码&#xff1a; TreeNode* invertTree(TreeNode* root) {reverse(root);return root;}// 迭代法&#xff0c;层序遍历void f2(TreeNode* root) {queue…...

“死鱼眼”,不存在的,一个提词小技巧,拯救的眼神——将内容说给用户,而非读给用户!

视频录制时&#xff0c;死鱼眼问题常见 即便内容再好&#xff0c;眼神死板也会减分 痛点真痛&#xff1a;拍视频时容易紧张 面对镜头&#xff0c;许多人难免紧张 神情僵硬&#xff0c;眼神无光&#xff0c;甚至忘词 这不仅影响表现&#xff0c;还让人难以专注 忘我场景&#x…...

深度学习在复杂系统中的应用

引言 复杂系统由多个相互作用的组成部分构成&#xff0c;这些部分之间的关系往往是非线性的&#xff0c;整体行为难以通过简单的线性组合来预测。这类系统广泛存在于生态学、气象学、经济学和社会科学等多个领域&#xff0c;具有动态演变、自组织、涌现现象以及多尺度与异质性…...

vue3图片懒加载

背景 界面很长&#xff0c;屏幕不能一下装下所有内容&#xff0c;如果以进入首页就把所有内容都加载完的话所需时间较长&#xff0c;会影响用户体验&#xff0c;所以可以当用户浏览到时再去加载。 代码 新建index.ts文件 src下新建directives文件夹&#xff0c;并新建Index…...

总结一些高级的SQL技巧

1. 窗口函数 窗函数允许在查询结果的每一行上进行计算&#xff0c;而不需要将数据分组。这使得我们可以计算累积总和、排名等。 SELECT employee_id,salary,RANK() OVER (ORDER BY salary DESC) AS salary_rank FROM employees;2. 公用表表达式 (CTE) CTE 提供了一种更清晰的…...

无人机飞手考证热,装调检修技术详解

随着无人机技术的飞速发展和广泛应用&#xff0c;无人机飞手考证热正在持续升温。无人机飞手不仅需要掌握飞行技能&#xff0c;还需要具备装调检修技术&#xff0c;以确保无人机的安全、稳定和高效运行。以下是对无人机飞手考证及装调检修技术的详细解析&#xff1a; 一、无人机…...

AI资讯快报(2024.10.27-11.01)

1.<国家超级计算济南中心发布系列大模型> 10月28日&#xff0c;以“人才引领创新 开放赋能发展”为主题的第三届山东人才创新发展大会暨第十三届“海洽会”集中展示大会在山东济南举行。本次大会发布了国家超级计算济南中心大模型&#xff0c;包括“智匠工业大模型、知风…...

范式的简单理解

第二范式 消除非键属性对键的部分依赖 第三范式 消除一个非键属性对另一个非键属性的依赖 表中的每个非键属性都应该依赖于键&#xff0c;整个键&#xff0c;而且只有键&#xff08;键可能为两个属性&#xff09; 第四范式 多值依赖于主键...

活着就好20241103

&#x1f31e; 早晨问候&#xff1a;亲爱的朋友们&#xff0c;大家早上好&#xff01;今天是2024年11月3日&#xff0c;第44周的第七天&#xff0c;也是本周的最后一天&#xff0c;农历甲辰[龙]年十月初三。在这金秋十一月的第三天&#xff0c;愿清晨的第一缕阳光如同活力的源泉…...

《华为工作法》读书摘记

无论做什么事情&#xff0c;首先要明确的就是做事的目标。目标是引导行动的关键&#xff0c;也是证明行动所具备的价值的前提&#xff0c;所以目标管理成了企业与个人管理的重要组成部分。 很多时候&#xff0c;勤奋、努力并不意味着就一定能把工作做好&#xff0c;也并不意味…...

【Unity基础】初识UI Toolkit - 运行时UI

Unity中的UI工具包&#xff08;UI Toolkit&#xff09;不但可以用于创建编辑器UI&#xff0c;同样可以来创建运行时UI。 关于Unity中的UI系统以及使用UI工具包创建编辑器UI可以参见&#xff1a; 1. Unity中的UI系统 2. 初识UI Toolkit - 编辑器UI 本文将通过一个简单示例来…...

20.体育馆使用预约系统(基于springboot和vue的Java项目)

目录 1.系统的受众说明 2.开发环境与技术 2.1 Java语言 2.2 MYSQL数据库 2.3 IDEA开发工具 2.4 Spring Boot框架 3.需求分析 3.1 可行性分析 3.1.1 技术可行性 3.1.2 经济可行性 3.1.3 操作可行性 3.2 系统流程分析 3.3 系统性能需求 3.4 系统功能需求 4.系…...

unity3d————三角函数练习题

先上代码&#xff1a; public class SinCos : MonoBehaviour {public float moveSpeed 10f; //前进的速度public float changValue 5f; //左右的速度public float changeSize 5f; //左右的幅度float time 0;void Update(){this.transform.Translate(Vector3.forwa…...

如何在Linux系统中使用Git进行版本控制

如何在Linux系统中使用Git进行版本控制 Git简介 安装Git 在Debian/Ubuntu系统中安装 在CentOS/RHEL系统中安装 初始化Git仓库 配置全局用户信息 基本的Git命令 添加文件到暂存区 查看状态 提交更改 查看提交历史 工作流 分支管理 切换分支 合并分支 远程仓库 添加远程仓库 推…...

Ubuntu编译linux内核指南(适用阿里云、腾讯云等远程服务器;包括添加Android支持)

在 Ubuntu 上编译内核的步骤如下: 1、安装必要的依赖包: 这里和你chatgpt的略有不同 sudo apt-get update sudo apt-get install build-essential libncurses-dev bison flex libssl-dev libelf-dev dwarves 后续如果遇到“FAILED: load BTF from vmlinux: Invalid argum…...

[MySQL]DQL语句(一)

查询语句是数据库操作中最为重要的一系列语法。查询关键字有 select、where、group、having、order by、imit。其中imit是MySQL的方言&#xff0c;只在MySQL适用。 数据库查询又分单表查询和多表查询&#xff0c;这里讲一下单表查询。 基础查询 # 查询指定列 SELECT * FROM …...

GPT原理;ChatGPT 等类似的问答系统工作流程如下;当用户向 ChatGPT 输入一个问题后:举例说明;ChatGPT不是通过索引搜索的传统知识库

目录 GPT原理 GPT架构 GPT 主要基于 Transformer 的解码器部分 ChatGPT 等类似的问答系统工作流程如下: 用户输入 文本预处理 模型处理 答案生成 输出回答 当用户向 ChatGPT 输入一个问题后:举例说明 文本预处理: ChatGPT不是通过索引搜索的传统知识库 GPT GPT…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)

目录 1.TCP的连接管理机制&#xff08;1&#xff09;三次握手①握手过程②对握手过程的理解 &#xff08;2&#xff09;四次挥手&#xff08;3&#xff09;握手和挥手的触发&#xff08;4&#xff09;状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

LeetCode - 394. 字符串解码

题目 394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; 思路 使用两个栈&#xff1a;一个存储重复次数&#xff0c;一个存储字符串 遍历输入字符串&#xff1a; 数字处理&#xff1a;遇到数字时&#xff0c;累积计算重复次数左括号处理&#xff1a;保存当前状态&a…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

相机从app启动流程

一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建

【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...