kafka详解(三)
2.2 Kafka命令行操作

2.2.1 主题命令行操作
1)查看操作主题命令参数
[aa kafka]$ bin/kafka-topics.sh

2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/)
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3)创建first topic
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 3 --replication-factor 3
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
4)查看first主题的详情
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first TopicId: 3pIfoppvRmq84FjACWzAgw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 103,102,104Topic: first Partition: 2 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103
[aa ~]$
5)修改分区数( 注意:分区数只能增加,不能减少,如果减少会报错!)
[a kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 4
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first TopicId: 3pIfoppvRmq84FjACWzAgw PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 103,102,104Topic: first Partition: 2 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103Topic: first Partition: 3 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 2
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 2.
[2023-09-13 19:22:16,891] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.(kafka.admin.TopicCommand$)
[aa ~]$
6)再次查看first主题的详情
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
7)删除topic
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --delete
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list[aa ~]$
2.2.2 生产者命令行操作
1)查看操作生产者命令参数
[aa kafka]$ bin/kafka-console-producer.sh

2)发送消息
[aa kafka]$ kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>111
>222
>333
>
2.2.3 消费者命令行操作
[aa kafka]$ bin/kafka-console-consumer.sh

2)消费消息
[aa kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group test --from-beginning
111
222
333
还可以动态的生产和消费,比如102机器上输入
>444
103机器就会自动在结尾弹出
111
222
333
444
Kafka生产者
生产者消息发送流程
3.1.1 发送原理
Kafka的producer发送消息采用的是异步发送的方式
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程(两个线程是异步!),以及一个线程共享变量:RecordAccumulator。
- 在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator。
- Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

main线程将外部数据包装成kafka要求的格式ProducerRecord,类似于Flume中的Event.网络中进行数据传输都会序列化(kryo框架)。
分区策略:涉及到生产者和消费者。生产者分区针对的是数据,消费者分区针对是分区怎么消费数据
。RecordAccumulator(一种堆内存缓冲区)达到两种标准之后就唤醒Sender进行发送!bachsize(同一个队列中,两个时间非常紧密的数据可以形成一个bachsize)一般就是数据洪峰的时候;linger.ms就是在数据量非常小的时候;默认值0代表来一条发一条;
sender发送也是异步发送,sender将RecordAccumulator中的数据包装成Request(一个批次包装成一个Request),sender发送Request1之后不等待响应就发送Request2,然后不等待响应就发送Request2,…Request5,Request6必须排队了。
sender发送过去的数据在Leader中应该是先存在线程对应的内存中,还没等到磁盘中存储数据落盘的一个时间点决定是不是回复ack为0,此时就是不安全,时延最低!。为1的时候就是数据落盘之后再发送ack,此时数据安全性有所提高,稍慢!注意此时的fllower还没有数据!完全保证数据安全,Leader和follwer都罗盘,回复-1
发送成功:清理网络客户端请求Request
线程共享变量中RecordAccumulator清理数据,因为只有32M。
发送失败:重试次数----int的最大值
Selector是负责决定将数据发送到集群的哪个分区!
注意:
中间涉及到数据的发送和拉取都是异步的!main线程放数据和sender拉取数据并发送两个过程异步!
一个队列只能发送到最右边的集群中的一个分区,假如有两个toptic,5个分区,就需要创建5个双端队列,队列内部才能形成批次(bachsize),所以只能发到一个分区!
相关文章:
kafka详解(三)
2.2 Kafka命令行操作 2.2.1 主题命令行操作 1)查看操作主题命令参数 [aahadoop102 kafka]$ bin/kafka-topics.sh2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/) [aahadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop10…...
[Spring] SpringMVC 简介(二)
目录 五、域对象共享数据 1、使用 ServletAPI 向 request 域对象共享数据 2、使用 ModelAndView 向 request 域对象共享数据 3、使用 Model、Map、ModelMap 向 request 域对象共享数据 4、向 session 域和 application 域共享数据 六、SpringMVC 的视图 1、ThymeleafVie…...
idea一些不太常用但是能提升编码效率的快捷键
Ctrl Shift V:从历史选择粘贴 从历史剪粘版中选择要粘贴的内容。Ctrl Shift Z:重做 恢复上一步撤销内容Shift F4:在新窗口中打开 比如要从另一个类中复制代码 可以把这个类在新窗口单独打开 用AltTab 来回切换复制Ctrl PageUpÿ…...
vsto word属性信息 并读取
要通过VSTO (Visual Studio Tools for Office) 读取和操作 Microsoft Word 文档的属性信息,您可以使用 C# 或 VB.NET 等 .NET 编程语言结合 VSTO 来实现。以下是一个示例,演示如何获取 Word 文档的属性信息:首先,确保您已经在 Vis…...
android之TextView自由选择复制
文章目录 前言一、效果图二、实现步骤1.OnSelectListener2.SelectionInfo类3.TextLayoutUtil类4.复制弹框的xml布局5.弹框背景Drawable6.倒三角Drawable7.复制工具类8.调用 总结 前言 根据时代进步,那些干产品的也叼砖起来了,今天就遇到一个需求&#x…...
【mysql】 bash: mysql: command not found
在linux 服务器上安装了mysql 也可以正常运行。 但是执行命令,系统提示:bash: mysql: command not found bash:mysql:找不到命令 执行的命令是: mysql -u root -h 127.0.0.1 -p由于系统默认会查找的 /usr/bin/ 中下的命令,如…...
鲲山科技:引入和鲸 ModelWhale,实现量化策略的高效迭代
量化投资是数据科学在金融行业的应用。 2023 年,量化行业的超额收益开始收敛,量化私募如何形成自身核心竞争力? 和鲸拜访客户鲲山科技(深圳),揭示其“弯道超车”的独家秘诀。 群体作战 年初至今ÿ…...
PFSK152 3BSE018877R1 有源滤波器的定义
PFSK152 3BSE018877R1 有源滤波器的定义 有源滤波器是以晶体管和运算放大器为基本元件设计的滤波电路。除了这些元件,有源滤波器的电路还包含电阻和电容,但不包含电感。 我们知道滤波器具有频率选择性。因此,有源滤波器电路使用晶体管和运算…...
WebDAV之π-Disk派盘 + 恒星播放器
想要拥有一款万能视频播放器,全能解码播放器,无需转码,支持所有格式的视频和音频,直接播放的播放器?那就选恒星播放器。 恒星播放器支持视频投屏,倍速播放,后台播放等功能,还能一键截图和录制gif动图。支持全格式超高清真4K解码,蓝光HDR低占用,支持ISO文件直出的播放…...
亚马逊,速卖通,敦煌产品测评补单攻略:低成本、高安全实操指南
随着电商平台的发展和消费者对产品质量的要求提升,测评补单成为了商家们提升销售和用户口碑的关键环节。然而,如何在保持成本低廉的同时确保操作安全,一直是卖家们面临的挑战。今天林哥分享一些实用的技巧和策略,帮助卖家们产品的…...
常用linux解压命令
1. 超过4g的zip文件在linux下unzip失败。需要用7z压缩,然后用p7zip命令解压。 p7zip -d x.7z 2. gzip解压.gz文件 gzip -d a11.txt.gz 和 gunzip a1.txt.gz gunzip –c filename.gz > filename #解压缩保留源文件, 上述命令非常容易写错,最后导…...
TensorFlow入门(二十二、梯度下降)
梯度下降的定义及作用 梯度下降本身是一个最优化算法,通常也被称为最速下降法。常被用于机器学习和人工智能中递归性地逼近最小偏差模型,也就是使用它找到一个函数的局部极小值。 使用过程中,梯度下降算法以函数上当前点对于梯度(或者是近似梯度)反方向的规定步长距离点进行迭代…...
WPF中的多重绑定
MultiBinding 将会给后端传回一个数组, 其顺序为绑定的顺序. 例如: <DataGridMargin"10"AutoGenerateColumns"False"ItemsSource"{Binding Stu}"><DataGrid.Columns><DataGridTextColumn Binding"{Binding Id}" Header…...
区块链在游戏行业的应用
区块链技术在游戏行业有许多潜在的应用,它可以改变游戏开发、发行和玩家交互的方式。以下是区块链技术在游戏行业的一些主要应用,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 1.游戏资产…...
uniapp快速入门系列(4)- 微信小程序开发
第四章 微信小程序开发 4.1 微信小程序开发与uniapp的融合4.2 微信小程序API在uniapp中的使用4.3 微信小程序常见问题的解决方法问题1: 如何获取用户信息?问题2: 如何获取当前位置?问题3: 如何发送网络请求? 在本章中,我们将学习如…...
Kafka保证消息幂等以及解决方案
1、幂等的基本概念 幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。 查询操作(天然幂等…...
接口测试用例设计
接口测试...
wireshark抓rtp包,提取出H265裸流数
调试rtsp收发流时,经常会需要抓包以确认是网络问题还是程序问题还是其它问题。通过tcpdump或者wireshark抓到的包通常是rtp流,保存为.pcap格式文件后中,可通过wireshark进行解析,得出h264裸流,并保存为文件。 1.wires…...
Excel往Word复制表格时删除空格
1.背景 在编写文档,经常需要从Excel往Word里复制表格 但是复制过去的表格前面会出现空格(缩进) 再WPS中试了很多方法,终于摆脱了挨个删除的困扰 2. WPS排版中删除 选择表格菜单栏-选在【开始】-【排版】选择【更多段落处理】-【段…...
客户机操作系统已禁用 CPU。请关闭或重置虚拟机(解决)
解决: 关闭虚拟机进入设置点击处理器给虚拟化引擎两个勾上确认后重新即可...
告别重复操作:用快马生成智能浏览器扩展,极速提升前端调试与数据提取效率
作为一名前端开发者,每天都要和网页元素打交道。调试样式、提取数据这些重复性工作,如果全靠手动操作,不仅效率低下还容易出错。最近我发现用InsCode(快马)平台可以快速生成定制化的浏览器扩展,把那些繁琐操作变成一键自动化&…...
别再到处找了!这12个三维点云开源数据集,够你从入门到项目实战
三维点云实战指南:12个精选开源数据集与精准匹配策略 当你第一次打开三维点云处理软件,面对空白的项目界面,最迫切的问题往往是:"我该从哪里获取高质量的训练数据?"这个问题困扰过每一位初学者,…...
告别答辩 PPT 熬夜局!PaperXie AI 一键生成,3 分钟拿捏学术范答辩神器
paperxie-免费查重复率aigc检测/开题报告/毕业论文/智能排版/文献综述/AIPPThttps://www.paperxie.cn/ppt/createhttps://www.paperxie.cn/ppt/create 一、开题答辩人破防瞬间:PPT 做得好,答辩分数高一半 “论文写完了,PPT 才是真正的修罗场…...
MiniCPM-o-4.5-nvidia-FlagOS部署运维:使用Docker Compose管理多服务依赖
MiniCPM-o-4.5-nvidia-FlagOS部署运维:使用Docker Compose管理多服务依赖 你是不是也遇到过这种情况?想部署一个AI模型,发现它依赖一堆东西:模型服务本身、数据库、缓存、可能还有别的辅助工具。一个个手动去装、去配置、去启动&…...
6大维度深度测评:如何挑选最可靠的开源付费墙绕过工具?
6大维度深度测评:如何挑选最可靠的开源付费墙绕过工具? 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在数字阅读时代,优质内容的付费壁垒逐渐形成…...
Shiny框架终极指南:输入控件与输出渲染的完美交互原理
Shiny框架终极指南:输入控件与输出渲染的完美交互原理 【免费下载链接】shiny Easy interactive web applications with R 项目地址: https://gitcode.com/gh_mirrors/sh/shiny Shiny是R语言生态中一款强大的交互式Web应用框架,它让数据科学家和分…...
如何用XHS-Downloader解决内容采集难题?3大维度提升效率90%
如何用XHS-Downloader解决内容采集难题?3大维度提升效率90% 【免费下载链接】XHS-Downloader 小红书(XiaoHongShu、RedNote)链接提取/作品采集工具:提取账号发布、收藏、点赞、专辑作品链接;提取搜索结果作品、用户链接…...
西门子S7-300 PLC实战:从零搭建药品装瓶机控制系统(附组态王6.55配置)
西门子S7-300 PLC实战:从零搭建药品装瓶机控制系统(附组态王6.55配置) 在制药生产线上,药品装瓶环节的效率直接影响整体产能。传统人工装瓶方式不仅速度慢,还容易产生计数误差。而采用PLC控制的自动化装瓶系统&#x…...
Wan2.1视频生成小白必看:避开这些坑,让你的视频生成一次成功
Wan2.1视频生成小白必看:避开这些坑,让你的视频生成一次成功 1. 为什么你的视频生成总是失败? 很多新手第一次使用Wan2.1视频生成模型时,都会遇到各种问题:生成的视频模糊不清、内容与描述不符、甚至直接失败。这通常…...
ClawdBot惊艳效果案例:PaddleOCR识别模糊手写体+LibreTranslate精准输出
ClawdBot惊艳效果案例:PaddleOCR识别模糊手写体LibreTranslate精准输出 1. 项目概述 ClawdBot是一个可以在个人设备上运行的AI助手应用,它使用vllm提供后端模型能力,为用户提供强大的多模态处理功能。这个应用特别适合需要处理文字识别和翻…...
