SparkStreaming入门
概述
实时/离线
- 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。
- 离线:数据是落盘后再处理,一般处理的数据是昨天的数据,处理精度是天。
SparkStreaming简介
- 支持的输入源:
Kafka, Flume, HDFS等 - 数据输入后,可以用RDD处理数据
- 结果可以保存在很多地方,比如HDFS,数据库等
SparkStreaming架构
DStream
SparkCore的基本单位RDD
SparkSQL的基本单位是DataFreme, DataSet
Spark Streaming的基本单位是Dstream
每个时间区间内收到的RDD组成的序列就是DStream.因此每个时间段的数据之间是独立的,如果需要汇总,需要指定相应的时间间隔。
架构图

由于接收方和计算方是两个节点,如果接收方和计算方的速度不一致,会存在数据挤压或者计算方空闲等待数据的问题。
DirectAPI : 为了解决该问题,后续新版本增加了Direct, 通过Executor计算方来控制数据的消费速度。
Hello World案例
- 添加依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version>
</dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency>
</dependencies>
- 编写代码,入口为javaStreamingContext, 必须设置时间间隔。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;import java.util.ArrayList;
import java.util.HashMap;public class Test01_HelloWorld {public static void main(String[] args) throws InterruptedException {// 创建流环境JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[*]", "HelloWorld", Duration.apply(3000));// 创建配置参数HashMap<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 需要消费的主题ArrayList<String> strings = new ArrayList<>();strings.add("topic_db");JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));JavaDStream<String> flatMap = directStream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {@Overridepublic Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {String[] words = consumerRecord.value().split(" ");return Arrays.stream(words).iterator();}});flatMap .print();// 执行流的任务javaStreamingContext.start();javaStreamingContext.awaitTermination();//线程阻塞}
}
window算子窗口操作
由于不同的DStream之间是独立,如果相同统计比DStream时间间隔更大的时间范围内的数据,可以使用窗口操作。
窗口时长:计算内容的时间范围
滑动步长:隔多久触发一次计算
//4 添加窗口 窗口大小12s 滑动步长6sJavaPairDStream<String, Long> word2oneDStreamBywindow = word2oneDStream.window(Duration.apply(12000L), Duration.apply(6000L));//5 对加过窗口的数据流进行计算JavaPairDStream<String, Long> resultDStream = word2oneDStreamBywindow.reduceByKey((v1, v2) -> v1 + v2);
相关文章:
SparkStreaming入门
概述 实时/离线 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。离线:数据是落盘后再处理,一般处理的数据是昨天的数据&…...
设计模式:模板模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
简介: 模板模式,它是一种行为型设计模式,它定义了一个操作中的算法的框架,将一些步骤延迟到子类中实现,使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 通俗地说,模板模式就是将某一行…...
基于Java的图书商城管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
PHP 基础
PHP 基础 概述 在PHP 文件中,可以与HTML 和JavaScript 混编。 开始标记<?php 表示进入PHP 模式,结束标记?>,标识退出PHP 模式。 PHP 模式之外的内容会被作为字符输出到浏览器中。 PHP 在服务端执行,HTML 和 JS 在浏览…...
Java RestTemplate使用TLS1.0(关闭SSL验证)
1. 问题 使用RestTemplate调用Http API时,服务器是TLS1.0,但是客户端Java默认禁止TLS1.0,会报错:org.springframework.web.client.ResourceAccessException: I/O error on POST request for “https://10.255.200.114/health”: …...
【进阶C语言】C语言文件操作
1. 为什么使用文件 2. 什么是文件 3. 文件的打开和关闭 4. 文件的顺序读写 5. 文件的随机读写 6. 文本文件和二进制文件 7. 文件读取结束的判定 8. 文件缓冲区 一、文件与文件的意义 1.文件的意义 文件的意义,无非就是为什么要使用文件? (1&…...
Django实现音乐网站 (21)
使用Python Django框架做一个音乐网站, 本篇音乐播放器功能完善及原有功能修改。 目录 播放列表修改 视图修改 删除、清空播放器 设置路由 视图处理 修改加载播放器脚本 模板修改 脚本设置 清空功能实现 删除列表音乐 播放列表无数据处理 视图修改 播放…...
LeetCode 面试题 10.11. 峰与谷
文章目录 一、题目二、C# 题解 一、题目 在一个整数数组中,“峰”是大于或等于相邻整数的元素,相应地,“谷”是小于或等于相邻整数的元素。例如,在数组{5, 8, 4, 2, 3, 4, 6}中,{8, 6}是峰, {5, 2}是谷。现…...
【专题】测试人员为什么需要学会做业务总结?
背景 如何回答以下这个问题的知识支撑:系统的测试重点在哪,难点是什么,怎么攻克,为什么要这样设计?项目交接效率? 同样是做业务测试,为什么有的人是A有的人只能C 二、框架 2.1 测试场景 重点…...
uni-app:实现当前时间的获取,并且根据当前时间判断所在时间段为早上,下午还是晚上
效果图 核心代码 获取当前时间 toString()方法将数字转换为字符串 padStart(2, 0):padStart()方法用于在字符串头部填充指定的字符,使其达到指定的长度。该方法接受两个参数:第一个参数为期望得到的字符串长度,第二个参数为要填充…...
C# .Net6 指定WSDL, 生成Webservice,调用该接口服务
C# .Net6 指定WSDL, 调用该接口服务。 IDE: Microsoft Visual Studio Community 2022 (64 位)平台:.Net6协议:Soap协议 Xml格式 功能 需要开发一个前置机程序, 用于和硬件程序交互,已知条件是:嵌入式同事…...
JS基本小知识:函数
目录 函数的基本概念 函数的定义和调用 函数的定义 函数的调用 函数的参数和返回值 参数的作用域和生命周期 返回值的作用和使用场景 匿名函数和箭头函数 匿名函数 本文将介绍 JavaScript 中的一个知识点:函数。函数是 JavaScript 中非常重要的一个概念&am…...
在Windows下Edge浏览器OA发起流程问题
在Edge浏览器中发起流程 如上图所示,不能正常打开Excel,自动将Excel表格转为了PDF 怎么处理?还得使用IE浏览器来访问,但打开IE后又自动跳转到Edge,根本就不给使用,在Edge下使用IE模式也解决不了这个问题。…...
2020年亚太杯APMCM数学建模大赛A题激光标记舱口轮廓生成求解全过程文档及程序
2020年亚太杯APMCM数学建模大赛 A题 激光标记舱口轮廓生成 原题再现: 激光是20中的一项重要发明世纪,它被称为“最锋利的刀”、“最精确的尺子”和“最不寻常的光”。 激光已越来越多地应用于工业加工, 其中可以是就业在各种加工业务例如作…...
【单元测试】--工具与环境
一、单元测试工具概览 1.1 JUnit JUnit 是一个广泛用于 Java 程序开发的开源测试框架。它是单元测试的标准工具之一,用于编写和运行测试用例,以确保 Java 程序的各个组件按预期工作。以下是一些关键特点和概念,来介绍 JUnit: 注…...
基于Java的汽车维修预约管理系统设计与实现(源码+lw+部署文档+讲解等)
文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序(小蔡coding) 代码参考数据库参考源码获取 前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...
vscode调试container(进行rocksdb调试)+vscode比较git项目不同分支和fork的哪个分支
vscode调试container(进行rocksdb调试) 参考链接: https://blog.csdn.net/qq_29809823/article/details/128445308#t5 https://blog.csdn.net/qq_29809823/article/details/121978762#t7 使用vscode中的插件dev containners->点击左侧的…...
[python-大语言模型]从浅到深一系列学习笔记记录
整体学习路径参照:点这里 python-机器学习-深度学习-大语言模型-数据开发 面向开发者的LLM入门提示原则 面向开发者的LLM入门 学习链接: github地址:https://github.com/datawhalechina/prompt-engineering-for-developers 在线阅读地址&…...
Android 指定有线网或Wifi进行网络请求
Android 指定有线网或Wifi进行网络请求 文章目录 Android 指定有线网或Wifi进行网络请求一、前言:二、指定网络通讯测试1、 窗口命令 ping -I 网络节点 IP2、Java 代码指定特定网络通讯 三、指定特定网络的demo app 开发1、效果图:2、实际测试结果说明&a…...
消除过期的对象引用
Java虽然有自己的垃圾回收机制,但是并没有那么的智能,对于被引用的对象,就算我们已经不在使用它了,但是Java的回收机制是不会回收它们的,人们称之为“内存泄漏”。 以下为三种不同的内存泄漏场景,极其优化方案 1、只要类自己管理内存,就该警惕内存泄漏问题 例如Stack…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
sshd代码修改banner
sshd服务连接之后会收到字符串: SSH-2.0-OpenSSH_9.5 容易被hacker识别此服务为sshd服务。 是否可以通过修改此banner达到让人无法识别此服务的目的呢? 不能。因为这是写的SSH的协议中的。 也就是协议规定了banner必须这么写。 SSH- 开头,…...
