当前位置: 首页 > news >正文

【Spark分布式内存计算框架——Spark Streaming】7. Kafka集成方式

集成方式

Spark Streaming与Kafka集成,有两套API,原因在于Kafka Consumer API有两套,
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html。

方式一:Kafka 0.8.x版本

  • 老的Old Kafka Consumer API
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html
  • 老的Old消费者API,有两种方式:
    • 第一种:高级消费API(Consumer High Level API),Receiver接收器接收数据
    • 第二种:简单消费者API(Consumer Simple Level API) ,Direct 直接拉取数据

方式二:Kafka 0.10.x版本

  • 新的 New Kafka Consumer API
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html
  • 核心API:KafkaConsumer、ConsumerRecorder
    在这里插入图片描述

两种方式区别

使用Kafka Old Consumer API集成两种方式,虽然实际生产环境使用Direct方式获取数据,但是在面试的时候常常问到两者区别。
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html

  • Receiver-based Approach:
    • 基于接收器方式,消费Kafka Topic数据,但是企业中基本上不再使用;
    • Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启多个,再手动合并数据(union),再进行处理,很麻烦;
    • Receiver那台机器挂了,可能会丢失数据,所以需要开启WAL(预写日志)保证数据安全,那么效率又会降低;
    • Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在zookeeper,由Receiver维护;
    • Spark在消费的时候为了保证数据不丢也会在Checkpoint中存一份offset,可能会出现数据不一致;
  • Direct Approach (No Receivers):
    • 直接方式,Streaming中每批次的每个job直接调用Simple Consumer API获取对应Topic数据,此种方式使用最多,面试时被问的最多;
    • Direct方式是直接连接kafka分区来获取数据,从每个分区直接读取数据大大提高并行能力
    • Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况;
    • 当然也可以自己手动维护,把offset存在MySQL、Redis和Zookeeper中;

上述两种方式区别,如下图所示:
在这里插入图片描述

4.2 Direct 方式集成

使用Kafka Old Consumer API方式集成Streaming,采用Direct方式,调用Old Simple Consumer API,
文档:
http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers

编码实现
Direct方式会定期地从Kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围,在每个batch里面处理数据,Spark通过调用kafka简单的消费者API读取一定范围的数据。
在这里插入图片描述
Direct方式没有receiver层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,再根据设定的maxRatePerPartition来处理每个batch。较于Receiver方式的优势:

  • 其一、简化的并行:Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据;
  • 其二、高效:no need for Write Ahead Logs;
  • 其三、精确一次:直接使用simple Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录。

添加相关MAVEN依赖:

<!-- Spark Streaming 集成Kafka 0.8.2.1 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.5</version>
</dependency>

提供工具类KafkaUtils专门从Kafka消费数据,其中函数【createDirectStream】消费数据:
在这里插入图片描述
创建Topic及Console控制台发送数据,命令如下:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic wc-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.itcast.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic wc-topic --broker-list node1.itcast.cn:9092
# Consumer
kafka-console-consumer.sh --topic wc-topic \
--bootstrap-server node1.itcast.cn:9092 --from-beginning

具体演示代码如下:

import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 集成Kafka,采用Direct式读取数据,对每批次(时间为1秒)数据进行词频统计,将统计结果输出到控制台
*/
object StreamingKafkaDirect {
def main(args: Array[String]): Unit = {
// 1、构建流式上下文实例对象StreamingContext,用于读取流式的数据和调度Batch Job执行
val ssc: StreamingContext = {
// a. 创建SparkConf实例对象,设置Application相关信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
//TODO: 设置每秒钟读取Kafka中Topic最大数据量
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 创建StreamingContext实例,传递Batch Interval(时间间隔:划分流式数据)
val context: StreamingContext = new StreamingContext(sparkConf, Seconds(5))
// c. 返回上下文对象
context
}
// TODO: 2、从流式数据源读取数据
/*
def createDirectStream[
K: ClassTag, // Topic中Key数据类型
V: ClassTag,
KD <: Decoder[K]: ClassTag, // 表示Topic中Key数据解码(从文件中读取,反序列化)
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)]
*/
// 表示从Kafka Topic读取数据时相关参数设置
val kafkaParams: Map[String, String] = Map(
"bootstrap.servers" -> "node1.itcast.cn:9092",
// 表示从Topic的各个分区的哪个偏移量开始消费数据,设置为最大的偏移量开始消费数据
"auto.offset.reset" -> "largest"
)
// 从哪些Topic中读取数据
val topics: Set[String] = Set("wc-topic")
// 采用Direct方式从Kafka 的Topic中读取数据
val kafkaDStream: DStream[(String, String)] = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, //
kafkaParams, //
topics
)
// 3、对接收每批次流式数据,进行词频统计WordCount
val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(rdd => {
rdd
// 获取从Kafka读取数据的Message
.map(tuple => tuple._2)
// 过滤“脏数据”
.filter(line => null != line && line.trim.length > 0)
// 分割为单词
.flatMap(line => line.trim.split("\\s+"))
// 转换为二元组
.mapPartitions{iter => iter.map(word => (word, 1))}
// 聚合统计
.reduceByKey((a, b) => a + b)
})
// 4、将分析每批次结果数据输出,此处答应控制台
resultDStream.foreachRDD{ (rdd, time) =>
// 使用lang3包下FastDateFormat日期格式类,属于线程安全的
val batchTime = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO: 先判断RDD是否有数据,有数据在输出哦
if(!rdd.isEmpty()){
rdd
// 对于结果RDD输出,需要考虑降低分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter =>iter.foreach(item => println(item))}
}
}
// 5、对于流式应用来说,需要启动应用,正常情况下启动以后一直运行,直到程序异常终止或者人为干涉
ssc.start() // 启动接收器Receivers,作为Long Running Task(线程) 运行在Executor
ssc.awaitTermination()
// 结束Streaming应用执行
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

从WEB UI监控页面可以看出如下信息:
在这里插入图片描述

相关文章:

【Spark分布式内存计算框架——Spark Streaming】7. Kafka集成方式

集成方式 Spark Streaming与Kafka集成&#xff0c;有两套API&#xff0c;原因在于Kafka Consumer API有两套&#xff0c; 文档&#xff1a;http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html。 方式一&#xff1a;Kafka 0.8.x版本 老的Old Kafka Consum…...

如何引入elementUI

elementUI的引入完整引入按需引入完整引入 在 main.js 中写入以下内容&#xff1a; import Vue from ‘vue’; import ElementUI from ‘element-ui’; import ‘element-ui/lib/theme-chalk/index.css’; import App from ‘./App.vue’; Vue.use(ElementUI); new Vue({ el: ‘…...

vue3+rust个人博客建站日记4-Vditor搞定MarkDown

即然是个人博客&#xff0c;那么绝对不能丢给自己一个大大的输入框敷衍了事。如果真是这样&#xff0c;现在就可以宣布项目到此结束了。如今没人享受用输入框写博客。作为一个有追求的程序员&#xff0c;作品就要紧跟潮流。 后来&#xff0c;Markdown 的崛起逐步改变了大家的排…...

KDZD-JC软化击穿试验仪

一、概 述 KDZD-JC智能软化击穿试验仪是根据GB/T4074.6-2008和idtIEC60851-6:2004标准而设计的一种新型漆包圆线检测仪器。主要适用于固体绝缘材料&#xff08;如&#xff1a;塑料、橡胶、层压材料、薄膜、树脂、云母、陶瓷、玻璃、绝缘漆等绝缘材料及绝缘件&#xff09;在工…...

【数据结构】单链表的C语言实现--万字详解介绍

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;数据结构 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录1.链表1.1 链表的概念…...

电子科技大学软件工程期末复习笔记(七):测试策略

目录 前言 重点一览 V模型 回归测试 单元测试 集成测试 重要概念 自顶向下的集成方法 自底向上的集成方法 SMOKE方法 系统测试 验收测试 α测试 β测试 本章小结 前言 本复习笔记基于王玉林老师的课堂PPT与复习大纲&#xff0c;供自己期末复习与学弟学妹参考用…...

逆向-还原代码之除法 (Interl 64)

除法和32位差不多&#xff0c;毕竟背后的数学公式是一样的。区别只是32位的乘法需要两个寄存器来存放大数相乘的结果&#xff0c;而64位的不需要&#xff0c;一个寄存器就能存下。所以在64位的环境下&#xff0c;多了右移32位这条指令&#xff0c;其他指令一样。 //code #incl…...

Python WebDriver自动化测试

Webdriver Selenium 是 ThroughtWorks 一个强大的基于浏览器的开源自动化测试工具&#xff0c;它通常用来编写 Web 应用的自动化测试。 Selenium 2&#xff0c;又名 WebDriver&#xff0c;它的主要新功能是集成了 Selenium 1.0 以及 WebDriver​&#xff08;WebDriver 曾经是…...

2023年微信小程序获取手机号授权登录注册详细教程,包含服务端教程

前言 小程序中有很多地方都会用到用户的手机号&#xff0c;比如登陆注册&#xff0c;填写收货地址等等。有了这个组件可以快速获取微信绑定手机号码&#xff0c;无须用户填写。网上大多数教程还是往年的&#xff0c;而微信官方的api已做了修改。本篇文章将使用最新的方法获取手…...

YOLOv8模型学习笔记

在前面的章节中博主学习了YOLOv5的相关知识&#xff0c;从YOLOv5的数据增强处理到模型设计&#xff0c;从正负样本匹配策略到LOSS设计&#xff0c;今天博主学习的是YOLOv8&#xff0c;同为ultralytics公司的产品&#xff0c;两者无论是思想层面还是具体的设计方面都有着异曲同工…...

Java SE知识点1

一、continue、break、和return的区别是什么? 在循环结构中,当循环条件不满足或者循环次数达到要求时,循环会正常结束。但是,有时候可能需要 在循环的过程中,当发生了某种条件之后 ,提前终止循环,这就需要用到下面几个关键词: 1. continue :指跳出当前的这一次循环,…...

华为OD机试模拟题 用 C++ 实现 - 端口合并(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 最多获得的短信条数(2023.Q1)) 文章目录 最近更新的博客使用说明端口合并题目输入输出示例一输入输出说明示例二输入输出说明示例三输入输出说明...

C++ Primer Plus 第6版 读书笔记(3) 第3章 处理数据

目录 3.1 简单变量 3.1.1 变量名 *位与字节 3.1.4 无符号类型 3.1.7 C如何确定常量的类型 C是在 C 语言基础上开发的一种集面向对象编程、泛型编程和过程化编程于一体的编程语言&#xff0c;是C语言的超集。本书是根据2003年的ISO/ANSI C标准编写的&#xff0c;通过大量短…...

ArrayList源码解读

参数 //默认初始容量private static final int DEFAULT_CAPACITY 10;//空数组(用于空实例)private static final Object[] EMPTY_ELEMENTDATA {};//用于默认大小空实例的共享空数组private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA {};//保存数据的数组tra…...

python实战应用讲解-【语法高级篇】时间与日期(附python示例代码)

目录 保持时间、计划任务和启动程序 time 模块 time.time() 函数 time.sleep() 函数 Python3 日期和时间...

D. Moscow Gorillas(双指针 + 区间分析)

Problem - D - Codeforces 在冬天&#xff0c;莫斯科动物园的居民非常无聊&#xff0c;尤其是大猩猩。你决定娱乐他们&#xff0c;带了一个长度为n的排列p到动物园。长度为n的排列是由n个从1到n的不同整数以任意顺序组成的数组。例如&#xff0c;[2,3,1,5,4]是一个排列&#xf…...

华为OD机试题,用 Java 解【相同数字的积木游戏 1】问题

最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...

Python实现GWO智能灰狼优化算法优化BP神经网络分类模型(BP神经网络分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。1.项目背景灰狼优化算法(GWO)&#xff0c;由澳大利亚格里菲斯大学学者 Mirjalili 等人于2014年提出来的一种群智能优…...

无线蓝牙耳机哪个牌子好?2023质量好的无线蓝牙耳机推荐

近几年&#xff0c;随着蓝牙技术的不断进步&#xff0c;使用蓝牙耳机的人也越来越多。蓝牙耳机的出现&#xff0c;不仅能让我们摆脱线带来的约束&#xff0c;还能提升我们学习和工作的效率。最近看到很多人问&#xff0c;无线蓝牙耳机哪个牌子好&#xff1f;下面&#xff0c;我…...

Qt之QTableView自定义排序/过滤(QSortFilterProxyModel实现,含源码+注释)

一、效果示例图 1.1 自定义表格排序示例图 本文过滤条件为行索引取余2等于0时返回true&#xff0c;且从下图中可以看到&#xff0c;奇偶行是各自挨在一起的。 1.2 自定义表格过滤示例图 下图添加两列条件&#xff08;当前数据大于当前列条件才返回true&#xff0c;且多个列…...

conda相比python好处

Conda 作为 Python 的环境和包管理工具&#xff0c;相比原生 Python 生态&#xff08;如 pip 虚拟环境&#xff09;有许多独特优势&#xff0c;尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处&#xff1a; 一、一站式环境管理&#xff1a…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

搭建DNS域名解析服务器(正向解析资源文件)

正向解析资源文件 1&#xff09;准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2&#xff09;服务端安装软件&#xff1a;bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...

tauri项目,如何在rust端读取电脑环境变量

如果想在前端通过调用来获取环境变量的值&#xff0c;可以通过标准的依赖&#xff1a; std::env::var(name).ok() 想在前端通过调用来获取&#xff0c;可以写一个command函数&#xff1a; #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...

学习一下用鸿蒙​​DevEco Studio HarmonyOS5实现百度地图

在鸿蒙&#xff08;HarmonyOS5&#xff09;中集成百度地图&#xff0c;可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API&#xff0c;可以构建跨设备的定位、导航和地图展示功能。 ​​1. 鸿蒙环境准备​​ ​​开发工具​​&#xff1a;下载安装 ​​De…...