kafka 案例
kafka 案例
- 目录
- 概述
- 需求:
- 设计思路
- 实现思路分析
- 1.kafka案例_API 带回调函数的生产者
- 2.kafka案例_API生产者分区策略测试
- 3.kafka案例_自定义分区的生产者
- 4.kafka案例_API同步发送生产者
- 5.kafka案例_API简单消费者
- 5.kafka案例_API消费者重置offset
- 参考资料和推荐阅读
Survive by day and develop by night.
talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive.
happy for hardess to solve denpendies.
目录
概述
需求:
设计思路
实现思路分析
1.kafka案例_API 带回调函数的生产者
以下是一个使用 Kafka 的 Java 生产者带回调函数的案例:
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class ProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息发送成功!");} else {System.out.println("消息发送失败:" + exception.getMessage());}}});producer.close();}
}
请注意替换 bootstrap.servers 的值为 Kafka 服务器的地址和端口,并替换 topic-name、key 和 value 分别为实际使用的主题、键和值。
在这个案例中,我们使用 KafkaProducer 类创建一个 Kafka 生产者。然后,通过创建一个 ProducerRecord 对象,我们指定了要发送到的主题、键和值。
然后,我们调用 producer.send() 方法来发送消息。此方法接受一个 Callback 参数,该参数用于在消息发送完成后执行回调函数。在回调函数中,我们可以检查发送结果并采取相应的操作。
最后,我们调用 producer.close() 方法来关闭生产者。
2.kafka案例_API生产者分区策略测试
在Kafka中,可以使用API生产者分区策略来决定将消息发送到哪个分区。以下是一个展示如何使用API生产者分区策略的示例代码。
首先,创建一个新的Java类,例如ProducerPartitionStrategyTest.java,并导入Kafka相关的依赖项。
import org.apache.kafka.clients.producer.*;import java.util.Properties;
接下来,定义一个自定义的Partitioner类,用于实现分区策略。在这个例子中,我们将根据消息的键来决定分区。如果键为偶数,则将消息发送到分区0;如果键为奇数,则将消息发送到分区1。
class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions = cluster.partitionCountForTopic(topic);int partition = 0;try {int keyValue = Integer.parseInt(key.toString());if (keyValue % 2 == 0) {partition = 0;} else {partition = 1;}} catch (NumberFormatException e) {partition = Math.abs(key.hashCode() % numPartitions);}return partition;}@Overridepublic void close() {// 不做任何操作}@Overridepublic void configure(Map<String, ?> configs) {// 不做任何操作}
}
然后,在主方法中创建一个Producer并设置自定义分区策略。
public class ProducerPartitionStrategyTest {public static void main(String[] args) {// 配置Kafka生产者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("partitioner.class", "com.example.CustomPartitioner");// 创建Kafka生产者Producer<String, String> producer = new KafkaProducer<>(properties);// 发送消息for (int i = 0; i < 10; i++) {String key = String.valueOf(i);String value = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>("topic", key, value);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error producing message: " + exception.getMessage());} else {System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());}}});}// 关闭生产者producer.close();}
}
在以上示例中,我们首先配置了Kafka生产者的一些属性,例如Kafka服务器的地址、键和值的序列化程序以及分区策略。然后,我们创建了一个Kafka生产者,并使用自定义分区策略将消息发送到Kafka集群中。
在发送消息的循环中,我们创建了一个ProducerRecord对象,它包含要发送的消息的主题、键和值。然后,我们使用send()方法将消息发送到Kafka集群,并使用回调函数处理发送结果。
最后,我们关闭了生产者。
运行以上代码后,你将会看到消息被发送到正确的分区,并打印出消息的分区和偏移量的信息。
这就是一个简单的演示如何使用API生产者分区策略的例子。你可以根据自己的需求来实现自定义的分区策略,并将消息发送到合适的分区中。
3.kafka案例_自定义分区的生产者
自定义分区的生产者示例代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;public class CustomPartitionProducer {public static void main(String[] args) {// Kafka 服务器地址String bootstrapServers = "localhost:9092";// 主题名称String topic = "custom-partition-topic";// 创建生产者配置Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 创建自定义分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息到自定义分区Random random = new Random();for (int i = 0; i < 10; i++) {String key = "key" + i;String value = "value" + i;producer.send(new ProducerRecord<>(topic, key, value));System.out.println("Sent message: key=" + key + ", value=" + value);}// 关闭生产者producer.close();}public static class CustomPartitioner implements org.apache.kafka.clients.producer.Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 这里自定义分区逻辑// 根据key的尾数来决定消息被发送到哪个分区int partition = Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;System.out.println("Custom partitioner: topic=" + topic + ", key=" + key + ", partition=" + partition);return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}
}
上面的代码示例首先创建一个自定义分区器CustomPartitioner,然后在生产者配置中指定该分区器类:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
在自定义分区器中,根据key的尾数来决定消息被发送到哪个分区:
int partition = Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;
然后启动生产者,发送消息到指定的主题。每条消息的key都是以数字结尾的字符串,根据key的尾数来选择分区。输出中会打印出消息的详细信息,包括主题、key和分区信息。
4.kafka案例_API同步发送生产者
下面是使用Kafka客户端库进行API同步发送的一个示例:
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092";String topic = "test-topic";String key = "key1";String value = "value1";// 配置Kafka生产者Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 创建消息实例ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 发送消息并等待返回结果RecordMetadata metadata = producer.send(record).get();System.out.println("消息发送成功,Topic: " + metadata.topic() +", Partition: " + metadata.partition() +", Offset: " + metadata.offset());} catch (Exception e) {System.out.println("消息发送失败:" + e.getMessage());} finally {// 关闭Kafka生产者producer.close();}}
}
这个示例使用了Kafka的Java客户端库,并创建了一个Kafka生产者实例。代码中设置了Kafka服务器的地址、要发送的主题、消息的键和值。还配置了键和值的序列化器为StringSerializer。
然后,创建了一个ProducerRecord实例来包装要发送的消息。通过调用producer.send(record).get()方法发送消息并等待返回结果。发送成功后,通过返回的RecordMetadata对象获取到消息的元数据,包括发送到的Topic、Partition和Offset。
最后,关闭Kafka生产者。
5.kafka案例_API简单消费者
以下是一个简单的Kafka案例,使用Kafka的Java API实现一个简单的消费者。
首先,需要安装Kafka并启动Kafka服务。然后,创建一个Kafka消费者来消费指定的主题。
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// Kafka broker的地址String bootstrapServers = "localhost:9092";// 消费者组的IDString groupId = "test-group";// 要消费的主题String topic = "test-topic";// 创建Kafka消费者的配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建Kafka消费者Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);// 处理每条消息records.forEach(record -> {System.out.println("Received message: " + record.value());});}}
}
在这个例子中,首先定义了Kafka broker的地址、消费者组的ID和要消费的主题。然后,创建了一个Kafka消费者的配置,并设置了必要的属性。然后,创建了一个Kafka消费者,并使用subscribe()方法订阅了指定的主题。最后,使用poll()方法从Kafka集群拉取消息,并使用forEach()方法对每条消息进行处理。
运行这个消费者应用程序,它将开始消费指定主题的消息并打印出来。
注意:这只是一个简单的Kafka消费者示例,没有处理异常或实现自动提交偏移量。在实际应用中,需要根据具体需求添加更多的处理逻辑。
5.kafka案例_API消费者重置offset
要在Java中重置Kafka消费者的偏移量(offset),您可以使用以下代码片段:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Properties;public class ConsumerOffsetResetExample {public static void main(String[] args) {String topic = "your-topic";String bootstrapServers = "localhost:9092";String groupId = "your-group-id";// 创建Kafka消费者配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", bootstrapServers);properties.setProperty("group.id", groupId);properties.setProperty("enable.auto.commit", "false"); // 禁用自动提交偏移量// 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic));// 将偏移量重置到最早的可用位置consumer.seekToBeginning(consumer.assignment());// 或者将偏移量重置到最新的可用位置// consumer.seekToEnd(consumer.assignment());// 处理消息try {while (true) {// 拉取消息// ConsumerRecord<String, String> record = consumer.poll(Duration.ofMillis(100)).iterator().next();// 处理消息// ...// 手动提交偏移量// consumer.commitSync();}} finally {// 关闭Kafka消费者consumer.close();}}
}
在上面的代码中,我们使用seekToBeginning方法将偏移量重置为最早的可用位置。您还可以使用seekToEnd方法将偏移量重置为最新的可用位置。请根据您的需求选择适当的方法。
参考资料和推荐阅读
参考资料
官方文档
开源社区
博客文章
书籍推荐
- 暂无
欢迎阅读,各位老铁,如果对你有帮助,点个赞加个关注呗!同时,期望各位大佬的批评指正~,如果有兴趣,可以加文末的交流群,大家一起进步哈
相关文章:
kafka 案例
kafka 案例 目录概述需求: 设计思路实现思路分析1.kafka案例_API 带回调函数的生产者2.kafka案例_API生产者分区策略测试3.kafka案例_自定义分区的生产者4.kafka案例_API同步发送生产者5.kafka案例_API简单消费者5.kafka案例_API消费者重置offset 参考资料和推荐阅读…...
别被“涨价“带跑,性价比才是消费真理
文章来源:全食在线 “再不好好赚钱,连方便面也吃不起了。”这是昨天在热搜下,一位网友的留言。而热搜的内容,正是康师傅方便面即将涨价的消息。 01 传闻初现 昨天上午,朋友圈就有人放出康师傅方便面要涨价的消息&am…...
GEE深度学习——使用Tensorflow进行神经网络DNN土地分类
Tensorflow TensorFlow是一个开源的深度学习框架,由Google开发和维护。它提供了一个灵活的框架来构建和训练各种机器学习模型,尤其是深度神经网络模型。 TensorFlow的主要特点包括: 1. 它具有高度的灵活性,可以用于训练和部署各种类型的机器学习模型,包括分类、回归、聚…...
死锁示例(python、go)
Thread 1首先获取了资源A,然后尝试获取资源B,但此时资源B已经被Thread 2获取,因此Thread 1会一直等待。而Thread 2也类似,首先获取资源B,然后尝试获取资源A,但此时资源A已经被Thread 1获取,因此…...
Spring Cloud 面试题(五)
1. Eureka的自我保护模式是什么? Eureka的自我保护模式是一种应对网络异常的安全保护措施,旨在防止因网络分区或其他异常情况导致服务实例被错误地注销。当Eureka Server在短时间内丢失过多的客户端心跳时,会触发自我保护机制。以下是自我保…...
源码编译安装LAMP
1.LAMP介绍 LAMP架构是目前成熟的企业网站应用模式之一,指的是协同工作的一整套系统和相关软件,能够提供动态Web站点服务及其应用开发环境。LAMP是一个缩写词,具体包括Linux操作系统、Apache网站服务器、MySQL数据库服务器、PHP(…...
html5网页-浏览器中实现高德地图定位功能
介绍 HTML5是当前Web开发中最常用的技术之一,而地图应用又是其中一个非常常见的需求。高德地图是国内最受欢迎的地图服务提供商之一,他们提供了一系列的API,方便开发者在自己的网站或应用中集成地图功能。 接下来我们如何使用HTML5浏览器和高…...
C从零开始实现贪吃蛇大作战
个人主页:星纭-CSDN博客 系列文章专栏 : C语言 踏上取经路,比抵达灵山更重要!一起努力一起进步! 有关Win32API的知识点在上一篇文章: 目录 一.地图 1.控制台基本介绍 2.宽字符 1.本地化 2.类项 3.setlocale函…...
国内外相机在LabVIEW图像处理的对比
概述 本文对比国内外相机在LabVIEW进行图像处理的区别,探讨各自的特点。国内相机如大恒和海康威视,具有较高性价比和本地化支持;国外品牌如Basler和FLIR则以高性能和稳定性著称。两者在驱动兼容性、图像质量和技术支持方面各有优势。 详细对…...
第四十五天 | 322.零钱兑换
题目:322.零钱兑换 尝试解答: 1.确定dp[j]含义:装满容量为j的背包所需要放的硬币个数为dp[j]; 2.动态转移方程:dp[j] dp[j - coins[i]] 1; 3.遍历顺序:本题应该为组合类题目,不考虑装入的顺序&#x…...
3D 生成重建011-LucidDreamer 优化SDS过平滑结果的一种探索
3D 生成重建011-LucidDreamer 优化SDS过平滑结果的一种探索 文章目录 0论文工作1论文方法2 效果 0论文工作 文本到3D生成的最新进展标志着生成模型的一个重要里程碑,为在各种现实场景中创建富有想象力的3D资产打开了新的可能性。虽然最近在文本到3D生成方面的进展…...
ES6 笔记04
01 异步函数的使用 es6推出了一种按照顺序执行的异步函数的方法 async 异步函数 async异步函数可以解决promise封装异步代码,调用时一直then链式编程时比较麻烦的问题 定义异步函数: async function 函数名(){ await 表达式1或者函数的调用1 await 表达式2或者函数的调用2 ...…...
中间件-------RabbitMQ
同步和异步 异步调用 MQ MQ优势:①服务解耦 ②异步调用 ③流量削峰 结构 消息模型 RabbitMQ入门案例,实现消息发送和消息接收 生产者: public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutExce…...
flink Data Source数据源
flink Data Source数据源 Source 并行度 非并行:并行度只能为1 并行 基于集合的Source fromElements package com.pxj.sx.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.ap…...
网络七层模型与云计算中的网络服务
网络七层模型,也称为OSI(Open System Interconnection)模型,是由国际标准化组织(ISO)制定的一个概念性框架,用于描述网络通信过程中信息是如何被封装、传输和解封装的。这一模型将复杂的网络通信…...
word一按空格就换行怎么办?word文本之间添加空格就换行怎么办?
如上图,无法在Connection和con之间添加空格,一按空格就会自动换行。 第一步:选中文本,打开段落。 第二步:点击中文版式,勾选允许西文在单词中间换行。 确定之后就解决一按空格就自动换行啦!...
Python 遍历字典的方法,你都掌握了吗
Python中的字典是一种非常灵活的数据结构,它允许通过键来存储和访问值。在处理字典时,经常需要遍历字典中的元素,以下是几种常见的遍历字典的方法。 1. 使用 for 循环直接遍历字典的键 字典的键是唯一的,可以直接通过 for 循环来…...
MySQL 8.4.0 LTS 变更解析:I_S 表、权限、关键字和客户端
↑ 关注“少安事务所”公众号,欢迎⭐收藏,不错过精彩内容~ MySQL 8.4.0 LTS 已经发布 ,作为发版模型变更后的第一个长期支持版本,注定要承担未来生产环境的重任,那么这个版本都有哪些新特性、变更,接下来少…...
LeetCode 124 —— 二叉树中的最大路径和
阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 二叉树的问题首先我们要想想是否能用递归来解决,本题也不例外,而递归的关键是找到子问题。 我们首先来看看一棵最简单的树,也就是示例 1。这样的一棵树总共有六条路径…...
美甲店会员预约系统管理小程序的作用是什么
女性爱美体现在方方面面,美丽好看的指甲也不能少,市场中美甲店、小摊不少,也跑出了不少连锁品牌,70后到00后,每个层级都有不少潜在客户,商家需要获取和完善转化路径,不断提高品牌影响力与自身内…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
JVM虚拟机:内存结构、垃圾回收、性能优化
1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...
前端高频面试题2:浏览器/计算机网络
本专栏相关链接 前端高频面试题1:HTML/CSS 前端高频面试题2:浏览器/计算机网络 前端高频面试题3:JavaScript 1.什么是强缓存、协商缓存? 强缓存: 当浏览器请求资源时,首先检查本地缓存是否命中。如果命…...
Linux-进程间的通信
1、IPC: Inter Process Communication(进程间通信): 由于每个进程在操作系统中有独立的地址空间,它们不能像线程那样直接访问彼此的内存,所以必须通过某种方式进行通信。 常见的 IPC 方式包括&#…...
Vuex:Vue.js 应用程序的状态管理模式
什么是Vuex? Vuex 是专门为 Vue.js 应用程序开发的状态管理模式 库。它采用集中式存储管理应用的所有组件的状态,并以相应的规则保证状态以一种可预测的方式发生变化。 在大型单页应用中,当多个组件共享状态时,简单的单向数据流…...
MAZANOKE结合内网穿透技术实现跨地域图像优化服务的远程访问过程
文章目录 前言1. 关于MAZANOKE2. Docker部署3. 简单使用MAZANOKE4. 安装cpolar内网穿透5. 配置公网地址6. 配置固定公网地址总结 前言 在数字世界高速发展的今天,您是否察觉到那些静默增长的视觉数据正在悄然蚕食存储空间?随着影像记录成为日常习惯&…...
