当前位置: 首页 > news >正文

Spark Streaming 数据流处理

一、创建Spark Streaming 环境

二、读取数据(监听端口)

三、任务处理

四、启动程序

我这里写的是简单的单词数量统计

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

相关文章:

Spark Streaming 数据流处理

一、创建Spark Streaming 环境 二、读取数据(监听端口) 三、任务处理 四、启动程序 我这里写的是简单的单词数量统计 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkConte…...

高效规划神器 markmap:一键将 Markdown 变思维导图!

❤️ 如果你也关注大模型与 AI 的发展现状,且对大模型应用开发非常感兴趣,我会快速跟你分享最新的感兴趣的 AI 应用和热点信息,也会不定期分享自己的想法和开源实例,欢迎关注我哦! 微信公众号|搜一搜&…...

微服务基础架构(图)

微服务基础架构是一种现代化的软件架构模式,旨在将大型复杂的应用程序拆分为多个小型、独立的服务。每个微服务专注于特定的业务功能,可独立开发、部署和扩展。 在微服务基础架构中,通常会使用轻量级的通信机制,如 RESTful API 或…...

中电金信:大模型时代 金融机构企业架构转型如何更智能化?

随着人工智能技术的不断进步,AI大模型在金融行业已经广泛应用,推动金融机构实现更高效、智能化的服务,同时也为金融科技领域的发展带来新的挑战。中电金信基于业务建模的企业架构转型解决方案也顺势而动,关注大模型在具体场景上的…...

基于CRNN模型的多位数字序列识别的应用【代码+数据集+python环境+GUI系统】

基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 背景意义 多位手写数字识别,即计算机从纸张文档、照片、触摸屏等来源接收并解释可理解的手写数字输入的能力。 随着…...

windows中命令行批处理脚本学习

目录 一 基础知识二 常见命令1. 输出 echo2. 注释 rem .... %...% :: goto if (10) ()3. 变量 set4. 获取参数 %数字 %*5. 退出 exit6. 复制 copy7.读取输出文件内容 type8. 帮助 命令xxx /?9.等待当前命令运行结束后,才执行下一条命令 call10. 修改字体编码 chcp11. 特殊变量…...

版本工具报错:Error Unity Version Control

NotConfiguredClientException: Unity VCS client is not correctly configured for the current user:Client config file....

ECharts饼图-饼图标签对齐,附视频讲解与代码下载

一、图表效果预览 引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外&#…...

Python实现基于WebSocket的stomp协议调试助手工具分享

stomp协议很简单,但是搜遍网络竟没找到一款合适的客户端工具。大多数提供的都是客户端库的使用。可能是太简单了吧!可是即便这样,假如有一可视化的工具,将方便的对stomp协议进行抓包调试。网上类似MQTT的客户端工具有很多&#xf…...

《语音识别方案选型研究》

《语音识别方案选型研究》 一、引言二、语音识别技术概述(一)语音识别的基本原理(二)语音识别技术的发展历程 三、语音识别方案的分类(一)基于云端的语音识别方案(二)基于本地的语音…...

解决关于HTML+JS + Servlet 实现前后端请求Session不一致的问题

1、前后端不分离情况 在处理session过程中,如果前后端项目在一个容器中,session是可以被获取的。例如如下项目结构: 结构 后端的代码是基本的设置值、获取值、销毁值的内容: 运行结果 由此可见,在前后统一的项目中&a…...

ECharts饼图-饼图34,附视频讲解与代码下载

引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外,我还将提供详…...

如何实现安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯

在工业自动化中,实现不同品牌、不同型号设备之间的通讯是确保生产流程顺畅、高效运行的关键。本文详细介绍了安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯的具体方法。 一.软硬件需求 1.一台安川MP3300CPU301,其IP地址是192.…...

react18中如何实现同步的setState来实现所见即所得的效果

在react项目中,实现添加列表项,最后一项自动显示在可视区域范围!! 实现效果 代码实现 import { useState, useRef } from "react"; import { flushSync } from "react-dom"; function FlushSyncRef() {con…...

深入理解MVP架构模式

引言 MVP(Model-View-Presenter,模型-视图-提供者)是一种广泛应用于软件开发中的架构模式,是经典MVC(Model-View-Controller)的变种。在传统的MVC模式中,Model和View之间存在直接的依赖和数据交…...

Java面试题七

一、Java中的集合框架是如何组织的?列举几个常用的集合类。 Java中的集合框架是一个设计用来存储和操作对象集合的统一架构。它主要由两大接口派生出来:Collection和Map。这两个接口及其子接口和实现类共同构成了Java集合框架的主体。 集合框架的组织结…...

linux网络编程3——http服务器的实现和性能测试

http服务器的实现 本文使用上一篇博文实现的epollreactor百万并发的服务器实现了一个使用http协议和WebSocket协议的WebServer。 完整代码请看我的github项目 1. 水平触发(Level Trigger)与边沿触发(Edge Trigger) 1.1 水平触发 水平触发是一种状态驱动机制。当文件描述符&a…...

Docker部署Kamailio,并使用LinPhone实现网络通话

前提条件 准备一个路由器,一个服务器,两个终端设备(手机或电脑) docker部署安装 我使用的是windows系统,docker desktop 先启动Docker desktop打开cmd,输入docker命令docker run --name kamailio --rm…...

JAVA-石头迷阵小游戏

采用企业式项目结构,接下来我将分享全部代码和结构,希望大家点点关注! 这是我的结构。首先使用IDE创建一个Module,命名stone-maze,接着把自带src下的main方法删除,接着在src下创建包,包名为com.wmuj,接着创建APP类代码如下: package com.wmuj;public class App {publ…...

鸿蒙--进度条通知

主要介绍如何使用通知能力和基础组件,实现模拟下载文件,发送通知的案例。 效果 代码结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ ├──Logger.ets //…...

如何用轻量工具实现Windows 11系统深度净化?

如何用轻量工具实现Windows 11系统深度净化? 【免费下载链接】Win11Debloat 一个简单的PowerShell脚本,用于从Windows中移除预装的无用软件,禁用遥测,从Windows搜索中移除Bing,以及执行各种其他更改以简化和改善你的Wi…...

革新性硬件控制工具:OmenSuperHub实现游戏本性能优化与完全掌控

革新性硬件控制工具:OmenSuperHub实现游戏本性能优化与完全掌控 【免费下载链接】OmenSuperHub 项目地址: https://gitcode.com/gh_mirrors/om/OmenSuperHub OmenSuperHub是一款专为惠普暗影精灵系列游戏本设计的开源硬件控制工具,提供完全离线的…...

Datawhale AI冬令营-学习笔记-task1

很多企业训练出来的通用模型,我们在使用时并不能很好得解答我们生活中的疑惑,故我们需要一些定制专属大模型来解答在特殊情境下的特定问题,通过投喂一些特定的数据,使得让专属模型在特定领域有着更出色的表现。本次学习将 基于《甄…...

centos7安装MySQL8.4手册

目录前言一、首先更新插件,并查看当前系统版本二、安装步骤--在线安装1、创建mysql目录2、安装rpm包3、安装 mysql-community-server4、启动MySQL服务5、查看MySQL状态6、设置开机自启动三、查看默认密码四、登录mysql五、修改密码六、开启远程访问1. 修改 MySQL 配…...

AI教材生成大揭秘!工具选择与低查重教材编写的实用干货

在教材编写的过程中,许多编辑者常常会感到遗憾:尽管正文章节已经经过了反复打磨,但因为缺乏必要的配套资源,整体教学效果却受到影响。课后练习的设计需要具有层次感,但缺乏灵活的想法;教学课件希望能做到形…...

保研党必看:用本科论文逆袭IEEE二区期刊的5个关键操作(含时间管理秘籍)

保研党必看:用本科论文逆袭IEEE二区期刊的5个关键操作(含时间管理秘籍) 在保研竞争日益激烈的当下,一篇高质量的学术论文往往能成为决定成败的关键。对于大多数本科生来说,科研经历有限、资源匮乏是普遍面临的困境。但…...

linux内核故障分析及调测工具使用能力

Linux内核的故障分析和调测工具非常丰富,根据使用场景大致可以分为静态分析/代码检查、动态跟踪、性能分析、内存调试、以及崩溃转储分析这几大类。 下面我为你整理了典型工具的归类表,方便快速查阅,后面再详细解读几个核心工具的实战能力。 …...

OpenClaw自动化测试:Qwen3.5-9B在API接口校验中的实战应用

OpenClaw自动化测试:Qwen3.5-9B在API接口校验中的实战应用 1. 为什么选择OpenClaw做接口自动化测试 去年接手一个个人项目时,我遇到了接口测试的痛点:每次后端更新都要手动验证几十个API,不仅耗时还容易遗漏边缘case。尝试过Pos…...

从ADC的‘胃口’说起:深入浅出解析电平移位电路中基准源VREF与滤波电容的选型玄学

从ADC的"胃口"说起:深入浅出解析电平移位电路中基准源VREF与滤波电容的选型玄学 在模拟电路设计中,ADC(模数转换器)就像一位挑剔的美食家,对输入信号的"口味"有着严苛的要求。而电平移位电路则如同…...

GaussDB JDBC SSL加密全攻略:从零配置到生产环境最佳实践

GaussDB JDBC SSL加密全攻略:从零配置到生产环境最佳实践 在数据驱动的时代,数据库连接的安全性已成为企业级应用不可忽视的生命线。作为华为云推出的分布式关系型数据库,GaussDB在金融、政务等对安全性要求极高的场景中广泛应用。而JDBC作为…...