当前位置: 首页 > news >正文

Spark Streaming 数据流处理

一、创建Spark Streaming 环境

二、读取数据(监听端口)

三、任务处理

四、启动程序

我这里写的是简单的单词数量统计

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val countDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

UpdateStateByKey(有状态算子)能统计之前的单词数量,可做实时更新 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object Demo2UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[2]")conf.setAppName("wc")val sc = new SparkContext(conf)//1、创建spark streaming环境//指定处理的间隔时间val ssc = new StreamingContext(sc, Durations.seconds(5))//设置checkpoint路径//用于保存状态ssc.checkpoint("data/checkpoint")//2、读取数据//nc -lk 8888//yum install ncval linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)//统计单词的数量val kvDS: DStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))//updateStateByKey(有状态算子): 每一次计算更新每一个key的状态(单词的数量)val countDS: DStream[(String, Int)] = kvDS.updateStateByKey {/*** seq: 当前批次一个key所有value* state: 之前的结果(状态:之前的单词的数量)*/case (seq: Seq[Int], state: Option[Int]) =>println(seq)println(state)//计算当前批次单词的数量val sum: Int = seq.sum//获取之前单词的数量val count: Int = state match {case Some(count) => countcase None => 0}//计算新的单词的数量并返回Option(sum + count)}countDS.print()//启动spark streaming程序ssc.start()ssc.awaitTermination()}
}

相关文章:

Spark Streaming 数据流处理

一、创建Spark Streaming 环境 二、读取数据(监听端口) 三、任务处理 四、启动程序 我这里写的是简单的单词数量统计 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkConte…...

高效规划神器 markmap:一键将 Markdown 变思维导图!

❤️ 如果你也关注大模型与 AI 的发展现状,且对大模型应用开发非常感兴趣,我会快速跟你分享最新的感兴趣的 AI 应用和热点信息,也会不定期分享自己的想法和开源实例,欢迎关注我哦! 微信公众号|搜一搜&…...

微服务基础架构(图)

微服务基础架构是一种现代化的软件架构模式,旨在将大型复杂的应用程序拆分为多个小型、独立的服务。每个微服务专注于特定的业务功能,可独立开发、部署和扩展。 在微服务基础架构中,通常会使用轻量级的通信机制,如 RESTful API 或…...

中电金信:大模型时代 金融机构企业架构转型如何更智能化?

随着人工智能技术的不断进步,AI大模型在金融行业已经广泛应用,推动金融机构实现更高效、智能化的服务,同时也为金融科技领域的发展带来新的挑战。中电金信基于业务建模的企业架构转型解决方案也顺势而动,关注大模型在具体场景上的…...

基于CRNN模型的多位数字序列识别的应用【代码+数据集+python环境+GUI系统】

基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 基于CRNN模型的多位数字序列识别的应用【代码数据集python环境GUI系统】 背景意义 多位手写数字识别,即计算机从纸张文档、照片、触摸屏等来源接收并解释可理解的手写数字输入的能力。 随着…...

windows中命令行批处理脚本学习

目录 一 基础知识二 常见命令1. 输出 echo2. 注释 rem .... %...% :: goto if (10) ()3. 变量 set4. 获取参数 %数字 %*5. 退出 exit6. 复制 copy7.读取输出文件内容 type8. 帮助 命令xxx /?9.等待当前命令运行结束后,才执行下一条命令 call10. 修改字体编码 chcp11. 特殊变量…...

版本工具报错:Error Unity Version Control

NotConfiguredClientException: Unity VCS client is not correctly configured for the current user:Client config file....

ECharts饼图-饼图标签对齐,附视频讲解与代码下载

一、图表效果预览 引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外&#…...

Python实现基于WebSocket的stomp协议调试助手工具分享

stomp协议很简单,但是搜遍网络竟没找到一款合适的客户端工具。大多数提供的都是客户端库的使用。可能是太简单了吧!可是即便这样,假如有一可视化的工具,将方便的对stomp协议进行抓包调试。网上类似MQTT的客户端工具有很多&#xf…...

《语音识别方案选型研究》

《语音识别方案选型研究》 一、引言二、语音识别技术概述(一)语音识别的基本原理(二)语音识别技术的发展历程 三、语音识别方案的分类(一)基于云端的语音识别方案(二)基于本地的语音…...

解决关于HTML+JS + Servlet 实现前后端请求Session不一致的问题

1、前后端不分离情况 在处理session过程中,如果前后端项目在一个容器中,session是可以被获取的。例如如下项目结构: 结构 后端的代码是基本的设置值、获取值、销毁值的内容: 运行结果 由此可见,在前后统一的项目中&a…...

ECharts饼图-饼图34,附视频讲解与代码下载

引言: 在数据可视化的世界里,ECharts凭借其丰富的图表类型和强大的配置能力,成为了众多开发者的首选。今天,我将带大家一起实现一个饼图图表,通过该图表我们可以直观地展示和分析数据。此外,我还将提供详…...

如何实现安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯

在工业自动化中,实现不同品牌、不同型号设备之间的通讯是确保生产流程顺畅、高效运行的关键。本文详细介绍了安川MP3300运动控制器与西门子1200系列PLC进行ModbusTCP通讯的具体方法。 一.软硬件需求 1.一台安川MP3300CPU301,其IP地址是192.…...

react18中如何实现同步的setState来实现所见即所得的效果

在react项目中,实现添加列表项,最后一项自动显示在可视区域范围!! 实现效果 代码实现 import { useState, useRef } from "react"; import { flushSync } from "react-dom"; function FlushSyncRef() {con…...

深入理解MVP架构模式

引言 MVP(Model-View-Presenter,模型-视图-提供者)是一种广泛应用于软件开发中的架构模式,是经典MVC(Model-View-Controller)的变种。在传统的MVC模式中,Model和View之间存在直接的依赖和数据交…...

Java面试题七

一、Java中的集合框架是如何组织的?列举几个常用的集合类。 Java中的集合框架是一个设计用来存储和操作对象集合的统一架构。它主要由两大接口派生出来:Collection和Map。这两个接口及其子接口和实现类共同构成了Java集合框架的主体。 集合框架的组织结…...

linux网络编程3——http服务器的实现和性能测试

http服务器的实现 本文使用上一篇博文实现的epollreactor百万并发的服务器实现了一个使用http协议和WebSocket协议的WebServer。 完整代码请看我的github项目 1. 水平触发(Level Trigger)与边沿触发(Edge Trigger) 1.1 水平触发 水平触发是一种状态驱动机制。当文件描述符&a…...

Docker部署Kamailio,并使用LinPhone实现网络通话

前提条件 准备一个路由器,一个服务器,两个终端设备(手机或电脑) docker部署安装 我使用的是windows系统,docker desktop 先启动Docker desktop打开cmd,输入docker命令docker run --name kamailio --rm…...

JAVA-石头迷阵小游戏

采用企业式项目结构,接下来我将分享全部代码和结构,希望大家点点关注! 这是我的结构。首先使用IDE创建一个Module,命名stone-maze,接着把自带src下的main方法删除,接着在src下创建包,包名为com.wmuj,接着创建APP类代码如下: package com.wmuj;public class App {publ…...

鸿蒙--进度条通知

主要介绍如何使用通知能力和基础组件,实现模拟下载文件,发送通知的案例。 效果 代码结构 ├──entry/src/main/ets // 代码区 │ ├──common │ │ ├──constants │ │ │ └──CommonConstants.ets // 公共常量类 │ │ └──utils │ │ ├──Logger.ets //…...

css实现圆环展示百分比,根据值动态展示所占比例

代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

无法与IP建立连接,未能下载VSCode服务器

如题&#xff0c;在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈&#xff0c;发现是VSCode版本自动更新惹的祸&#xff01;&#xff01;&#xff01; 在VSCode的帮助->关于这里发现前几天VSCode自动更新了&#xff0c;我的版本号变成了1.100.3 才导致了远程连接出…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

质量体系的重要

质量体系是为确保产品、服务或过程质量满足规定要求&#xff0c;由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面&#xff1a; &#x1f3db;️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限&#xff0c;形成层级清晰的管理网络&#xf…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

【Oracle】分区表

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...