当前位置: 首页 > news >正文

Kafka:安装和配置

 producer:发布消息的对象,称为消息产生者 (Kafka topic producer)

topic:Kafka将消息分门别类,每一个消息称为一个主题(topic)

consumer:订阅消息并处理发布消息的对象称为消费者(consumer)

broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker),消费者(consumer)可以订阅一个或者多个主题(topic),并从broker中拉取数据,从而消费这些已发布的信息。

1、Kafka对zookeeper是一个强依赖,保存Kafka相关的节点数据,所以安装kafka之前要先安装zookeeper

下载镜像

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

下载镜像

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

2、入门案例

①创建kafka-demo工程并引入依赖

        <!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

②创建ProducerQuickStart生产者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*生产者*/
public class ProducerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、生产对象*/KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送消息的对象ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");/*3、发送消息*/producer.send(record);/*4、关闭通道,负责消息发送不成功*/producer.close();}
}

③创建ConsumerQuickStart消费者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、消费者对象*/KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);/*3、订阅主题*/consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程处于一直监听状态while (true){//4、获取消息ConsumerRecords<String,String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.println(record.key());System.out.println(record.value());}}}
}

④运行测试

        成功接收到消息

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)

  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

下一篇: springboot集成kafka收发消息

相关文章:

Kafka:安装和配置

producer&#xff1a;发布消息的对象&#xff0c;称为消息产生者 &#xff08;Kafka topic producer&#xff09; topic&#xff1a;Kafka将消息分门别类&#xff0c;每一个消息称为一个主题&#xff08;topic&#xff09; consumer&#xff1a;订阅消息并处理发布消息的对象…...

786. 第k个数

文章目录 QuestionIdeasCode Question 给定一个长度为 n 的整数数列&#xff0c;以及一个整数 k &#xff0c;请用快速选择算法求出数列从小到大排序后的第 k 个数。 输入格式 第一行包含两个整数 n 和 k 。 第二行包含 n 个整数&#xff08;所有整数均在 1∼109 范围内&…...

用友-NC-Cloud远程代码执行漏洞[2023-HW]

用友-NC-Cloud远程代码执行漏洞[2023-HW] 一、漏洞介绍二、资产搜索三、漏洞复现PoC小龙POC检测脚本: 四、修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#…...

Transformer(二)(VIT,TNT)(基于视觉CV)

目录 1.视觉中的Attention 2.VIT框架&#xff08;图像分类&#xff0c;不需要decoder&#xff09; 2.1整体框架 2.2.CNN和Transformer遇到的问题 2.3.1CNN 2.3.2Transformer 2.3.3二者对比 2.4.公式理解 3TNT 参考文献 1.视觉中的Attention 对于人类而言看到一幅图可以立…...

Scratch 详解 之 线性→代数之——求两线段交点坐标

可能有人要问&#xff1a;求交点坐标有什么用呢&#xff1f;而且为啥要用线代来求&#xff1f;直线方程不行吗&#xff1f;&#xff1f;&#xff1f; 这个问题&#xff0c;我只能说&#xff0c;直线方程计算的次数过多了&#xff0c;而且动不动就要考虑线的方向&#xff0c;90的…...

Python-组合数据类型

今天要介绍的是Python的组合数据类型 整理不易&#xff0c;希望得到大家的支持&#xff0c;欢迎各位读者评论点赞收藏 感谢&#xff01; 目录 知识点知识导图1、组合数据类型的基本概念1.1 组合数据类型1.2 集合类型概述1.3 序列类型概述1.4 映射类型概述 2、列表类型2.1 列表的…...

vue3+vue-simple-uploader实现大文件上传

vue-simple-uploader本身是基于vue2实现,如果要使用vue3会报错。如何在vue3中使用,可参考我的另一篇文章:解决vue3中不能使用vue-simple-uploader__Jyann_的博客-CSDN博客 一.实现思路 使用vue-simple-uploader组件的uploader组件,设置自动上传为false,即可开启手动上传。…...

自适应变异麻雀搜索算法及其Matlab实现

麻雀搜索算法( sparrow search algorithm&#xff0c;SSA) 是2020 年新提出的一种元启发式算法[1]&#xff0c;它是受麻雀种群的觅食和反捕食行为启发&#xff0c;将搜索群体分为发现者、加入者和侦察者 3 部分&#xff0c;其相互分工寻找最优值&#xff0c;通过 19 个标准测试…...

ETL技术入门之ETLCloud初认识

首先ETL是什么&#xff1f; ETL代表“Extract, Transform, Load”&#xff0c;是一种用于数据集成和转换的过程。它在数据管理和分析中扮演着重要的角色。下面我们将分解每个步骤&#xff1a; Extract&#xff08;抽取&#xff09;&#xff1a; 这一步骤涉及从多个不同的数据源…...

uniapp项目如何运行在微信小程序模拟器上

在HbuilderX中的小程序写完后自己一定要保存&#xff0c;否则会出不来效果 那么怎么让uniapp项目运行在微信小程序开发工具中呢 1 在hbuilderx中点击运行到小程序模拟器 2 然后在项目目录中会生成一个文件夹 在微信小程序开发软件中的工具>安全设置>打开端口 或者在微…...

数据挖掘全流程解析

数据挖掘全流程解析 数据指标选择 在这一阶段&#xff0c;使用直方图和柱状图的方式对数据进行分析&#xff0c;观察什么数据属性对于因变量会产生更加明显的结果。 如何绘制直方图和条形统计图 数据清洗 观察数据是否存在数据缺失或者离群点的情况。 数据异常的两种情况…...

详细介绍如何对音乐信息进行检索和音频节拍跟踪

在本文中,我们将了解节拍的概念,以及我们在尝试跟踪节拍时面临的挑战。然后我们将介绍解决问题的方法以及业界最先进的解决方案。 介绍 音乐就在我们身边。每当我们听到任何与我们的心灵和思想相关的音乐时,我们就会迷失其中。我们下意识地随着听到的节拍而敲击。您一定已…...

Java课题笔记~ HTTP协议(请求和响应)

Servlet最主要的作用就是处理客户端请求&#xff0c;并向客户端做出响应。为此&#xff0c;针对Servlet的每次请求&#xff0c;Web服务器在调用service()方法之前&#xff0c;都会创建两个对象 分别是HttpServletRequest和HttpServletResponse。 其中HttpServletRequest用于封…...

在x86下运行的Ubuntu系统上部署QEMU用于模拟RISC-V硬件环境

1.配置工作环境 sudo apt install gcc bison flex libncurses-dev ninja-build \pkg-config build-essential zlib1g-dev pkg-config libglib2.0-dev \binutils-dev libboost-all-dev autoconf libtool libssl-dev \libpixman-1-dev python-capstone virtualenv software-prop…...

网络爬虫选择代理IP的标准

Hey&#xff0c;小伙伴们&#xff01;作为一家http代理产品供应商&#xff0c;我知道网络爬虫在选择代理IP时可能会遇到些问题&#xff0c;毕竟市面上有很多选择。别担心&#xff01;今天我要给大家分享一些实用的建议&#xff0c;帮助你们选择适合网络爬虫的代理IP。一起来看看…...

RxJava 复刻简版之三,map 多次中转数据

案例代码&#xff1a;https://gitee.com/bobidali/lite-rx-java/commit/292e9227a5491f7ec6a07f395292ef8e6ff69290 RxJava 的调用第一步是封装了观察者接受了数据的处理&#xff0c;进一步就是使用 map 将数据操作传递给上下游 1、类似Observer.create 创建一个简单的观察者…...

06 Word2Vec模型(第一个专门做词向量的模型,CBOW和Skip-gram)

博客配套视频链接: https://space.bilibili.com/383551518?spm_id_from=333.1007.0.0 b 站直接看 配套 github 链接:https://github.com/nickchen121/Pre-training-language-model 配套博客链接:https://www.cnblogs.com/nickchen121/p/15105048.html 神经网络语言模型(NNL…...

Axure RP9小白安装教程

第一步&#xff1a; 打开&#xff1a;Axure中文学习网 第二步&#xff1a; 鼠标移动软件下载&#xff0c;点击Axure RP 9下载既可 第三步&#xff1a; 注意&#xff1a;Axure RP 9 MAC正式版为苹果版本&#xff0c;Axure RP 9 WIN正式版为Windows版本 中文汉化包&#xff…...

腾讯云CVM服务器2核2g1m带宽支持多少人访问?

腾讯云2核2g1m的服务器支持多少人同时访问&#xff1f;2核2g1m云服务器短板是在1M公网带宽上&#xff0c;腾讯云服务器网以网站应用为例&#xff0c;当大规模用户同时访问网站时&#xff0c;很大概率会卡在公网带宽上&#xff0c;所以压根就谈不上2核2G的CPU内存计算性能是否够…...

8.12学习笔记

在PyTorch中&#xff0c;Dataset和DataLoader是用于处理数据的两个重要类。Dataset类是一个抽象类&#xff0c;用于表示数据集。它的主要作用是将数据加载到内存中&#xff0c;并提供一种统一的方式来访问数据。为了使用Dataset类&#xff0c;你需要继承它并实现两个方法&#…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案

问题描述&#xff1a;iview使用table 中type: "index",分页之后 &#xff0c;索引还是从1开始&#xff0c;试过绑定后台返回数据的id, 这种方法可行&#xff0c;就是后台返回数据的每个页面id都不完全是按照从1开始的升序&#xff0c;因此百度了下&#xff0c;找到了…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

HTML前端开发:JavaScript 获取元素方法详解

作为前端开发者&#xff0c;高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法&#xff0c;分为两大系列&#xff1a; 一、getElementBy... 系列 传统方法&#xff0c;直接通过 DOM 接口访问&#xff0c;返回动态集合&#xff08;元素变化会实时更新&#xff09;。…...

协议转换利器,profinet转ethercat网关的两大派系,各有千秋

随着工业以太网的发展&#xff0c;其高效、便捷、协议开放、易于冗余等诸多优点&#xff0c;被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口&#xff0c;具有实时性、开放性&#xff0c;使用TCP/IP和IT标准&#xff0c;符合基于工业以太网的…...

Linux 下 DMA 内存映射浅析

序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存&#xff0c;但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程&#xff0c;可以参考这篇文章&#xff0c;我觉得写的非常…...

【笔记】结合 Conda任意创建和配置不同 Python 版本的双轨隔离的 Poetry 虚拟环境

如何结合 Conda 任意创建和配置不同 Python 版本的双轨隔离的Poetry 虚拟环境&#xff1f; 在 Python 开发中&#xff0c;为不同项目配置独立且适配的虚拟环境至关重要。结合 Conda 和 Poetry 工具&#xff0c;能高效创建不同 Python 版本的 Poetry 虚拟环境&#xff0c;接下来…...

后端下载限速(redis记录实时并发,bucket4j动态限速)

✅ 使用 Redis 记录 所有用户的实时并发下载数✅ 使用 Bucket4j 实现 全局下载速率限制&#xff08;动态&#xff09;✅ 支持 动态调整限速策略✅ 下载接口安全、稳定、可监控 &#x1f9e9; 整体架构概览 模块功能Redis存储全局并发数和带宽令牌桶状态Bucket4j Redis分布式限…...

【见合八方平面波导外腔激光器专题系列】用于干涉光纤传感的低噪声平面波导外腔激光器2

----翻译自Mazin Alalus等人的文章 摘要 1550 nm DWDM 平面波导外腔激光器具有低相位/频率噪声、窄线宽和低 RIN 等特点。该腔体包括一个半导体增益芯片和一个带布拉格光栅的平面光波电路波导&#xff0c;采用 14 引脚蝶形封装。这种平面波导外腔激光器设计用于在振动和恶劣的…...