Kafka Java API
1、增加依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version> </dependency>
2、三个案例
案例1:生产数据
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class Demo1KafkaProducer {public static void main(String[] args) {Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//生产数据producer.send(new ProducerRecord<>("words","java"));producer.flush();//关闭连接producer.close();}
}
案例2: 文件生产数据到kafka(读取)
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;public class Demo2FileToKaFka {public static void main(String[] args) throws Exception{Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//读取文件BufferedReader bs = new BufferedReader(new FileReader("flink/data/student.csv"));String line;while ((line=bs.readLine())!=null){//生产数据 如果指定分区默认为轮循添加数据producer.send(new ProducerRecord<>("students",line));producer.flush();}//关闭连接bs.close();producer.close();}
}
创建控制台消费者消费数据
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

使用hash分区的方式改写该案例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;public class Dem3FileToKafkaWithHash {public static void main(String[] args) throws Exception {Properties properties = new Properties();//指定broker列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//读取文件FileReader fileReader = new FileReader("flink/data/student.csv");BufferedReader bufferedReader = new BufferedReader(fileReader);String line;while ((line = bufferedReader.readLine()) != null) {String clazz = line.split(",")[4];//hash分区int partition = Math.abs(clazz.hashCode()) % 3;//生产数据//kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash_partition//指定分区生产数据producer.send(new ProducerRecord<>("students_hash_partition", partition, null, line));producer.flush();}//关闭连接fileReader.close();bufferedReader.close();producer.close();}
}
案例3:消费kafka中的数据
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.ArrayList;
import java.util.Properties;public class Demo4Consumer {public static void main(String[] args) {Properties properties = new Properties();//指定kafka集群列表properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");//指定key和value的数据格式properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/** earliest* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* latest 默认* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据* none* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常**/properties.setProperty("auto.offset.reset", "earliest");//指定消费者组,一条数据在一个组内只消费一次properties.setProperty("group.id", "java_kafka_group1");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅topicArrayList<String> topics = new ArrayList<>();topics.add("hash_students");consumer.subscribe(topics);//死循环拉取数据,使数据全部拉取完毕while (true) {//拉取数据 默认只会拉取500条数据ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);//解析数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String topic = consumerRecord.topic(); //主题long offset = consumerRecord.offset(); //偏移量int partition = consumerRecord.partition(); //分区String value = consumerRecord.value(); //数据long timestamp = consumerRecord.timestamp(); //处理时间System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);}}}
}

相关文章:
Kafka Java API
1、增加依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version> </dependency>2、三个案例 案例1:生产数据 import org.apache.kafka.clients.p…...
pushd: not found
解决方法: pushd 比 cd 命令更高效的切换命令,非默认,可在脚本开头添加: #! /bin/bash ubuntu 编译时出现/bin/sh: 1: pushd: not found的问题-CSDN博客...
【第十三节】C++控制台版本坦克大战小游戏
目录 一、游戏简介 1.1 游戏概述 1.2 知识点应用 1.3 实现功能 1.4 开发环境 二、项目设计 2.1 类的设计 2.2 各类功能 三、程序运行截图 3.1 游戏主菜单 3.2 游戏进行中 3.3 双人作战 3.4 编辑地图 一、游戏简介 1.1 游戏概述 本项目是一款基于C语言开发的控制台…...
酷得单片机方案 2.4G儿童遥控漂移车
电子方案开发定制,我们是专业的 东莞酷得智能单片机方案之2.4G遥控玩具童车具有以下比较有特色的特点: 1、内置充电电池:这款小车配备了可充电的电池,无需频繁更换电池,既环保又方便。充电方式可能为USB充电或者专用…...
【为什么 Google Chrome 打开网页有时极慢?尤其是国内网站,如知网等】
要通过知网搜一点资料,发现怎么都打不开。而且B站,知乎这些速度也变慢了!已经检查过确定不是网络的问题。 清空了记录,清空了已接受Cookie,清空了缓存内容……没用!!! 不断搜索&am…...
FastAPI - 数据库操作5
先安装mysql驱动程序 pipenv install pymysql安装数据库ORM库SQLAlchemy pipenv install SQLAlchemy修改文件main.py文件内容 设置数据库连接 # -*- coding:utf-8 –*- from fastapi import FastAPIfrom sqlalchemy import create_engineHOST 192.168.123.228 PORT 3306 …...
HTML静态网页成品作业(HTML+CSS)—— 冶金工程专业展望与介绍介绍网页(2个页面)
🎉不定期分享源码,关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 🏷️本套采用HTMLCSS,未使用Javacsript代码,共有2个页面。 二、作品演示 三、代…...
Flutter基础 -- Dart 语言 -- 注释函数表达式
目录 1. 注释 1.1 单行注释 1.2 多行注释 1.3 文档注释 2. 函数 2.1 定义 2.2 可选参数 2.3 可选参数 默认值 2.4 命名参数 默认值 2.5 函数内定义 2.6 Funcation 返回函数对象 2.7 匿名函数 2.8 作用域 3. 操作符 3.1 操作符表 3.2 算术操作符 3.3 相等相关的…...
“仿RabbitMQ实现消息队列”---整体架构与模块说明
顾得泉:个人主页 个人专栏:《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂,年薪百万! 一、概念性框架理解 我们主要实现的内容: 1.Broker服务器:消息队列服务器(服务端&…...
springboot如何快速接入minio对象存储
1.在项目中添加 Minio 的依赖,在使用 Minio 之前,需要在项目中添加 Minio 的依赖。可以在 Maven 的 pom.xml 文件中添加以下依赖: <dependency><groupId>io.minio</groupId><artifactId>minio</artifactId>&l…...
第六届“智能设计+运维”国产工业软件研讨会暨2024年天洑软件用户大会圆满召开
2024年5月23-24日,第六届“智能设计运维”国产工业软件研讨会暨2024年天洑软件用户大会在南京举办。来自国产工业软件研发企业、制造业企业、高校、科研院所的业内大咖,能源动力、船舶海事、车辆运载、航空航天、新能源汽车、动力电池、消费电子、石油石…...
05.k8s弹性伸缩
5.k8s弹性伸缩 k8s弹性伸缩,需要附加插件heapster监控 弹性伸缩:随着业务访问量的大小,k8s系统中的pod比较弹性,会自动增加或者减少pod数量; 5.1 安装heapster监控 1:上传并导入镜像,打标签 ls *.tar.gz for n in ls *.tar.gz…...
【数据结构】详解二叉树
文章目录 1.树的结构及概念1.1树的概念1.2树的相关结构概念1.3树的表示1.4树在实际中的应用 2.二叉树的结构及概念2.1二叉树的概念2.2特殊的二叉树2.2.1满二叉树2.2.2完全二叉树 2.3 二叉树的性质2.4二叉树的存储结构2.4.1顺序结构2.4.2链表结构 1.树的结构及概念 1.1树的概念…...
MapDB:轻量级、高性能的Java嵌入式数据库引擎
MapDB:轻量级、高性能的Java嵌入式数据库引擎 在今天的软件开发中,嵌入式数据库因其轻便、高效和易于集成而备受欢迎。对于Java开发者来说,MapDB无疑是一个值得关注的选项。MapDB是一个纯Java编写的嵌入式数据库引擎,它提供了高性…...
Rye: 一个革新的Python包管理工具
文章目录 Rye: 一个革新的Python包管理工具Rye的诞生背景Rye的核心特性Rye的安装与使用Rye的优势与挑战Rye的未来展望结语 Rye: 一个革新的Python包管理工具 在Python生态系统中,包管理一直是一个复杂且令人头疼的问题。随着Python社区的不断发展,出现了…...
如何在C#代码中判断当前C#的版本和dotnet版本
代码如下: using System.Reflection; using System.Runtime.InteropServices;var csharpVersion typeof(string).Assembly.GetCustomAttributes(typeof(AssemblyFileVersionAttribute), false).OfType<AssemblyFileVersionAttribute>().FirstOrDefault()?.…...
Linux 36.3@Jetson Orin Nano之系统安装
Linux 36.3Jetson Orin Nano之系统安装 1. 源由2. 命令行烧录Step 1:下载Linux 36.3安装程序Step 2:下载Linux 36.3根文件系统Step 3:解压Linux 36.3安装程序Step 4:解压Linux 36.3根文件系统Step 5:安装应用程序Step …...
案例实践 | 基于长安链的首钢供应链金融科技服务平台
案例名称-首钢供应链金融科技服务平台 ■ 建设单位 首惠产业金融服务集团有限公司 ■ 用户群体 核心企业、资金方(多为银行)等合作方 ■ 应用成效 三大业务场景,共计关联29个业务节点,覆盖京票项目全部关键业务 案例背景…...
Vue3实战笔记(55)—Vue3.4新特性揭秘:defineModel重塑v-model,拥抱高效双向数据流!
文章目录 前言defineModel() 基本用法总结 前言 v-model 可以在组件上使用以实现双向绑定。 从 Vue 3.4 开始,推荐的实现方式是使用 defineModel() 宏 defineModel() 基本用法 定义defineModel(): <!-- Child.vue --> <script setup> con…...
C++ | Leetcode C++题解之第123题买卖股票的最佳时机III
题目: 题解: class Solution { public:int maxProfit(vector<int>& prices) {int n prices.size();int buy1 -prices[0], sell1 0;int buy2 -prices[0], sell2 0;for (int i 1; i < n; i) {buy1 max(buy1, -prices[i]);sell1 max(…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...
