kafka详解(三)
2.2 Kafka命令行操作

2.2.1 主题命令行操作
1)查看操作主题命令参数
[aa kafka]$ bin/kafka-topics.sh

2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/)
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
3)创建first topic
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 3 --replication-factor 3
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
4)查看first主题的详情
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
first
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first TopicId: 3pIfoppvRmq84FjACWzAgw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 103,102,104Topic: first Partition: 2 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103
[aa ~]$
5)修改分区数( 注意:分区数只能增加,不能减少,如果减少会报错!)
[a kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 4
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
Topic: first TopicId: 3pIfoppvRmq84FjACWzAgw PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first Partition: 0 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102Topic: first Partition: 1 Leader: 103 Replicas: 103,102,104 Isr: 103,102,104Topic: first Partition: 2 Leader: 102 Replicas: 102,104,103 Isr: 102,104,103Topic: first Partition: 3 Leader: 104 Replicas: 104,103,102 Isr: 104,103,102
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --alter --partitions 2
Error while executing topic command : Topic currently has 4 partitions, which is higher than the requested 2.
[2023-09-13 19:22:16,891] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.(kafka.admin.TopicCommand$)
[aa ~]$
6)再次查看first主题的详情
[aa kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
7)删除topic
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --delete
[aa ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list[aa ~]$
2.2.2 生产者命令行操作
1)查看操作生产者命令参数
[aa kafka]$ bin/kafka-console-producer.sh

2)发送消息
[aa kafka]$ kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
>111
>222
>333
>
2.2.3 消费者命令行操作
[aa kafka]$ bin/kafka-console-consumer.sh

2)消费消息
[aa kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --group test --from-beginning
111
222
333
还可以动态的生产和消费,比如102机器上输入
>444
103机器就会自动在结尾弹出
111
222
333
444
Kafka生产者
生产者消息发送流程
3.1.1 发送原理
Kafka的producer发送消息采用的是异步发送的方式
在消息发送的过程中,涉及到了两个线程——main线程和Sender线程(两个线程是异步!),以及一个线程共享变量:RecordAccumulator。
- 在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator。
- Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

main线程将外部数据包装成kafka要求的格式ProducerRecord,类似于Flume中的Event.网络中进行数据传输都会序列化(kryo框架)。
分区策略:涉及到生产者和消费者。生产者分区针对的是数据,消费者分区针对是分区怎么消费数据
。RecordAccumulator(一种堆内存缓冲区)达到两种标准之后就唤醒Sender进行发送!bachsize(同一个队列中,两个时间非常紧密的数据可以形成一个bachsize)一般就是数据洪峰的时候;linger.ms就是在数据量非常小的时候;默认值0代表来一条发一条;
sender发送也是异步发送,sender将RecordAccumulator中的数据包装成Request(一个批次包装成一个Request),sender发送Request1之后不等待响应就发送Request2,然后不等待响应就发送Request2,…Request5,Request6必须排队了。
sender发送过去的数据在Leader中应该是先存在线程对应的内存中,还没等到磁盘中存储数据落盘的一个时间点决定是不是回复ack为0,此时就是不安全,时延最低!。为1的时候就是数据落盘之后再发送ack,此时数据安全性有所提高,稍慢!注意此时的fllower还没有数据!完全保证数据安全,Leader和follwer都罗盘,回复-1
发送成功:清理网络客户端请求Request
线程共享变量中RecordAccumulator清理数据,因为只有32M。
发送失败:重试次数----int的最大值
Selector是负责决定将数据发送到集群的哪个分区!
注意:
中间涉及到数据的发送和拉取都是异步的!main线程放数据和sender拉取数据并发送两个过程异步!
一个队列只能发送到最右边的集群中的一个分区,假如有两个toptic,5个分区,就需要创建5个双端队列,队列内部才能形成批次(bachsize),所以只能发到一个分区!
相关文章:
kafka详解(三)
2.2 Kafka命令行操作 2.2.1 主题命令行操作 1)查看操作主题命令参数 [aahadoop102 kafka]$ bin/kafka-topics.sh2)查看当前服务器中的所有topic (配置了环境变量不需要写bin/) [aahadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop10…...
[Spring] SpringMVC 简介(二)
目录 五、域对象共享数据 1、使用 ServletAPI 向 request 域对象共享数据 2、使用 ModelAndView 向 request 域对象共享数据 3、使用 Model、Map、ModelMap 向 request 域对象共享数据 4、向 session 域和 application 域共享数据 六、SpringMVC 的视图 1、ThymeleafVie…...
idea一些不太常用但是能提升编码效率的快捷键
Ctrl Shift V:从历史选择粘贴 从历史剪粘版中选择要粘贴的内容。Ctrl Shift Z:重做 恢复上一步撤销内容Shift F4:在新窗口中打开 比如要从另一个类中复制代码 可以把这个类在新窗口单独打开 用AltTab 来回切换复制Ctrl PageUpÿ…...
vsto word属性信息 并读取
要通过VSTO (Visual Studio Tools for Office) 读取和操作 Microsoft Word 文档的属性信息,您可以使用 C# 或 VB.NET 等 .NET 编程语言结合 VSTO 来实现。以下是一个示例,演示如何获取 Word 文档的属性信息:首先,确保您已经在 Vis…...
android之TextView自由选择复制
文章目录 前言一、效果图二、实现步骤1.OnSelectListener2.SelectionInfo类3.TextLayoutUtil类4.复制弹框的xml布局5.弹框背景Drawable6.倒三角Drawable7.复制工具类8.调用 总结 前言 根据时代进步,那些干产品的也叼砖起来了,今天就遇到一个需求&#x…...
【mysql】 bash: mysql: command not found
在linux 服务器上安装了mysql 也可以正常运行。 但是执行命令,系统提示:bash: mysql: command not found bash:mysql:找不到命令 执行的命令是: mysql -u root -h 127.0.0.1 -p由于系统默认会查找的 /usr/bin/ 中下的命令,如…...
鲲山科技:引入和鲸 ModelWhale,实现量化策略的高效迭代
量化投资是数据科学在金融行业的应用。 2023 年,量化行业的超额收益开始收敛,量化私募如何形成自身核心竞争力? 和鲸拜访客户鲲山科技(深圳),揭示其“弯道超车”的独家秘诀。 群体作战 年初至今ÿ…...
PFSK152 3BSE018877R1 有源滤波器的定义
PFSK152 3BSE018877R1 有源滤波器的定义 有源滤波器是以晶体管和运算放大器为基本元件设计的滤波电路。除了这些元件,有源滤波器的电路还包含电阻和电容,但不包含电感。 我们知道滤波器具有频率选择性。因此,有源滤波器电路使用晶体管和运算…...
WebDAV之π-Disk派盘 + 恒星播放器
想要拥有一款万能视频播放器,全能解码播放器,无需转码,支持所有格式的视频和音频,直接播放的播放器?那就选恒星播放器。 恒星播放器支持视频投屏,倍速播放,后台播放等功能,还能一键截图和录制gif动图。支持全格式超高清真4K解码,蓝光HDR低占用,支持ISO文件直出的播放…...
亚马逊,速卖通,敦煌产品测评补单攻略:低成本、高安全实操指南
随着电商平台的发展和消费者对产品质量的要求提升,测评补单成为了商家们提升销售和用户口碑的关键环节。然而,如何在保持成本低廉的同时确保操作安全,一直是卖家们面临的挑战。今天林哥分享一些实用的技巧和策略,帮助卖家们产品的…...
常用linux解压命令
1. 超过4g的zip文件在linux下unzip失败。需要用7z压缩,然后用p7zip命令解压。 p7zip -d x.7z 2. gzip解压.gz文件 gzip -d a11.txt.gz 和 gunzip a1.txt.gz gunzip –c filename.gz > filename #解压缩保留源文件, 上述命令非常容易写错,最后导…...
TensorFlow入门(二十二、梯度下降)
梯度下降的定义及作用 梯度下降本身是一个最优化算法,通常也被称为最速下降法。常被用于机器学习和人工智能中递归性地逼近最小偏差模型,也就是使用它找到一个函数的局部极小值。 使用过程中,梯度下降算法以函数上当前点对于梯度(或者是近似梯度)反方向的规定步长距离点进行迭代…...
WPF中的多重绑定
MultiBinding 将会给后端传回一个数组, 其顺序为绑定的顺序. 例如: <DataGridMargin"10"AutoGenerateColumns"False"ItemsSource"{Binding Stu}"><DataGrid.Columns><DataGridTextColumn Binding"{Binding Id}" Header…...
区块链在游戏行业的应用
区块链技术在游戏行业有许多潜在的应用,它可以改变游戏开发、发行和玩家交互的方式。以下是区块链技术在游戏行业的一些主要应用,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开发公司,欢迎交流合作。 1.游戏资产…...
uniapp快速入门系列(4)- 微信小程序开发
第四章 微信小程序开发 4.1 微信小程序开发与uniapp的融合4.2 微信小程序API在uniapp中的使用4.3 微信小程序常见问题的解决方法问题1: 如何获取用户信息?问题2: 如何获取当前位置?问题3: 如何发送网络请求? 在本章中,我们将学习如…...
Kafka保证消息幂等以及解决方案
1、幂等的基本概念 幂等简单点讲,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会产生任何副作用。幂等分很多种,比如接口的幂等、消息的幂等,它是分布式系统设计时必须要考虑的一个方面。 查询操作(天然幂等…...
接口测试用例设计
接口测试...
wireshark抓rtp包,提取出H265裸流数
调试rtsp收发流时,经常会需要抓包以确认是网络问题还是程序问题还是其它问题。通过tcpdump或者wireshark抓到的包通常是rtp流,保存为.pcap格式文件后中,可通过wireshark进行解析,得出h264裸流,并保存为文件。 1.wires…...
Excel往Word复制表格时删除空格
1.背景 在编写文档,经常需要从Excel往Word里复制表格 但是复制过去的表格前面会出现空格(缩进) 再WPS中试了很多方法,终于摆脱了挨个删除的困扰 2. WPS排版中删除 选择表格菜单栏-选在【开始】-【排版】选择【更多段落处理】-【段…...
客户机操作系统已禁用 CPU。请关闭或重置虚拟机(解决)
解决: 关闭虚拟机进入设置点击处理器给虚拟化引擎两个勾上确认后重新即可...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...
让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...
LabVIEW双光子成像系统技术
双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制,展现出显著的技术优势: 深层组织穿透能力:适用于活体组织深度成像 高分辨率观测性能:满足微观结构的精细研究需求 低光毒性特点:减少对样本的损伤…...
Python 高效图像帧提取与视频编码:实战指南
Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...
Android写一个捕获全局异常的工具类
项目开发和实际运行过程中难免会遇到异常发生,系统提供了一个可以捕获全局异常的工具Uncaughtexceptionhandler,它是Thread的子类(就是package java.lang;里线程的Thread)。本文将利用它将设备信息、报错信息以及错误的发生时间都…...
