当前位置: 首页 > news >正文

Kafka

这里写目录标题

  • 1.Kafka
    • 1.1 Kafka概述
    • 1.2 kafka安装和配置
    • 1.3 入门案例
    • 1.4 kafka生产者详解
      • 1.4.1 生产者的参数

1.Kafka

1.1 Kafka概述

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
请添加图片描述

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

1.2 kafka安装和配置

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

一、Docker安装zookeeper
下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

一、Docker安装kafka
下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器, 此处需要改为自己虚拟机的ip地址

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

1.3 入门案例

实现一个简单的生产 > 消费过程
请添加图片描述
一、引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

二、生产者发送消息

  1. 连接kafka
  2. 创建生产者对象
  3. 发送信息
  4. 关闭消息通道(必须关闭, 否则消息发送不成功)
package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送的消息ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");//3.发送消息producer.send(record);//4.关闭消息通道,必须关闭,否则消息发送不成功producer.close();}}

三、消费者接收消息

package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

1.4 kafka生产者详解

一、同步发送
  使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());

二、异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

//异步消息发送
producer.send(kvProducerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}
});

1.4.1 生产者的参数

一、ack
请添加图片描述
代码的配置方式:

//ack配置  消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");

在这里插入图片描述


二、retries

相关文章:

Kafka

这里写目录标题1.Kafka1.1 Kafka概述1.2 kafka安装和配置1.3 入门案例1.4 kafka生产者详解1.4.1 生产者的参数1.Kafka 1.1 Kafka概述 Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。 producer&#xff1a;发布消息的对象称之为主题生产者&#xff08;Ka…...

数据结构——第三章 栈与队列(2)

栈的运用1.括号匹配2.表达式求值2.1.算术表示式的形式2.2.后缀表达式求值2.3.将算术表达式转换为后缀表达式2.4.算术表达式直接求值3.栈与递归3.1.递归算法3.2.栈与函数调用3.3.递归工作与递归函数3.4.递归到非递归的转换1.括号匹配 void matching(char str[]) {//创建空栈Lin…...

【Linux学习】基础IO——理解缓冲区 | 理解文件系统

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《Linux学习》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 基础IO☕理解缓冲区&#x1f9c3;缓冲区的共识&#x1f9c3;缓冲区的位置&#x1f9c3;缓冲区的刷…...

RHCSA-重置root密码(3.3)

方法1&#xff1a;rd.break &#xff08;1&#xff09;首先重启系统&#xff0c;在此页面按e键&#xff0c;在屏幕上显示内核启动参数 &#xff08;2&#xff09;知道linux这行&#xff0c;末尾空格后输入rd.break&#xff0c;然后按ctrlx &#xff08;3&#xff09;查看&#…...

无公网IP快解析实现U+随时随地访问

现阶段商品从生产到消费者手中要经过多个环节&#xff0c;为实现对每一个环节进行管理&#xff0c;越来越多的企业选择通过信息化手段来实现。供应链管理系统配合供应链中各实体的业务需求&#xff0c;使操作流程和信息系统紧密配合&#xff0c;做到各环节无缝链接&#xff0c;…...

UVa 307 Sticks 木棍拼接 ID 迭代加深搜

题目链接&#xff1a;Sticks 题目描述&#xff1a; 小明一开始有一些长度相等的木棍&#xff0c;小明现在将木棍砍成了一些长度为整数的木棍&#xff0c;他现在忘记了最开始木棍的长度&#xff0c;你需要找到最短的可能木棍长度&#xff0c;例如给定5,2,1,5,2,1,5,2,15,2,1,5,2…...

阿里云(CentOS)中MySQL8忘记密码的解决方法

阿里云(CentOS)中MySQL8忘记密码的解决方法 方法 在 skip-grant-tables 模式下启动 MySQL&#xff0c;该模式下启动 MySQL 时不启动授权表功能&#xff0c;可以直接免密码登录 实现 编辑 /etc/my.cnf 文件 vim /etc/my.cnf在 [mysqld] 区域末尾添加配置&#xff0c;设置免密…...

三、Spring的入门程序

第一个Spring程序 创建新的空工程spring6 设置JDK版本17&#xff0c;编译器版本17 设置IDEA的Maven&#xff1a;关联自己的maven 在空的工程spring6中创建第一个maven模块&#xff1a;spring6-001-first 在pom.xml添加spring context依赖和junit依赖&#xff0c; <?x…...

摘录一下Python列表和元组的学习笔记

1 基础概念 列表一个值&#xff0c;列表值指的是列表本身&#xff0c;而不是列表中的内容 列表用[]表示 列表中的内容称为 表项 len()函数可以显示列表中表项的个数&#xff0c;比如下面这个例子 spam [cat, bat, dog, rat]print(len(spam))列表的范围选取中&#xff0c;比…...

【量化金融】收益率、对数收益率、年华收益、波动率、夏普比率、索提诺比率、阿尔法和贝塔、最大回撤

【量化金融】收益率、对数收益率、年华收益、波动率、夏普比率、索提诺比率、阿尔法和贝塔、最大回撤 1 收益率 在学术界&#xff0c;建模一般不直接使用资产价格&#xff0c;而是使用资产收益率(Returns)。因为收益率比价格具有更好的统计特性&#xff0c;更便于建模。下经典…...

1_机器学习概述—全流程

文章目录1 机器学习定义2 机器学习常见应用框架&#xff08;重点&#xff09;3 机器学习分类3.1 监督学习&#xff08;Supervised learning&#xff09;3.2 无监督学习&#xff08;Unsupervised learning&#xff09;3.3 半监督学习&#xff08;Semi-Supervised Learning&#…...

VUE中给对象添加新属性时,界面不刷新怎么办

一、直接添加属性的问题 举例&#xff1a; 定义一个p标签&#xff0c;通过v-for指令进行遍历 然后给botton标签绑定点击事件&#xff0c;我们预期点击按钮时&#xff0c;数据新增一个属性&#xff0c;界面也 新增一行。 <p v-for"(value,key) in item" :key&qu…...

视频号频出10w+,近期爆红的账号有哪些?

回顾2月&#xff0c;视频号持续放出大动作&#xff0c;不仅进行了16小时不间断的NBA全明星直播&#xff0c;还邀请国际奥委会入驻&#xff0c;分享奥运的最新资讯。视频号成为越来越多官方机构宣传推广的有效渠道。官方积极入驻&#xff0c;内容创作生态也在同步繁荣发展&#…...

企业寄件现代化管理教程

现代化企业为了跟上时代发展的步伐&#xff0c;在不断完善着管理制度&#xff0c;其中公司寄件管理&#xff0c;也是重要的一个模块。为了提高公司快递的寄件效率&#xff0c;以及节约寄件成本&#xff0c;实现快递寄件的规范化&#xff0c;越来越多的现代化企业&#xff0c;开…...

django 在网页显示后台进度

1、定义函数打开网页 def PeformanceIndex(request): citys{‘wuhu’: ‘芜湖’, ‘xuancheng’: ‘宣城’, ‘tongling’: ‘铜陵’, ‘suzhou’: ‘宿州’, ‘maanshan’: ‘马鞍山’, ‘liuan’: ‘六安’, ‘huainan’: ‘淮南’, ‘huabei’: ‘淮北’, ‘hefei’: ‘合肥…...

机器学习库(Numpy, Scikit-learn)

Numpy 创建数组 import numpy as npa np.array([1,2,3]) b np.array([(1.5,2,3), (4,5,6)], dtype float) c np.array([[(1.5,2,3), (4,5,6)], [(3,2,1), (4,5,6)]],dtype float)创建占位符 z1np.zeros((3,4)) z2np.ones((2,3,4),dtypenp.int16) z3d np.arange(10,25,5)…...

Linux操作系统学习(进程替换)

文章目录进程替换进程替换是什么&#xff1f;替换的方法进程替换简易shell模拟进程替换 进程替换是什么&#xff1f; 如下图所示&#xff1a; ​ 进程替换就是&#xff0c;把进程B的代码和数据&#xff0c;替换正在执行的进程A的代码和数据在内存中的位置&#xff08;若代码…...

【C++从入门到放弃】类和对象(中)———类的六大默认成员函数

&#x1f9d1;‍&#x1f4bb;作者&#xff1a; 情话0.0 &#x1f4dd;专栏&#xff1a;《C从入门到放弃》 &#x1f466;个人简介&#xff1a;一名双非编程菜鸟&#xff0c;在这里分享自己的编程学习笔记&#xff0c;欢迎大家的指正与点赞&#xff0c;谢谢&#xff01; 类和对…...

白盒测试重点复习内容

白盒测试白盒测试之逻辑覆盖法逻辑覆盖用例设计方法1.语句覆盖2.判定覆盖(分支覆盖)3.条件覆盖4.判定条件覆盖5.条件组合覆盖6.路径覆盖白盒测试之基本路径测试法基本路径测试方法的步骤1.根据程序流程图画控制流图2.计算圈复杂度3.导出测试用例4.准备测试用例5.例题白盒测试总…...

【13】linux命令每日分享——groupadd建立组

大家好&#xff0c;这里是sdust-vrlab&#xff0c;Linux是一种免费使用和自由传播的类UNIX操作系统&#xff0c;Linux的基本思想有两点&#xff1a;一切都是文件&#xff1b;每个文件都有确定的用途&#xff1b;linux涉及到IT行业的方方面面&#xff0c;在我们日常的学习中&…...

从供热管道泄漏模拟出发,聊聊Fluent中那些容易被忽略的‘粘性模型’选择细节

从供热管道泄漏模拟看Fluent粘性模型选择的工程智慧 供热管道泄漏事故的数值模拟一直是市政工程中的难点——当高温高压流体从破损处喷涌而出时&#xff0c;流动形态会经历从管道内湍流到自由射流的复杂转变。这种多尺度流动对湍流模型的选择提出了严苛考验&#xff0c;而大多数…...

MusePublic Art Studio实际效果:UI设计稿生成中组件一致性保障

MusePublic Art Studio实际效果&#xff1a;UI设计稿生成中组件一致性保障 1. 引言&#xff1a;当AI成为你的UI设计搭档 想象一下这个场景&#xff1a;你正在为一个新的移动应用设计UI界面。你已经画好了登录页的草图&#xff0c;上面有圆角按钮、卡片式布局和一套清爽的配色…...

Qwen-Image-2512实现Python爬虫数据自动化处理:电商图片批量生成方案

Qwen-Image-2512实现Python爬虫数据自动化处理&#xff1a;电商图片批量生成方案 1. 引言 如果你是做电商的&#xff0c;或者负责过电商运营&#xff0c;肯定遇到过这个头疼的问题&#xff1a;上架一个新商品&#xff0c;或者给一批老商品换季上新&#xff0c;需要准备大量的…...

4个让OneNote效率倍增的开源效率工具:Markdown全功能增强方案

4个让OneNote效率倍增的开源效率工具&#xff1a;Markdown全功能增强方案 【免费下载链接】NoteWidget Markdown add-in for Microsoft Office OneNote 项目地址: https://gitcode.com/gh_mirrors/no/NoteWidget 一、问题发现&#xff1a;OneNote的专业创作短板与解决方…...

Drizzle ORM性能优化终极指南:查询优化与缓存策略详解

Drizzle ORM性能优化终极指南&#xff1a;查询优化与缓存策略详解 【免费下载链接】drizzle-orm drizzle-team/drizzle-orm: 是一个基于 C 的 ORM&#xff08;对象关系映射&#xff09;库&#xff0c;支持 MySQL 和 SQLite 数据库。适合对 C、数据库开发以及想要使用轻量级 ORM…...

墨语灵犀开源模型生态:对接LangChain/RAG构建专属翻译知识库

墨语灵犀开源模型生态&#xff1a;对接LangChain/RAG构建专属翻译知识库 1. 引言&#xff1a;当古典美学遇见现代AI架构 在人工智能技术快速发展的今天&#xff0c;翻译工具已经从简单的词汇转换演变为理解文化语境和语义深度的智能系统。「墨语灵犀」作为基于腾讯混元大模型…...

5年java开发经验总结面试题-内含完整答案

1、讲讲IO里面的常见类&#xff0c;字节流、字符流、接口、实现类、方法阻塞。 文件字节输入输出流 FileInputStream/FileOutputStream&#xff0c; 文件字符流 FileReader/FileWriter 包装流PrintStream/PrintWriter/Scanner 字符串输入输出流StringReader/StringWriter 转换流…...

Ludusavi:你的游戏进度守护神,三分钟搞定跨平台存档备份

Ludusavi&#xff1a;你的游戏进度守护神&#xff0c;三分钟搞定跨平台存档备份 【免费下载链接】ludusavi Backup tool for PC game saves 项目地址: https://gitcode.com/gh_mirrors/lu/ludusavi 你是否曾在电脑崩溃后&#xff0c;发现数百小时的游戏进度瞬间归零&…...

RAG深度解析一:从参数化知识到检索增强的范式重构

【内容定位】深度技术原理【文章日期】2026-03-27【场景引入】进入2026年3月&#xff0c;一场围绕大语言模型“可信性”的讨论在技术社区再度升温。开发者们早已不再争论模型参数量&#xff0c;而是转向一个更实际的问题&#xff1a;如何让动辄千亿参数的大模型&#xff0c;在回…...

轻量级AI写作工坊:OpenClaw+nanobot内容创作流

轻量级AI写作工坊&#xff1a;OpenClawnanobot内容创作流 1. 为什么需要自动化写作助手 作为一名技术博主兼自媒体运营者&#xff0c;我每天都要面对内容创作的"三重压力"&#xff1a;选题焦虑、写作耗时、发布繁琐。最痛苦的是&#xff0c;当我花两小时写完一篇技…...