kafka详解(三)
2.2 Kafka命令行操作

2.2.1 主题命令行操作
1)查看操作主题命令参数
[aa kafka]$ bin/kafka-topics.sh

2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/)
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3)创建first topic
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 3 --replication-factor 3
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
4)查看first主题的详情
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first TopicId: 3pIfoppvRmq84FjACWzAgw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 103,102,104Topic: first Partition: 2 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103
[aa ~]$
5)修改分区数( 注意:分区数只能增加,不能减少,如果减少会报错!)
[a kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 4
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first TopicId: 3pIfoppvRmq84FjACWzAgw PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 103,102,104Topic: first Partition: 2 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103Topic: first Partition: 3 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 2
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 2.
[2023-09-13 19:22:16,891] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.(kafka.admin.TopicCommand$)
[aa ~]$
6)再次查看first主题的详情
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
7)删除topic
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --delete
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list[aa ~]$
2.2.2 生产者命令行操作
1)查看操作生产者命令参数
[aa kafka]$ bin/kafka-console-producer.sh

2)发送消息
[aa kafka]$ kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>111
>222
>333
>
2.2.3 消费者命令行操作
[aa kafka]$ bin/kafka-console-consumer.sh

2)消费消息
[aa kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group test --from-beginning
111
222
333
还可以动态的生产和消费,比如102机器上输入
>444
103机器就会自动在结尾弹出
111
222
333
444
Kafka生产者
生产者消息发送流程
3.1.1 发送原理
Kafka的producer发送消息采用的是异步发送的方式
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程(两个线程是异步!),以及一个线程共享变量:RecordAccumulator。
- 在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator。
- Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

main线程将外部数据包装成kafka要求的格式ProducerRecord,类似于Flume中的Event.网络中进行数据传输都会序列化(kryo框架)。
分区策略:涉及到生产者和消费者。生产者分区针对的是数据,消费者分区针对是分区怎么消费数据
。RecordAccumulator(一种堆内存缓冲区)达到两种标准之后就唤醒Sender进行发送!bachsize(同一个队列中,两个时间非常紧密的数据可以形成一个bachsize)一般就是数据洪峰的时候;linger.ms就是在数据量非常小的时候;默认值0代表来一条发一条;
sender发送也是异步发送,sender将RecordAccumulator中的数据包装成Request(一个批次包装成一个Request),sender发送Request1之后不等待响应就发送Request2,然后不等待响应就发送Request2,…Request5,Request6必须排队了。
sender发送过去的数据在Leader中应该是先存在线程对应的内存中,还没等到磁盘中存储数据落盘的一个时间点决定是不是回复ack为0,此时就是不安全,时延最低!。为1的时候就是数据落盘之后再发送ack,此时数据安全性有所提高,稍慢!注意此时的fllower还没有数据!完全保证数据安全,Leader和follwer都罗盘,回复-1
发送成功:清理网络客户端请求Request
线程共享变量中RecordAccumulator清理数据,因为只有32M。
发送失败:重试次数----int的最大值
Selector是负责决定将数据发送到集群的哪个分区!
注意:
中间涉及到数据的发送和拉取都是异步的!main线程放数据和sender拉取数据并发送两个过程异步!
一个队列只能发送到最右边的集群中的一个分区,假如有两个toptic,5个分区,就需要创建5个双端队列,队列内部才能形成批次(bachsize),所以只能发到一个分区!
相关文章:
kafka详解(三)
2.2 Kafka命令行操作 2.2.1 主题命令行操作 1)查看操作主题命令参数 [aahadoop102 kafka]$ bin/kafka-topics.sh2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/) [aahadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop10…...
[Spring] SpringMVC 简介(二)
目录 五、域对象共享数据 1、使用 ServletAPI 向 request 域对象共享数据 2、使用 ModelAndView 向 request 域对象共享数据 3、使用 Model、Map、ModelMap 向 request 域对象共享数据 4、向 session 域和 application 域共享数据 六、SpringMVC 的视图 1、ThymeleafVie…...
idea一些不太常用但是能提升编码效率的快捷键
Ctrl Shift V:从历史选择粘贴 从历史剪粘版中选择要粘贴的内容。Ctrl Shift Z:重做 恢复上一步撤销内容Shift F4:在新窗口中打开 比如要从另一个类中复制代码 可以把这个类在新窗口单独打开 用AltTab 来回切换复制Ctrl PageUpÿ…...
vsto word属性信息 并读取
要通过VSTO (Visual Studio Tools for Office) 读取和操作 Microsoft Word 文档的属性信息,您可以使用 C# 或 VB.NET 等 .NET 编程语言结合 VSTO 来实现。以下是一个示例,演示如何获取 Word 文档的属性信息:首先,确保您已经在 Vis…...
android之TextView自由选择复制
文章目录 前言一、效果图二、实现步骤1.OnSelectListener2.SelectionInfo类3.TextLayoutUtil类4.复制弹框的xml布局5.弹框背景Drawable6.倒三角Drawable7.复制工具类8.调用 总结 前言 根据时代进步,那些干产品的也叼砖起来了,今天就遇到一个需求&#x…...
【mysql】 bash: mysql: command not found
在linux 服务器上安装了mysql 也可以正常运行。 但是执行命令,系统提示:bash: mysql: command not found bash:mysql:找不到命令 执行的命令是: mysql -u root -h 127.0.0.1 -p由于系统默认会查找的 /usr/bin/ 中下的命令,如…...
鲲山科技:引入和鲸 ModelWhale,实现量化策略的高效迭代
量化投资是数据科学在金融行业的应用。 2023 年,量化行业的超额收益开始收敛,量化私募如何形成自身核心竞争力? 和鲸拜访客户鲲山科技(深圳),揭示其“弯道超车”的独家秘诀。 群体作战 年初至今ÿ…...
PFSK152 3BSE018877R1 有源滤波器的定义
PFSK152 3BSE018877R1 有源滤波器的定义 有源滤波器是以晶体管和运算放大器为基本元件设计的滤波电路。除了这些元件,有源滤波器的电路还包含电阻和电容,但不包含电感。 我们知道滤波器具有频率选择性。因此,有源滤波器电路使用晶体管和运算…...
WebDAV之π-Disk派盘 + 恒星播放器
想要拥有一款万能视频播放器,全能解码播放器,无需转码,支持所有格式的视频和音频,直接播放的播放器?那就选恒星播放器。 恒星播放器支持视频投屏,倍速播放,后台播放等功能,还能一键截图和录制gif动图。支持全格式超高清真4K解码,蓝光HDR低占用,支持ISO文件直出的播放…...
亚马逊,速卖通,敦煌产品测评补单攻略:低成本、高安全实操指南
随着电商平台的发展和消费者对产品质量的要求提升,测评补单成为了商家们提升销售和用户口碑的关键环节。然而,如何在保持成本低廉的同时确保操作安全,一直是卖家们面临的挑战。今天林哥分享一些实用的技巧和策略,帮助卖家们产品的…...
常用linux解压命令
1. 超过4g的zip文件在linux下unzip失败。需要用7z压缩,然后用p7zip命令解压。 p7zip -d x.7z 2. gzip解压.gz文件 gzip -d a11.txt.gz 和 gunzip a1.txt.gz gunzip –c filename.gz > filename #解压缩保留源文件, 上述命令非常容易写错,最后导…...
TensorFlow入门(二十二、梯度下降)
梯度下降的定义及作用 梯度下降本身是一个最优化算法,通常也被称为最速下降法。常被用于机器学习和人工智能中递归性地逼近最小偏差模型,也就是使用它找到一个函数的局部极小值。 使用过程中,梯度下降算法以函数上当前点对于梯度(或者是近似梯度)反方向的规定步长距离点进行迭代…...
WPF中的多重绑定
MultiBinding 将会给后端传回一个数组, 其顺序为绑定的顺序. 例如: <DataGridMargin"10"AutoGenerateColumns"False"ItemsSource"{Binding Stu}"><DataGrid.Columns><DataGridTextColumn Binding"{Binding Id}" Header…...
区块链在游戏行业的应用
区块链技术在游戏行业有许多潜在的应用,它可以改变游戏开发、发行和玩家交互的方式。以下是区块链技术在游戏行业的一些主要应用,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 1.游戏资产…...
uniapp快速入门系列(4)- 微信小程序开发
第四章 微信小程序开发 4.1 微信小程序开发与uniapp的融合4.2 微信小程序API在uniapp中的使用4.3 微信小程序常见问题的解决方法问题1: 如何获取用户信息?问题2: 如何获取当前位置?问题3: 如何发送网络请求? 在本章中,我们将学习如…...
Kafka保证消息幂等以及解决方案
1、幂等的基本概念 幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。 查询操作(天然幂等…...
接口测试用例设计
接口测试...
wireshark抓rtp包,提取出H265裸流数
调试rtsp收发流时,经常会需要抓包以确认是网络问题还是程序问题还是其它问题。通过tcpdump或者wireshark抓到的包通常是rtp流,保存为.pcap格式文件后中,可通过wireshark进行解析,得出h264裸流,并保存为文件。 1.wires…...
Excel往Word复制表格时删除空格
1.背景 在编写文档,经常需要从Excel往Word里复制表格 但是复制过去的表格前面会出现空格(缩进) 再WPS中试了很多方法,终于摆脱了挨个删除的困扰 2. WPS排版中删除 选择表格菜单栏-选在【开始】-【排版】选择【更多段落处理】-【段…...
客户机操作系统已禁用 CPU。请关闭或重置虚拟机(解决)
解决: 关闭虚拟机进入设置点击处理器给虚拟化引擎两个勾上确认后重新即可...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
GraphQL 实战篇:Apollo Client 配置与缓存
GraphQL 实战篇:Apollo Client 配置与缓存 上一篇:GraphQL 入门篇:基础查询语法 依旧和上一篇的笔记一样,主实操,没啥过多的细节讲解,代码具体在: https://github.com/GoldenaArcher/graphql…...
前端高频面试题2:浏览器/计算机网络
本专栏相关链接 前端高频面试题1:HTML/CSS 前端高频面试题2:浏览器/计算机网络 前端高频面试题3:JavaScript 1.什么是强缓存、协商缓存? 强缓存: 当浏览器请求资源时,首先检查本地缓存是否命中。如果命…...
从零开始了解数据采集(二十八)——制造业数字孪生
近年来,我国的工业领域正经历一场前所未有的数字化变革,从“双碳目标”到工业互联网平台的推广,国家政策和市场需求共同推动了制造业的升级。在这场变革中,数字孪生技术成为备受关注的关键工具,它不仅让企业“看见”设…...
基于单片机的宠物屋智能系统设计与实现(论文+源码)
本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢,连接红外测温传感器,可实时精准捕捉宠物体温变化,以便及时发现健康异常;水位检测传感器时刻监测饮用水余量,防止宠物…...
