当前位置: 首页 > news >正文

Kafka3.1部署和Topic主题数据生产与消费

文章目录

  • 前言
  • 一、Kafka3.1X版本在Windows11主机部署
  • 二、Kafk生产Topic主题数据
    • 1.kafka生产数据
    • 2.JAVA kafka客户端消费数据
  • 总结


前言

本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:

一、Kafka3.1X版本在Windows11主机部署

1.安装JDK配置环境变量

2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1

3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka
在这里插入图片描述
3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2

3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs

4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

  .\zkServer.cmd;

在这里插入图片描述
5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述

二、Kafk生产Topic主题数据

1.kafka生产数据

创建Topic主题heima

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.

查看Topic主题heima

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092  --topic heima

在这里插入图片描述
Topic主题heima生产数据

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima

在 > 符号后输入数据:

{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

在这里插入图片描述

2.JAVA kafka客户端消费数据

2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本

        <!-- kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.1</version></dependency>

2.2 JAVA数据读取文件

package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** Kafka服务器操作与数据读取*/
public class KafkaUtilDemo {public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);public static final Properties props = new Properties();
//    protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);public static void init(String kafakservers) {// 配置Kafka消费者属性props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");}/*** 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作* @param kafaktopic* @return*/public static String poll(String kafaktopic) {String msg = "";try {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(kafaktopic));log.info("Kafka消费者订阅指定主题,持续监听并处理消息");while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));for (ConsumerRecord<String, String> record : records) {log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());msg = record.value();if (!StringUtils.isBlank(record.value())) {JSONObject jsonObject = JSONObject.parseObject(record.value());String mobilePhone = jsonObject.getString("mobilePhone");if (StringUtils.isBlank(mobilePhone)) {log.error("Kafka消费者手机号mobilePhone为空");} else {KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();kafkaUtil.syncSystemInfoTask(jsonObject);}}}}} catch (Exception e) {log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());}return msg;}public boolean syncSystemInfoTask(JSONObject jsonObject) {boolean repsBln = true;try {String mobilePhone = jsonObject.getString("mobilePhone");String roleType = jsonObject.getString("roleType");String roleCode = jsonObject.getString("roleCode");log.info("业务数据同步操作................");} catch (Exception e) {repsBln = false;log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());}return repsBln;}public static void main(String[] args) {try {String kafakservers = "localhost:9092";String kafaktopic = "heima";init(kafakservers);poll(kafaktopic);} catch (Exception e) {log.error("error msg=" + e.getMessage());}}}

3 执行KafkaUtilDemo 文件,查看消费数据。
在这里插入图片描述

总结

pom.xml文件在引入spring-kafka 会由于版本问题出现


org.apache.kafka
kafka-clients
2.0.1

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.8.RELEASE</version></dependency>

相关文章:

Kafka3.1部署和Topic主题数据生产与消费

文章目录 前言一、Kafka3.1X版本在Windows11主机部署二、Kafk生产Topic主题数据1.kafka生产数据2.JAVA kafka客户端消费数据 总结 前言 本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用&#xff1a; 一、Kafka3.1X版本在Windows11主机部署 1.安装JDK配…...

ICIF2023化工展首亮相,宏工科技解决方案助力制造升级

ICIF China 2023中国国际化工展览会于9月4日-6日在上海新国际博览中心举办。宏工科技携化工物料处理一站式解决方案首次亮相&#xff0c;同化工行业全产业链共叙物料处理自动化未来。 宏工科技是一家提供物料处理自动化设备、系统与服务的国家级高新技术企业&#xff0c;业务覆…...

本地部署kubesphere集群

本地部署kubesphere集群 本文采用一主两从结构 1.前置硬件准备 准备最少3台机器&#xff0c;本人分配如下 IP&#xff1a;192.168.58.10 &#xff08;主&#xff09; 192.168.58.11 &#xff08;节点1&#xff09; 192.168.58.12 &#xff08;节点2&#xff09; 系统镜像…...

HNU小学期工训-STC15单片机模型大作业实验报告

STC15单片机模型大作业实验报告 全称&#xff1a;基于STC15单片机与OLED显示模块&PC端演示的多功能声光温振时钟智能手表模型 计科210X 甘晴void 202108010XXX 【请注意&#xff1a;本作业入选优秀范例&#xff0c;直接照抄源码有很大风险】 【建议理解原理之后作改动】 …...

【计算机网络】 TCP协议头相关知识点

文章目录 TCP协议头 TCP协议头 我们来看一下TCP协议头里都有什么东西&#xff0c;研究一下为什么TCP协议是可靠的呢 TCP协议可靠是因为在协议头里带着一些校验的数据 首先是源端口和目的端口&#xff0c;这两个是UDP中也有的&#xff0c;但是UDP中只有这两个&#xff0c;没有…...

深度学习相关VO梳理

相关论文 基于学习的VO 相关&#xff1a; DeepVO Towards End-to-End Visual Odometry with Deep Recurrent Convolutional Neural Networks&#xff08;ICRA&#xff0c;2017&#xff09; TartanVO: A Generalizable Learning-based VO(CoRL2021) SimVODIS: Simultaneous Vis…...

SpringMVC---CRUD实现

思路分析 搭建环境逆向生层对应的类&#xff08;model、mapper.xml、mapper.java&#xff09;编写业务逻辑层编写web层&#xff08;控制器&#xff09;前端页面 一、环境搭建 1.1、导入项目所需依赖(pom.xml) <project xmlns"http://maven.apache.org/POM/4.0.0"…...

vue+elementUI el-select 自定义搜索逻辑(filter-method)

下拉列表的默认搜索是搜索label显示label,我司要求输入id显示label名称 <el-form-item label"部门&#xff1a;"><el-select v-model"form.region1" placeholder"请选择部门" filterable clearable:filter-method"dataFilter&qu…...

数据库——事务

事务是指作为一个整体被执行的一系列操作。在数据库管理系统中&#xff0c;事务是指一组数据库操作&#xff08;如插入、更新、删除等&#xff09;的逻辑单元&#xff0c;也就是说事务的本质是把多个操作打包成一个操作&#xff0c;并且它要么完全执行&#xff0c;要么完全不执…...

echarts折线图每段显示不同的颜色

效果图 配置项&#xff1a; zqChartFour: {title: {text: "一天用电量分布",subtext: "纯属虚构",},tooltip: {trigger: "axis",axisPointer: {type: "cross",},},toolbox: {show: true,feature: {saveAsImage: {},},},xAxis: {type:…...

设计模式-单例模式(Singleton)

文章目录 前言一、单例模式的概念二、单例模式的实现三、单例模式的应用场景四、单例模式优缺点优点&#xff1a;缺点&#xff1a;总结 前言 单例模式&#xff08;Singleton Pattern&#xff09;是一种创建型设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一…...

优漫动游 常见的AI视频生成网站的官方网站:

1、Lumen5 Lumen5是一款在线视频制作工具&#xff0c;利用人工智能技术能够迅速将文本、和音乐转换为视频。它可以帮助你把博客文章、社交媒体内容等转化为吸引人的视频&#xff0c;从而提高你的品牌曝光率和社交媒体的参与度。 2.Animoto Animoto是一个视频制作平台&…...

Vue中数据可视化关系图展示与关系图分析

Vue中数据可视化关系图展示与关系图分析 数据可视化是现代Web应用程序的重要组成部分之一&#xff0c;它可以帮助我们以图形的方式呈现和分析复杂的数据关系。Vue.js是一个流行的JavaScript框架&#xff0c;它提供了强大的工具来构建数据可视化应用。本文将介绍如何使用Vue.js…...

【启扬方案】基于启扬安卓屏一体机的医疗手推车解决方案

医疗手推车作为医院基础设施的一部分&#xff0c;被广泛应用于医院内部&#xff0c;包括急诊室、手术室、病房和其他临床部门。伴随着互联网技术的发展和行业的渗透&#xff0c;智慧医疗受到越来越多的青睐&#xff0c;这也使得很多医疗设施得到了改进&#xff0c;医疗手推车也…...

JavaScript实现MD5加密的6种方式

关于MD5&#xff1a; MD5.js是通过前台js加密的方式对用户信息&#xff0c;密码等私密信息进行加密处理的工具&#xff0c;也可称为插件。 在本案例中 可以看到MD5共有6种加密方法&#xff1a; 1&#xff0c; hex_md5(value) 2&#xff0c; b64_md5(value) 3&#xff0c; …...

腾讯云和阿里云2核2G服务器租用价格表对比

2核2G云服务器可以选择阿里云服务器或腾讯云服务器&#xff0c;腾讯云轻量2核2G3M带宽服务器95元一年&#xff0c;阿里云轻量2核2G3M带宽优惠价108元一年&#xff0c;不只是轻量应用服务器&#xff0c;阿里云还可以选择ECS云服务器u1&#xff0c;腾讯云也可以选择CVM标准型S5云…...

抖音无需API开发连接Stable Diffusion,实现自动根据评论区的指令生成图像并返回

抖音用户使用场景&#xff1a; 随着AI绘图的热度不断升高&#xff0c;许多抖音达人通过录制视频介绍不同的AI工具&#xff0c;包括产品背景、使用方法以及价格等&#xff0c;以吸引更多的用户。其中&#xff0c;Stable Diffusion这款产品受到了许多博主达人的青睐。在介绍这款产…...

MySQL(三)

DDL&#xff08;数据定义语言&#xff09; 库 /* 创建数据库testone */ create database testone; /* 查询数据库testone */ show databases; /* 选择数据库testone */ use testone; /* 删除数据库testone */ drop database testone; 表 创建表 create table table_name (…...

汽车级肖特基二极管DSS220-Q 200V 2A

DSS220-Q是什么二极管&#xff1f;贵司有生产吗&#xff1f; 肖特基二极管DSS220-Q符合汽车级AEC Q101标准吗&#xff1f; DSS220-Q贴片肖特基二极管参数是什么封装&#xff1f;正向电流和反向电压是多大&#xff1f; DSS220-Q肖特基二极管需要100KK&#xff0c;有现货吗&#…...

maven jetty post 上传长度设置

maven jetty post 上传长度设置 <plugin><groupId>org.eclipse.jetty</groupId><artifactId>jetty-maven-plugin</artifactId><version>9.4.8.v20171121</version><configuration><scanIntervalSeconds>1</scanInter…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

C++:多态机制详解

目录 一. 多态的概念 1.静态多态&#xff08;编译时多态&#xff09; 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1&#xff09;.协变 2&#xff09;.析构函数的重写 5.override 和 final关键字 1&#…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...