当前位置: 首页 > news >正文

Kafka Java API

1、增加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version>
</dependency>

2、三个案例

案例1:生产数据

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class Demo1KafkaProducer {public static void main(String[] args) {Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//生产数据producer.send(new ProducerRecord<>("words","java"));producer.flush();//关闭连接producer.close();}
}

案例2: 文件生产数据到kafka(读取)

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;public class Demo2FileToKaFka {public static void main(String[] args) throws Exception{Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//读取文件BufferedReader bs = new BufferedReader(new FileReader("flink/data/student.csv"));String line;while ((line=bs.readLine())!=null){//生产数据    如果指定分区默认为轮循添加数据producer.send(new ProducerRecord<>("students",line));producer.flush();}//关闭连接bs.close();producer.close();}
}

创建控制台消费者消费数据 

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

使用hash分区的方式改写该案例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;public class Dem3FileToKafkaWithHash {public static void main(String[] args) throws Exception {Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//读取文件FileReader fileReader = new FileReader("flink/data/student.csv");BufferedReader bufferedReader = new BufferedReader(fileReader);String line;while ((line = bufferedReader.readLine()) != null) {String clazz = line.split(",")[4];//hash分区int partition = Math.abs(clazz.hashCode()) % 3;//生产数据//kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash_partition//指定分区生产数据producer.send(new ProducerRecord<>("students_hash_partition", partition, null, line));producer.flush();}//关闭连接fileReader.close();bufferedReader.close();producer.close();}
}

案例3:消费kafka中的数据

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.ArrayList;
import java.util.Properties;public class Demo4Consumer {public static void main(String[] args) {Properties properties = new Properties();//指定kafka集群列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/** earliest* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* latest  默认* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据* none* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常**/properties.setProperty("auto.offset.reset", "earliest");//指定消费者组,一条数据在一个组内只消费一次properties.setProperty("group.id", "java_kafka_group1");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅topicArrayList<String> topics = new ArrayList<>();topics.add("hash_students");consumer.subscribe(topics);//死循环拉取数据,使数据全部拉取完毕while (true) {//拉取数据  默认只会拉取500条数据ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);//解析数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String topic = consumerRecord.topic();         //主题long offset = consumerRecord.offset();         //偏移量int partition = consumerRecord.partition();    //分区String value = consumerRecord.value();         //数据long timestamp = consumerRecord.timestamp();   //处理时间System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);}}}
}

相关文章:

Kafka Java API

1、增加依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version> </dependency>2、三个案例 案例1&#xff1a;生产数据 import org.apache.kafka.clients.p…...

pushd: not found

解决方法&#xff1a; pushd 比 cd 命令更高效的切换命令&#xff0c;非默认&#xff0c;可在脚本开头添加&#xff1a; #! /bin/bash ubuntu 编译时出现/bin/sh: 1: pushd: not found的问题-CSDN博客...

【第十三节】C++控制台版本坦克大战小游戏

目录 一、游戏简介 1.1 游戏概述 1.2 知识点应用 1.3 实现功能 1.4 开发环境 二、项目设计 2.1 类的设计 2.2 各类功能 三、程序运行截图 3.1 游戏主菜单 3.2 游戏进行中 3.3 双人作战 3.4 编辑地图 一、游戏简介 1.1 游戏概述 本项目是一款基于C语言开发的控制台…...

酷得单片机方案 2.4G儿童遥控漂移车

电子方案开发定制&#xff0c;我们是专业的 东莞酷得智能单片机方案之2.4G遥控玩具童车具有以下比较有特色的特点&#xff1a; 1、内置充电电池&#xff1a;这款小车配备了可充电的电池&#xff0c;无需频繁更换电池&#xff0c;既环保又方便。充电方式可能为USB充电或者专用…...

【为什么 Google Chrome 打开网页有时极慢?尤其是国内网站,如知网等】

要通过知网搜一点资料&#xff0c;发现怎么都打不开。而且B站&#xff0c;知乎这些速度也变慢了&#xff01;已经检查过确定不是网络的问题。 清空了记录&#xff0c;清空了已接受Cookie&#xff0c;清空了缓存内容……没用&#xff01;&#xff01;&#xff01; 不断搜索&am…...

FastAPI - 数据库操作5

先安装mysql驱动程序 pipenv install pymysql安装数据库ORM库SQLAlchemy pipenv install SQLAlchemy修改文件main.py文件内容 设置数据库连接 # -*- coding:utf-8 –*- from fastapi import FastAPIfrom sqlalchemy import create_engineHOST 192.168.123.228 PORT 3306 …...

HTML静态网页成品作业(HTML+CSS)—— 冶金工程专业展望与介绍介绍网页(2个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有2个页面。 二、作品演示 三、代…...

Flutter基础 -- Dart 语言 -- 注释函数表达式

目录 1. 注释 1.1 单行注释 1.2 多行注释 1.3 文档注释 2. 函数 2.1 定义 2.2 可选参数 2.3 可选参数 默认值 2.4 命名参数 默认值 2.5 函数内定义 2.6 Funcation 返回函数对象 2.7 匿名函数 2.8 作用域 3. 操作符 3.1 操作符表 3.2 算术操作符 3.3 相等相关的…...

“仿RabbitMQ实现消息队列”---整体架构与模块说明

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、概念性框架理解 我们主要实现的内容&#xff1a; 1.Broker服务器&#xff1a;消息队列服务器&#xff08;服务端&…...

springboot如何快速接入minio对象存储

1.在项目中添加 Minio 的依赖&#xff0c;在使用 Minio 之前&#xff0c;需要在项目中添加 Minio 的依赖。可以在 Maven 的 pom.xml 文件中添加以下依赖&#xff1a; <dependency><groupId>io.minio</groupId><artifactId>minio</artifactId>&l…...

第六届“智能设计+运维”国产工业软件研讨会暨2024年天洑软件用户大会圆满召开

2024年5月23-24日&#xff0c;第六届“智能设计运维”国产工业软件研讨会暨2024年天洑软件用户大会在南京举办。来自国产工业软件研发企业、制造业企业、高校、科研院所的业内大咖&#xff0c;能源动力、船舶海事、车辆运载、航空航天、新能源汽车、动力电池、消费电子、石油石…...

05.k8s弹性伸缩

5.k8s弹性伸缩 k8s弹性伸缩,需要附加插件heapster监控 弹性伸缩&#xff1a;随着业务访问量的大小&#xff0c;k8s系统中的pod比较弹性&#xff0c;会自动增加或者减少pod数量&#xff1b; 5.1 安装heapster监控 1:上传并导入镜像,打标签 ls *.tar.gz for n in ls *.tar.gz…...

【数据结构】详解二叉树

文章目录 1.树的结构及概念1.1树的概念1.2树的相关结构概念1.3树的表示1.4树在实际中的应用 2.二叉树的结构及概念2.1二叉树的概念2.2特殊的二叉树2.2.1满二叉树2.2.2完全二叉树 2.3 二叉树的性质2.4二叉树的存储结构2.4.1顺序结构2.4.2链表结构 1.树的结构及概念 1.1树的概念…...

MapDB:轻量级、高性能的Java嵌入式数据库引擎

MapDB&#xff1a;轻量级、高性能的Java嵌入式数据库引擎 在今天的软件开发中&#xff0c;嵌入式数据库因其轻便、高效和易于集成而备受欢迎。对于Java开发者来说&#xff0c;MapDB无疑是一个值得关注的选项。MapDB是一个纯Java编写的嵌入式数据库引擎&#xff0c;它提供了高性…...

Rye: 一个革新的Python包管理工具

文章目录 Rye: 一个革新的Python包管理工具Rye的诞生背景Rye的核心特性Rye的安装与使用Rye的优势与挑战Rye的未来展望结语 Rye: 一个革新的Python包管理工具 在Python生态系统中&#xff0c;包管理一直是一个复杂且令人头疼的问题。随着Python社区的不断发展&#xff0c;出现了…...

如何在C#代码中判断当前C#的版本和dotnet版本

代码如下&#xff1a; using System.Reflection; using System.Runtime.InteropServices;var csharpVersion typeof(string).Assembly.GetCustomAttributes(typeof(AssemblyFileVersionAttribute), false).OfType<AssemblyFileVersionAttribute>().FirstOrDefault()?.…...

Linux 36.3@Jetson Orin Nano之系统安装

Linux 36.3Jetson Orin Nano之系统安装 1. 源由2. 命令行烧录Step 1&#xff1a;下载Linux 36.3安装程序Step 2&#xff1a;下载Linux 36.3根文件系统Step 3&#xff1a;解压Linux 36.3安装程序Step 4&#xff1a;解压Linux 36.3根文件系统Step 5&#xff1a;安装应用程序Step …...

案例实践 | 基于长安链的首钢供应链金融科技服务平台

案例名称-首钢供应链金融科技服务平台 ■ 建设单位 首惠产业金融服务集团有限公司 ■ 用户群体 核心企业、资金方&#xff08;多为银行&#xff09;等合作方 ■ 应用成效 三大业务场景&#xff0c;共计关联29个业务节点&#xff0c;覆盖京票项目全部关键业务 案例背景…...

Vue3实战笔记(55)—Vue3.4新特性揭秘:defineModel重塑v-model,拥抱高效双向数据流!

文章目录 前言defineModel() 基本用法总结 前言 v-model 可以在组件上使用以实现双向绑定。 从 Vue 3.4 开始&#xff0c;推荐的实现方式是使用 defineModel() 宏 defineModel() 基本用法 定义defineModel()&#xff1a; <!-- Child.vue --> <script setup> con…...

C++ | Leetcode C++题解之第123题买卖股票的最佳时机III

题目&#xff1a; 题解&#xff1a; class Solution { public:int maxProfit(vector<int>& prices) {int n prices.size();int buy1 -prices[0], sell1 0;int buy2 -prices[0], sell2 0;for (int i 1; i < n; i) {buy1 max(buy1, -prices[i]);sell1 max(…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

Java毕业设计:WML信息查询与后端信息发布系统开发

JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发&#xff0c;实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构&#xff0c;服务器端使用Java Servlet处理请求&#xff0c;数据库采用MySQL存储信息&#xff0…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...

WEB3全栈开发——面试专业技能点P7前端与链上集成

一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染&#xff08;SSR&#xff09;与静态网站生成&#xff08;SSG&#xff09; 框架&#xff0c;由 Vercel 开发。它简化了构建生产级 React 应用的过程&#xff0c;并内置了很多特性&#xff1a; ✅ 文件系…...

Vue3中的computer和watch

computed的写法 在页面中 <div>{{ calcNumber }}</div>script中 写法1 常用 import { computed, ref } from vue; let price ref(100);const priceAdd () > { //函数方法 price 1price.value ; }//计算属性 let calcNumber computed(() > {return ${p…...

uni-app学习笔记三十五--扩展组件的安装和使用

由于内置组件不能满足日常开发需要&#xff0c;uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件&#xff0c;需要安装才能使用。 一、安装扩展插件 安装方法&#xff1a; 1.访问uniapp官方文档组件部分&#xff1a;组件使用的入门教程 | uni-app官网 点击左侧…...

如何做好一份技术文档?从规划到实践的完整指南

如何做好一份技术文档&#xff1f;从规划到实践的完整指南 &#x1f31f; 嗨&#xff0c;我是IRpickstars&#xff01; &#x1f30c; 总有一行代码&#xff0c;能点亮万千星辰。 &#x1f50d; 在技术的宇宙中&#xff0c;我愿做永不停歇的探索者。 ✨ 用代码丈量世界&…...

Netty自定义协议解析

目录 自定义协议设计 实现消息解码器 实现消息编码器 自定义消息对象 配置ChannelPipeline Netty提供了强大的编解码器抽象基类,这些基类能够帮助开发者快速实现自定义协议的解析。 自定义协议设计 在实现自定义协议解析之前,需要明确协议的具体格式。例如,一个简单的…...

旋量理论:刚体运动的几何描述与机器人应用

旋量理论为描述刚体在三维空间中的运动提供了强大而优雅的数学框架。与传统的欧拉角或方向余弦矩阵相比&#xff0c;旋量理论通过螺旋运动的概念统一了旋转和平移&#xff0c;在机器人学、计算机图形学和多体动力学领域具有显著优势。这种描述不仅几何直观&#xff0c;而且计算…...