当前位置: 首页 > news >正文

Spark Streaming 数据流处理

一、创建Spark Streaming 环境

二、读取数据(监听端口)

三、任务处理

四、启动程序

我这里写的是简单的单词数量统计

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

相关文章:

Spark Streaming 数据流处理

一、创建Spark Streaming 环境 二、读取数据(监听端口) 三、任务处理 四、启动程序 我这里写的是简单的单词数量统计 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkConte…...

高效规划神器 markmap:一键将 Markdown 变思维导图!

❤️ 如果你也关注大模型与 AI 的发展现状,且对大模型应用开发非常感兴趣,我会快速跟你分享最新的感兴趣的 AI 应用和热点信息,也会不定期分享自己的想法和开源实例,欢迎关注我哦! 微信公众号|搜一搜&…...

微服务基础架构(图)

微服务基础架构是一种现代化的软件架构模式,旨在将大型复杂的应用程序拆分为多个小型、独立的服务。每个微服务专注于特定的业务功能,可独立开发、部署和扩展。 在微服务基础架构中,通常会使用轻量级的通信机制,如 RESTful API 或…...

中电金信:大模型时代 金融机构企业架构转型如何更智能化?

随着人工智能技术的不断进步,AI大模型在金融行业已经广泛应用,推动金融机构实现更高效、智能化的服务,同时也为金融科技领域的发展带来新的挑战。中电金信基于业务建模的企业架构转型解决方案也顺势而动,关注大模型在具体场景上的…...

基于CRNN模型的多位数字序列识别的应用【代码+数据集+python环境+GUI系统】

基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 背景意义 多位手写数字识别,即计算机从纸张文档、照片、触摸屏等来源接收并解释可理解的手写数字输入的能力。 随着…...

windows中命令行批处理脚本学习

目录 一 基础知识二 常见命令1. 输出 echo2. 注释 rem .... %...% :: goto if (10) ()3. 变量 set4. 获取参数 %数字 %*5. 退出 exit6. 复制 copy7.读取输出文件内容 type8. 帮助 命令xxx /?9.等待当前命令运行结束后,才执行下一条命令 call10. 修改字体编码 chcp11. 特殊变量…...

版本工具报错:Error Unity Version Control

NotConfiguredClientException: Unity VCS client is not correctly configured for the current user:Client config file....

ECharts饼图-饼图标签对齐,附视频讲解与代码下载

一、图表效果预览 引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外&#…...

Python实现基于WebSocket的stomp协议调试助手工具分享

stomp协议很简单,但是搜遍网络竟没找到一款合适的客户端工具。大多数提供的都是客户端库的使用。可能是太简单了吧!可是即便这样,假如有一可视化的工具,将方便的对stomp协议进行抓包调试。网上类似MQTT的客户端工具有很多&#xf…...

《语音识别方案选型研究》

《语音识别方案选型研究》 一、引言二、语音识别技术概述(一)语音识别的基本原理(二)语音识别技术的发展历程 三、语音识别方案的分类(一)基于云端的语音识别方案(二)基于本地的语音…...

解决关于HTML+JS + Servlet 实现前后端请求Session不一致的问题

1、前后端不分离情况 在处理session过程中,如果前后端项目在一个容器中,session是可以被获取的。例如如下项目结构: 结构 后端的代码是基本的设置值、获取值、销毁值的内容: 运行结果 由此可见,在前后统一的项目中&a…...

ECharts饼图-饼图34,附视频讲解与代码下载

引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外,我还将提供详…...

如何实现安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯

在工业自动化中,实现不同品牌、不同型号设备之间的通讯是确保生产流程顺畅、高效运行的关键。本文详细介绍了安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯的具体方法。 一.软硬件需求 1.一台安川MP3300CPU301,其IP地址是192.…...

react18中如何实现同步的setState来实现所见即所得的效果

在react项目中,实现添加列表项,最后一项自动显示在可视区域范围!! 实现效果 代码实现 import { useState, useRef } from "react"; import { flushSync } from "react-dom"; function FlushSyncRef() {con…...

深入理解MVP架构模式

引言 MVP(Model-View-Presenter,模型-视图-提供者)是一种广泛应用于软件开发中的架构模式,是经典MVC(Model-View-Controller)的变种。在传统的MVC模式中,Model和View之间存在直接的依赖和数据交…...

Java面试题七

一、Java中的集合框架是如何组织的?列举几个常用的集合类。 Java中的集合框架是一个设计用来存储和操作对象集合的统一架构。它主要由两大接口派生出来:Collection和Map。这两个接口及其子接口和实现类共同构成了Java集合框架的主体。 集合框架的组织结…...

linux网络编程3——http服务器的实现和性能测试

http服务器的实现 本文使用上一篇博文实现的epollreactor百万并发的服务器实现了一个使用http协议和WebSocket协议的WebServer。 完整代码请看我的github项目 1. 水平触发(Level Trigger)与边沿触发(Edge Trigger) 1.1 水平触发 水平触发是一种状态驱动机制。当文件描述符&a…...

Docker部署Kamailio,并使用LinPhone实现网络通话

前提条件 准备一个路由器,一个服务器,两个终端设备(手机或电脑) docker部署安装 我使用的是windows系统,docker desktop 先启动Docker desktop打开cmd,输入docker命令docker run --name kamailio --rm…...

JAVA-石头迷阵小游戏

采用企业式项目结构,接下来我将分享全部代码和结构,希望大家点点关注! 这是我的结构。首先使用IDE创建一个Module,命名stone-maze,接着把自带src下的main方法删除,接着在src下创建包,包名为com.wmuj,接着创建APP类代码如下: package com.wmuj;public class App {publ…...

鸿蒙--进度条通知

主要介绍如何使用通知能力和基础组件,实现模拟下载文件,发送通知的案例。 效果 代码结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ ├──Logger.ets //…...

搜维尔科技:varjo xr-4开箱测评,工业用途头显,一流视觉保真度

varjo xr-4开箱测评,工业用途头显,一流视觉保真度 搜维尔科技:varjo xr-4开箱测评,工业用途头显,一流视觉保真度...

mysql数据量分库分表

一、分库分表参考阈值 分库分表是解决大规模数据和高并发访问问题的常用策略。虽然没有绝对的阈值来决定何时进行分库分表,但以下是一些参考阈值和考虑因素,可以帮助你做出决策: 1.1 数据量阈值 单表数据行数:当单表的数据行数…...

Vite创建Vue3项目以及Vue3相关基础知识

1.创建Vue3项目 1.运行创建项目命令 # 使用 npm npm create vitelatest2、填写项目名称 3、选择前端框架 4、选择语法类型 5、按提示运行代码 不出意外的话,运行之后应该会出现 下边这个页面 6.延伸学习:对比webpack和vite(这个是面试必考…...

Elasticsearch封装公共索引增删改查

什么是索引? 定义:索引是 Elasticsearch 中用于存储数据的逻辑命名空间。它由多个文档组成,每个文档是一个 JSON 格式的结构化数据对应关系:在关系数据库中,索引类似于表;而在 Elasticsearch 中&#xff0…...

Python异常检测:Isolation Forest与局部异常因子(LOF)详解

这里写目录标题 Python异常检测:Isolation Forest与局部异常因子(LOF)详解引言一、异常检测的基本原理1.1 什么是异常检测?1.2 异常检测的应用场景 二、Isolation Forest2.1 Isolation Forest的原理2.1.1 算法步骤 2.2 Python实现…...

Git的原理和使用(二)

1. git的版本回退 之前我们也提到过,Git 能够管理⽂件的历史版本,这也是版本控制器重要的能⼒。如果有⼀天你发现 之前前的⼯作做的出现了很⼤的问题,需要在某个特定的历史版本重新开始,这个时候,就需要版本 回退的功能…...

docker 发布镜像

如果要推广自己的软件,势必要自己制作 image 文件。 1 制作自己的 Docker 容器 基于 centos 镜像构建自己的 centos 镜像,可以在 centos 镜像基础上,安装相关的软件,之后进行构建新的镜像。 1.1 dockerfile 文件编写 首先&…...

投了15亿美元,芯片创新公司Ampere为何成了Oracle真爱?

【科技明说 | 科技热点关注】 一个数据库软件公司却想要操控一家芯片厂商,这样的想法不错。也真大胆。 目前,全球数据库巨头甲骨文Oracle已经持有Ampere Computing LLC 29%的股份,并有可能通过未来的投资选择权获得对这家芯片制造…...

vue 报告标题时间来自 elementUI的 el-date-picker 有开始时间和结束时间

要在Vue中使用 Element UI 的 el-date-picker 来选择开始时间和结束时间,并将其展示在报告中,以下是详细的实现步骤。 实现思路: 使用 Element UI 的 el-date-picker 组件,让用户选择时间范围(开始时间和结束时间&am…...

简单几何问题的通解

来,这道题怎么做?边长为2的正方形内,2个扇形的交集面积是多少?这道题一定要画辅助线,因为要用到两个扇形的交点,如果不画辅助线,这个交点相关的4个子图一个都无法求出面积,只能求出子…...