当前位置: 首页 > news >正文

Spark-Streaming集成Kafka

 Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们一起来看看吧。

一、创建一个Direct Stream

导入相关maven依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.5.3</version>
</dependency>

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeclass KafkaDriectStream {def main(args: Array[String]): Unit = {// 创建一个具有2个线程和1秒批处理间隔的本地StreamingContext。val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDriectStream")val ssc = new StreamingContext(conf, Seconds(1))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "cdh1:9092,cdh2:9092,cdh3:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("topicA", "topicB")val inputDStream :InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))inputDStream.map(record => (record.key, record.value))}
}

如果Spark批处理持续时间大于默认的Kafka心跳会话超时时间(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批处理,这将需要更改代理上的group.max.session.timeout.ms。

二、executor选择适合分区处理

新的Kafka Consumer API会将消息预取到缓冲区中。因此,出于性能原因,Spark集成Kafka时最好将缓存的Consumer 保留在executor上(而不是为每个批次重新创建它们)。

在大多数情况下,应该使用LocationStrategies.PreferConsistent。这将在可用的executor之间均匀地分配分区。如果executor与Kafka 的broker位于相同的主机上,则使用PreferBrokers,这将在该分区的Kafka leader上安排分区。最后,如果分区之间的负载严重偏差,请使用PreferFixed。这允许指定分区到主机的显式映射(任何未指定的分区都将使用一致的位置)。

Consumer 缓存的默认最大大小为64。如果处理超过(64个executor数量)的Kafka分区,可以通过更改spark.streaming.kafka.consumer.cache.maxCapacity设置。

如果想禁用Consumer 的缓存,可以将spark.streaming.kafka.consumer.cache.enabled 设置成false

缓存由topic分区和group.id控制,因此对createDirectStream的每次调用使用单独的 group.id

三、根据topic、partition、offset创建RDD

// 导入依赖关系并创建kafka-params,例如第一步:创建Direct Streamval offsetRanges = Array(// topic, partition, 包含起始offset, 不包含结束offsetOffsetRange("test", 0, 0, 100),OffsetRange("test", 1, 0, 100)
)//根据kafka TopicPartition 中的一段数据来创建一个RDD,这是不是为了实现微批来提供支持呢
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

请注意,这里不能指定broker来消费,因为spark streaming的Driver Consumer 可以自动查找broker的元数据。如果要指定broker,需要将其与元数据绑定到一起。

四、获取offset

stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter =>val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}
}

请注意,HasOffsetRanges的类型转换只有在createDirectStream结果调用的第一个方法中完成时才会成功,而不是在后面的方法链中完成。因为一旦发生shuffle和重分区,RDD分区和Kafka分区之间的一对一关系就会遭到破坏。

五、存储offset

在kafka中为了实现精确一次的语义,必须把结果处理和offset放到一个事务中去处理,在与spark streaming集成时也不例外。必须在幂等输出之后存储offset,或者将offset与输出一起存储在原子事务中。

offset可以存储在spark的checkpoint中,也可以存储在kafka自身的内部topic中。将offset存储到kafka的好处是,无论应用程序代码发生什么变化,Kafka都是一个持久的存储。但是,Kafka不是事务性的,程序的输出必须仍然是幂等的。注意,在流式计算中我们一般会将enable.auto.commit置为false。采用手动提交的方式。

stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges// 一段时间后,在输出完成之后,提交offsetstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与HasOffsetRanges一样,只有在createDirectStream的结果上调用时,才能成功得到CanCommitOffsets ,而不是在转换之后。获取到CanCommitOffsets 一般要等这批数据处理完再进行提交。

// 从提交到数据库的偏移量开始
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMapval stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval results = yourCalculation(rdd)// 开启事务// 更新结果// 更新offset// 结束事务
}

六、官方例子

object DirectKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <groupId> <topics>|  <brokers> is a list of one or more Kafka brokers|  <groupId> is a consumer group name to consume from topics|  <topics> is a list of one or more kafka topics to consume from|""".stripMargin)System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(brokers, groupId, topics) = args// 以2秒的批处理间隔创建上下文val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(2))//指定kafka、topic信息创建direct kafka streamval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// 获取一行数据并进行分割、统计、打印val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()//启动计算ssc.start()ssc.awaitTermination()}
}

该例子消费Kafka中一个或多个topic的消息并进行单词统计,需要三个参数:1、Kafka broker的列表,2、消费者组,3、以逗号分隔的topic列表

1、创建2个topic

kafka-topics --create --topic spark-streaming-wc1 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
kafka-topics --create --topic spark-streaming-wc2 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.DirectKafkaWordCount cdh1:9092,cdh2:9092 direct-kafka-wc-group spark-streaming-wc1,spark-streaming-wc2

3、向topic推送数据

kafka-console-producer --topic spark-streaming-wc1 --broker-list cdh1:9092,cdh2:9092,cdh3:9092
kafka-console-producer --topic spark-streaming-wc2 --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、查看结果


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

相关文章:

Spark-Streaming集成Kafka

Spark Streaming集成Kafka是生产上最多的方式&#xff0c;其中集成Kafka 0.10是较为简单的&#xff0c;即&#xff1a;Kafka分区和Spark分区之间是1:1的对应关系&#xff0c;以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整&#xff0c;下面我们…...

移植 OLLVM 到 Android NDK,Android Studio 中使用 OLLVM

版权归作者所有&#xff0c;如有转发&#xff0c;请注明文章出处&#xff1a;https://cyrus-studio.github.io/blog/ OLLVM、LLVM 与 Android NDK 在 Android NDK 中&#xff0c;LLVM/Clang 是默认的编译器。自 Android NDK r18 开始&#xff0c;Google 弃用了 GCC&#xff0c…...

DAY36|动态规划Part04|LeetCode:1049. 最后一块石头的重量 II、494. 目标和、474.一和零

目录 LeetCode:1049. 最后一块石头的重量 II 基本思路 C代码 LeetCode:494. 目标和 基本思路 C代码 LeetCode:474.一和零 基本思路 C代码 LeetCode:1049. 最后一块石头的重量 II 力扣代码链接 文字讲解&#xff1a;LeetCode:1049. 最后一块石头的重量 II 视频讲解&…...

Linux 下SVN新手操作手册

下面来介绍Linux 下 SVN操作方法&#xff1a; 1、SVN的安装 Centos 7 安装Subversion sudo yum -y install subversion Ubuntu 安装Subversion sudo apt-get install subversion 自定义安装&#xff0c;官方地址&#xff1a;https://subversion.apache.org/ 2、SVN的使用…...

障碍感知 | 基于KD树的障碍物快速处理(附案例分析与ROS C++仿真)

目录 1 障碍处理与KD树2 KD树核心原理2.1 KD树的构造2.2 KD树的查找 3 仿真实现3.1 KD树基本算法3.2 ROS C仿真 1 障碍处理与KD树 在机器人感知系统中&#xff0c;传感器&#xff08;如激光雷达、摄像头等&#xff09;会采集周围的环境数据&#xff0c;例如代价地图、八叉树地…...

Electron -- Electron Fiddle(一)

Electron Fiddle 是一个由 Electron 团队开发的开源工具&#xff0c;它允许开发者快速创建、运行和调试 Electron 应用。这个工具提供了一个简洁的界面&#xff0c;使用户无需配置复杂的开发环境&#xff0c;就能快速体验和学习 Electron。强烈建议将其安装为学习工具。 学习它…...

详解Redis的常用命令

目录 KEYS 语法 EXISTS 语法 DEL 语法 EXPIRE 语法 TTL 语法 TYPE 语法 Redis数据结构和内部编码 KEYS 返回所有满⾜样式&#xff08;pattern&#xff09;的 key。 返回值&#xff1a;匹配 pattern 的所有 key。 语法 ⽀持如下统配样式: h?llo matches hello, ha…...

elasticache备份

Elasticsearch 本地快照操作流程 配置快照存储路径 在 elasticsearch.yml 文件中配置以下字段以指定数据、日志和快照存储路径&#xff1a;path:data: /data/data # 数据存储路径logs: /data/log # 日志存储路径repo: /data/snapshot # 快照存储路径确保路径 /dat…...

Tomcat负载均衡全解析

一、Java项目概述 (一)Java语言特点 Java是一种计算机应用语言,在开发王者和管理系统等方面有着广泛的应用。它具有开源免费的特性,不过需要注意的是,虽然语言本身开源,但是后期开发工具可能会收取费用。 (二)、JDK和Tomcat 1,JDK:作为Java语言的开发工具,在Linu…...

[LeetCode-Python版] 定长滑动窗口8——2461. 长度为 K 子数组中的最大和

题目 给你一个整数数组 nums 和一个整数 k 。请你从 nums 中满足下述条件的全部子数组中找出最大子数组和&#xff1a; 子数组的长度是 k&#xff0c;且 子数组中的所有元素 各不相同 。 返回满足题面要求的最大子数组和。如果不存在子数组满足这些条件&#xff0c;返回 0 。…...

springboot476基于vue篮球联盟管理系统(论文+源码)_kaic

摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统篮球联盟管理系统信息管理难度大&#xff0c;容错率低&am…...

预约参观华为基地,见证行业巅峰

✨ 大家好呀&#xff01;今天要跟大家分享一个超酷的体验&#xff0c;关于华为的参观学习之旅&#xff01;&#x1f680; 华为成立于1987年&#xff0c;位于深圳&#xff0c;是全球领先的信息与通信技术&#xff08;ICT&#xff09;解决方案供应商哦&#xff01;他们专注于科技…...

【Flink-scala】DataSet编程模型介绍及数据源

DataStream 学习 1.DataStream编程模型总结 文章目录 DataStream 学习介绍一、DataSet编程模型二、数据源1.文件类数据源2.集合类数据源3.通用类数据源4第三方文件系统 介绍 Flink把批处理看成是一个流处理的特例&#xff0c;因此可以在底层统一的流处理引擎上&#xff0c;同…...

Odrive源码分析(四) 位置爬坡算法

Odrive中自带一个简单的梯形速度爬坡算法&#xff0c;本文分析下这部分代码。 代码如下&#xff1a; #include <cmath> #include "odrive_main.h" #include "utils.hpp"// A sign function where input 0 has positive sign (not 0) float sign_ha…...

[Unity Shader][图形渲染] Shader数学基础11 - 复合变换详解

在图形学与Shader编程中,复合变换是将平移、旋转和缩放等基本几何变换组合在一起,从而实现更复杂的物体变换效果。复合变换的本质是通过矩阵的串联操作,依次应用多个变换。 本文将介绍复合变换的数学原理、矩阵计算方法及注意事项,并结合实际编程中的实现细节帮助你掌握其…...

使用Python实现智能家居控制系统:开启智慧生活的钥匙

友友们好! 我的新专栏《Python进阶》正式启动啦!这是一个专为那些渴望提升Python技能的朋友们量身打造的专栏,无论你是已经有一定基础的开发者,还是希望深入挖掘Python潜力的爱好者,这里都将是你不可错过的宝藏。 在这个专栏中,你将会找到: ● 深入解析:每一篇文章都将…...

使用 HTML5 Canvas 实现动态蜈蚣动画

使用 HTML5 Canvas 实现动态蜈蚣动画 1. 项目概述 我们将通过 HTML 和 JavaScript 创建一个动态蜈蚣。蜈蚣由多个节段组成&#xff0c;每个节段看起来像一个小圆形&#xff0c;并且每个节段上都附带有“脚”。蜈蚣的头部会在画布上随机移动。 完整代码在底部&#xff01;&…...

计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)

计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers) 文章目录 计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)摘要Abstract一、DETR算法1. 摘要&#xff08;Abstract&#xff09;2. 引言&#xff08;Introduction&#…...

uniapp .gitignore

打开HBuilderX&#xff0c;在项目根目录下新建文件 .gitignore复制下面内容 #忽略unpackge目录下除了res目录的所有目录 unpackage/* !unpackage/res/#忽略.hbuilderx目录 .hbuilderx# 忽略node_modules目录下的所有文件 node_modules/# 忽略锁文件 package-lock.json yarn.l…...

JavaWeb Servlet的反射优化、Dispatcher优化、视图(重定向)优化、方法参数值获取优化

目录 1. 背景2. 实现2.1 pom.xml2.2 FruitController.java2.3 DispatcherServlet.java2.4 applicationContext.xml 3. 测试 1. 背景 前面我们做了Servlet的一个案例。但是存在很多问题&#xff0c;现在我们要做优化&#xff0c;优化的步骤如下&#xff1a; 每个Fruit请求都需…...

备忘一个FDBatchMove数据转存的问题

使用FDBatchMove的SQL导入excel表到sql表&#xff0c;设置条件时一头雾水&#xff0c;函数不遵守sql的规则。 比如替换字段的TAB键值为空&#xff0c;replace(字段名,char(9),)竟然提示错误&#xff0c;百思不得其解。 试遍了几乎所有的函数&#xff0c;竟然是chr(9)。 这个…...

CEF127 编译指南 MacOS 篇 - 编译 CEF(六)

1. 引言 经过前面的准备工作&#xff0c;我们已经完成了所有必要的环境配置。本文将详细介绍如何在 macOS 系统上编译 CEF127。通过正确的编译命令和参数配置&#xff0c;我们将完成 CEF 的构建工作&#xff0c;最终生成可用的二进制文件。 2. 编译前准备 2.1 确认环境变量 …...

【更新】LLM Interview

课程链接&#xff1a;BV1o217YeELo 文章目录 LLM基础相关1. LLMs概述2. 大语言模型尺寸3. LLMs的优势与劣势4. 常见的大模型分类5. 目前主流的LLMs开源模型体系有哪些&#xff08;Prefix Decoder&#xff0c;Causal Decoder&#xff0c;Encoder-Decoder的区别是什么&#xff09…...

Django 视图中使用 Redis 缓存优化查询性能

在 Web 应用程序开发中,查询数据库是一个常见的操作,但如果查询过于频繁或耗时,就会影响应用程序的性能。为了解决这个问题,我们可以使用缓存技术,将查询结果暂时存储在内存中,从而减少对数据库的访问。本文将介绍如何在 Django 视图中使用 Redis 缓存来优化查询性能。 © …...

正则表达式解析与功能说明

正则表达式解析与功能说明 表达式说明 String regex "\\#\\{TOASRTRINNG\\((.*?)((.*?))\\)(\\})";该正则表达式的作用是匹配形如 #{TOASRTRINNG(...)} 的字符串格式。以下是正则表达式的详细解析&#xff1a; 拆解与解析 1. \\# 匹配&#xff1a;# 字符。说明…...

STUN服务器实现NAT穿透

NAT穿透的问题 在现代网络环境中&#xff0c;大多数设备都位于NAT(网络地址转换)设备后面。这给点对点(P2P)通信带来了挑战&#xff0c;因为NAT会阻止外部网络直接访问内部设备。STUN(Session Traversal Utilities for NAT)服务器就是为了解决这个问题而设计的。 STUN是什么?…...

音视频入门基础:MPEG2-TS专题(19)——FFmpeg源码中,解析TS流中的PES流的实现

一、引言 FFmpeg源码在解析完PMT表后&#xff0c;会得到该节目包含的视频和音频信息&#xff0c;从而找到音视频流。TS流的音视频流包含在PES流中。FFmpeg源码通过调用函数指针tss->u.pes_filter.pes_cb指向的回调函数解析PES流的PES packet&#xff1a; /* handle one TS…...

tomcat的安装以及配置(基于linuxOS)

目录 安装jdk环境 yum安装 验证JDK环境 安装tomcat应用 yum安装 ​编辑 使用yum工具进行安装 配置tomcat应用 关闭防火墙和selinux 查看端口开启情况 ​编辑 访问tomcat服务 安装扩展包 重启服务 查看服务 源码安装 进入tomcat官网进行下载 查找自己要用的to…...

因子分解(递归)

1.素分解式(简单版) 任务描述 编写函数&#xff0c;输出一个正整数的素数分解式。主函数的功能为输入若干正整数&#xff08;大于1&#xff09;&#xff0c;输出每一个数的素分解式。素数分解式是指将整数写成若干素数(从小到大)乘积的形式。例如&#xff1a; 202*2*5 362*2*…...

【Python】pandas库---数据分析

大学毕业那年&#xff0c;你成了社会底层群众里&#xff0c;受教育程度最高的一批人。 前言 这是我自己学习Python的第四篇博客总结。后期我会继续把Python学习笔记开源至博客上。 上一期笔记有关Python的NumPy数据分析&#xff0c;没看过的同学可以去看看&#xff1a;【Pyt…...