kafka入门(一):kafka消息发送与消费
kafka的基础概念
-
Producer (消息生产者)
向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 -
Consumer (消息消费者)
订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 -
Consumer Group (消费者组)
每个消费者属于一个特定的消费者群组(可为每个消费者指定group name,若不指定group name则属于默认的group)。
每个消费者群组都有一个唯一的GroupId。
- Brokers(kafka服务器)
一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。
一个broker可以容纳多个topic。每个broker都有各自的broker.id。
- Topics(主题)
消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。
- Partition(分区)
主题(Topic)可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,
由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序。
安装kafka,创建 topic:
Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851
Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353
添加依赖包:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.10.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>
kafka生产者示例:
按以下步骤发送消息:
- 设置 broker服务器的ip和端口
- 生产者初始化
- 发送消息
public class KafkaDemoProducer {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static void main(String[] args) {//属性配置Properties properties = getProperties(BROKER_LIST);//生产者初始化KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");//发送消息try {producer.send(record);} catch (Exception e) {System.out.println("send error." + e);}producer.close();}private static Properties getProperties(String brokerList) {Properties properties = new Properties();properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", brokerList);return properties;}}
kafka消费者示例:
主要按照以下步骤:
-
设置 broker服务器的ip和端口, 设置 消费者群组id
-
初始化消费者
-
消费者订阅主题
-
消费者批量拉取消息
public class KafkaDemoConsumer {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {consumerRecord();}public static void consumerRecord() {//属性配置Properties properties = getProperties(BROKER_LIST, GROUP_ID);//消费者初始化KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//消息者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));//循环while (true) {//每次拉取 1千条消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("=============> 消费kafka消息:"+ record.value());}}}public static Properties getProperties(String brokerList, String groupId) {Properties properties = new Properties();//序列化properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//broker服务器的ip和端口,多个用逗号隔开properties.put("bootstrap.servers", brokerList);//消费者群组idproperties.put("group.id", groupId);return properties;}}
观察idea 控制台,可以看到 成功消费了消息:
=============> 消费kafka消息:hello kafka
参考资料:
《深入理解kafka 核心设计与实践原理》
kafka的简单理解
相关文章:
kafka入门(一):kafka消息发送与消费
kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 Co…...
CMap数据库筛选化学药物
数据库clue.io 文献链接:连接图谱:使用基因表达特征连接小分子、基因和疾病 |科学 (science.org) 基本模式:利用CMap将差异基因列表与数据库参考数据集比对;根据差异表达基因在参考基因表达谱富集情况得到一个相关性分数&#…...
mysql命令行(mysql-client)连接数据库
有时项目连接不上数据库,报错鉴权失败,先用mysql工具连接下,容易发现问题。 直接输入mysql看是否已安装,如果没有就安装下。 yum -y install mysql-client; 这个名称一直记不准,有时记为mysql-cli,结果发现…...
sklearn中的TfidfTransformer和gensim中的TfidfModel的区别
sklearn.feature_extraction.text.TfidfTransformer 和 gensim.models.TfidfModel 都是用于计算文本数据的 TF-IDF 值的工具。它们的主要区别在于实现方式和输入数据的格式。 1、实现方式和输入数据格式: TfidfTransformer 是 scikit-learn 中的一个类,…...
spring注解
spring注解 Configuration 用于标注配置类Bean 结合Configuration(full mode)使用或结合Component(light mode)使用。可以导入第三方组件,入方法有参数默认从IOC容器中获取,可以指定initMethod和destroyMethod 指定初…...
SpringCloud实用篇02
SpringCloud实用篇02 0.学习目标 1.Nacos配置管理 Nacos除了可以做注册中心,同样可以做配置管理来使用。 1.1.统一配置管理 当微服务部署的实例越来越多,达到数十、数百时,逐个修改微服务配置就会让人抓狂,而且很容易出错。我…...
Nginx快速入门教程,域名转发、负载均衡
1.Nginx简介 Nginx是⽬前最流⾏的Web服务器, 最开始是由⼀个叫做igor的俄罗斯的程序员开发的, 2019年3⽉11⽇被美国的F5公司以6.7亿美元的价格收购, 现在Nginx是F5公司旗下的⼀款产品了。 2.Nginx的版本 Nginx开源版本主要分为两种&#x…...
ElasticSearch之健康状态
参考Cluster health API。 命令样例,如下: curl -X GET "https://localhost:9200/_cluster/health?wait_for_statusyellow&timeout50s&pretty" --cacert $ES_HOME/config/certs/http_ca.crt -u "elastic:ohCxPHQBEs5*lo7F9&qu…...
java io流中为什么使用缓冲流就能加快文件读写速度
FileInputStream的read方法底层确实是通过调用JDK层面的read方法,并且这个JDK层面的read方法底层是使用C语言编写的,以实现高效的文件读取功能。但是它会涉及多次内核态与操作系统交互。当我们使用FileInputStream的read方法读取文件时,首先会…...
【鸿蒙最新全套教程】<HarmonyOS第一课>1、运行Hello World
下载与安装DevEco Studio 在HarmonyOS应用开发学习之前,需要进行一些准备工作,首先需要完成开发工具DevEco Studio的下载与安装以及环境配置。 进入DevEco Studio下载官网,单击“立即下载”进入下载页面。 DevEco Studio提供了Windows版本和…...
求二叉树中指定节点所在的层数(可运行)
运行环境.cpp 我这里设置的是查字符e的层数,大家可以在main函数里改成自己想查的字符。(输入的字符一定是自己树里有的)。 如果没有输出结果,一定是建树错误!!!!!&…...
Ubuntu18 Opencv3.4.12 viz 3D显示安装、编译、移植
Opencv3.*主模块默认包括两个3D库 calib3d用于相机校准和三维重建 ,viz用于三维图像显示,其中viz是cmake选配。 参考: https://docs.opencv.org/3.4.12/index.html 下载linux版本的源码 sources。 查看cmake apt list --installed | grep…...
EPSon打印机更换色带
1、打印机色带拆装视频 打印机色带更换 2、色带盒四周有多个卡扣,需从右到左依次轻微用力掰开,使盖板与盒体脱离,注意不要掰断卡扣。 3、如何将色带放入打印机色带盒? A、色带放入盒体时不可打乱打结,以免卡带&#x…...
电脑游戏录屏软件,记录游戏高光时刻
电脑游戏录制是游戏爱好者分享游戏乐趣、技巧和成就的绝佳方式,此时,一款好用的录屏软件就显得尤为重要。本文将为大家介绍三款电脑游戏录屏软件,通过对这三款软件的分步骤详细介绍,让大家更加了解它们的特点及使用方法。 电脑游戏…...
Hadoop性能调优建议
一、服务器配置 1. BIOS配置: 关闭smmu/关闭cpu预取/performance策略 2. 硬盘优化 raid0 打卡cache /jbod scheduler/sector_size/read_ahead_kb 3. 网卡优化 rx_buff/ring_buffer/lro/中断绑核/驱动升级 4. 内存插法:要用均衡插法…...
算法的奥秘:常见的六种算法(算法导论笔记2)
算法的奥秘:种类、特性及应用详解(算法导论笔记1) 上期总结算法的种类和大致介绍,这一期主要讲常见的六种算法详解以及演示。 排序算法: 排序算法是一类用于对一组数据元素进行排序的算法。根据不同的排序方式和时间复…...
Python算法——树的路径和算法
Python算法——树的路径和算法 树的路径和算法是一种在树结构中寻找从根节点到叶节点的所有路径,其路径上的节点值之和等于给定目标值的算法。这种算法可以用Python语言实现,本文将介绍如何使用Python编写树的路径和算法,并给出一些示例代码…...
数据结构之链表练习与习题详细解析
个人主页:点我进入主页 专栏分类:C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 数据结构初阶 欢迎大家点赞,评论,收藏。 一起努力,一起奔赴大厂。 目录 1.前言 2.习题解…...
QT中使用unity
qt把unity发入widget中 头文件showunitywindowsinqt #ifndef SHOWUNITYWINDOWSINQT_H #define SHOWUNITYWINDOWSINQT_H #include <QObject> #include <QProcess> #include <windows.h> #include <winuser.h> #include <QDebug> class ShowUnity…...
QTableView/QTableWidget设置单元格字体颜色及背景色
1.QTableView设置单元格字体颜色及背景色 QStandardItem * pItem new QStandardItem("AAA"); pItem->setBackground(QBrush(Qt::blue)); // 设置背景色 pItem->setForeground(QBrush(Qt::red)); // 设置字体颜色 2.QTableWidget设置单元格字…...
深入eMios时钟树:从160MHz CORE_CLK到通道定时,搞懂S32K3xx系列性能调优基础
深入eMios时钟树:从160MHz CORE_CLK到通道定时,搞懂S32K3xx系列性能调优基础 在汽车电子和工业控制领域,定时精度往往直接决定系统性能的上限。当工程师面对S32K3xx系列MCU时,eMios模块的时钟配置就像一把双刃剑——用得好可以精准…...
Scroll Reverser终极指南:5分钟解决macOS多设备滚动混乱难题
Scroll Reverser终极指南:5分钟解决macOS多设备滚动混乱难题 【免费下载链接】Scroll-Reverser Per-device scrolling prefs on macOS. 项目地址: https://gitcode.com/gh_mirrors/sc/Scroll-Reverser 你是否曾在MacBook上同时使用触控板和外接鼠标时&#x…...
AISMM模型与投资回报分析,深度拆解头部金融机构私有化调参逻辑及动态敏感性阈值矩阵
更多请点击: https://intelliparadigm.com 第一章:AISMM模型与投资回报分析 AISMM(Artificial Intelligence Strategy Maturity Model)是一种面向企业AI战略落地的五阶成熟度评估框架,涵盖意识层、数据层、算法层、系…...
wechatbot云端微信SAAS框架使用教程,轻松实现微信登录,微信消息调度,微信群管理,微信联系人管理,定时任务!
1. 登录模块(首次使用) 这是使用系统的第一步,核心流程如下: 获取二维码:调用 POST /getLoginQrCode 接口。您需要传入 AUTHORIZATION(从官网获取)、设备类型(type,推荐 …...
游戏服务器容器化部署:基于Docker的Archon镜像实战指南
1. 项目概述:一个为游戏服务器量身定制的容器化部署方案如果你和我一样,曾经被游戏服务器的部署、迁移和运维搞得焦头烂额,那么看到SufficientDaikon/archon这个项目,你可能会和我当初一样眼前一亮。这本质上是一个为特定游戏&…...
自动驾驶仿真训练平台SIMSCALE的技术解析与应用实践
1. 项目背景与核心价值去年参与某自动驾驶研发项目时,我们团队遇到了真实路测成本高、极端场景覆盖难的问题。当时每天要花费数万元进行车队路测,但遇到暴雨天气或特殊交通状况时,数据采集效率直线下降。正是这种困境让我开始关注仿真技术在自…...
基于RAG的本地代码知识库构建:CodeQAI部署与实战指南
1. 项目概述:当AI代码助手遇见本地知识库最近在折腾一个挺有意思的项目,叫fynnfluegge/codeqai。简单来说,它不是一个传统的代码生成工具,而是一个能让你用自然语言“盘问”自己代码库的智能助手。想象一下,你接手了一…...
SmartOnmyoji:阴阳师全自动代肝脚本的终极解决方案
SmartOnmyoji:阴阳师全自动代肝脚本的终极解决方案 【免费下载链接】SmartOnmyoji 阴阳师后台代肝脚本,支持所有类似阴阳师的卡牌游戏(点点点游戏)自动找图-点击…(支持后台运行、支持多开、支持模拟器) …...
pocketclaw:轻量级网页抓取工具,配置驱动与无头浏览器实战
1. 项目概述:一个轻量级、高可用的网页内容抓取工具最近在做一个需要批量获取网页结构化数据的项目,找了一圈现成的爬虫框架,要么太重,要么配置太复杂,要么对动态渲染页面的支持不够友好。直到我发现了PYXXXX/pocketcl…...
基于skill-mcp-builder快速构建生产级MCP服务器:从协议到实践
1. 项目概述与核心价值如果你正在为AI助手(比如Claude Code、Cursor、或是Gemini CLI)开发工具,并且厌倦了为每个平台重复编写适配代码,那么你很可能已经听说过Model Context Protocol。MCP,你可以把它理解为AI工具领域…...
