Spark Streaming 数据流处理
一、创建Spark Streaming 环境
二、读取数据(监听端口)
三、任务处理
四、启动程序
我这里写的是简单的单词数量统计
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}
UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}
相关文章:
Spark Streaming 数据流处理
一、创建Spark Streaming 环境 二、读取数据(监听端口) 三、任务处理 四、启动程序 我这里写的是简单的单词数量统计 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkConte…...
高效规划神器 markmap:一键将 Markdown 变思维导图!
❤️ 如果你也关注大模型与 AI 的发展现状,且对大模型应用开发非常感兴趣,我会快速跟你分享最新的感兴趣的 AI 应用和热点信息,也会不定期分享自己的想法和开源实例,欢迎关注我哦! 微信公众号|搜一搜&…...
微服务基础架构(图)
微服务基础架构是一种现代化的软件架构模式,旨在将大型复杂的应用程序拆分为多个小型、独立的服务。每个微服务专注于特定的业务功能,可独立开发、部署和扩展。 在微服务基础架构中,通常会使用轻量级的通信机制,如 RESTful API 或…...
中电金信:大模型时代 金融机构企业架构转型如何更智能化?
随着人工智能技术的不断进步,AI大模型在金融行业已经广泛应用,推动金融机构实现更高效、智能化的服务,同时也为金融科技领域的发展带来新的挑战。中电金信基于业务建模的企业架构转型解决方案也顺势而动,关注大模型在具体场景上的…...
基于CRNN模型的多位数字序列识别的应用【代码+数据集+python环境+GUI系统】
基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 背景意义 多位手写数字识别,即计算机从纸张文档、照片、触摸屏等来源接收并解释可理解的手写数字输入的能力。 随着…...
windows中命令行批处理脚本学习
目录 一 基础知识二 常见命令1. 输出 echo2. 注释 rem .... %...% :: goto if (10) ()3. 变量 set4. 获取参数 %数字 %*5. 退出 exit6. 复制 copy7.读取输出文件内容 type8. 帮助 命令xxx /?9.等待当前命令运行结束后,才执行下一条命令 call10. 修改字体编码 chcp11. 特殊变量…...
版本工具报错:Error Unity Version Control
NotConfiguredClientException: Unity VCS client is not correctly configured for the current user:Client config file....
ECharts饼图-饼图标签对齐,附视频讲解与代码下载
一、图表效果预览 引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外&#…...
Python实现基于WebSocket的stomp协议调试助手工具分享
stomp协议很简单,但是搜遍网络竟没找到一款合适的客户端工具。大多数提供的都是客户端库的使用。可能是太简单了吧!可是即便这样,假如有一可视化的工具,将方便的对stomp协议进行抓包调试。网上类似MQTT的客户端工具有很多…...
《语音识别方案选型研究》
《语音识别方案选型研究》 一、引言二、语音识别技术概述(一)语音识别的基本原理(二)语音识别技术的发展历程 三、语音识别方案的分类(一)基于云端的语音识别方案(二)基于本地的语音…...
解决关于HTML+JS + Servlet 实现前后端请求Session不一致的问题
1、前后端不分离情况 在处理session过程中,如果前后端项目在一个容器中,session是可以被获取的。例如如下项目结构: 结构 后端的代码是基本的设置值、获取值、销毁值的内容: 运行结果 由此可见,在前后统一的项目中&a…...
ECharts饼图-饼图34,附视频讲解与代码下载
引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外,我还将提供详…...
如何实现安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯
在工业自动化中,实现不同品牌、不同型号设备之间的通讯是确保生产流程顺畅、高效运行的关键。本文详细介绍了安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯的具体方法。 一.软硬件需求 1.一台安川MP3300CPU301,其IP地址是192.…...
react18中如何实现同步的setState来实现所见即所得的效果
在react项目中,实现添加列表项,最后一项自动显示在可视区域范围!! 实现效果 代码实现 import { useState, useRef } from "react"; import { flushSync } from "react-dom"; function FlushSyncRef() {con…...
深入理解MVP架构模式
引言 MVP(Model-View-Presenter,模型-视图-提供者)是一种广泛应用于软件开发中的架构模式,是经典MVC(Model-View-Controller)的变种。在传统的MVC模式中,Model和View之间存在直接的依赖和数据交…...
Java面试题七
一、Java中的集合框架是如何组织的?列举几个常用的集合类。 Java中的集合框架是一个设计用来存储和操作对象集合的统一架构。它主要由两大接口派生出来:Collection和Map。这两个接口及其子接口和实现类共同构成了Java集合框架的主体。 集合框架的组织结…...
linux网络编程3——http服务器的实现和性能测试
http服务器的实现 本文使用上一篇博文实现的epollreactor百万并发的服务器实现了一个使用http协议和WebSocket协议的WebServer。 完整代码请看我的github项目 1. 水平触发(Level Trigger)与边沿触发(Edge Trigger) 1.1 水平触发 水平触发是一种状态驱动机制。当文件描述符&a…...
Docker部署Kamailio,并使用LinPhone实现网络通话
前提条件 准备一个路由器,一个服务器,两个终端设备(手机或电脑) docker部署安装 我使用的是windows系统,docker desktop 先启动Docker desktop打开cmd,输入docker命令docker run --name kamailio --rm…...
JAVA-石头迷阵小游戏
采用企业式项目结构,接下来我将分享全部代码和结构,希望大家点点关注! 这是我的结构。首先使用IDE创建一个Module,命名stone-maze,接着把自带src下的main方法删除,接着在src下创建包,包名为com.wmuj,接着创建APP类代码如下: package com.wmuj;public class App {publ…...
鸿蒙--进度条通知
主要介绍如何使用通知能力和基础组件,实现模拟下载文件,发送通知的案例。 效果 代码结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ ├──Logger.ets //…...
ClawdOS:为AI Agent构建可视化操作系统的全栈实践
1. 项目概述:为你的AI大脑装上眼睛和手如果你和我一样,是OpenClaw(前身是Moltbot/Clawdbot)的早期用户,那你一定经历过这种场景:在终端里,你的AI助手聪明绝顶,能写代码、查资料、分析…...
玩转OurBMC第二十六期:OpenBMC固件远程更新原理与实践(下)
栏目介绍:“玩转OurBMC” 是OurBMC社区开创的知识分享类栏目,主要聚焦于社区和BMC全栈技术相关基础知识的分享,全方位涵盖了从理论原理到实践操作的知识传递。OurBMC社区将通过 “玩转OurBMC” 栏目,帮助开发者们深入了解到社区文…...
Windows上的APK安装革命:如何用开源工具无缝运行安卓应用
Windows上的APK安装革命:如何用开源工具无缝运行安卓应用 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer 还在为Windows和安卓生态之间的鸿沟而烦恼吗&…...
SAP ABAP开发避坑指南:NATIVE SQL里那个冒号和MANDT字段,你写对了吗?
SAP ABAP开发实战:NATIVE SQL高频陷阱与性能优化全解析 在SAP ABAP开发领域,NATIVE SQL就像一把双刃剑——它既能突破Open SQL的限制直接操作底层数据库,又隐藏着无数让开发者"踩坑"的语法细节。根据SAP官方统计,超过60…...
LangGraph 生产级部署全解:FastAPI + Docker
一、部署架构总览 我们将基于你之前的带人工干预的双智能体系统,构建一个完整的生产级部署方案,包含三个核心部分: FastAPI 接口层:封装 Agent 为标准 HTTP 接口,支持任务启动、人工干预、状态查询Redis 持久化层&am…...
Doramagic:AI助手开源项目专家技能提取引擎架构与实战
1. 项目概述:Doramagic,一个为AI助手注入项目“灵魂”的提取引擎如果你和我一样,每天都在和各种各样的开源项目打交道,从FastAPI到Home Assistant,从Next.js到LangChain,那你肯定也遇到过这样的困境&#x…...
SpringBoot生产级监控与异常日志运维实战,线上项目稳定排查不慌
SpringBoot项目本地开发调试正常,部署到生产环境后频繁出现接口报错、服务卡顿、内存溢出、接口响应缓慢、数据库连接耗尽等线上问题,开发者无法实时查看项目运行状态,报错无精准日志定位,排查问题耗时费力,严重影响业…...
Cursor智能体监控工具:本地部署与API成本可视化实战
1. 项目概述:一个为开发者量身打造的Cursor智能体监控工具如果你和我一样,是一位重度依赖Cursor进行编码的开发者,那你一定对它的“智能体”(Agent)功能又爱又恨。爱的是,它能理解上下文、自动补全代码、甚…...
小白/程序员必备!收藏这份大模型AI学习资料,抓住高薪职业赛道!
小白/程序员必备!收藏这份大模型AI学习资料,抓住高薪职业赛道! 随着AI技术发展,AI人才需求激增,薪资待遇飙升。本文针对小白和程序员学习大模型AI的三大难题:缺乏理论、资源受限、底层逻辑难懂,…...
数字遗产:我们写的代码,在死后将归于何处?
一行注释里的永恒追问测试工程师的日常,往往是从一行日志或一个断言开始的。但你是否注意过,在那些被反复修改的代码文件最顶端,常常躺着一行注释:“Author: [某位早已离职的同事]”。这行注释像一座小小的墓碑,标记着…...
