当前位置: 首页 > news >正文

SparkStreaming入门

概述

实时/离线

  • 实时:Spark是每个3秒或者5秒更新一下处理后的数据,这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算,处理精度达到ms级别。
  • 离线:数据是落盘后再处理,一般处理的数据是昨天的数据,处理精度是天。

SparkStreaming简介

  1. 支持的输入源:Kafka, Flume, HDFS等
  2. 数据输入后,可以用RDD处理数据
  3. 结果可以保存在很多地方,比如HDFS,数据库等

SparkStreaming架构

DStream

SparkCore的基本单位RDD
SparkSQL的基本单位是DataFreme, DataSet
Spark Streaming的基本单位是Dstream

每个时间区间内收到的RDD组成的序列就是DStream.因此每个时间段的数据之间是独立的,如果需要汇总,需要指定相应的时间间隔。

架构图

在这里插入图片描述
由于接收方和计算方是两个节点,如果接收方和计算方的速度不一致,会存在数据挤压或者计算方空闲等待数据的问题。

DirectAPI : 为了解决该问题,后续新版本增加了Direct, 通过Executor计算方来控制数据的消费速度。

Hello World案例

  1. 添加依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version>
</dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency>
</dependencies>
  1. 编写代码,入口为javaStreamingContext, 必须设置时间间隔。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;import java.util.ArrayList;
import java.util.HashMap;public class Test01_HelloWorld {public static void main(String[] args) throws InterruptedException {// 创建流环境JavaStreamingContext javaStreamingContext = new JavaStreamingContext("local[*]", "HelloWorld", Duration.apply(3000));// 创建配置参数HashMap<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 需要消费的主题ArrayList<String> strings = new ArrayList<>();strings.add("topic_db");JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));JavaDStream<String> flatMap = directStream.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {@Overridepublic Iterator<String> call(ConsumerRecord<String, String> consumerRecord) throws Exception {String[] words = consumerRecord.value().split(" ");return Arrays.stream(words).iterator();}});flatMap .print();// 执行流的任务javaStreamingContext.start();javaStreamingContext.awaitTermination();//线程阻塞}
}

window算子窗口操作

由于不同的DStream之间是独立,如果相同统计比DStream时间间隔更大的时间范围内的数据,可以使用窗口操作。

窗口时长:计算内容的时间范围
滑动步长:隔多久触发一次计算

//4 添加窗口 窗口大小12s 滑动步长6sJavaPairDStream<String, Long> word2oneDStreamBywindow = word2oneDStream.window(Duration.apply(12000L), Duration.apply(6000L));//5 对加过窗口的数据流进行计算JavaPairDStream<String, Long> resultDStream = word2oneDStreamBywindow.reduceByKey((v1, v2) -> v1 + v2);

相关文章:

SparkStreaming入门

概述 实时/离线 实时&#xff1a;Spark是每个3秒或者5秒更新一下处理后的数据&#xff0c;这个是按照时间切分的伪实时。真正的实时是根据事件触发的数据计算&#xff0c;处理精度达到ms级别。离线&#xff1a;数据是落盘后再处理&#xff0c;一般处理的数据是昨天的数据&…...

设计模式:模板模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)

简介&#xff1a; 模板模式&#xff0c;它是一种行为型设计模式&#xff0c;它定义了一个操作中的算法的框架&#xff0c;将一些步骤延迟到子类中实现&#xff0c;使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。 通俗地说&#xff0c;模板模式就是将某一行…...

基于Java的图书商城管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...

PHP 基础

PHP 基础 概述 在PHP 文件中&#xff0c;可以与HTML 和JavaScript 混编。 开始标记<?php 表示进入PHP 模式&#xff0c;结束标记?>&#xff0c;标识退出PHP 模式。 PHP 模式之外的内容会被作为字符输出到浏览器中。 PHP 在服务端执行&#xff0c;HTML 和 JS 在浏览…...

Java RestTemplate使用TLS1.0(关闭SSL验证)

1. 问题 使用RestTemplate调用Http API时&#xff0c;服务器是TLS1.0&#xff0c;但是客户端Java默认禁止TLS1.0&#xff0c;会报错&#xff1a;org.springframework.web.client.ResourceAccessException: I/O error on POST request for “https://10.255.200.114/health”: …...

【进阶C语言】C语言文件操作

1. 为什么使用文件 2. 什么是文件 3. 文件的打开和关闭 4. 文件的顺序读写 5. 文件的随机读写 6. 文本文件和二进制文件 7. 文件读取结束的判定 8. 文件缓冲区 一、文件与文件的意义 1.文件的意义 文件的意义&#xff0c;无非就是为什么要使用文件&#xff1f; &#xff08;1&…...

Django实现音乐网站 (21)

使用Python Django框架做一个音乐网站&#xff0c; 本篇音乐播放器功能完善及原有功能修改。 目录 播放列表修改 视图修改 删除、清空播放器 设置路由 视图处理 修改加载播放器脚本 模板修改 脚本设置 清空功能实现 删除列表音乐 播放列表无数据处理 视图修改 播放…...

LeetCode 面试题 10.11. 峰与谷

文章目录 一、题目二、C# 题解 一、题目 在一个整数数组中&#xff0c;“峰”是大于或等于相邻整数的元素&#xff0c;相应地&#xff0c;“谷”是小于或等于相邻整数的元素。例如&#xff0c;在数组{5, 8, 4, 2, 3, 4, 6}中&#xff0c;{8, 6}是峰&#xff0c; {5, 2}是谷。现…...

【专题】测试人员为什么需要学会做业务总结?

背景 如何回答以下这个问题的知识支撑&#xff1a;系统的测试重点在哪&#xff0c;难点是什么&#xff0c;怎么攻克&#xff0c;为什么要这样设计&#xff1f;项目交接效率&#xff1f; 同样是做业务测试&#xff0c;为什么有的人是A有的人只能C 二、框架 2.1 测试场景 重点…...

uni-app:实现当前时间的获取,并且根据当前时间判断所在时间段为早上,下午还是晚上

效果图 核心代码 获取当前时间 toString()方法将数字转换为字符串 padStart(2, 0)&#xff1a;padStart()方法用于在字符串头部填充指定的字符&#xff0c;使其达到指定的长度。该方法接受两个参数&#xff1a;第一个参数为期望得到的字符串长度&#xff0c;第二个参数为要填充…...

C# .Net6 指定WSDL, 生成Webservice,调用该接口服务

C# .Net6 指定WSDL, 调用该接口服务。 IDE&#xff1a; Microsoft Visual Studio Community 2022 (64 位)平台&#xff1a;.Net6协议&#xff1a;Soap协议 Xml格式 功能 需要开发一个前置机程序&#xff0c; 用于和硬件程序交互&#xff0c;已知条件是&#xff1a;嵌入式同事…...

JS基本小知识:函数

目录 函数的基本概念 函数的定义和调用 函数的定义 函数的调用 函数的参数和返回值 参数的作用域和生命周期 返回值的作用和使用场景 匿名函数和箭头函数 匿名函数 本文将介绍 JavaScript 中的一个知识点&#xff1a;函数。函数是 JavaScript 中非常重要的一个概念&am…...

在Windows下Edge浏览器OA发起流程问题

在Edge浏览器中发起流程 如上图所示&#xff0c;不能正常打开Excel&#xff0c;自动将Excel表格转为了PDF 怎么处理&#xff1f;还得使用IE浏览器来访问&#xff0c;但打开IE后又自动跳转到Edge&#xff0c;根本就不给使用&#xff0c;在Edge下使用IE模式也解决不了这个问题。…...

2020年亚太杯APMCM数学建模大赛A题激光标记舱口轮廓生成求解全过程文档及程序

2020年亚太杯APMCM数学建模大赛 A题 激光标记舱口轮廓生成 原题再现&#xff1a; 激光是20中的一项重要发明世纪&#xff0c;它被称为“最锋利的刀”、“最精确的尺子”和“最不寻常的光”。 激光已越来越多地应用于工业加工&#xff0c; 其中可以是就业在各种加工业务例如作…...

【单元测试】--工具与环境

一、单元测试工具概览 1.1 JUnit JUnit 是一个广泛用于 Java 程序开发的开源测试框架。它是单元测试的标准工具之一&#xff0c;用于编写和运行测试用例&#xff0c;以确保 Java 程序的各个组件按预期工作。以下是一些关键特点和概念&#xff0c;来介绍 JUnit&#xff1a; 注…...

基于Java的汽车维修预约管理系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…...

vscode调试container(进行rocksdb调试)+vscode比较git项目不同分支和fork的哪个分支

vscode调试container&#xff08;进行rocksdb调试&#xff09; 参考链接&#xff1a; https://blog.csdn.net/qq_29809823/article/details/128445308#t5 https://blog.csdn.net/qq_29809823/article/details/121978762#t7 使用vscode中的插件dev containners->点击左侧的…...

[python-大语言模型]从浅到深一系列学习笔记记录

整体学习路径参照&#xff1a;点这里 python-机器学习-深度学习-大语言模型-数据开发 面向开发者的LLM入门提示原则 面向开发者的LLM入门 学习链接&#xff1a; github地址&#xff1a;https://github.com/datawhalechina/prompt-engineering-for-developers 在线阅读地址&…...

Android 指定有线网或Wifi进行网络请求

Android 指定有线网或Wifi进行网络请求 文章目录 Android 指定有线网或Wifi进行网络请求一、前言&#xff1a;二、指定网络通讯测试1、 窗口命令 ping -I 网络节点 IP2、Java 代码指定特定网络通讯 三、指定特定网络的demo app 开发1、效果图&#xff1a;2、实际测试结果说明&a…...

消除过期的对象引用

Java虽然有自己的垃圾回收机制,但是并没有那么的智能,对于被引用的对象,就算我们已经不在使用它了,但是Java的回收机制是不会回收它们的,人们称之为“内存泄漏”。 以下为三种不同的内存泄漏场景,极其优化方案 1、只要类自己管理内存,就该警惕内存泄漏问题 例如Stack…...

【kafka】Golang实现分布式Masscan任务调度系统

要求&#xff1a; 输出两个程序&#xff0c;一个命令行程序&#xff08;命令行参数用flag&#xff09;和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽&#xff0c;然后将消息推送到kafka里面。 服务端程序&#xff1a; 从kafka消费者接收…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

黑马Mybatis

Mybatis 表现层&#xff1a;页面展示 业务层&#xff1a;逻辑处理 持久层&#xff1a;持久数据化保存 在这里插入图片描述 Mybatis快速入门 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6501c2109c4442118ceb6014725e48e4.png //logback.xml <?xml ver…...

【Oracle】分区表

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

均衡后的SNRSINR

本文主要摘自参考文献中的前两篇&#xff0c;相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程&#xff0c;其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt​ 根发送天线&#xff0c; n r n_r nr​ 根接收天线的 MIMO 系…...

【网络安全】开源系统getshell漏洞挖掘

审计过程&#xff1a; 在入口文件admin/index.php中&#xff1a; 用户可以通过m,c,a等参数控制加载的文件和方法&#xff0c;在app/system/entrance.php中存在重点代码&#xff1a; 当M_TYPE system并且M_MODULE include时&#xff0c;会设置常量PATH_OWN_FILE为PATH_APP.M_T…...