SparkStreaming入门
概述
实时/离线
- 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。
- 离线:数据是落盘后再处理,一般处理的数据是昨天的数据,处理精度是天。
SparkStreaming简介
- 支持的输入源:
Kafka, Flume, HDFS等 - 数据输入后,可以用RDD处理数据
- 结果可以保存在很多地方,比如HDFS,数据库等
SparkStreaming架构
DStream
SparkCore的基本单位RDD
SparkSQL的基本单位是DataFreme, DataSet
Spark Streaming的基本单位是Dstream
每个时间区间内收到的RDD组成的序列就是DStream.因此每个时间段的数据之间是独立的,如果需要汇总,需要指定相应的时间间隔。
架构图

由于接收方和计算方是两个节点,如果接收方和计算方的速度不一致,会存在数据挤压或者计算方空闲等待数据的问题。
DirectAPI : 为了解决该问题,后续新版本增加了Direct, 通过Executor计算方来控制数据的消费速度。
Hello World案例
- 添加依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version>
</dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency>
</dependencies>
- 编写代码,入口为javaStreamingContext, 必须设置时间间隔。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;import java.util.ArrayList;
import java.util.HashMap;public class Test01_HelloWorld {public static void main(String[] args) throws InterruptedException {// 创建流环境JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[*]", "HelloWorld", Duration.apply(3000));// 创建配置参数HashMap<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 需要消费的主题ArrayList<String> strings = new ArrayList<>();strings.add("topic_db");JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));JavaDStream<String> flatMap = directStream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {@Overridepublic Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {String[] words = consumerRecord.value().split(" ");return Arrays.stream(words).iterator();}});flatMap .print();// 执行流的任务javaStreamingContext.start();javaStreamingContext.awaitTermination();//线程阻塞}
}
window算子窗口操作
由于不同的DStream之间是独立,如果相同统计比DStream时间间隔更大的时间范围内的数据,可以使用窗口操作。
窗口时长:计算内容的时间范围
滑动步长:隔多久触发一次计算
//4 添加窗口 窗口大小12s 滑动步长6sJavaPairDStream<String, Long> word2oneDStreamBywindow = word2oneDStream.window(Duration.apply(12000L), Duration.apply(6000L));//5 对加过窗口的数据流进行计算JavaPairDStream<String, Long> resultDStream = word2oneDStreamBywindow.reduceByKey((v1, v2) -> v1 + v2);
相关文章:
SparkStreaming入门
概述 实时/离线 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。离线:数据是落盘后再处理,一般处理的数据是昨天的数据&…...
设计模式:模板模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
简介: 模板模式,它是一种行为型设计模式,它定义了一个操作中的算法的框架,将一些步骤延迟到子类中实现,使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 通俗地说,模板模式就是将某一行…...
基于Java的图书商城管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
PHP 基础
PHP 基础 概述 在PHP 文件中,可以与HTML 和JavaScript 混编。 开始标记<?php 表示进入PHP 模式,结束标记?>,标识退出PHP 模式。 PHP 模式之外的内容会被作为字符输出到浏览器中。 PHP 在服务端执行,HTML 和 JS 在浏览…...
Java RestTemplate使用TLS1.0(关闭SSL验证)
1. 问题 使用RestTemplate调用Http API时,服务器是TLS1.0,但是客户端Java默认禁止TLS1.0,会报错:org.springframework.web.client.ResourceAccessException: I/O error on POST request for “https://10.255.200.114/health”: …...
【进阶C语言】C语言文件操作
1. 为什么使用文件 2. 什么是文件 3. 文件的打开和关闭 4. 文件的顺序读写 5. 文件的随机读写 6. 文本文件和二进制文件 7. 文件读取结束的判定 8. 文件缓冲区 一、文件与文件的意义 1.文件的意义 文件的意义,无非就是为什么要使用文件? (1&…...
Django实现音乐网站 (21)
使用Python Django框架做一个音乐网站, 本篇音乐播放器功能完善及原有功能修改。 目录 播放列表修改 视图修改 删除、清空播放器 设置路由 视图处理 修改加载播放器脚本 模板修改 脚本设置 清空功能实现 删除列表音乐 播放列表无数据处理 视图修改 播放…...
LeetCode 面试题 10.11. 峰与谷
文章目录 一、题目二、C# 题解 一、题目 在一个整数数组中,“峰”是大于或等于相邻整数的元素,相应地,“谷”是小于或等于相邻整数的元素。例如,在数组{5, 8, 4, 2, 3, 4, 6}中,{8, 6}是峰, {5, 2}是谷。现…...
【专题】测试人员为什么需要学会做业务总结?
背景 如何回答以下这个问题的知识支撑:系统的测试重点在哪,难点是什么,怎么攻克,为什么要这样设计?项目交接效率? 同样是做业务测试,为什么有的人是A有的人只能C 二、框架 2.1 测试场景 重点…...
uni-app:实现当前时间的获取,并且根据当前时间判断所在时间段为早上,下午还是晚上
效果图 核心代码 获取当前时间 toString()方法将数字转换为字符串 padStart(2, 0):padStart()方法用于在字符串头部填充指定的字符,使其达到指定的长度。该方法接受两个参数:第一个参数为期望得到的字符串长度,第二个参数为要填充…...
C# .Net6 指定WSDL, 生成Webservice,调用该接口服务
C# .Net6 指定WSDL, 调用该接口服务。 IDE: Microsoft Visual Studio Community 2022 (64 位)平台:.Net6协议:Soap协议 Xml格式 功能 需要开发一个前置机程序, 用于和硬件程序交互,已知条件是:嵌入式同事…...
JS基本小知识:函数
目录 函数的基本概念 函数的定义和调用 函数的定义 函数的调用 函数的参数和返回值 参数的作用域和生命周期 返回值的作用和使用场景 匿名函数和箭头函数 匿名函数 本文将介绍 JavaScript 中的一个知识点:函数。函数是 JavaScript 中非常重要的一个概念&am…...
在Windows下Edge浏览器OA发起流程问题
在Edge浏览器中发起流程 如上图所示,不能正常打开Excel,自动将Excel表格转为了PDF 怎么处理?还得使用IE浏览器来访问,但打开IE后又自动跳转到Edge,根本就不给使用,在Edge下使用IE模式也解决不了这个问题。…...
2020年亚太杯APMCM数学建模大赛A题激光标记舱口轮廓生成求解全过程文档及程序
2020年亚太杯APMCM数学建模大赛 A题 激光标记舱口轮廓生成 原题再现: 激光是20中的一项重要发明世纪,它被称为“最锋利的刀”、“最精确的尺子”和“最不寻常的光”。 激光已越来越多地应用于工业加工, 其中可以是就业在各种加工业务例如作…...
【单元测试】--工具与环境
一、单元测试工具概览 1.1 JUnit JUnit 是一个广泛用于 Java 程序开发的开源测试框架。它是单元测试的标准工具之一,用于编写和运行测试用例,以确保 Java 程序的各个组件按预期工作。以下是一些关键特点和概念,来介绍 JUnit: 注…...
基于Java的汽车维修预约管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
vscode调试container(进行rocksdb调试)+vscode比较git项目不同分支和fork的哪个分支
vscode调试container(进行rocksdb调试) 参考链接: https://blog.csdn.net/qq_29809823/article/details/128445308#t5 https://blog.csdn.net/qq_29809823/article/details/121978762#t7 使用vscode中的插件dev containners->点击左侧的…...
[python-大语言模型]从浅到深一系列学习笔记记录
整体学习路径参照:点这里 python-机器学习-深度学习-大语言模型-数据开发 面向开发者的LLM入门提示原则 面向开发者的LLM入门 学习链接: github地址:https://github.com/datawhalechina/prompt-engineering-for-developers 在线阅读地址&…...
Android 指定有线网或Wifi进行网络请求
Android 指定有线网或Wifi进行网络请求 文章目录 Android 指定有线网或Wifi进行网络请求一、前言:二、指定网络通讯测试1、 窗口命令 ping -I 网络节点 IP2、Java 代码指定特定网络通讯 三、指定特定网络的demo app 开发1、效果图:2、实际测试结果说明&a…...
消除过期的对象引用
Java虽然有自己的垃圾回收机制,但是并没有那么的智能,对于被引用的对象,就算我们已经不在使用它了,但是Java的回收机制是不会回收它们的,人们称之为“内存泄漏”。 以下为三种不同的内存泄漏场景,极其优化方案 1、只要类自己管理内存,就该警惕内存泄漏问题 例如Stack…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...
Kafka主题运维全指南:从基础配置到故障处理
#作者:张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1:主题删除失败。常见错误2:__consumer_offsets占用太多的磁盘。 主题日常管理 …...
[论文阅读]TrustRAG: Enhancing Robustness and Trustworthiness in RAG
TrustRAG: Enhancing Robustness and Trustworthiness in RAG [2501.00879] TrustRAG: Enhancing Robustness and Trustworthiness in Retrieval-Augmented Generation 代码:HuichiZhou/TrustRAG: Code for "TrustRAG: Enhancing Robustness and Trustworthin…...
【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...
32位寻址与64位寻址
32位寻址与64位寻址 32位寻址是什么? 32位寻址是指计算机的CPU、内存或总线系统使用32位二进制数来标识和访问内存中的存储单元(地址),其核心含义与能力如下: 1. 核心定义 地址位宽:CPU或内存控制器用32位…...
虚幻基础:角色旋转
能帮到你的话,就给个赞吧 😘 文章目录 移动组件使用控制器所需旋转:组件 使用 控制器旋转将旋转朝向运动:组件 使用 移动方向旋转 控制器旋转和移动旋转 缺点移动旋转:必须移动才能旋转,不移动不旋转控制器…...
