SparkStreaming入门
概述
实时/离线
- 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。
- 离线:数据是落盘后再处理,一般处理的数据是昨天的数据,处理精度是天。
SparkStreaming简介
- 支持的输入源:
Kafka, Flume, HDFS等 - 数据输入后,可以用RDD处理数据
- 结果可以保存在很多地方,比如HDFS,数据库等
SparkStreaming架构
DStream
SparkCore的基本单位RDD
SparkSQL的基本单位是DataFreme, DataSet
Spark Streaming的基本单位是Dstream
每个时间区间内收到的RDD组成的序列就是DStream.因此每个时间段的数据之间是独立的,如果需要汇总,需要指定相应的时间间隔。
架构图

由于接收方和计算方是两个节点,如果接收方和计算方的速度不一致,会存在数据挤压或者计算方空闲等待数据的问题。
DirectAPI : 为了解决该问题,后续新版本增加了Direct, 通过Executor计算方来控制数据的消费速度。
Hello World案例
- 添加依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version>
</dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency>
</dependencies>
- 编写代码,入口为javaStreamingContext, 必须设置时间间隔。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;import java.util.ArrayList;
import java.util.HashMap;public class Test01_HelloWorld {public static void main(String[] args) throws InterruptedException {// 创建流环境JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[*]", "HelloWorld", Duration.apply(3000));// 创建配置参数HashMap<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 需要消费的主题ArrayList<String> strings = new ArrayList<>();strings.add("topic_db");JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));JavaDStream<String> flatMap = directStream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {@Overridepublic Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {String[] words = consumerRecord.value().split(" ");return Arrays.stream(words).iterator();}});flatMap .print();// 执行流的任务javaStreamingContext.start();javaStreamingContext.awaitTermination();//线程阻塞}
}
window算子窗口操作
由于不同的DStream之间是独立,如果相同统计比DStream时间间隔更大的时间范围内的数据,可以使用窗口操作。
窗口时长:计算内容的时间范围
滑动步长:隔多久触发一次计算
//4 添加窗口 窗口大小12s 滑动步长6sJavaPairDStream<String, Long> word2oneDStreamBywindow = word2oneDStream.window(Duration.apply(12000L), Duration.apply(6000L));//5 对加过窗口的数据流进行计算JavaPairDStream<String, Long> resultDStream = word2oneDStreamBywindow.reduceByKey((v1, v2) -> v1 + v2);
相关文章:
SparkStreaming入门
概述 实时/离线 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。离线:数据是落盘后再处理,一般处理的数据是昨天的数据&…...
设计模式:模板模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
简介: 模板模式,它是一种行为型设计模式,它定义了一个操作中的算法的框架,将一些步骤延迟到子类中实现,使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 通俗地说,模板模式就是将某一行…...
基于Java的图书商城管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
PHP 基础
PHP 基础 概述 在PHP 文件中,可以与HTML 和JavaScript 混编。 开始标记<?php 表示进入PHP 模式,结束标记?>,标识退出PHP 模式。 PHP 模式之外的内容会被作为字符输出到浏览器中。 PHP 在服务端执行,HTML 和 JS 在浏览…...
Java RestTemplate使用TLS1.0(关闭SSL验证)
1. 问题 使用RestTemplate调用Http API时,服务器是TLS1.0,但是客户端Java默认禁止TLS1.0,会报错:org.springframework.web.client.ResourceAccessException: I/O error on POST request for “https://10.255.200.114/health”: …...
【进阶C语言】C语言文件操作
1. 为什么使用文件 2. 什么是文件 3. 文件的打开和关闭 4. 文件的顺序读写 5. 文件的随机读写 6. 文本文件和二进制文件 7. 文件读取结束的判定 8. 文件缓冲区 一、文件与文件的意义 1.文件的意义 文件的意义,无非就是为什么要使用文件? (1&…...
Django实现音乐网站 (21)
使用Python Django框架做一个音乐网站, 本篇音乐播放器功能完善及原有功能修改。 目录 播放列表修改 视图修改 删除、清空播放器 设置路由 视图处理 修改加载播放器脚本 模板修改 脚本设置 清空功能实现 删除列表音乐 播放列表无数据处理 视图修改 播放…...
LeetCode 面试题 10.11. 峰与谷
文章目录 一、题目二、C# 题解 一、题目 在一个整数数组中,“峰”是大于或等于相邻整数的元素,相应地,“谷”是小于或等于相邻整数的元素。例如,在数组{5, 8, 4, 2, 3, 4, 6}中,{8, 6}是峰, {5, 2}是谷。现…...
【专题】测试人员为什么需要学会做业务总结?
背景 如何回答以下这个问题的知识支撑:系统的测试重点在哪,难点是什么,怎么攻克,为什么要这样设计?项目交接效率? 同样是做业务测试,为什么有的人是A有的人只能C 二、框架 2.1 测试场景 重点…...
uni-app:实现当前时间的获取,并且根据当前时间判断所在时间段为早上,下午还是晚上
效果图 核心代码 获取当前时间 toString()方法将数字转换为字符串 padStart(2, 0):padStart()方法用于在字符串头部填充指定的字符,使其达到指定的长度。该方法接受两个参数:第一个参数为期望得到的字符串长度,第二个参数为要填充…...
C# .Net6 指定WSDL, 生成Webservice,调用该接口服务
C# .Net6 指定WSDL, 调用该接口服务。 IDE: Microsoft Visual Studio Community 2022 (64 位)平台:.Net6协议:Soap协议 Xml格式 功能 需要开发一个前置机程序, 用于和硬件程序交互,已知条件是:嵌入式同事…...
JS基本小知识:函数
目录 函数的基本概念 函数的定义和调用 函数的定义 函数的调用 函数的参数和返回值 参数的作用域和生命周期 返回值的作用和使用场景 匿名函数和箭头函数 匿名函数 本文将介绍 JavaScript 中的一个知识点:函数。函数是 JavaScript 中非常重要的一个概念&am…...
在Windows下Edge浏览器OA发起流程问题
在Edge浏览器中发起流程 如上图所示,不能正常打开Excel,自动将Excel表格转为了PDF 怎么处理?还得使用IE浏览器来访问,但打开IE后又自动跳转到Edge,根本就不给使用,在Edge下使用IE模式也解决不了这个问题。…...
2020年亚太杯APMCM数学建模大赛A题激光标记舱口轮廓生成求解全过程文档及程序
2020年亚太杯APMCM数学建模大赛 A题 激光标记舱口轮廓生成 原题再现: 激光是20中的一项重要发明世纪,它被称为“最锋利的刀”、“最精确的尺子”和“最不寻常的光”。 激光已越来越多地应用于工业加工, 其中可以是就业在各种加工业务例如作…...
【单元测试】--工具与环境
一、单元测试工具概览 1.1 JUnit JUnit 是一个广泛用于 Java 程序开发的开源测试框架。它是单元测试的标准工具之一,用于编写和运行测试用例,以确保 Java 程序的各个组件按预期工作。以下是一些关键特点和概念,来介绍 JUnit: 注…...
基于Java的汽车维修预约管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
vscode调试container(进行rocksdb调试)+vscode比较git项目不同分支和fork的哪个分支
vscode调试container(进行rocksdb调试) 参考链接: https://blog.csdn.net/qq_29809823/article/details/128445308#t5 https://blog.csdn.net/qq_29809823/article/details/121978762#t7 使用vscode中的插件dev containners->点击左侧的…...
[python-大语言模型]从浅到深一系列学习笔记记录
整体学习路径参照:点这里 python-机器学习-深度学习-大语言模型-数据开发 面向开发者的LLM入门提示原则 面向开发者的LLM入门 学习链接: github地址:https://github.com/datawhalechina/prompt-engineering-for-developers 在线阅读地址&…...
Android 指定有线网或Wifi进行网络请求
Android 指定有线网或Wifi进行网络请求 文章目录 Android 指定有线网或Wifi进行网络请求一、前言:二、指定网络通讯测试1、 窗口命令 ping -I 网络节点 IP2、Java 代码指定特定网络通讯 三、指定特定网络的demo app 开发1、效果图:2、实际测试结果说明&a…...
消除过期的对象引用
Java虽然有自己的垃圾回收机制,但是并没有那么的智能,对于被引用的对象,就算我们已经不在使用它了,但是Java的回收机制是不会回收它们的,人们称之为“内存泄漏”。 以下为三种不同的内存泄漏场景,极其优化方案 1、只要类自己管理内存,就该警惕内存泄漏问题 例如Stack…...
H8SX单片机USB大容量存储设备开发实战指南
1. H8SX单片机USB大容量存储设备开发概述在嵌入式系统开发中,实现USB大容量存储设备(Mass Storage Class,简称MSC)功能是一项常见需求。H8SX系列单片机作为瑞萨电子推出的高性能微控制器,其内置的USB模块为开发者提供了…...
Metso Valmet A413052电路板模块
Metso Valmet A413052 电路板模块产品概述A413052是Valmet DNA分布式控制系统的模拟量输出模块,专为造纸、制浆、化工等连续过程工业设计,将数字控制信号转换为高精度模拟量输出,驱动现场执行机构。核心特点4通道独立输出:每通道均…...
边缘重构智慧城市:FPGA SoM如何破解视频系统 “重而慢”
智慧城市这几年有一个挺明显的悖论:摄像头越装越多,平台越做越“智能”,但真正能在现场把问题解决掉的系统,并没有按比例变多。更现实的情况是——城市里“看见”的能力已经很强,但“看懂并立刻行动”的能力࿰…...
Kioxia推出面向PC OEM的全新主流KIOXIA BG8系列固态硬盘
新一代客户端固态硬盘产品组合将PCIe 5.0速度带入主流应用 Kioxia Corporation今日宣布推出KIOXIA BG8系列固态硬盘(SSD),这是其面向PC OEM客户的客户端固态硬盘产品线的最新力作。KIOXIA BG8系列将PCIe 5.0的高速性能引入主流市场,兼具新一代技术能力、…...
LVM(逻辑卷管理器)核心概念与完整操作笔记
LVM(逻辑卷管理器)通过在物理磁盘与文件系统之间增加抽象层,实现了灵活的磁盘空间管理,彻底解决了传统分区方案的刚性限制问题。一、核心抽象层次与类比LVM采用分层架构设计,将物理存储资源抽象为可动态调整的逻辑卷&a…...
CCC vs. FiRa:数字车钥匙UWB MAC层时间网格设计的差异与选择
CCC与FiRa标准下的UWB MAC层时间网格设计:数字车钥匙技术选型指南 当你的手机在靠近车门时自动解锁,或是停车场精准引导你找到空位,背后很可能是UWB(超宽带)技术在发挥作用。作为数字车钥匙的核心技术,UWB的…...
赋能核心力量,共建全球共识 | Alpha大学精英领导人内训营(第二期)即将启幕
随着 AlphaAI 全球战略的深入推进,人才与领导力成为了推动生态进化的核心动能。2026年5月5日至6日,备受瞩目的Alpha大学精英领导人内训营(第二期)将正式拉开帷幕。一、战略对齐,点亮“万家灯火”在 AlphaAI 的全球蓝图…...
别急着重装!盘点搭建DNF服务端时最容易被误判的‘异常’(附数据库检查清单)
别急着重装!盘点搭建DNF服务端时最容易被误判的‘异常’(附数据库检查清单) 在搭建DNF服务端的过程中,许多开发者遇到报错的第一反应往往是"重装系统"或"换版本重来"。这种条件反射式的操作不仅浪费时间&…...
【T5模型架构】从Transformer到T5:架构演进与核心模块拆解
1. Transformer基础回顾:从Attention到Encoder-Decoder 要理解T5模型的创新点,我们得先回到2017年那个改变NLP格局的经典架构——Transformer。当时谷歌大脑团队发表的《Attention is All You Need》论文,彻底抛弃了传统的RNN和CNN结构&#…...
如何用roop-unleashed轻松制作专业级AI换脸视频:从入门到精通的完整指南
如何用roop-unleashed轻松制作专业级AI换脸视频:从入门到精通的完整指南 【免费下载链接】roop-unleashed Evolved Fork of roop with Web Server and lots of additions 项目地址: https://gitcode.com/gh_mirrors/ro/roop-unleashed 在AI技术飞速发展的今天…...
