Spark-Streaming集成Kafka
Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们一起来看看吧。
一、创建一个Direct Stream
导入相关maven依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.5.3</version> </dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeclass KafkaDriectStream {def main(args: Array[String]): Unit = {// 创建一个具有2个线程和1秒批处理间隔的本地StreamingContext。val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDriectStream")val ssc = new StreamingContext(conf, Seconds(1))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "cdh1:9092,cdh2:9092,cdh3:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("topicA", "topicB")val inputDStream :InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))inputDStream.map(record => (record.key, record.value))}
}
如果Spark批处理持续时间大于默认的Kafka心跳会话超时时间(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批处理,这将需要更改代理上的group.max.session.timeout.ms。
二、executor选择适合分区处理
新的Kafka Consumer API会将消息预取到缓冲区中。因此,出于性能原因,Spark集成Kafka时最好将缓存的Consumer 保留在executor上(而不是为每个批次重新创建它们)。
在大多数情况下,应该使用LocationStrategies.PreferConsistent
。这将在可用的executor之间均匀地分配分区。如果executor与Kafka 的broker位于相同的主机上,则使用PreferBrokers
,这将在该分区的Kafka leader上安排分区。最后,如果分区之间的负载严重偏差,请使用PreferFixed
。这允许指定分区到主机的显式映射(任何未指定的分区都将使用一致的位置)。
Consumer 缓存的默认最大大小为64。如果处理超过(64个executor数量)的Kafka分区,可以通过更改spark.streaming.kafka.consumer.cache.maxCapacity
设置。
如果想禁用Consumer 的缓存,可以将spark.streaming.kafka.consumer.cache.enabled 设置成false
。
缓存由topic分区和group.id控制,因此对createDirectStream
的每次调用使用单独的 group.id
三、根据topic、partition、offset创建RDD
// 导入依赖关系并创建kafka-params,例如第一步:创建Direct Streamval offsetRanges = Array(// topic, partition, 包含起始offset, 不包含结束offsetOffsetRange("test", 0, 0, 100),OffsetRange("test", 1, 0, 100)
)//根据kafka TopicPartition 中的一段数据来创建一个RDD,这是不是为了实现微批来提供支持呢
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
请注意,这里不能指定broker来消费,因为spark streaming的Driver Consumer 可以自动查找broker的元数据。如果要指定broker,需要将其与元数据绑定到一起。
四、获取offset
stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter =>val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}
}
请注意,HasOffsetRanges的类型转换只有在createDirectStream结果调用的第一个方法中完成时才会成功,而不是在后面的方法链中完成。因为一旦发生shuffle和重分区,RDD分区和Kafka分区之间的一对一关系就会遭到破坏。
五、存储offset
在kafka中为了实现精确一次的语义,必须把结果处理和offset放到一个事务中去处理,在与spark streaming集成时也不例外。必须在幂等输出之后存储offset,或者将offset与输出一起存储在原子事务中。
offset可以存储在spark的checkpoint中,也可以存储在kafka自身的内部topic中。将offset存储到kafka的好处是,无论应用程序代码发生什么变化,Kafka都是一个持久的存储。但是,Kafka不是事务性的,程序的输出必须仍然是幂等的。注意,在流式计算中我们一般会将enable.auto.commit置为false。采用手动提交的方式。
stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges// 一段时间后,在输出完成之后,提交offsetstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
与HasOffsetRanges一样,只有在createDirectStream的结果上调用时,才能成功得到CanCommitOffsets ,而不是在转换之后。获取到CanCommitOffsets 一般要等这批数据处理完再进行提交。
// 从提交到数据库的偏移量开始
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMapval stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval results = yourCalculation(rdd)// 开启事务// 更新结果// 更新offset// 结束事务
}
六、官方例子
object DirectKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <groupId> <topics>| <brokers> is a list of one or more Kafka brokers| <groupId> is a consumer group name to consume from topics| <topics> is a list of one or more kafka topics to consume from|""".stripMargin)System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(brokers, groupId, topics) = args// 以2秒的批处理间隔创建上下文val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(2))//指定kafka、topic信息创建direct kafka streamval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// 获取一行数据并进行分割、统计、打印val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()//启动计算ssc.start()ssc.awaitTermination()}
}
该例子消费Kafka中一个或多个topic的消息并进行单词统计,需要三个参数:1、Kafka broker的列表,2、消费者组,3、以逗号分隔的topic列表
1、创建2个topic
kafka-topics --create --topic spark-streaming-wc1 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
kafka-topics --create --topic spark-streaming-wc2 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
2、启动程序
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.DirectKafkaWordCount cdh1:9092,cdh2:9092 direct-kafka-wc-group spark-streaming-wc1,spark-streaming-wc2
3、向topic推送数据
kafka-console-producer --topic spark-streaming-wc1 --broker-list cdh1:9092,cdh2:9092,cdh3:9092
kafka-console-producer --topic spark-streaming-wc2 --broker-list cdh1:9092,cdh2:9092,cdh3:9092
4、查看结果
大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:
第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)
- 广州
- https://ais.cn/u/fi2yym
第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)
- 青岛
- https://ais.cn/u/nuQr6f
第六届大数据与信息化教育国际学术会议(ICBDIE 2025)
- 苏州
- https://ais.cn/u/eYnmQr
第三届通信网络与机器学习国际学术会议(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2
相关文章:

Spark-Streaming集成Kafka
Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们…...

移植 OLLVM 到 Android NDK,Android Studio 中使用 OLLVM
版权归作者所有,如有转发,请注明文章出处:https://cyrus-studio.github.io/blog/ OLLVM、LLVM 与 Android NDK 在 Android NDK 中,LLVM/Clang 是默认的编译器。自 Android NDK r18 开始,Google 弃用了 GCC,…...

DAY36|动态规划Part04|LeetCode:1049. 最后一块石头的重量 II、494. 目标和、474.一和零
目录 LeetCode:1049. 最后一块石头的重量 II 基本思路 C代码 LeetCode:494. 目标和 基本思路 C代码 LeetCode:474.一和零 基本思路 C代码 LeetCode:1049. 最后一块石头的重量 II 力扣代码链接 文字讲解:LeetCode:1049. 最后一块石头的重量 II 视频讲解&…...

Linux 下SVN新手操作手册
下面来介绍Linux 下 SVN操作方法: 1、SVN的安装 Centos 7 安装Subversion sudo yum -y install subversion Ubuntu 安装Subversion sudo apt-get install subversion 自定义安装,官方地址:https://subversion.apache.org/ 2、SVN的使用…...

障碍感知 | 基于KD树的障碍物快速处理(附案例分析与ROS C++仿真)
目录 1 障碍处理与KD树2 KD树核心原理2.1 KD树的构造2.2 KD树的查找 3 仿真实现3.1 KD树基本算法3.2 ROS C仿真 1 障碍处理与KD树 在机器人感知系统中,传感器(如激光雷达、摄像头等)会采集周围的环境数据,例如代价地图、八叉树地…...
Electron -- Electron Fiddle(一)
Electron Fiddle 是一个由 Electron 团队开发的开源工具,它允许开发者快速创建、运行和调试 Electron 应用。这个工具提供了一个简洁的界面,使用户无需配置复杂的开发环境,就能快速体验和学习 Electron。强烈建议将其安装为学习工具。 学习它…...

详解Redis的常用命令
目录 KEYS 语法 EXISTS 语法 DEL 语法 EXPIRE 语法 TTL 语法 TYPE 语法 Redis数据结构和内部编码 KEYS 返回所有满⾜样式(pattern)的 key。 返回值:匹配 pattern 的所有 key。 语法 ⽀持如下统配样式: h?llo matches hello, ha…...
elasticache备份
Elasticsearch 本地快照操作流程 配置快照存储路径 在 elasticsearch.yml 文件中配置以下字段以指定数据、日志和快照存储路径:path:data: /data/data # 数据存储路径logs: /data/log # 日志存储路径repo: /data/snapshot # 快照存储路径确保路径 /dat…...
Tomcat负载均衡全解析
一、Java项目概述 (一)Java语言特点 Java是一种计算机应用语言,在开发王者和管理系统等方面有着广泛的应用。它具有开源免费的特性,不过需要注意的是,虽然语言本身开源,但是后期开发工具可能会收取费用。 (二)、JDK和Tomcat 1,JDK:作为Java语言的开发工具,在Linu…...
[LeetCode-Python版] 定长滑动窗口8——2461. 长度为 K 子数组中的最大和
题目 给你一个整数数组 nums 和一个整数 k 。请你从 nums 中满足下述条件的全部子数组中找出最大子数组和: 子数组的长度是 k,且 子数组中的所有元素 各不相同 。 返回满足题面要求的最大子数组和。如果不存在子数组满足这些条件,返回 0 。…...

springboot476基于vue篮球联盟管理系统(论文+源码)_kaic
摘 要 如今社会上各行各业,都喜欢用自己行业的专属软件工作,互联网发展到这个时候,人们已经发现离不开了互联网。新技术的产生,往往能解决一些老技术的弊端问题。因为传统篮球联盟管理系统信息管理难度大,容错率低&am…...

预约参观华为基地,见证行业巅峰
✨ 大家好呀!今天要跟大家分享一个超酷的体验,关于华为的参观学习之旅!🚀 华为成立于1987年,位于深圳,是全球领先的信息与通信技术(ICT)解决方案供应商哦!他们专注于科技…...

【Flink-scala】DataSet编程模型介绍及数据源
DataStream 学习 1.DataStream编程模型总结 文章目录 DataStream 学习介绍一、DataSet编程模型二、数据源1.文件类数据源2.集合类数据源3.通用类数据源4第三方文件系统 介绍 Flink把批处理看成是一个流处理的特例,因此可以在底层统一的流处理引擎上,同…...

Odrive源码分析(四) 位置爬坡算法
Odrive中自带一个简单的梯形速度爬坡算法,本文分析下这部分代码。 代码如下: #include <cmath> #include "odrive_main.h" #include "utils.hpp"// A sign function where input 0 has positive sign (not 0) float sign_ha…...
[Unity Shader][图形渲染] Shader数学基础11 - 复合变换详解
在图形学与Shader编程中,复合变换是将平移、旋转和缩放等基本几何变换组合在一起,从而实现更复杂的物体变换效果。复合变换的本质是通过矩阵的串联操作,依次应用多个变换。 本文将介绍复合变换的数学原理、矩阵计算方法及注意事项,并结合实际编程中的实现细节帮助你掌握其…...
使用Python实现智能家居控制系统:开启智慧生活的钥匙
友友们好! 我的新专栏《Python进阶》正式启动啦!这是一个专为那些渴望提升Python技能的朋友们量身打造的专栏,无论你是已经有一定基础的开发者,还是希望深入挖掘Python潜力的爱好者,这里都将是你不可错过的宝藏。 在这个专栏中,你将会找到: ● 深入解析:每一篇文章都将…...

使用 HTML5 Canvas 实现动态蜈蚣动画
使用 HTML5 Canvas 实现动态蜈蚣动画 1. 项目概述 我们将通过 HTML 和 JavaScript 创建一个动态蜈蚣。蜈蚣由多个节段组成,每个节段看起来像一个小圆形,并且每个节段上都附带有“脚”。蜈蚣的头部会在画布上随机移动。 完整代码在底部!&…...

计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)
计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers) 文章目录 计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)摘要Abstract一、DETR算法1. 摘要(Abstract)2. 引言(Introduction&#…...

uniapp .gitignore
打开HBuilderX,在项目根目录下新建文件 .gitignore复制下面内容 #忽略unpackge目录下除了res目录的所有目录 unpackage/* !unpackage/res/#忽略.hbuilderx目录 .hbuilderx# 忽略node_modules目录下的所有文件 node_modules/# 忽略锁文件 package-lock.json yarn.l…...

JavaWeb Servlet的反射优化、Dispatcher优化、视图(重定向)优化、方法参数值获取优化
目录 1. 背景2. 实现2.1 pom.xml2.2 FruitController.java2.3 DispatcherServlet.java2.4 applicationContext.xml 3. 测试 1. 背景 前面我们做了Servlet的一个案例。但是存在很多问题,现在我们要做优化,优化的步骤如下: 每个Fruit请求都需…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...

自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...
机器学习的数学基础:线性模型
线性模型 线性模型的基本形式为: f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法,得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...
手动给中文分词和 直接用神经网络RNN做有什么区别
手动分词和基于神经网络(如 RNN)的自动分词在原理、实现方式和效果上有显著差异,以下是核心对比: 1. 实现原理对比 对比维度手动分词(规则 / 词典驱动)神经网络 RNN 分词(数据驱动)…...

Python异步编程:深入理解协程的原理与实践指南
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 持续学习,不断…...
IP选择注意事项
IP选择注意事项 MTP、FTP、EFUSE、EMEMORY选择时,需要考虑以下参数,然后确定后选择IP。 容量工作电压范围温度范围擦除、烧写速度/耗时读取所有bit的时间待机功耗擦写、烧写功耗面积所需要的mask layer...

Java多线程从入门到精通
一、基础概念 1.1 进程与线程 进程是指运行中的程序。 比如我们使用浏览器,需要启动这个程序,操作系统会给这个程序分配一定的资源(占用内存资源)。 线程是CPU调度的基本单位,每个线程执行的都是某一个进程的代码的某…...