Spark Streaming 数据流处理
一、创建Spark Streaming 环境
二、读取数据(监听端口)
三、任务处理
四、启动程序
我这里写的是简单的单词数量统计
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}
UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}
相关文章:
Spark Streaming 数据流处理
一、创建Spark Streaming 环境 二、读取数据(监听端口) 三、任务处理 四、启动程序 我这里写的是简单的单词数量统计 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkConte…...

高效规划神器 markmap:一键将 Markdown 变思维导图!
❤️ 如果你也关注大模型与 AI 的发展现状,且对大模型应用开发非常感兴趣,我会快速跟你分享最新的感兴趣的 AI 应用和热点信息,也会不定期分享自己的想法和开源实例,欢迎关注我哦! 微信公众号|搜一搜&…...

微服务基础架构(图)
微服务基础架构是一种现代化的软件架构模式,旨在将大型复杂的应用程序拆分为多个小型、独立的服务。每个微服务专注于特定的业务功能,可独立开发、部署和扩展。 在微服务基础架构中,通常会使用轻量级的通信机制,如 RESTful API 或…...

中电金信:大模型时代 金融机构企业架构转型如何更智能化?
随着人工智能技术的不断进步,AI大模型在金融行业已经广泛应用,推动金融机构实现更高效、智能化的服务,同时也为金融科技领域的发展带来新的挑战。中电金信基于业务建模的企业架构转型解决方案也顺势而动,关注大模型在具体场景上的…...

基于CRNN模型的多位数字序列识别的应用【代码+数据集+python环境+GUI系统】
基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 背景意义 多位手写数字识别,即计算机从纸张文档、照片、触摸屏等来源接收并解释可理解的手写数字输入的能力。 随着…...

windows中命令行批处理脚本学习
目录 一 基础知识二 常见命令1. 输出 echo2. 注释 rem .... %...% :: goto if (10) ()3. 变量 set4. 获取参数 %数字 %*5. 退出 exit6. 复制 copy7.读取输出文件内容 type8. 帮助 命令xxx /?9.等待当前命令运行结束后,才执行下一条命令 call10. 修改字体编码 chcp11. 特殊变量…...

版本工具报错:Error Unity Version Control
NotConfiguredClientException: Unity VCS client is not correctly configured for the current user:Client config file....

ECharts饼图-饼图标签对齐,附视频讲解与代码下载
一、图表效果预览 引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外&#…...

Python实现基于WebSocket的stomp协议调试助手工具分享
stomp协议很简单,但是搜遍网络竟没找到一款合适的客户端工具。大多数提供的都是客户端库的使用。可能是太简单了吧!可是即便这样,假如有一可视化的工具,将方便的对stomp协议进行抓包调试。网上类似MQTT的客户端工具有很多…...
《语音识别方案选型研究》
《语音识别方案选型研究》 一、引言二、语音识别技术概述(一)语音识别的基本原理(二)语音识别技术的发展历程 三、语音识别方案的分类(一)基于云端的语音识别方案(二)基于本地的语音…...

解决关于HTML+JS + Servlet 实现前后端请求Session不一致的问题
1、前后端不分离情况 在处理session过程中,如果前后端项目在一个容器中,session是可以被获取的。例如如下项目结构: 结构 后端的代码是基本的设置值、获取值、销毁值的内容: 运行结果 由此可见,在前后统一的项目中&a…...

ECharts饼图-饼图34,附视频讲解与代码下载
引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外,我还将提供详…...

如何实现安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯
在工业自动化中,实现不同品牌、不同型号设备之间的通讯是确保生产流程顺畅、高效运行的关键。本文详细介绍了安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯的具体方法。 一.软硬件需求 1.一台安川MP3300CPU301,其IP地址是192.…...

react18中如何实现同步的setState来实现所见即所得的效果
在react项目中,实现添加列表项,最后一项自动显示在可视区域范围!! 实现效果 代码实现 import { useState, useRef } from "react"; import { flushSync } from "react-dom"; function FlushSyncRef() {con…...

深入理解MVP架构模式
引言 MVP(Model-View-Presenter,模型-视图-提供者)是一种广泛应用于软件开发中的架构模式,是经典MVC(Model-View-Controller)的变种。在传统的MVC模式中,Model和View之间存在直接的依赖和数据交…...
Java面试题七
一、Java中的集合框架是如何组织的?列举几个常用的集合类。 Java中的集合框架是一个设计用来存储和操作对象集合的统一架构。它主要由两大接口派生出来:Collection和Map。这两个接口及其子接口和实现类共同构成了Java集合框架的主体。 集合框架的组织结…...

linux网络编程3——http服务器的实现和性能测试
http服务器的实现 本文使用上一篇博文实现的epollreactor百万并发的服务器实现了一个使用http协议和WebSocket协议的WebServer。 完整代码请看我的github项目 1. 水平触发(Level Trigger)与边沿触发(Edge Trigger) 1.1 水平触发 水平触发是一种状态驱动机制。当文件描述符&a…...

Docker部署Kamailio,并使用LinPhone实现网络通话
前提条件 准备一个路由器,一个服务器,两个终端设备(手机或电脑) docker部署安装 我使用的是windows系统,docker desktop 先启动Docker desktop打开cmd,输入docker命令docker run --name kamailio --rm…...

JAVA-石头迷阵小游戏
采用企业式项目结构,接下来我将分享全部代码和结构,希望大家点点关注! 这是我的结构。首先使用IDE创建一个Module,命名stone-maze,接着把自带src下的main方法删除,接着在src下创建包,包名为com.wmuj,接着创建APP类代码如下: package com.wmuj;public class App {publ…...

鸿蒙--进度条通知
主要介绍如何使用通知能力和基础组件,实现模拟下载文件,发送通知的案例。 效果 代码结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ ├──Logger.ets //…...

CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...

练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...