当前位置: 首页 > news >正文

SparkStreaming与Kafka整合

1.3 SparkStreaming与Kafka整合

1.3.1 整合简述
kafka是做消息的缓存,数据和业务隔离操作的消息队列,而sparkstreaming是一款准实时流式计算框架,所以二者的整合,是大势所趋。
​
二者的整合,有主要的两大版本。

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,在实际开发中Spark Streaming经常会结合Kafka来处理实时数据。Spark Streaming 与 kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10。jar包分支选择原则:

  • 0.10.0>kafka版本>=0.8.2.1,选择 08 接口

  • kafka版本>=0.10.0,选择 010 接口

sparkStreaming和Kafka整合一般两种方式:Receiver方式和Direct方式

Receiver方式(介绍)

Receiver方式基于kafka的高级消费者API实现(高级优点:高级API写起来简单;不需要去自行去管理offset,系统通过zookeeper自行管理;不需要管理分区,副本等情况,系统自动管理;消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据;高级缺点:不能自行控制 offset;不能细化控制如分区、副本、zk 等)。Receiver从kafka接收数据,存储在Executor中,Spark Streaming 定时生成任务来处理数据。

默认配置的情况,Receiver失败时有可能丢失数据。如果要保证数据的可靠性,需要开启预写式日志,简称WAL(Write Ahead Logs,Spark1.2引入),只有接收到的数据被持久化之后才会去更新Kafka中的消费位移。接收到的数据和WAL存储位置信息被可靠地存储,如果期间出现故障,这些信息被用来从错误中恢复,并继续处理数据。

还有几个需要注意的点:

  • 在Receiver的方式中,Spark中的 partition 和 kafka 中的 partition 并不是相关的,如果加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度;

  • 对于不同的 Group 和 Topic 可以使用多个 Receiver 创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream;

  • 如果启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是:KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

  • WAL将接收的数据备份到HDFS上,保证了数据的安全性。但写HDFS比较消耗性能,另外要在备份完数据之后还要写相关的元数据信息,这样总体上增加job的执行时间,增加了任务执行时间;

  • 总体上看 Receiver 方式,不适于生产环境;

1.3.2  Direct的方式
Direct方式从Spark1.3开始引入的,通过 KafkaUtils.createDirectStream 方法创建一个DStream对象,Direct方式的结构如下图所示。

Direct 方式特点如下:

  • 对应Kafka的版本 0.8.2.1+

  • Direct 方式

  • Offset 可自定义

  • 使用kafka低阶API

  • 底层实现为KafkaRDD

该方式中Kafka的一个分区与Spark RDD对应,通过定期扫描所订阅Kafka每个主题的每个分区的最新偏移量以确定当前批处理数据偏移范围。与Receiver方式相比,Direct方式不需要维护一份WAL数据,由Spark Streaming程序自己控制位移的处理,通常通过检查点机制处理消费位移,这样可以保证Kafka中的数据只会被Spark拉取一次

  • 引入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.1.2</version>
</dependency>
  • 模拟kafka生产数据

package com.qianfeng.sparkstreaming
​
import java.util.{Properties, Random}
​
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
​
/*** 向kafka中test主题模拟生产数据;;;也可以使用命令行生产:kafka-console-producer.sh --broker-list qianfeng01:9092,hadoop02:9092,hadoop03:9092 -topic test*/
object Demo02_DataLoad2Kafka {def main(args: Array[String]): Unit = {val prop = new Properties()//提供Kafka服务器信息prop.put("bootstrap.servers","qianfeng01:9092")//指定响应的方式prop.put("acks","all")//请求失败重试的次数prop.put("retries","3")//指定key的序列化方式,key是用于存放数据对应的offsetprop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")//指定value的序列化方式prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")//创建producer对象val producer = new KafkaProducer[String,String](prop)//提供一个数组,数组中数据val arr = Array("hello tom","hello jerry","hello dabao","hello zhangsan","hello lisi","hello wangwu",)//提供一个随机数,随机获取数组中数据向kafka中进行发送存储val r = new Random()while(true){val message = arr(r.nextInt(arr.length))producer.send(new ProducerRecord[String,String]("test",message))Thread.sleep(r.nextInt(1000))   //休眠1s以内}}
}
  • 实时消费kafka数据

package com.qianfeng.sparkstreaming
​
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
​
/*** sparkStreaming消费Kafka中的数据*/
object Demo03_SparkStreamingWithKafka {def main(args: Array[String]): Unit = {//1.创建SparkConf对象val conf = new SparkConf().setAppName("SparkStreamingToKafka").setMaster("local[*]")//2.提供批次时间val time = Seconds(5)//3.提供StreamingContext对象val sc = new StreamingContext(conf, time)//4.提供Kafka配置参数val kafkaConfig = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "qianfeng01:9092",ConsumerConfig.GROUP_ID_CONFIG -> "qianfeng","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",)//5.读取Kafka中数据信息生成DStreamval value = KafkaUtils.createDirectStream(sc,//本地化策略:将Kafka的分区数据均匀的分配到各个执行Executor中LocationStrategies.PreferConsistent,//表示要从使用kafka进行消费【offset谁来管理,从那个位置开始消费数据】ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaConfig))//6.将每条消息kv获取出来val line: DStream[String] = value.map(record => record.value())//7.开始计算操作line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()//line.count().print()   //每隔5s的数据条数//8.开始任务sc.start()sc.awaitTermination()}
}
  • 说明

    1. 简化的并行性:不需要创建多个输入Kafka流并将其合并。 使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。

    2. 效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次:一次是Kafka,另一次是由预先写入日志(WriteAhead Log)复制。这个第二种方法消除了这个问题,因为没有接收器,因此不需要预先写入日志。只要Kafka数据保留时间足够长。

    3. 正好一次(Exactly-once)的语义:第一种方法使用Kafka的高级API来在Zookeeper中存储消耗的偏移量。传统上这是从Kafka消费数据的方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

相关文章:

SparkStreaming与Kafka整合

1.3 SparkStreaming与Kafka整合 1.3.1 整合简述 kafka是做消息的缓存&#xff0c;数据和业务隔离操作的消息队列&#xff0c;而sparkstreaming是一款准实时流式计算框架&#xff0c;所以二者的整合&#xff0c;是大势所趋。 ​ 二者的整合&#xff0c;有主要的两大版本。 kaf…...

openwrt源码编译

下载openwrt源码 git clone https://github.com/openwrt/chaos_calmer.git // 官方下载地址 当前我们基于15.05版本开发&#xff0c;如果开发者想用最新的OpenWRT系统&#xff0c;可以下载 https://github.com/openwrt/openwrt.git git clone https://github.com/Ying-Yun/o…...

【Leetcode Sheet】Weekly Practice 22

Leetcode Test 1349 参加考试的最大学生数(12.26) 给你一个 m * n 的矩阵 seats 表示教室中的座位分布。如果座位是坏的&#xff08;不可用&#xff09;&#xff0c;就用 # 表示&#xff1b;否则&#xff0c;用 . 表示。 学生可以看到左侧、右侧、左上、右上这四个方向上紧邻…...

ROS TF坐标变换 - 静态坐标变换

目录 一、静态坐标变换&#xff08;C实现&#xff09;二、静态坐标变换&#xff08;Python实现&#xff09; 如前文所属&#xff0c;ROS通过广播的形式告知各模块的位姿关系&#xff0c;接下来详述这一机制的代码实现。 模块间的位置关系有两种类型&#xff0c;一种是相对固定…...

香橙派5plus从ssd启动Ubuntu

官方接口图 我实际会用到的就几个接口&#xff0c;背面的话就一个M.2固态的位置&#xff1a; 其中WIFI模块的接口应该也可以插2230的固态&#xff0c;不过是pcie2.0的速度&#xff0c;背面的接口则是pcie3.0*4的速度&#xff0c;差距还是挺大的。 开始安装系统 准备工作 一张…...

JWT+Redis 实现接口 Token 校验

1、业务逻辑 有一些接口&#xff0c;需要用户登录以后才能访问&#xff0c;用户没有登录则无法访问。 因此&#xff0c;对于一些限制用户访问的接口&#xff0c;可以在请求头中增加一个校验参数&#xff0c;用于判断接口对应的用户是否登录。 而对于一些不需要登录即可访问的接…...

C语言 linux文件操作(二)

文章目录 一、获取文件长度二、追加写入三、覆盖写入四、文件创建函数creat 一、获取文件长度 通过lseek函数&#xff0c;除了操作定位文件指针&#xff0c;还可以获取到文件大小&#xff0c;注意这里是文件大小&#xff0c;单位是字节。例如在file1文件中事先写入"你好世…...

机器学习分类

1. 监督学习 监督学习指的是人们给机器一大堆标记好的数据&#xff0c;比如&#xff1a; 一大堆照片&#xff0c;标记出哪些是猫的照片&#xff0c;哪些是狗的照片 让机器自己学习归纳出算法或模型 使用该算法或模型判断出其他没有标记的照片是否是猫或狗 上述流程如下图所…...

CSS之元素转换

我想大家在写代码时有一个疑问&#xff0c;块级元素可以转换成其他元素吗&#xff1f; 让我为大家介绍一下元素转换 1.display:block(转换成块元素) display&#xff1a;block可以把我们的行内元素或者行内块元素转换成块元素 接下来让我为大家演示一下&#xff1a; <!DO…...

自激振荡电路笔记 电弧打火机

三极管相关 三极管的形象描述 二极管 简单求解&#xff08;理想&#xff09; 优先导通&#xff08;理想&#xff09; 恒压降 稳压管&#xff08;二极管plus&#xff09; 基础工作模块 理想稳压管的工作特性 晶体管之三极管(“两个二极管的组合” ) 电弧打火机电路 1.闭合开…...

Linux su 命令

Linux su&#xff08;英文全拼&#xff1a;switch user&#xff09;命令用于变更为其他使用者的身份&#xff0c;除 root 外&#xff0c;需要键入该使用者的密码。 使用权限&#xff1a;所有使用者。 语法 su [-fmp] [-c command] [-s shell] [--help] [--version] [-] [USE…...

论文阅读: AAAI 2022行人重识别方向论文-PFD_Net

本篇博客用于记录一篇行人重识别方向的论文所提出的优化方法《Pose-Guided Feature Disentangling for Occluded Person Re-identification Based on Transformer》&#xff0c;论文中提出的PDF_Net模型的backbone是采用《TransReID: Transformer-based Object Re-Identificati…...

蓝牙物联网灯控设计方案

蓝牙技术是当前应用最广泛的无线通信技术之一&#xff0c;工作在全球通用的 2.4GHZ 的ISM 频段。蓝牙的工作距离约为 100 米&#xff0c;具有一定的穿透性&#xff0c;没有方向限制。具有低成本、抗干扰能力强、传输质量高、低功耗等特点。蓝牙技术组网比较简单&#xff0c;无需…...

Codeforces Round 900 (Div. 3)(A-F)

比赛链接 : Dashboard - Codeforces Round 900 (Div. 3) - Codeforces A. How Much Does Daytona Cost? 题面 : 思路 : 在序列中只要找到k&#xff0c;就返回true ; 代码 : #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0)…...

vue大屏-列表自动滚动vue-seamless-scroll

vue大屏-列表自动滚动vue-seamless-scroll vue-seamless-scroll的官方文档地址&#xff1a;https://chenxuan0000.github.io/vue-seamless-scroll/zh/guide/ 具体效果可到官方文档那里查看。 1、下载依赖 npm install vue-seamless-scroll --save2、使用例子 <template…...

easyx的窗口函数

文章目录 前言一、EasyX的颜色二、EasyX的坐标和设备1&#xff0c;EasyX的坐标2&#xff0c;EasyX的设备 三、窗口函数1&#xff0c;初始化窗口函数2&#xff0c;关闭绘图窗口3&#xff0c;设置窗口背景板颜色4&#xff0c;清空绘图设备 前言 easyx是针对c的图形库&#xff0c;…...

【记录】开始学习网络安全

本文持续更新学习进度 背景 在私企干了5年虚拟化、云原生相关的运维&#xff0c;学到了很多&#xff0c;但不成体系。老板是清华毕业法国留学在德勤干过&#xff0c;最后回国创业的野路子。我工作是为了更好的生活&#xff0c;我挺担心老板因为家庭变故或者炒个原油宝&#x…...

【Java EE初阶三 】线程的状态与安全(下)

3. 线程安全 线程安全&#xff1a;某个代码&#xff0c;不管它是单个线程执行&#xff0c;还是多个线程执行&#xff0c;都不会产生bug&#xff0c;这个情况就成为“线程安全”。 线程不安全&#xff1a;某个代码&#xff0c;它单个线程执行&#xff0c;不会产生bug&#xff0c…...

MD5算法

一、引言 MD5&#xff08;Message-Digest Algorithm 5&#xff09;是一种广泛应用的密码散列算法&#xff0c;由Ronald L. Rivest于1991年提出。MD5算法主要用于对任意长度的消息进行加密&#xff0c;将消息压缩成固定长度的摘要&#xff08;通常为128位&#xff09;。在密码学…...

Postman使用

Postman使用 Pre-request Script 参考&#xff1a; Scripting in Postman 可以请求、集合或文件夹中添加Pre-request Script&#xff0c;在请求运行之前执行JavaScript 如设置变量值、参数、Header和正文数据&#xff0c;也可以使用Pre-request Script来调试代码&#xff0…...

Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?

Golang 面试经典题&#xff1a;map 的 key 可以是什么类型&#xff1f;哪些不可以&#xff1f; 在 Golang 的面试中&#xff0c;map 类型的使用是一个常见的考点&#xff0c;其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

GC1808高性能24位立体声音频ADC芯片解析

1. 芯片概述 GC1808是一款24位立体声音频模数转换器&#xff08;ADC&#xff09;&#xff0c;支持8kHz~96kHz采样率&#xff0c;集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器&#xff0c;适用于高保真音频采集场景。 2. 核心特性 高精度&#xff1a;24位分辨率&#xff0c…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版

7种色调职场工作汇报PPT&#xff0c;橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版&#xff1a;职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...

给网站添加live2d看板娘

给网站添加live2d看板娘 参考文献&#xff1a; stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下&#xff0c;文章也主…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中&#xff0c;JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作&#xff08;如 Promise、async/await 等&#xff09;&#xff0c;开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝&#xff08;r…...