【Spark分布式内存计算框架——Spark Streaming】7. Kafka集成方式
集成方式
Spark Streaming与Kafka集成,有两套API,原因在于Kafka Consumer API有两套,
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html。
方式一:Kafka 0.8.x版本
- 老的Old Kafka Consumer API
- 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html
- 老的Old消费者API,有两种方式:
- 第一种:高级消费API(Consumer High Level API),Receiver接收器接收数据
- 第二种:简单消费者API(Consumer Simple Level API) ,Direct 直接拉取数据
方式二:Kafka 0.10.x版本
- 新的 New Kafka Consumer API
- 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html
- 核心API:KafkaConsumer、ConsumerRecorder
两种方式区别
使用Kafka Old Consumer API集成两种方式,虽然实际生产环境使用Direct方式获取数据,但是在面试的时候常常问到两者区别。
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html
- Receiver-based Approach:
- 基于接收器方式,消费Kafka Topic数据,但是企业中基本上不再使用;
- Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启多个,再手动合并数据(union),再进行处理,很麻烦;
- Receiver那台机器挂了,可能会丢失数据,所以需要开启WAL(预写日志)保证数据安全,那么效率又会降低;
- Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在zookeeper,由Receiver维护;
- Spark在消费的时候为了保证数据不丢也会在Checkpoint中存一份offset,可能会出现数据不一致;
- Direct Approach (No Receivers):
- 直接方式,Streaming中每批次的每个job直接调用Simple Consumer API获取对应Topic数据,此种方式使用最多,面试时被问的最多;
- Direct方式是直接连接kafka分区来获取数据,从每个分区直接读取数据大大提高并行能力
- Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况;
- 当然也可以自己手动维护,把offset存在MySQL、Redis和Zookeeper中;
上述两种方式区别,如下图所示:
4.2 Direct 方式集成
使用Kafka Old Consumer API方式集成Streaming,采用Direct方式,调用Old Simple Consumer API,
文档:
http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers
编码实现
Direct方式会定期地从Kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围,在每个batch里面处理数据,Spark通过调用kafka简单的消费者API读取一定范围的数据。
Direct方式没有receiver层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,再根据设定的maxRatePerPartition来处理每个batch。较于Receiver方式的优势:
- 其一、简化的并行:Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据;
- 其二、高效:no need for Write Ahead Logs;
- 其三、精确一次:直接使用simple Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录。
添加相关MAVEN依赖:
<!-- Spark Streaming 集成Kafka 0.8.2.1 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.5</version>
</dependency>
提供工具类KafkaUtils专门从Kafka消费数据,其中函数【createDirectStream】消费数据:
创建Topic及Console控制台发送数据,命令如下:
# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic wc-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.itcast.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic wc-topic --broker-list node1.itcast.cn:9092
# Consumer
kafka-console-consumer.sh --topic wc-topic \
--bootstrap-server node1.itcast.cn:9092 --from-beginning
具体演示代码如下:
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 集成Kafka,采用Direct式读取数据,对每批次(时间为1秒)数据进行词频统计,将统计结果输出到控制台
*/
object StreamingKafkaDirect {
def main(args: Array[String]): Unit = {
// 1、构建流式上下文实例对象StreamingContext,用于读取流式的数据和调度Batch Job执行
val ssc: StreamingContext = {
// a. 创建SparkConf实例对象,设置Application相关信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
//TODO: 设置每秒钟读取Kafka中Topic最大数据量
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 创建StreamingContext实例,传递Batch Interval(时间间隔:划分流式数据)
val context: StreamingContext = new StreamingContext(sparkConf, Seconds(5))
// c. 返回上下文对象
context
}
// TODO: 2、从流式数据源读取数据
/*
def createDirectStream[
K: ClassTag, // Topic中Key数据类型
V: ClassTag,
KD <: Decoder[K]: ClassTag, // 表示Topic中Key数据解码(从文件中读取,反序列化)
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)]
*/
// 表示从Kafka Topic读取数据时相关参数设置
val kafkaParams: Map[String, String] = Map(
"bootstrap.servers" -> "node1.itcast.cn:9092",
// 表示从Topic的各个分区的哪个偏移量开始消费数据,设置为最大的偏移量开始消费数据
"auto.offset.reset" -> "largest"
)
// 从哪些Topic中读取数据
val topics: Set[String] = Set("wc-topic")
// 采用Direct方式从Kafka 的Topic中读取数据
val kafkaDStream: DStream[(String, String)] = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, //
kafkaParams, //
topics
)
// 3、对接收每批次流式数据,进行词频统计WordCount
val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(rdd => {
rdd
// 获取从Kafka读取数据的Message
.map(tuple => tuple._2)
// 过滤“脏数据”
.filter(line => null != line && line.trim.length > 0)
// 分割为单词
.flatMap(line => line.trim.split("\\s+"))
// 转换为二元组
.mapPartitions{iter => iter.map(word => (word, 1))}
// 聚合统计
.reduceByKey((a, b) => a + b)
})
// 4、将分析每批次结果数据输出,此处答应控制台
resultDStream.foreachRDD{ (rdd, time) =>
// 使用lang3包下FastDateFormat日期格式类,属于线程安全的
val batchTime = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO: 先判断RDD是否有数据,有数据在输出哦
if(!rdd.isEmpty()){
rdd
// 对于结果RDD输出,需要考虑降低分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter =>iter.foreach(item => println(item))}
}
}
// 5、对于流式应用来说,需要启动应用,正常情况下启动以后一直运行,直到程序异常终止或者人为干涉
ssc.start() // 启动接收器Receivers,作为Long Running Task(线程) 运行在Executor
ssc.awaitTermination()
// 结束Streaming应用执行
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
从WEB UI监控页面可以看出如下信息:
相关文章:

【Spark分布式内存计算框架——Spark Streaming】7. Kafka集成方式
集成方式 Spark Streaming与Kafka集成,有两套API,原因在于Kafka Consumer API有两套, 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html。 方式一:Kafka 0.8.x版本 老的Old Kafka Consum…...
如何引入elementUI
elementUI的引入完整引入按需引入完整引入 在 main.js 中写入以下内容: import Vue from ‘vue’; import ElementUI from ‘element-ui’; import ‘element-ui/lib/theme-chalk/index.css’; import App from ‘./App.vue’; Vue.use(ElementUI); new Vue({ el: ‘…...

vue3+rust个人博客建站日记4-Vditor搞定MarkDown
即然是个人博客,那么绝对不能丢给自己一个大大的输入框敷衍了事。如果真是这样,现在就可以宣布项目到此结束了。如今没人享受用输入框写博客。作为一个有追求的程序员,作品就要紧跟潮流。 后来,Markdown 的崛起逐步改变了大家的排…...

KDZD-JC软化击穿试验仪
一、概 述 KDZD-JC智能软化击穿试验仪是根据GB/T4074.6-2008和idtIEC60851-6:2004标准而设计的一种新型漆包圆线检测仪器。主要适用于固体绝缘材料(如:塑料、橡胶、层压材料、薄膜、树脂、云母、陶瓷、玻璃、绝缘漆等绝缘材料及绝缘件)在工…...

【数据结构】单链表的C语言实现--万字详解介绍
📝个人主页:Sherry的成长之路 🏠学习社区:Sherry的成长之路(个人社区) 📖专栏链接:数据结构 🎯长路漫漫浩浩,万事皆有期待 文章目录1.链表1.1 链表的概念…...

电子科技大学软件工程期末复习笔记(七):测试策略
目录 前言 重点一览 V模型 回归测试 单元测试 集成测试 重要概念 自顶向下的集成方法 自底向上的集成方法 SMOKE方法 系统测试 验收测试 α测试 β测试 本章小结 前言 本复习笔记基于王玉林老师的课堂PPT与复习大纲,供自己期末复习与学弟学妹参考用…...

逆向-还原代码之除法 (Interl 64)
除法和32位差不多,毕竟背后的数学公式是一样的。区别只是32位的乘法需要两个寄存器来存放大数相乘的结果,而64位的不需要,一个寄存器就能存下。所以在64位的环境下,多了右移32位这条指令,其他指令一样。 //code #incl…...

Python WebDriver自动化测试
Webdriver Selenium 是 ThroughtWorks 一个强大的基于浏览器的开源自动化测试工具,它通常用来编写 Web 应用的自动化测试。 Selenium 2,又名 WebDriver,它的主要新功能是集成了 Selenium 1.0 以及 WebDriver(WebDriver 曾经是…...

2023年微信小程序获取手机号授权登录注册详细教程,包含服务端教程
前言 小程序中有很多地方都会用到用户的手机号,比如登陆注册,填写收货地址等等。有了这个组件可以快速获取微信绑定手机号码,无须用户填写。网上大多数教程还是往年的,而微信官方的api已做了修改。本篇文章将使用最新的方法获取手…...

YOLOv8模型学习笔记
在前面的章节中博主学习了YOLOv5的相关知识,从YOLOv5的数据增强处理到模型设计,从正负样本匹配策略到LOSS设计,今天博主学习的是YOLOv8,同为ultralytics公司的产品,两者无论是思想层面还是具体的设计方面都有着异曲同工…...
Java SE知识点1
一、continue、break、和return的区别是什么? 在循环结构中,当循环条件不满足或者循环次数达到要求时,循环会正常结束。但是,有时候可能需要 在循环的过程中,当发生了某种条件之后 ,提前终止循环,这就需要用到下面几个关键词: 1. continue :指跳出当前的这一次循环,…...

华为OD机试模拟题 用 C++ 实现 - 端口合并(2023.Q1)
最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 最多获得的短信条数(2023.Q1)) 文章目录 最近更新的博客使用说明端口合并题目输入输出示例一输入输出说明示例二输入输出说明示例三输入输出说明...

C++ Primer Plus 第6版 读书笔记(3) 第3章 处理数据
目录 3.1 简单变量 3.1.1 变量名 *位与字节 3.1.4 无符号类型 3.1.7 C如何确定常量的类型 C是在 C 语言基础上开发的一种集面向对象编程、泛型编程和过程化编程于一体的编程语言,是C语言的超集。本书是根据2003年的ISO/ANSI C标准编写的,通过大量短…...
ArrayList源码解读
参数 //默认初始容量private static final int DEFAULT_CAPACITY 10;//空数组(用于空实例)private static final Object[] EMPTY_ELEMENTDATA {};//用于默认大小空实例的共享空数组private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA {};//保存数据的数组tra…...
python实战应用讲解-【语法高级篇】时间与日期(附python示例代码)
目录 保持时间、计划任务和启动程序 time 模块 time.time() 函数 time.sleep() 函数 Python3 日期和时间...

D. Moscow Gorillas(双指针 + 区间分析)
Problem - D - Codeforces 在冬天,莫斯科动物园的居民非常无聊,尤其是大猩猩。你决定娱乐他们,带了一个长度为n的排列p到动物园。长度为n的排列是由n个从1到n的不同整数以任意顺序组成的数组。例如,[2,3,1,5,4]是一个排列…...

华为OD机试题,用 Java 解【相同数字的积木游戏 1】问题
最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...

Python实现GWO智能灰狼优化算法优化BP神经网络分类模型(BP神经网络分类算法)项目实战
说明:这是一个机器学习实战项目(附带数据代码文档视频讲解),如需数据代码文档视频讲解可以直接到文章最后获取。1.项目背景灰狼优化算法(GWO),由澳大利亚格里菲斯大学学者 Mirjalili 等人于2014年提出来的一种群智能优…...

无线蓝牙耳机哪个牌子好?2023质量好的无线蓝牙耳机推荐
近几年,随着蓝牙技术的不断进步,使用蓝牙耳机的人也越来越多。蓝牙耳机的出现,不仅能让我们摆脱线带来的约束,还能提升我们学习和工作的效率。最近看到很多人问,无线蓝牙耳机哪个牌子好?下面,我…...

Qt之QTableView自定义排序/过滤(QSortFilterProxyModel实现,含源码+注释)
一、效果示例图 1.1 自定义表格排序示例图 本文过滤条件为行索引取余2等于0时返回true,且从下图中可以看到,奇偶行是各自挨在一起的。 1.2 自定义表格过滤示例图 下图添加两列条件(当前数据大于当前列条件才返回true,且多个列…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...

接口测试中缓存处理策略
在接口测试中,缓存处理策略是一个关键环节,直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性,避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明: 一、缓存处理的核…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...

Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...