当前位置: 首页 > news >正文

Kafka3.1部署和Topic主题数据生产与消费

文章目录

  • 前言
  • 一、Kafka3.1X版本在Windows11主机部署
  • 二、Kafk生产Topic主题数据
    • 1.kafka生产数据
    • 2.JAVA kafka客户端消费数据
  • 总结


前言

本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:

一、Kafka3.1X版本在Windows11主机部署

1.安装JDK配置环境变量

2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1

3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka
在这里插入图片描述
3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2

3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs

4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

  .\zkServer.cmd;

在这里插入图片描述
5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述

二、Kafk生产Topic主题数据

1.kafka生产数据

创建Topic主题heima

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.

查看Topic主题heima

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092  --topic heima

在这里插入图片描述
Topic主题heima生产数据

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima

在 > 符号后输入数据:

{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

在这里插入图片描述

2.JAVA kafka客户端消费数据

2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本

        <!-- kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.1</version></dependency>

2.2 JAVA数据读取文件

package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** Kafka服务器操作与数据读取*/
public class KafkaUtilDemo {public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);public static final Properties props = new Properties();
//    protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);public static void init(String kafakservers) {// 配置Kafka消费者属性props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");}/*** 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作* @param kafaktopic* @return*/public static String poll(String kafaktopic) {String msg = "";try {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(kafaktopic));log.info("Kafka消费者订阅指定主题,持续监听并处理消息");while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));for (ConsumerRecord<String, String> record : records) {log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());msg = record.value();if (!StringUtils.isBlank(record.value())) {JSONObject jsonObject = JSONObject.parseObject(record.value());String mobilePhone = jsonObject.getString("mobilePhone");if (StringUtils.isBlank(mobilePhone)) {log.error("Kafka消费者手机号mobilePhone为空");} else {KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();kafkaUtil.syncSystemInfoTask(jsonObject);}}}}} catch (Exception e) {log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());}return msg;}public boolean syncSystemInfoTask(JSONObject jsonObject) {boolean repsBln = true;try {String mobilePhone = jsonObject.getString("mobilePhone");String roleType = jsonObject.getString("roleType");String roleCode = jsonObject.getString("roleCode");log.info("业务数据同步操作................");} catch (Exception e) {repsBln = false;log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());}return repsBln;}public static void main(String[] args) {try {String kafakservers = "localhost:9092";String kafaktopic = "heima";init(kafakservers);poll(kafaktopic);} catch (Exception e) {log.error("error msg=" + e.getMessage());}}}

3 执行KafkaUtilDemo 文件,查看消费数据。
在这里插入图片描述

总结

pom.xml文件在引入spring-kafka 会由于版本问题出现


org.apache.kafka
kafka-clients
2.0.1

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.8.RELEASE</version></dependency>

相关文章:

Kafka3.1部署和Topic主题数据生产与消费

文章目录 前言一、Kafka3.1X版本在Windows11主机部署二、Kafk生产Topic主题数据1.kafka生产数据2.JAVA kafka客户端消费数据 总结 前言 本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用&#xff1a; 一、Kafka3.1X版本在Windows11主机部署 1.安装JDK配…...

ICIF2023化工展首亮相,宏工科技解决方案助力制造升级

ICIF China 2023中国国际化工展览会于9月4日-6日在上海新国际博览中心举办。宏工科技携化工物料处理一站式解决方案首次亮相&#xff0c;同化工行业全产业链共叙物料处理自动化未来。 宏工科技是一家提供物料处理自动化设备、系统与服务的国家级高新技术企业&#xff0c;业务覆…...

本地部署kubesphere集群

本地部署kubesphere集群 本文采用一主两从结构 1.前置硬件准备 准备最少3台机器&#xff0c;本人分配如下 IP&#xff1a;192.168.58.10 &#xff08;主&#xff09; 192.168.58.11 &#xff08;节点1&#xff09; 192.168.58.12 &#xff08;节点2&#xff09; 系统镜像…...

HNU小学期工训-STC15单片机模型大作业实验报告

STC15单片机模型大作业实验报告 全称&#xff1a;基于STC15单片机与OLED显示模块&PC端演示的多功能声光温振时钟智能手表模型 计科210X 甘晴void 202108010XXX 【请注意&#xff1a;本作业入选优秀范例&#xff0c;直接照抄源码有很大风险】 【建议理解原理之后作改动】 …...

【计算机网络】 TCP协议头相关知识点

文章目录 TCP协议头 TCP协议头 我们来看一下TCP协议头里都有什么东西&#xff0c;研究一下为什么TCP协议是可靠的呢 TCP协议可靠是因为在协议头里带着一些校验的数据 首先是源端口和目的端口&#xff0c;这两个是UDP中也有的&#xff0c;但是UDP中只有这两个&#xff0c;没有…...

深度学习相关VO梳理

相关论文 基于学习的VO 相关&#xff1a; DeepVO Towards End-to-End Visual Odometry with Deep Recurrent Convolutional Neural Networks&#xff08;ICRA&#xff0c;2017&#xff09; TartanVO: A Generalizable Learning-based VO(CoRL2021) SimVODIS: Simultaneous Vis…...

SpringMVC---CRUD实现

思路分析 搭建环境逆向生层对应的类&#xff08;model、mapper.xml、mapper.java&#xff09;编写业务逻辑层编写web层&#xff08;控制器&#xff09;前端页面 一、环境搭建 1.1、导入项目所需依赖(pom.xml) <project xmlns"http://maven.apache.org/POM/4.0.0"…...

vue+elementUI el-select 自定义搜索逻辑(filter-method)

下拉列表的默认搜索是搜索label显示label,我司要求输入id显示label名称 <el-form-item label"部门&#xff1a;"><el-select v-model"form.region1" placeholder"请选择部门" filterable clearable:filter-method"dataFilter&qu…...

数据库——事务

事务是指作为一个整体被执行的一系列操作。在数据库管理系统中&#xff0c;事务是指一组数据库操作&#xff08;如插入、更新、删除等&#xff09;的逻辑单元&#xff0c;也就是说事务的本质是把多个操作打包成一个操作&#xff0c;并且它要么完全执行&#xff0c;要么完全不执…...

echarts折线图每段显示不同的颜色

效果图 配置项&#xff1a; zqChartFour: {title: {text: "一天用电量分布",subtext: "纯属虚构",},tooltip: {trigger: "axis",axisPointer: {type: "cross",},},toolbox: {show: true,feature: {saveAsImage: {},},},xAxis: {type:…...

设计模式-单例模式(Singleton)

文章目录 前言一、单例模式的概念二、单例模式的实现三、单例模式的应用场景四、单例模式优缺点优点&#xff1a;缺点&#xff1a;总结 前言 单例模式&#xff08;Singleton Pattern&#xff09;是一种创建型设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一…...

优漫动游 常见的AI视频生成网站的官方网站:

1、Lumen5 Lumen5是一款在线视频制作工具&#xff0c;利用人工智能技术能够迅速将文本、和音乐转换为视频。它可以帮助你把博客文章、社交媒体内容等转化为吸引人的视频&#xff0c;从而提高你的品牌曝光率和社交媒体的参与度。 2.Animoto Animoto是一个视频制作平台&…...

Vue中数据可视化关系图展示与关系图分析

Vue中数据可视化关系图展示与关系图分析 数据可视化是现代Web应用程序的重要组成部分之一&#xff0c;它可以帮助我们以图形的方式呈现和分析复杂的数据关系。Vue.js是一个流行的JavaScript框架&#xff0c;它提供了强大的工具来构建数据可视化应用。本文将介绍如何使用Vue.js…...

【启扬方案】基于启扬安卓屏一体机的医疗手推车解决方案

医疗手推车作为医院基础设施的一部分&#xff0c;被广泛应用于医院内部&#xff0c;包括急诊室、手术室、病房和其他临床部门。伴随着互联网技术的发展和行业的渗透&#xff0c;智慧医疗受到越来越多的青睐&#xff0c;这也使得很多医疗设施得到了改进&#xff0c;医疗手推车也…...

JavaScript实现MD5加密的6种方式

关于MD5&#xff1a; MD5.js是通过前台js加密的方式对用户信息&#xff0c;密码等私密信息进行加密处理的工具&#xff0c;也可称为插件。 在本案例中 可以看到MD5共有6种加密方法&#xff1a; 1&#xff0c; hex_md5(value) 2&#xff0c; b64_md5(value) 3&#xff0c; …...

腾讯云和阿里云2核2G服务器租用价格表对比

2核2G云服务器可以选择阿里云服务器或腾讯云服务器&#xff0c;腾讯云轻量2核2G3M带宽服务器95元一年&#xff0c;阿里云轻量2核2G3M带宽优惠价108元一年&#xff0c;不只是轻量应用服务器&#xff0c;阿里云还可以选择ECS云服务器u1&#xff0c;腾讯云也可以选择CVM标准型S5云…...

抖音无需API开发连接Stable Diffusion,实现自动根据评论区的指令生成图像并返回

抖音用户使用场景&#xff1a; 随着AI绘图的热度不断升高&#xff0c;许多抖音达人通过录制视频介绍不同的AI工具&#xff0c;包括产品背景、使用方法以及价格等&#xff0c;以吸引更多的用户。其中&#xff0c;Stable Diffusion这款产品受到了许多博主达人的青睐。在介绍这款产…...

MySQL(三)

DDL&#xff08;数据定义语言&#xff09; 库 /* 创建数据库testone */ create database testone; /* 查询数据库testone */ show databases; /* 选择数据库testone */ use testone; /* 删除数据库testone */ drop database testone; 表 创建表 create table table_name (…...

汽车级肖特基二极管DSS220-Q 200V 2A

DSS220-Q是什么二极管&#xff1f;贵司有生产吗&#xff1f; 肖特基二极管DSS220-Q符合汽车级AEC Q101标准吗&#xff1f; DSS220-Q贴片肖特基二极管参数是什么封装&#xff1f;正向电流和反向电压是多大&#xff1f; DSS220-Q肖特基二极管需要100KK&#xff0c;有现货吗&#…...

maven jetty post 上传长度设置

maven jetty post 上传长度设置 <plugin><groupId>org.eclipse.jetty</groupId><artifactId>jetty-maven-plugin</artifactId><version>9.4.8.v20171121</version><configuration><scanIntervalSeconds>1</scanInter…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

回溯算法学习

一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

【WebSocket】SpringBoot项目中使用WebSocket

1. 导入坐标 如果springboot父工程没有加入websocket的起步依赖&#xff0c;添加它的坐标的时候需要带上版本号。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dep…...

基于Uniapp的HarmonyOS 5.0体育应用开发攻略

一、技术架构设计 1.混合开发框架选型 &#xff08;1&#xff09;使用Uniapp 3.8版本支持ArkTS编译 &#xff08;2&#xff09;通过uni-harmony插件调用原生能力 &#xff08;3&#xff09;分层架构设计&#xff1a; graph TDA[UI层] -->|Vue语法| B(Uniapp框架)B --&g…...