当前位置: 首页 > news >正文

kafka入门(一):kafka消息发送与消费

kafka的基础概念

  • Producer (消息生产者)
    向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

  • Consumer (消息消费者)
    订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

  • Consumer Group (消费者组)

每个消费者属于一个特定的消费者群组(可为每个消费者指定group name,若不指定group name则属于默认的group)。

每个消费者群组都有一个唯一的GroupId。

  • Brokers(kafka服务器)

一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

一个broker可以容纳多个topic。每个broker都有各自的broker.id。

  • Topics(主题)

消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。

  • Partition(分区)

主题(Topic)可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,

由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序。

安装kafka,创建 topic:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.10.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>

kafka生产者示例:

按以下步骤发送消息:

  • 设置 broker服务器的ip和端口
  • 生产者初始化
  • 发送消息
public class KafkaDemoProducer {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static void main(String[] args) {//属性配置Properties properties = getProperties(BROKER_LIST);//生产者初始化KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");//发送消息try {producer.send(record);} catch (Exception e) {System.out.println("send error." + e);}producer.close();}private static Properties getProperties(String brokerList) {Properties properties = new Properties();properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", brokerList);return properties;}}

kafka消费者示例:

主要按照以下步骤:

  • 设置 broker服务器的ip和端口, 设置 消费者群组id

  • 初始化消费者

  • 消费者订阅主题

  • 消费者批量拉取消息

public class KafkaDemoConsumer {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {consumerRecord();}public static void consumerRecord() {//属性配置Properties properties = getProperties(BROKER_LIST, GROUP_ID);//消费者初始化KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//消息者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));//循环while (true) {//每次拉取 1千条消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("=============> 消费kafka消息:"+ record.value());}}}public static Properties getProperties(String brokerList, String groupId) {Properties properties = new Properties();//序列化properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//broker服务器的ip和端口,多个用逗号隔开properties.put("bootstrap.servers", brokerList);//消费者群组idproperties.put("group.id", groupId);return properties;}}

观察idea 控制台,可以看到 成功消费了消息:

=============> 消费kafka消息:hello kafka

参考资料:

《深入理解kafka 核心设计与实践原理》
kafka的简单理解

相关文章:

kafka入门(一):kafka消息发送与消费

kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer)&#xff0c;生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer)&#xff0c;消费者用于处理生产者产生的消息。 Co…...

CMap数据库筛选化学药物

数据库clue.io 文献链接&#xff1a;连接图谱&#xff1a;使用基因表达特征连接小分子、基因和疾病 |科学 (science.org) 基本模式&#xff1a;利用CMap将差异基因列表与数据库参考数据集比对&#xff1b;根据差异表达基因在参考基因表达谱富集情况得到一个相关性分数&#…...

mysql命令行(mysql-client)连接数据库

有时项目连接不上数据库&#xff0c;报错鉴权失败&#xff0c;先用mysql工具连接下&#xff0c;容易发现问题。 直接输入mysql看是否已安装&#xff0c;如果没有就安装下。 yum -y install mysql-client; 这个名称一直记不准&#xff0c;有时记为mysql-cli&#xff0c;结果发现…...

sklearn中的TfidfTransformer和gensim中的TfidfModel的区别

sklearn.feature_extraction.text.TfidfTransformer 和 gensim.models.TfidfModel 都是用于计算文本数据的 TF-IDF 值的工具。它们的主要区别在于实现方式和输入数据的格式。 1、实现方式和输入数据格式&#xff1a; TfidfTransformer 是 scikit-learn 中的一个类&#xff0c;…...

spring注解

spring注解 Configuration 用于标注配置类Bean 结合Configuration&#xff08;full mode&#xff09;使用或结合Component&#xff08;light mode&#xff09;使用。可以导入第三方组件,入方法有参数默认从IOC容器中获取&#xff0c;可以指定initMethod和destroyMethod 指定初…...

SpringCloud实用篇02

SpringCloud实用篇02 0.学习目标 1.Nacos配置管理 Nacos除了可以做注册中心&#xff0c;同样可以做配置管理来使用。 1.1.统一配置管理 当微服务部署的实例越来越多&#xff0c;达到数十、数百时&#xff0c;逐个修改微服务配置就会让人抓狂&#xff0c;而且很容易出错。我…...

Nginx快速入门教程,域名转发、负载均衡

1.Nginx简介 Nginx是⽬前最流⾏的Web服务器&#xff0c; 最开始是由⼀个叫做igor的俄罗斯的程序员开发的&#xff0c; 2019年3⽉11⽇被美国的F5公司以6.7亿美元的价格收购&#xff0c; 现在Nginx是F5公司旗下的⼀款产品了。 2.Nginx的版本 Nginx开源版本主要分为两种&#x…...

ElasticSearch之健康状态

参考Cluster health API。 命令样例&#xff0c;如下&#xff1a; curl -X GET "https://localhost:9200/_cluster/health?wait_for_statusyellow&timeout50s&pretty" --cacert $ES_HOME/config/certs/http_ca.crt -u "elastic:ohCxPHQBEs5*lo7F9&qu…...

java io流中为什么使用缓冲流就能加快文件读写速度

FileInputStream的read方法底层确实是通过调用JDK层面的read方法&#xff0c;并且这个JDK层面的read方法底层是使用C语言编写的&#xff0c;以实现高效的文件读取功能。但是它会涉及多次内核态与操作系统交互。当我们使用FileInputStream的read方法读取文件时&#xff0c;首先会…...

【鸿蒙最新全套教程】<HarmonyOS第一课>1、运行Hello World

下载与安装DevEco Studio 在HarmonyOS应用开发学习之前&#xff0c;需要进行一些准备工作&#xff0c;首先需要完成开发工具DevEco Studio的下载与安装以及环境配置。 进入DevEco Studio下载官网&#xff0c;单击“立即下载”进入下载页面。 DevEco Studio提供了Windows版本和…...

求二叉树中指定节点所在的层数(可运行)

运行环境.cpp 我这里设置的是查字符e的层数&#xff0c;大家可以在main函数里改成自己想查的字符。&#xff08;输入的字符一定是自己树里有的&#xff09;。 如果没有输出结果&#xff0c;一定是建树错误&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&…...

Ubuntu18 Opencv3.4.12 viz 3D显示安装、编译、移植

Opencv3.*主模块默认包括两个3D库 calib3d用于相机校准和三维重建 &#xff0c;viz用于三维图像显示&#xff0c;其中viz是cmake选配。 参考&#xff1a; https://docs.opencv.org/3.4.12/index.html 下载linux版本的源码 sources。 查看cmake apt list --installed | grep…...

EPSon打印机更换色带

1、打印机色带拆装视频 打印机色带更换 2、色带盒四周有多个卡扣&#xff0c;需从右到左依次轻微用力掰开&#xff0c;使盖板与盒体脱离&#xff0c;注意不要掰断卡扣。 3、如何将色带放入打印机色带盒&#xff1f; A、色带放入盒体时不可打乱打结&#xff0c;以免卡带&#x…...

电脑游戏录屏软件,记录游戏高光时刻

电脑游戏录制是游戏爱好者分享游戏乐趣、技巧和成就的绝佳方式&#xff0c;此时&#xff0c;一款好用的录屏软件就显得尤为重要。本文将为大家介绍三款电脑游戏录屏软件&#xff0c;通过对这三款软件的分步骤详细介绍&#xff0c;让大家更加了解它们的特点及使用方法。 电脑游戏…...

Hadoop性能调优建议

一、服务器配置 1. BIOS配置&#xff1a; 关闭smmu/关闭cpu预取/performance策略 2. 硬盘优化 raid0 打卡cache /jbod scheduler/sector_size/read_ahead_kb 3. 网卡优化 rx_buff/ring_buffer/lro/中断绑核/驱动升级 4. 内存插法&#xff1a;要用均衡插法…...

算法的奥秘:常见的六种算法(算法导论笔记2)

算法的奥秘&#xff1a;种类、特性及应用详解&#xff08;算法导论笔记1&#xff09; 上期总结算法的种类和大致介绍&#xff0c;这一期主要讲常见的六种算法详解以及演示。 排序算法&#xff1a; 排序算法是一类用于对一组数据元素进行排序的算法。根据不同的排序方式和时间复…...

Python算法——树的路径和算法

Python算法——树的路径和算法 树的路径和算法是一种在树结构中寻找从根节点到叶节点的所有路径&#xff0c;其路径上的节点值之和等于给定目标值的算法。这种算法可以用Python语言实现&#xff0c;本文将介绍如何使用Python编写树的路径和算法&#xff0c;并给出一些示例代码…...

数据结构之链表练习与习题详细解析

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 数据结构初阶 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力&#xff0c;一起奔赴大厂。 目录 1.前言 2.习题解…...

QT中使用unity

qt把unity发入widget中 头文件showunitywindowsinqt #ifndef SHOWUNITYWINDOWSINQT_H #define SHOWUNITYWINDOWSINQT_H #include <QObject> #include <QProcess> #include <windows.h> #include <winuser.h> #include <QDebug> class ShowUnity…...

QTableView/QTableWidget设置单元格字体颜色及背景色

1.QTableView设置单元格字体颜色及背景色 QStandardItem * pItem new QStandardItem("AAA"); pItem->setBackground(QBrush(Qt::blue)); // 设置背景色 pItem->setForeground(QBrush(Qt::red)); // 设置字体颜色 2.QTableWidget设置单元格字…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

【HTTP三个基础问题】

面试官您好&#xff01;HTTP是超文本传输协议&#xff0c;是互联网上客户端和服务器之间传输超文本数据&#xff08;比如文字、图片、音频、视频等&#xff09;的核心协议&#xff0c;当前互联网应用最广泛的版本是HTTP1.1&#xff0c;它基于经典的C/S模型&#xff0c;也就是客…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

Chrome 浏览器前端与客户端双向通信实战

Chrome 前端&#xff08;即页面 JS / Web UI&#xff09;与客户端&#xff08;C 后端&#xff09;的交互机制&#xff0c;是 Chromium 架构中非常核心的一环。下面我将按常见场景&#xff0c;从通道、流程、技术栈几个角度做一套完整的分析&#xff0c;特别适合你这种在分析和改…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用

前言&#xff1a;我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM&#xff08;Java Virtual Machine&#xff09;让"一次编写&#xff0c;到处运行"成为可能。这个软件层面的虚拟化让我着迷&#xff0c;但直到后来接触VMware和Doc…...

【java面试】微服务篇

【java面试】微服务篇 一、总体框架二、Springcloud&#xff08;一&#xff09;Springcloud五大组件&#xff08;二&#xff09;服务注册和发现1、Eureka2、Nacos &#xff08;三&#xff09;负载均衡1、Ribbon负载均衡流程2、Ribbon负载均衡策略3、自定义负载均衡策略4、总结 …...