当前位置: 首页 > news >正文

Kafka基本原理及使用

目录

基本概念

单机版

环境准备

基本命令使用

集群版

消息模型

成员组成

1. Topic(主题):

2. Partition(分区):

3. Producer(生产者):

4. Consumer(消费者):

5. Broker(代理服务器):

6. Zookeeper:

 成员关系


基本概念

Kafka 是一个分布式流处理平台,主要用于实时处理和传输大规模数据流。

基本MQ功能:

  • 异步
  • 削峰
  • 解耦

与RocketMq对比:

  • 高吞吐量和低延迟
  • 流式处理
  • 生态环境更好

适合业务场景:

  1. 日志聚合: Kafka 作为一个分布式消息传递系统,非常适合用于收集和存储系统和应用程序产生的大量日志数据。它提供了持久性存储和高吞吐量的写入,是构建日志聚合系统的理想选择。

  2. 实时数据处理: Kafka 可以与流处理框架(如 Apache Flink、Apache Storm、Spark Streaming)集成,用于实时处理和分析数据流。这使得 Kafka 在需要实时数据处理、计算和分析的场景中非常有用。

  3. 事件溯源: 对于需要记录系统每个状态变化的场景,例如金融交易、订单处理等,Kafka 支持事件溯源,帮助构建可追溯、可审计的系统。

  4. 消息队列: Kafka 作为分布式消息队列,可用于解耦生产者和消费者之间的通信。这在微服务架构中尤为重要,帮助构建松耦合的系统。

  5. 数据集成: Kafka 提供 Kafka Connect,一个用于数据集成的工具,用于连接 Kafka 与其他数据存储系统,支持构建端到端的数据流管道。

  6. 大数据管道: Kafka 可以作为大数据管道的核心组件,用于连接和传递大规模数据集,以支持数据湖、数据仓库等大数据处理场景。

单机版

环境准备

1. 从官网下载kafka, 这里选择3.4.0版本,官网:Apache Kafka

2. 解压压缩包

tar -zxvf kafka_2.13-3.4.0.tgz

3. 启动自带的zookeeper, jps检查是否启动成功

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

4. 修改kafka配置文件config/server.properties, 允许外网客户端连接

5. 启动kafka

nohup bin/kafka-server-start.sh config/server.properties &

 6. jps检查是否启动成功

基本命令使用

1. 创建topic

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

2. 查看topic

bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092

3. 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4. 消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

5. 从起点开始消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

6. 从指定地方消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

 7. 分组消费消息

示例:创建三个消费者A,B,C, 其中A和B属于testGrroup消费者组, C属于testGrroup2消费者组

#开一个终端1, 配置消费者组testGrroup 
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test#开一个终端2, 配置消费者组testGrroup 
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test#开一个终端3, 配置消费者组testGrroup2
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup2 --topic test#结果: 终端1和2会竞争消息, 一条只会被其中一个实例消费; 终端3独享消费群组, 每条消息能消费

集群版

==

消息模型

成员组成

1. Topic(主题):

  • 消息在 Kafka 中通过主题进行分类,每个主题都是一个消息的类别。
  • 生产者将消息发布到一个或多个主题,消费者订阅一个或多个主题。
  • 主题在 Kafka 集群中的分区上进行分布,每个分区可以看作是一个有序的日志文件。

2. Partition(分区):

  • Topic只是一个逻辑概念,而Partition就是实际存储消息的组件。每个Partiton就是一个queue队列结构。所有消息以FIFO先进先出的顺序保存在这些Partition分区中。
  • 每个主题可以划分为一个或多个分区,分区是 Kafka 消息的基本存储单元。
  • 分区允许水平扩展和并行处理,提高了整个系统的吞吐量。
  • 分区内的消息有序存储,保证了分区内的顺序性。

3. Producer(生产者):

  • 生产者负责将消息发布到指定的主题。
  • 生产者可以指定消息的键(key),Kafka 根据键将消息发送到特定的分区。
  • 生产者将消息发送到分区的 Leader 副本,并可以等待确认或异步发送。

4. Consumer(消费者):

  • 消费者订阅一个或多个主题,从中获取消息。
  • 消费者可以以消费者组(Consumer Group)的形式进行组织,每个组内的消费者共享订阅的主题的消息。
  • 每个分区只能由同一消费者组内的一个消费者进行消费,确保了消息在消费时的顺序性。

5. Broker(代理服务器):

  • Broker 是 Kafka 集群中的节点,负责存储和处理消息。
  • 每个分区在集群中有多个副本,其中一个是 Leader 副本,其余是 Follower 副本。Leader 负责处理读写请求,Follower 复制 Leader 的数据。

6. Zookeeper:

  • Kafka 使用 ZooKeeper 来进行集群管理和协调。
  • ZooKeeper 管理 Kafka 集群的节点、分区的分配,以及监视 Broker 的健康状态。

 成员关系

  • Topic 和 Partition:

    • 一个 Topic 包含一个或多个 Partition。
    • 每个 Partition 中的消息是有序的,可以保证 Partition 内的消息顺序性。
    • Partition 的数量和分布影响了 Kafka 集群的并发处理能力和水平扩展性。
  • Partition 和 Broker:

    • Partition 在 Kafka 集群中分布在多个 Broker 上,以实现水平扩展。
    • 每个 Partition 在任意时刻只有一个 Broker 的副本是 Leader,其余的是 Follower。
    • Leader 负责处理读写请求,Follower 负责复制 Leader 的数据,以实现高可用性和容错性。
  • Topic 和 Broker:

    • 一个 Topic 的多个 Partition 可以分布在多个 Broker 上。
    • Topic 的所有 Partition 的所有副本的集合构成了整个 Kafka 集群的数据。

相关文章:

Kafka基本原理及使用

目录 基本概念 单机版 环境准备 基本命令使用 集群版 消息模型 成员组成 1. Topic(主题): 2. Partition(分区): 3. Producer(生产者): 4. Consumer(…...

使用Python爬取GooglePlay并从复杂的自定义数据结构中实现解析

文章目录 【作者主页】:吴秋霖 【作者介绍】:Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作! 【作者推荐】:对JS逆向感兴趣的朋友可以关注《爬虫JS逆向实战》,对分布…...

前后端分离下的鸿鹄电子招投标系统:使用Spring Boot、Mybatis、Redis和Layui实现源码与立项流程

在数字化时代,采购管理也正经历着前所未有的变革。全过程数字化采购管理成为了企业追求高效、透明和规范的关键。该系统通过Spring Cloud、Spring Boot2、Mybatis等先进技术,打造了从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通过…...

ChatGPT 有什么新奇的使用方式?

来看看 OpenAI 内部是如何使用 ChatGPT 的。 目前(4月29日)距离ChatGPT发布了已经半年,这期间大家基本上把能想到的ChatGPT的使用方法都研究遍了——从写作、写代码,到翻译、英语润色,再到角色扮演等等。 所以&#x…...

【计算机四级(网络工程师)笔记】操作系统概论

目录 一、OS的概念 1.1OS的定义 1.2OS的特征 1.2.1并发性 1.2.2共享性 1.2.3随机性 1.3研究OS的观点 1.3.1软件的观点 1.3.2资源管理器的观点 1.3.3进程的观点 1.3.4虚拟机的观点 1.3.5服务提供者的观点 二、OS的分类 2.1批处理操作系统 2.2分时操作系统 2.3实时操作系统 2.4嵌…...

LeetCode算法练习top100:(10)贪心算法

package top100.贪心算法;import java.util.ArrayList; import java.util.List;public class TOP {//121. 买卖股票的最佳时机public int maxProfit(int[] prices) {int res 0, min prices[0];for (int i 1; i < prices.length; i) {if (prices[i] < min) {min price…...

随记-探究 OpenApi 的加密方式

open api 主要参数如下 appKey 接口Key&#xff08;app id&#xff09;appSecret 接口密钥timeStamp 时间戳 毫秒nonceStr 随机字符串signature 加密字符串 客户端 使用 appSecret 按照一定规则将 appKey timeStamp nonceStr 进行加密&#xff0c;得到密文 signature将 appK…...

stm32学习总结:4、Proteus8+STM32CubeMX+MDK仿真串口收发

stm32学习总结&#xff1a;4、Proteus8STM32CubeMXMDK仿真串口收发 文章目录 stm32学习总结&#xff1a;4、Proteus8STM32CubeMXMDK仿真串口收发一、前言二、资料收集三、STM32CubeMX配置串口1、配置开启USART12、设置usart中断优先级3、配置外设独立生成.c和.h 四、MDK串口收发…...

配置paddleocr及paddlepaddle解决报错 GLIBCXX_3.4.30 FreeTypeFont

配置 https://github.com/PaddlePaddle/PaddleOCR/blob/release/2.7/StyleText/README_ch.md#style-text 环境配置 https://www.paddlepaddle.org.cn/ 根据自己的cuda版本选择paddlepaddle-gpu # 新建conda环境 # python version conda create -n paddle python3.8 # 安装p…...

【实战】如何在Docker Image中轻松运行MySQL

定义 使用Docker运行MySQL有许多优势。它允许数据库程序和数据分离&#xff0c;增强了数据的安全性和可靠性。Docker Image的轻便性简化了MySQL的部署和迁移&#xff0c;而Docker的资源隔离功能确保了应用程序之间无冲突。结合中间件和容器化系统&#xff0c;Docker为MySQL提供…...

PLC物联网,实现工厂设备数据采集

随着工业4.0时代的到来&#xff0c;物联网技术在工厂设备管理领域的应用日益普及。作为物联网技术的重要一环&#xff0c;PLC物联网为工厂设备数据采集带来了前所未有的便捷和高效。本文将围绕“PLC物联网&#xff0c;实现工厂设备数据采集”这一主题&#xff0c;探讨PLC物联网…...

npm安装依赖报错ERESOLVE unable to resolve dependency tree(我是在taro项目中)(node、npm 版本问题)

换了电脑之后新电脑安装包出错 &#x1f447;&#x1f447;&#x1f447; npm install 安装包报错 ERESOLVE unable to resolve dependency tree 百度后尝试使用 npm install --force 还是报错 参考 有人说是 node 版本和 npm 版本的问题 参考 新电脑 node版本&#xff1a;16.1…...

Maven仓库上传jar和mvn命令汇总

目录 导入远程仓库 命令结构 命令解释 项目pom 输入执行 本地仓库导入 命令格式 命令解释 Maven命令汇总 mvn 参数 mvn常用命令 web项目相关命令 导入远程仓库 命令结构 mvn deploy:deploy-file -Dfilejar包完整名称 -DgroupIdpom文件中引用的groupId名 -Dartifa…...

Jenkins 执行远程脚本的插件—SSH2 Easy

SSH2 Easy 是什么&#xff1f; SSH2 Easy 是一个 Jenkins 插件&#xff0c;它用于在 Jenkins 构建过程中通过 SSH2 协议与远程服务器进行交互。通过该插件&#xff0c;用户可以在 Jenkins 的构建过程中执行远程命令、上传或下载文件、管理远程服务器等操作。 以下是 SSH2 Eas…...

Starting the Docker Engine...一直转圈

出现的问题&#xff1a; 原因排查&#xff1a; 看了网上的很多篇文章&#xff0c;每个原因都排查了&#xff0c;没有发现问题。 遇到这样的情况应先看自己是否安装成功 打开运行&#xff0c;在空框中输入powershell并点击确定&#xff1a; docker version 显示版本证明安装…...

关于Python里xlwings库对Excel表格的操作(十五)

这篇小笔记主要记录如何【获取单元格数据的对齐方式或更改单元格数据的对齐方式】。 前面的小笔记已整理成目录&#xff0c;可点链接去目录寻找所需更方便。 【目录部分内容如下】【点击此处可进入目录】 &#xff08;1&#xff09;如何安装导入xlwings库&#xff1b; &#xf…...

[Linux] LVS+Keepalived高可用集群部署

一、Keepalived实现原理 1.1 高可用方案 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题。 在一个LVS服务集群中通常有主服务器&#xff08;MASTER&#xff09;和备份服务器&#xff08;BACKUP&#xff09;两种角色…...

【版本管理】git stash用法

应用场景 我们在开发过程中可能会遇到这样的情况&#xff1a; 想从A分支切换到B分支&#xff0c;但A分支尚未改完&#xff0c;暂时不想提交代码 此时可以在切换到B分支前&#xff0c;先通过stash指令来缓存本地改动&#xff0c;等切回A分支时&#xff0c;再通过stash还原改动…...

声明式的理解【gpt】

一 MyBatis采用了声明式语法来进行SQL映射配置【mybatis声明式】 MyBatis是一款优秀的持久层框架&#xff0c;支持自定义SQL、存储过程以及高级映射&#xff0c;使得开发人员能够专注于SQL本身而不是数据库访问。MyBatis提供了两种配置方式&#xff1a;XML配置和注解配置&…...

提高Spring Boot技能的9种方法

以下是提高 Spring Boot 技能的 9 种方法&#xff1a; 1. 外部化您的配置&#xff1a; 充分利用 Spring Boot 潜力的另一种方法是尽可能地尝试外部化您的配置&#xff0c;而不是对其进行硬编码。外部化您的配置将使您的应用程序更加灵活且更易于管理。 外部化配置的另一个优点…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

遍历 Map 类型集合的方法汇总

1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

srs linux

下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935&#xff0c;SRS管理页面端口是8080&#xff0c;可…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

Element Plus 表单(el-form)中关于正整数输入的校验规则

目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入&#xff08;联动&#xff09;2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

rnn判断string中第一次出现a的下标

# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

【SpringBoot自动化部署】

SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一&#xff0c;能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时&#xff0c;需要添加Git仓库地址和凭证&#xff0c;设置构建触发器&#xff08;如GitHub…...

2.3 物理层设备

在这个视频中&#xff0c;我们要学习工作在物理层的两种网络设备&#xff0c;分别是中继器和集线器。首先来看中继器。在计算机网络中两个节点之间&#xff0c;需要通过物理传输媒体或者说物理传输介质进行连接。像同轴电缆、双绞线就是典型的传输介质&#xff0c;假设A节点要给…...