SparkStreaming入门
概述
实时/离线
- 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。
- 离线:数据是落盘后再处理,一般处理的数据是昨天的数据,处理精度是天。
SparkStreaming简介
- 支持的输入源:
Kafka, Flume, HDFS等 - 数据输入后,可以用RDD处理数据
- 结果可以保存在很多地方,比如HDFS,数据库等
SparkStreaming架构
DStream
SparkCore的基本单位RDD
SparkSQL的基本单位是DataFreme, DataSet
Spark Streaming的基本单位是Dstream
每个时间区间内收到的RDD组成的序列就是DStream.因此每个时间段的数据之间是独立的,如果需要汇总,需要指定相应的时间间隔。
架构图

由于接收方和计算方是两个节点,如果接收方和计算方的速度不一致,会存在数据挤压或者计算方空闲等待数据的问题。
DirectAPI : 为了解决该问题,后续新版本增加了Direct, 通过Executor计算方来控制数据的消费速度。
Hello World案例
- 添加依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version>
</dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency>
</dependencies>
- 编写代码,入口为javaStreamingContext, 必须设置时间间隔。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;import java.util.ArrayList;
import java.util.HashMap;public class Test01_HelloWorld {public static void main(String[] args) throws InterruptedException {// 创建流环境JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[*]", "HelloWorld", Duration.apply(3000));// 创建配置参数HashMap<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 需要消费的主题ArrayList<String> strings = new ArrayList<>();strings.add("topic_db");JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));JavaDStream<String> flatMap = directStream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {@Overridepublic Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {String[] words = consumerRecord.value().split(" ");return Arrays.stream(words).iterator();}});flatMap .print();// 执行流的任务javaStreamingContext.start();javaStreamingContext.awaitTermination();//线程阻塞}
}
window算子窗口操作
由于不同的DStream之间是独立,如果相同统计比DStream时间间隔更大的时间范围内的数据,可以使用窗口操作。
窗口时长:计算内容的时间范围
滑动步长:隔多久触发一次计算
//4 添加窗口 窗口大小12s 滑动步长6sJavaPairDStream<String, Long> word2oneDStreamBywindow = word2oneDStream.window(Duration.apply(12000L), Duration.apply(6000L));//5 对加过窗口的数据流进行计算JavaPairDStream<String, Long> resultDStream = word2oneDStreamBywindow.reduceByKey((v1, v2) -> v1 + v2);
相关文章:
SparkStreaming入门
概述 实时/离线 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。离线:数据是落盘后再处理,一般处理的数据是昨天的数据&…...
设计模式:模板模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
简介: 模板模式,它是一种行为型设计模式,它定义了一个操作中的算法的框架,将一些步骤延迟到子类中实现,使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 通俗地说,模板模式就是将某一行…...
基于Java的图书商城管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
PHP 基础
PHP 基础 概述 在PHP 文件中,可以与HTML 和JavaScript 混编。 开始标记<?php 表示进入PHP 模式,结束标记?>,标识退出PHP 模式。 PHP 模式之外的内容会被作为字符输出到浏览器中。 PHP 在服务端执行,HTML 和 JS 在浏览…...
Java RestTemplate使用TLS1.0(关闭SSL验证)
1. 问题 使用RestTemplate调用Http API时,服务器是TLS1.0,但是客户端Java默认禁止TLS1.0,会报错:org.springframework.web.client.ResourceAccessException: I/O error on POST request for “https://10.255.200.114/health”: …...
【进阶C语言】C语言文件操作
1. 为什么使用文件 2. 什么是文件 3. 文件的打开和关闭 4. 文件的顺序读写 5. 文件的随机读写 6. 文本文件和二进制文件 7. 文件读取结束的判定 8. 文件缓冲区 一、文件与文件的意义 1.文件的意义 文件的意义,无非就是为什么要使用文件? (1&…...
Django实现音乐网站 (21)
使用Python Django框架做一个音乐网站, 本篇音乐播放器功能完善及原有功能修改。 目录 播放列表修改 视图修改 删除、清空播放器 设置路由 视图处理 修改加载播放器脚本 模板修改 脚本设置 清空功能实现 删除列表音乐 播放列表无数据处理 视图修改 播放…...
LeetCode 面试题 10.11. 峰与谷
文章目录 一、题目二、C# 题解 一、题目 在一个整数数组中,“峰”是大于或等于相邻整数的元素,相应地,“谷”是小于或等于相邻整数的元素。例如,在数组{5, 8, 4, 2, 3, 4, 6}中,{8, 6}是峰, {5, 2}是谷。现…...
【专题】测试人员为什么需要学会做业务总结?
背景 如何回答以下这个问题的知识支撑:系统的测试重点在哪,难点是什么,怎么攻克,为什么要这样设计?项目交接效率? 同样是做业务测试,为什么有的人是A有的人只能C 二、框架 2.1 测试场景 重点…...
uni-app:实现当前时间的获取,并且根据当前时间判断所在时间段为早上,下午还是晚上
效果图 核心代码 获取当前时间 toString()方法将数字转换为字符串 padStart(2, 0):padStart()方法用于在字符串头部填充指定的字符,使其达到指定的长度。该方法接受两个参数:第一个参数为期望得到的字符串长度,第二个参数为要填充…...
C# .Net6 指定WSDL, 生成Webservice,调用该接口服务
C# .Net6 指定WSDL, 调用该接口服务。 IDE: Microsoft Visual Studio Community 2022 (64 位)平台:.Net6协议:Soap协议 Xml格式 功能 需要开发一个前置机程序, 用于和硬件程序交互,已知条件是:嵌入式同事…...
JS基本小知识:函数
目录 函数的基本概念 函数的定义和调用 函数的定义 函数的调用 函数的参数和返回值 参数的作用域和生命周期 返回值的作用和使用场景 匿名函数和箭头函数 匿名函数 本文将介绍 JavaScript 中的一个知识点:函数。函数是 JavaScript 中非常重要的一个概念&am…...
在Windows下Edge浏览器OA发起流程问题
在Edge浏览器中发起流程 如上图所示,不能正常打开Excel,自动将Excel表格转为了PDF 怎么处理?还得使用IE浏览器来访问,但打开IE后又自动跳转到Edge,根本就不给使用,在Edge下使用IE模式也解决不了这个问题。…...
2020年亚太杯APMCM数学建模大赛A题激光标记舱口轮廓生成求解全过程文档及程序
2020年亚太杯APMCM数学建模大赛 A题 激光标记舱口轮廓生成 原题再现: 激光是20中的一项重要发明世纪,它被称为“最锋利的刀”、“最精确的尺子”和“最不寻常的光”。 激光已越来越多地应用于工业加工, 其中可以是就业在各种加工业务例如作…...
【单元测试】--工具与环境
一、单元测试工具概览 1.1 JUnit JUnit 是一个广泛用于 Java 程序开发的开源测试框架。它是单元测试的标准工具之一,用于编写和运行测试用例,以确保 Java 程序的各个组件按预期工作。以下是一些关键特点和概念,来介绍 JUnit: 注…...
基于Java的汽车维修预约管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
vscode调试container(进行rocksdb调试)+vscode比较git项目不同分支和fork的哪个分支
vscode调试container(进行rocksdb调试) 参考链接: https://blog.csdn.net/qq_29809823/article/details/128445308#t5 https://blog.csdn.net/qq_29809823/article/details/121978762#t7 使用vscode中的插件dev containners->点击左侧的…...
[python-大语言模型]从浅到深一系列学习笔记记录
整体学习路径参照:点这里 python-机器学习-深度学习-大语言模型-数据开发 面向开发者的LLM入门提示原则 面向开发者的LLM入门 学习链接: github地址:https://github.com/datawhalechina/prompt-engineering-for-developers 在线阅读地址&…...
Android 指定有线网或Wifi进行网络请求
Android 指定有线网或Wifi进行网络请求 文章目录 Android 指定有线网或Wifi进行网络请求一、前言:二、指定网络通讯测试1、 窗口命令 ping -I 网络节点 IP2、Java 代码指定特定网络通讯 三、指定特定网络的demo app 开发1、效果图:2、实际测试结果说明&a…...
消除过期的对象引用
Java虽然有自己的垃圾回收机制,但是并没有那么的智能,对于被引用的对象,就算我们已经不在使用它了,但是Java的回收机制是不会回收它们的,人们称之为“内存泄漏”。 以下为三种不同的内存泄漏场景,极其优化方案 1、只要类自己管理内存,就该警惕内存泄漏问题 例如Stack…...
label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
uniapp 开发ios, xcode 提交app store connect 和 testflight内测
uniapp 中配置 配置manifest 文档:manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号:4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...
为什么要创建 Vue 实例
核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...
