当前位置: 首页 > news >正文

kafka使用心得(一)

kafka入门

一种分布式的、基于发布/订阅的消息系统,scala编写,具备快速、可扩展、可持久化的特点。

基本概念

topic
主题

partition
分区,一个topic下可以有多个partition,消息是分散到多个partition里存储的,partition支持水平扩展。
一个partition内的消息是有序的,partition间的消息则是无序的
每个partition会有若干副本。

broker
一个kafka节点

consumer
消费者,从topic里取得消息。
每个consumer维护自己的offset。
consumer数量小于分区数,会有一个消费者处理多个分区;反之,会有空闲的消费者,造成浪费。

producer
生产者,负责将消息写入topic

特点

基于硬盘的消息保存,避免在producer上累积消息或者消息丢失。
同一个消息可以由多个consumer消费。

可扩展性:随着数据的增加,可扩展为数十台,上百台规模的大集群。扩展可以在集群正常运行的时候进行,对于整个系统的运作没有影响;这也就意味着,对于很多台broker 的集群,如果一台broker 有故障,不影响为client 提供服务.集群如果要同时容忍更多的故障的话, 可以配置更高的replication
factors。

高性能:上面的这些特性使得Apache Kafka 成为一个能够在高负载的情况下表现出优越性能的发布-订阅消息系统。Producer, consumer 和broker 都能在大数据流的情况下轻松的扩展.

kafka的版本

比如kafka_2.10-0.10.2.0
这里,2.10指的是编译kafka的scala版本,真正的kafka版本号是后面的:0.10.2.0

配置kafka

config/server.properties文件里的配置项说明:
broker.id
每个kafka 的broker 都需要有一个整型的唯一标识,这个标识通过broker.id 来设置。默认的情况下,这个数字是0,但是它可以设置成任何值。需要注意的
是,需要保证集群中这个id 是唯一的。这个值是可以任意填写的,并且可以在必要的时候从broker 集群中删除。比较好的做法是使用主机名相关的标识来做
为id,比如,你的主机名当中有数字相关的信息,如hosts1.example.com,host2.example.com,那么这个数字就可以用来作为broker.id 的值。

port
默认启动kafka 时,监听的是TCP 的9092 端口,端口号可以被任意修改。如果端口号设置为小于1024,那么kafka 需要以root 身份启动。但是并不推荐以root 身份启动。

zookeeper.connect
这个参数指定了Zookeeper 所在的地址,它存储了broker 的元信息。默认是运行在本机的2181 端口上,因此这个值被设置成
localhost:2181。这个值可以通过分号设置多个值,每个值的格式都是hostname:port/path

log.dirs
这个参数用于配置Kafka 保存数据的位置,Kafka 中所有的消息都会存在这个目录下。可以通过逗号来指定多个目录,kafka 会根据最少被使用的原则选择目录分配新的partition。注意kafka 在分配partition 的时候选择的规则不是按照磁盘的空间大小来定的,而是分配的parition 的个数大小。

num.recovery.thread.per.data.dir
kafka 可以配置一个线程池,线程池的使用场景如下:

  • 当正常启动的时候,开启每个parition 的文档块
  • 当失败后重启时,检查parition 的文档块
  • 当关闭kafka 的时候,清除关闭文档块
    默认,每个目录只有一个线程。最好是设置多个线程数,这样在服务器启动或者关闭的时候,都可以并行的进行操作。尤其是当非正常停机后,重启时,如果有大量的分区数,那么启动broker 将会花费大量的时间。
    【注意】这个参数是针对每个目录的。比如,num.recovery.threads.per.data.dir 设置为8,如果有3个log.dirs 路径,那么一共会有24 个线程。

num.partitions
这个参数用于配置新创建的topic 有多少个分区,默认是1 个。注意partition 的个数只可以被增加,不能被减少。这就意味着如果想要减少主题的分区数,那
么就需要重新创建topic

在第一章中介绍过,kafka 通过分区来对topic 进行扩展,因此需要使用分区的个数来做负载均衡,如果新增了broker,那么就会引发重新负载分配。这并不意味着所有的主题的分区数都需要大于broker 的数量,因为kafka 是支持多个主题的,其他的主题会使用其余的broker。需要注意的是,如果消息的吞吐量很高,那么可以通过设置一个比较大的分区数,来分摊压力。

log.retention.ms
这个参数用于配置kafka 中消息保存的时间,也可以使用log.retention.hours,默认这个参数是168 个小时,即一周。另外,还支持log.retention.minutes 和log.retention.ms。这三个参数都会控制删除过期数据的时间,推荐还是使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。

log.retention.bytes
这个参数也是用来配置消息过期的,它会应用到每个分区,比如,你有一个主题,有8 个分区,并且设置了log.retention.bytes 为1G,那么这个主题总共可以保留8G 的数据。注意,所有的过期配置都会应用到patition 粒度,而不是主题粒度。这也意味着,如果增加了主题的分区数,那么主题所能保留的数据也就随之增加了。
如果设置了log.retention.bytes 和log.retention.ms(或者其他过期时间的配置),只要满足其中一个条件,消息就会被删除。

log.segment.bytes
这个参数用来控制log 段文件的大小,而不是消息的大小。在kafka 中,所有的消息都会进入broker,然后以追加的方式追加到分区当前最新的segment 段文件中。一旦这个段文件到达log.segment.bytes 设置的大小,比如默认的1G,这个段文件就会被关闭,然后创建一个新的。一旦这个文件被关闭,就可以理解成这个文件已经过期了。这个参数设置的越小,那么关闭文件创建文件的操作就会越频繁,这样也会造成大量的磁盘读写的开销。
通过生产者发送过来的消息的情况可以判断这个值的大小。比如,主题每天接收100M 的消息,并且log.segment.bytes 为默认设置,那么10 天后,这个段文件才会被填满。由于段文件在没有关闭的时候,是不能删除的,log.retention.ms 又是默认的设置,那么这个消息将会在17 天后,才过期删除。因为10 天后,段文件才关闭。再过7 天,这个文件才算真正过期,才能被清除。

message.max.bytes
这个参数用于限制生产者消息的大小,默认是1000000,也就是1M。生产者在发送消息给broker 的时候,如果出错,会尝试重发;但是如果是因为大小的原因,那生产者是不会重发的。另外,broker上的消息可以进行压缩,这个参数是指压缩后的大小,这样能多存储很多消息。
需要注意的是,允许发送更大的消息会对性能有很大影响。更大的消息,就意味着broker 在处理网络连接的时候需要更长的时间,它也会增加磁盘的写操作压力,影响IO 吞吐量。

启动kafka

进入kafka/bin目录。

启动zk

nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &

测试zk是否启动

telnet localhost 2181
输入srvr,应该会返回:

[2017-12-01 15:59:18,829] INFO Processing srvr command from /0:0:0:0:0:0:0:1:40194 (org.apache.zookeeper.server.NIOServerCnxn)
Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0xdf
Mode: standalone
Node count: 127
[2017-12-01 15:59:18,833] INFO Closed socket connection for client /0:0:0:0:0:0:0:1:40194 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
Connection closed by foreign host.

不建议zookeeper运行在多于7个的节点上,因为集群性能会因一致性的特征而降低。

启动kafka

nohup  ./kafka-server-start.sh -daemon ../config/server.properties

以守护进程方式执行。

topic管理命令

创建具有指定数量分区(或复制因子)的topic
./kafka-topics.sh --create --zookeeper xx.xx.xx.xx:2181 --topic test1 --replication-factor 1 --partitions 6

查看topic的元信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1

查看所有topic
./kafka-topics.sh --zookeeper localhost:2181 --list

修改已有topic的分区

./kafka-topics.sh --alter --zookeeper localhost:2181 --topic test1 --partitions 4./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.ms=5000./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.messages=20000

删除topic

./kafka-topics.sh --delete --zookeeper xx.xx.xx.xx:2181 --topic test1

删除topic并不是真的删除,只是打个标记,要真正删除,需要同时修改server.properties:

delete.topic.enable=true

并重启kafka才能生效。

向topic发布消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
输入:
msg1
msg2
按^D停止发送。

若kafka的server.properties配置了host.name,则localhost必须改成host.name的值,例如:
./kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic test1

查看topic里的消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
输出:
msg1
msg2
按^C停止接收。

相关文章:

kafka使用心得(一)

kafka入门 一种分布式的、基于发布/订阅的消息系统,scala编写,具备快速、可扩展、可持久化的特点。 基本概念 topic 主题 partition 分区,一个topic下可以有多个partition,消息是分散到多个partition里存储的,part…...

Django图书商城系统实战开发-实现商品管理

Django图书商城系统实战开发 - 实现商品管理 在本教程中,我们将使用Django框架来实现一个简单的图书商城系统,并重点讨论如何实现商品管理功能。此外,我们还将介绍如何使用Markdown格式来写博客,并将其集成到我们的图书商城系统中…...

走出象牙塔:李郓梁的区块链实践之路丨对话MVP

如何从科研走向实践?李郓梁在社区找到了答案。 作为西安工业大学的硕士研究生,李郓梁从学校的实验室接触区块链技术。通过研读大量论文,李郓梁为区块链多中心化、不可篡改等前沿理论深深着迷,并选择将区块链作为主要研究方向&…...

【hive】hive分桶表的学习

hive分桶表的学习 前言: 每一个表或者分区,hive都可以进一步组织成桶,桶是更细粒度的数据划分,他本质不会改变表或分区的目录组织方式,他会改变数据在文件中的分布方式。 分桶规则: 对分桶字段值进行哈…...

ReactDOM模块react-dom/client没有默认导出报错解决办法

import ReactDOM 模块“"E:/Dpandata/Shbank/rt-pro/node_modules/.pnpm/registry.npmmirror.comtypesreact-dom18.2.7/node_modules/types/react-dom/client"”没有默认导出。 解决办法 只需要在tsconfig.json里面添加配置 "esModuleInterop": true 即…...

TiDB数据库的安装配置

一、 TiDB 软件和硬件环境建议配置 Linux 操作系统版本要求 Linux 操作系统 版本 Red Hat Enterprise Linux 7.3 及以上的 7.x 版本 CentOS 7.3 及以上的 7.x 版本 Oracle Enterprise Linux 7.3 及以上的 7.x 版本 Amazon Linux 2 Ubuntu LTS 16.04 及以上的版本 …...

Unity智慧园区夜景制作

近期使用Unity做了一个智慧园区场景的demo,初步了解了3D开发的一些步骤和知识,以下为制作的步骤,比较简略,备忘: 1. 制作前的设计分析: 1. 分析日光角度,阴影长度,效果 2. 分析冷暖…...

Linux MQTT智能家居项目(LED界面的布局设置)

文章目录 前言一、LED界面布局准备工作二、LED界面布局三、逻辑实现总结 前言 上篇文章我们完成了主界面的布局设置那么这篇文章我们就来完成各个界面的布局设置吧。 一、LED界面布局准备工作 首先添加LED灯光控制的图标。 将选择好的LED图标添加进来: 图标可以…...

LeetCode 160.相交链表

文章目录 💡题目分析💡解题思路🚩步骤一:找尾节点🚩步骤二:判断尾节点是否相等🚩步骤三:找交点🍄思路1🍄思路2 🔔接口源码 题目链接👉…...

【深度学习_TensorFlow】调用keras高层API重写手写数字识别项目

写在前面 上一阶段我们完成了手写数字识别项目的构建,了解了网络构建、训练、测试的基本流程,但是对于一些常见的操作,因其使用过于频繁,实际上并无必要手动实现,而早已被封装为函数了。 这篇文章我们将了解keras高层…...

柔性数组(C语言)

也许你从来没有听说过柔性数组( flexible array )这个概念,但是它确实是存在的。 C99 中,结构中的最后一个元素允许是未知大小的数组,这就叫做柔性数组成员,但结 构中的柔性数组成员前面必须至少一个其他…...

判断推理 -- 图形推理 -- 属性规律

中心对称:取一个点,穿过中心能找到另一个对称点。把轴对称 中心对称标出来。五角星不是中心对称。 BD对称轴方向相同,但135自带对称轴,24没带,所以6应该不带对称轴。 百分号不是轴对称。 白色对称轴 平行 或者 夹角…...

【注解使用】使用@Autowired后提示:Field injection is not recommended(Spring团队不推荐使用Field注入)

问题发生场景: 在使用 IDEA 开发 SpringBoot 项目时,在 Controller 类中使用注解 Autowired 注入一个依赖出现了警告提示,查看其他使用该注解的地方同样出现了警告提示。这是怎么回事?由于先去使用了SpringBoot并没有对Spring进行…...

Rust语法: 枚举,泛型,trait

这是我学习Rust的笔记,本文适合于有一定高级语言基础的开发者看不适合刚入门编程的人,对于一些概念像枚举,泛型等,不会再做解释,只写在Rust中怎么用。 文章目录 枚举枚举的定义与赋值枚举绑定方法和函数match匹配枚举…...

hivesql-dayofweek 函数

返回日期或时间戳的星期几。 此函数是 extract(DAYOFWEEK FROM expr) 的同义函数。 语法 dayofweek(expr) 参数 expr:一个 DATE 或 TIMESTAMP 表达式。 返回 一个 INTEGER,其中 1 Sunday 和 7 Saturday。 示例 > SELECT dayofweek(2009-07-30)…...

DIP:《Deep Image Prior》经典文献阅读总结与实现

文章目录 Deep Image Prior1. 方法原理1.1 研究动机1.2 方法 2. 实验验证2.1 去噪2.2 超分辨率2.3 图像修复2.4 消融实验 3. 总结 Deep Image Prior 1. 方法原理 1.1 研究动机 动机 深度神经网络在图像复原和生成领域有非常好的表现一般归功于神经网络学习到了图像的先验信息…...

LAXCUS如何通过技术创新管理数千台服务器

随着互联网技术的不断发展,服务器已经成为企业和个人获取信息、进行计算和存储的重要工具。然而,随着服务器数量的不断增加,传统的服务器管理和运维方式已经无法满足现代企业的需求。LAXCUS做为专注服务器集群的【数存算管】一体化平台&#…...

【Java】BF算法(串模式匹配算法)

☀️ 什么是BF算法 BF算法,即暴力算法,是普通的模式匹配算法,BF算法的思想就是将目标串S的第一个与模式串T的第一个字符串进行匹配,若相等,则继续比较S的第二个字符和T的第二个字符;若不相等,则…...

Vue:使用Promise.all()方法并行执行多个请求

在Vue中,可以使用Promise.all()方法来并行执行多个请求。当需要同时执行多个异步请求时,可以将这些请求封装为Promise对象并使用Promise.all()方法来执行它们。 示例1: 以下是一个示例代码,展示了如何通过Promise.all()方法并行…...

21.0 CSS 介绍

1. CSS层叠样式表 1.1 CSS简介 CSS(层叠样式表): 是一种用于描述网页上元素外观和布局的样式标记语言. 它可以与HTML结合使用, 通过为HTML元素添加样式来改变其外观. CSS使用选择器来选择需要应用样式的元素, 并使用属性-值对来定义这些样式.1.2 CSS版本 CSS有多个版本, 每个…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求&#xff…...

iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈

在日常iOS开发过程中,性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期,开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发,但背后往往隐藏着系统资源调度不当…...

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)

题目 做法 启动靶机,点进去 点进去 查看URL,有 ?fileflag.php说明存在文件包含,原理是php://filter 协议 当它与包含函数结合时,php://filter流会被当作php文件执行。 用php://filter加编码,能让PHP把文件内容…...

uniapp 实现腾讯云IM群文件上传下载功能

UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中,群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS,在uniapp中实现: 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...