当前位置: 首页 > news >正文

Spark-Streaming集成Kafka

 Spark Streaming集成Kafka是生产上最多的方式,其中集成Kafka 0.10是较为简单的,即:Kafka分区和Spark分区之间是1:1的对应关系,以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整,下面我们一起来看看吧。

一、创建一个Direct Stream

导入相关maven依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.5.3</version>
</dependency>

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeclass KafkaDriectStream {def main(args: Array[String]): Unit = {// 创建一个具有2个线程和1秒批处理间隔的本地StreamingContext。val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDriectStream")val ssc = new StreamingContext(conf, Seconds(1))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "cdh1:9092,cdh2:9092,cdh3:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("topicA", "topicB")val inputDStream :InputDStream[ConsumerRecord[String, String]]= KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))inputDStream.map(record => (record.key, record.value))}
}

如果Spark批处理持续时间大于默认的Kafka心跳会话超时时间(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批处理,这将需要更改代理上的group.max.session.timeout.ms。

二、executor选择适合分区处理

新的Kafka Consumer API会将消息预取到缓冲区中。因此,出于性能原因,Spark集成Kafka时最好将缓存的Consumer 保留在executor上(而不是为每个批次重新创建它们)。

在大多数情况下,应该使用LocationStrategies.PreferConsistent。这将在可用的executor之间均匀地分配分区。如果executor与Kafka 的broker位于相同的主机上,则使用PreferBrokers,这将在该分区的Kafka leader上安排分区。最后,如果分区之间的负载严重偏差,请使用PreferFixed。这允许指定分区到主机的显式映射(任何未指定的分区都将使用一致的位置)。

Consumer 缓存的默认最大大小为64。如果处理超过(64个executor数量)的Kafka分区,可以通过更改spark.streaming.kafka.consumer.cache.maxCapacity设置。

如果想禁用Consumer 的缓存,可以将spark.streaming.kafka.consumer.cache.enabled 设置成false

缓存由topic分区和group.id控制,因此对createDirectStream的每次调用使用单独的 group.id

三、根据topic、partition、offset创建RDD

// 导入依赖关系并创建kafka-params,例如第一步:创建Direct Streamval offsetRanges = Array(// topic, partition, 包含起始offset, 不包含结束offsetOffsetRange("test", 0, 0, 100),OffsetRange("test", 1, 0, 100)
)//根据kafka TopicPartition 中的一段数据来创建一个RDD,这是不是为了实现微批来提供支持呢
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

请注意,这里不能指定broker来消费,因为spark streaming的Driver Consumer 可以自动查找broker的元数据。如果要指定broker,需要将其与元数据绑定到一起。

四、获取offset

stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter =>val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}
}

请注意,HasOffsetRanges的类型转换只有在createDirectStream结果调用的第一个方法中完成时才会成功,而不是在后面的方法链中完成。因为一旦发生shuffle和重分区,RDD分区和Kafka分区之间的一对一关系就会遭到破坏。

五、存储offset

在kafka中为了实现精确一次的语义,必须把结果处理和offset放到一个事务中去处理,在与spark streaming集成时也不例外。必须在幂等输出之后存储offset,或者将offset与输出一起存储在原子事务中。

offset可以存储在spark的checkpoint中,也可以存储在kafka自身的内部topic中。将offset存储到kafka的好处是,无论应用程序代码发生什么变化,Kafka都是一个持久的存储。但是,Kafka不是事务性的,程序的输出必须仍然是幂等的。注意,在流式计算中我们一般会将enable.auto.commit置为false。采用手动提交的方式。

stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges// 一段时间后,在输出完成之后,提交offsetstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与HasOffsetRanges一样,只有在createDirectStream的结果上调用时,才能成功得到CanCommitOffsets ,而不是在转换之后。获取到CanCommitOffsets 一般要等这批数据处理完再进行提交。

// 从提交到数据库的偏移量开始
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMapval stream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)stream.foreachRDD { rdd =>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval results = yourCalculation(rdd)// 开启事务// 更新结果// 更新offset// 结束事务
}

六、官方例子

object DirectKafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 3) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <groupId> <topics>|  <brokers> is a list of one or more Kafka brokers|  <groupId> is a consumer group name to consume from topics|  <topics> is a list of one or more kafka topics to consume from|""".stripMargin)System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(brokers, groupId, topics) = args// 以2秒的批处理间隔创建上下文val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(2))//指定kafka、topic信息创建direct kafka streamval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))// 获取一行数据并进行分割、统计、打印val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()//启动计算ssc.start()ssc.awaitTermination()}
}

该例子消费Kafka中一个或多个topic的消息并进行单词统计,需要三个参数:1、Kafka broker的列表,2、消费者组,3、以逗号分隔的topic列表

1、创建2个topic

kafka-topics --create --topic spark-streaming-wc1 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
kafka-topics --create --topic spark-streaming-wc2 --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2

2、启动程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.DirectKafkaWordCount cdh1:9092,cdh2:9092 direct-kafka-wc-group spark-streaming-wc1,spark-streaming-wc2

3、向topic推送数据

kafka-console-producer --topic spark-streaming-wc1 --broker-list cdh1:9092,cdh2:9092,cdh3:9092
kafka-console-producer --topic spark-streaming-wc2 --broker-list cdh1:9092,cdh2:9092,cdh3:9092

4、查看结果


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)

  • 广州
  • https://ais.cn/u/fi2yym

第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)

  • 青岛
  • https://ais.cn/u/nuQr6f

第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

  • 苏州
  • https://ais.cn/u/eYnmQr

第三届通信网络与机器学习国际学术会议(CNML 2025)

  • 南京
  • https://ais.cn/u/vUNva2

相关文章:

Spark-Streaming集成Kafka

Spark Streaming集成Kafka是生产上最多的方式&#xff0c;其中集成Kafka 0.10是较为简单的&#xff0c;即&#xff1a;Kafka分区和Spark分区之间是1:1的对应关系&#xff0c;以及对偏移量和元数据的访问。与高版本的Kafka Consumer API 集成时做了一些调整&#xff0c;下面我们…...

移植 OLLVM 到 Android NDK,Android Studio 中使用 OLLVM

版权归作者所有&#xff0c;如有转发&#xff0c;请注明文章出处&#xff1a;https://cyrus-studio.github.io/blog/ OLLVM、LLVM 与 Android NDK 在 Android NDK 中&#xff0c;LLVM/Clang 是默认的编译器。自 Android NDK r18 开始&#xff0c;Google 弃用了 GCC&#xff0c…...

DAY36|动态规划Part04|LeetCode:1049. 最后一块石头的重量 II、494. 目标和、474.一和零

目录 LeetCode:1049. 最后一块石头的重量 II 基本思路 C代码 LeetCode:494. 目标和 基本思路 C代码 LeetCode:474.一和零 基本思路 C代码 LeetCode:1049. 最后一块石头的重量 II 力扣代码链接 文字讲解&#xff1a;LeetCode:1049. 最后一块石头的重量 II 视频讲解&…...

Linux 下SVN新手操作手册

下面来介绍Linux 下 SVN操作方法&#xff1a; 1、SVN的安装 Centos 7 安装Subversion sudo yum -y install subversion Ubuntu 安装Subversion sudo apt-get install subversion 自定义安装&#xff0c;官方地址&#xff1a;https://subversion.apache.org/ 2、SVN的使用…...

障碍感知 | 基于KD树的障碍物快速处理(附案例分析与ROS C++仿真)

目录 1 障碍处理与KD树2 KD树核心原理2.1 KD树的构造2.2 KD树的查找 3 仿真实现3.1 KD树基本算法3.2 ROS C仿真 1 障碍处理与KD树 在机器人感知系统中&#xff0c;传感器&#xff08;如激光雷达、摄像头等&#xff09;会采集周围的环境数据&#xff0c;例如代价地图、八叉树地…...

Electron -- Electron Fiddle(一)

Electron Fiddle 是一个由 Electron 团队开发的开源工具&#xff0c;它允许开发者快速创建、运行和调试 Electron 应用。这个工具提供了一个简洁的界面&#xff0c;使用户无需配置复杂的开发环境&#xff0c;就能快速体验和学习 Electron。强烈建议将其安装为学习工具。 学习它…...

详解Redis的常用命令

目录 KEYS 语法 EXISTS 语法 DEL 语法 EXPIRE 语法 TTL 语法 TYPE 语法 Redis数据结构和内部编码 KEYS 返回所有满⾜样式&#xff08;pattern&#xff09;的 key。 返回值&#xff1a;匹配 pattern 的所有 key。 语法 ⽀持如下统配样式: h?llo matches hello, ha…...

elasticache备份

Elasticsearch 本地快照操作流程 配置快照存储路径 在 elasticsearch.yml 文件中配置以下字段以指定数据、日志和快照存储路径&#xff1a;path:data: /data/data # 数据存储路径logs: /data/log # 日志存储路径repo: /data/snapshot # 快照存储路径确保路径 /dat…...

Tomcat负载均衡全解析

一、Java项目概述 (一)Java语言特点 Java是一种计算机应用语言,在开发王者和管理系统等方面有着广泛的应用。它具有开源免费的特性,不过需要注意的是,虽然语言本身开源,但是后期开发工具可能会收取费用。 (二)、JDK和Tomcat 1,JDK:作为Java语言的开发工具,在Linu…...

[LeetCode-Python版] 定长滑动窗口8——2461. 长度为 K 子数组中的最大和

题目 给你一个整数数组 nums 和一个整数 k 。请你从 nums 中满足下述条件的全部子数组中找出最大子数组和&#xff1a; 子数组的长度是 k&#xff0c;且 子数组中的所有元素 各不相同 。 返回满足题面要求的最大子数组和。如果不存在子数组满足这些条件&#xff0c;返回 0 。…...

springboot476基于vue篮球联盟管理系统(论文+源码)_kaic

摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统篮球联盟管理系统信息管理难度大&#xff0c;容错率低&am…...

预约参观华为基地,见证行业巅峰

✨ 大家好呀&#xff01;今天要跟大家分享一个超酷的体验&#xff0c;关于华为的参观学习之旅&#xff01;&#x1f680; 华为成立于1987年&#xff0c;位于深圳&#xff0c;是全球领先的信息与通信技术&#xff08;ICT&#xff09;解决方案供应商哦&#xff01;他们专注于科技…...

【Flink-scala】DataSet编程模型介绍及数据源

DataStream 学习 1.DataStream编程模型总结 文章目录 DataStream 学习介绍一、DataSet编程模型二、数据源1.文件类数据源2.集合类数据源3.通用类数据源4第三方文件系统 介绍 Flink把批处理看成是一个流处理的特例&#xff0c;因此可以在底层统一的流处理引擎上&#xff0c;同…...

Odrive源码分析(四) 位置爬坡算法

Odrive中自带一个简单的梯形速度爬坡算法&#xff0c;本文分析下这部分代码。 代码如下&#xff1a; #include <cmath> #include "odrive_main.h" #include "utils.hpp"// A sign function where input 0 has positive sign (not 0) float sign_ha…...

[Unity Shader][图形渲染] Shader数学基础11 - 复合变换详解

在图形学与Shader编程中,复合变换是将平移、旋转和缩放等基本几何变换组合在一起,从而实现更复杂的物体变换效果。复合变换的本质是通过矩阵的串联操作,依次应用多个变换。 本文将介绍复合变换的数学原理、矩阵计算方法及注意事项,并结合实际编程中的实现细节帮助你掌握其…...

使用Python实现智能家居控制系统:开启智慧生活的钥匙

友友们好! 我的新专栏《Python进阶》正式启动啦!这是一个专为那些渴望提升Python技能的朋友们量身打造的专栏,无论你是已经有一定基础的开发者,还是希望深入挖掘Python潜力的爱好者,这里都将是你不可错过的宝藏。 在这个专栏中,你将会找到: ● 深入解析:每一篇文章都将…...

使用 HTML5 Canvas 实现动态蜈蚣动画

使用 HTML5 Canvas 实现动态蜈蚣动画 1. 项目概述 我们将通过 HTML 和 JavaScript 创建一个动态蜈蚣。蜈蚣由多个节段组成&#xff0c;每个节段看起来像一个小圆形&#xff0c;并且每个节段上都附带有“脚”。蜈蚣的头部会在画布上随机移动。 完整代码在底部&#xff01;&…...

计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)

计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers) 文章目录 计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)摘要Abstract一、DETR算法1. 摘要&#xff08;Abstract&#xff09;2. 引言&#xff08;Introduction&#…...

uniapp .gitignore

打开HBuilderX&#xff0c;在项目根目录下新建文件 .gitignore复制下面内容 #忽略unpackge目录下除了res目录的所有目录 unpackage/* !unpackage/res/#忽略.hbuilderx目录 .hbuilderx# 忽略node_modules目录下的所有文件 node_modules/# 忽略锁文件 package-lock.json yarn.l…...

JavaWeb Servlet的反射优化、Dispatcher优化、视图(重定向)优化、方法参数值获取优化

目录 1. 背景2. 实现2.1 pom.xml2.2 FruitController.java2.3 DispatcherServlet.java2.4 applicationContext.xml 3. 测试 1. 背景 前面我们做了Servlet的一个案例。但是存在很多问题&#xff0c;现在我们要做优化&#xff0c;优化的步骤如下&#xff1a; 每个Fruit请求都需…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

模型参数、模型存储精度、参数与显存

模型参数量衡量单位 M&#xff1a;百万&#xff08;Million&#xff09; B&#xff1a;十亿&#xff08;Billion&#xff09; 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的&#xff0c;但是一个参数所表示多少字节不一定&#xff0c;需要看这个参数以什么…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...