当前位置: 首页 > news >正文

SparkStreaming在实时处理的两个场景示例

简介

Spark Streaming是Apache Spark生态系统中的一个组件,用于实时流式数据处理。它提供了类似于Spark的API,使开发者可以使用相似的编程模型来处理实时数据流。

Spark Streaming的工作原理是将连续的数据流划分成小的批次,并将每个批次作为RDD(弹性分布式数据集)来处理。这样,开发者可以使用Spark的各种高级功能,如map、reduce、join等,来进行实时数据处理。Spark Streaming还提供了内置的窗口操作、状态管理、容错处理等功能,使得开发者能够轻松处理实时数据的复杂逻辑。

Spark Streaming支持多种数据源,包括Kafka、Flume、HDFS、S3等,因此可以轻松地集成到各种数据管道中。它还能够与Spark的批处理和SQL引擎进行无缝集成,从而实现流式处理与批处理的混合使用。
在这里插入图片描述

本文以 TCP、kafka场景讲解spark streaming的使用

消息队列下的信息铺抓

类似消息队列的有redis、kafka等核心组件。
本文以kafka为例,向kafka中实时抓取数据,

pom.xml中添加以下依赖

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark SQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.0</version></dependency><!-- Kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Spark Streaming Kafka Connector --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.0</version></dependency><!-- PostgreSQL JDBC --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.24</version></dependency>
</dependencies>

创建项目编写以下代码实现功能

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[*]").setExecutorEnv("setLogLevel", "ERROR");//设置日志等级为ERROR,避免日志增长导致的磁盘膨胀// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("auto.offset.reset", "earliest");// auto.offset.reset可指定参数有// latest:从分区的最新偏移量开始读取消息。// earliest:从分区的最早偏移量开始读取消息。// none:如果没有有效的偏移量,则抛出异常。kafkaParams.put("enable.auto.commit", true);  //采用自动提交offset 的模式kafkaParams.put("auto.commit.interval.ms",2000);//每隔离两秒提交一次commited-offsetkafkaParams.put("group.id", "spark_kafka"); //消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  //订阅kafka);//定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  //将偏移量和value聚合}), schema);// 写入到 PostgreSQLdf.write()//选择写入数据库的模式.mode(SaveMode.Append)//采用追加的写入模式//协议.format("jdbc")//option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL//确定表名.option("dbtable", "public.spark_kafka")//指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在执行代码前,向创建名为spark_kafka的topic

kafka-topics.sh --create --topic spark_kafka --bootstrap-server 10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

向spark_kafka 主题进行随机推数

kafka-producer-perf-test.sh --topic spark_kafka --thrghput 10 --num-records 10000 --record-size 100000 --producer-props bootstrap.servers=10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

运行过程中消费的offset会一直被提交到每一个分区
在这里插入图片描述

此时在数据库中查看,数据已经实时落地到库中
在这里插入图片描述

TCP

TCP环境下,实时监控日志的输出,可用于监控设备状态、环境变化等。当监测到异常情况时,可以实时发出警报。

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka") // 设置应用程序名称.setMaster("local[*]") // 设置 Spark master 为本地模式,[*]表示使用所有可用核心// 设置日志等级为ERROR,避免日志增长导致的磁盘膨胀.setExecutorEnv("setLogLevel", "ERROR");// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092"); // Kafka 服务器地址kafkaParams.put("key.deserializer", StringDeserializer.class); // key 反序列化器类kafkaParams.put("value.deserializer", StringDeserializer.class); // value 反序列化器类kafkaParams.put("auto.offset.reset", "earliest"); // 从最早的偏移量开始消费消息kafkaParams.put("enable.auto.commit", true);  // 采用自动提交 offset 的模式kafkaParams.put("auto.commit.interval.ms", 2000); // 每隔两秒提交一次 committed-offsetkafkaParams.put("group.id", "spark_kafka"); // 消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  // 订阅 Kafka);// 定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  // 将偏移量和 value 聚合}), schema);// 写入到 PostgreSQLdf.write()// 选择写入数据库的模式.mode(SaveMode.Append) // 采用追加的写入模式// 协议.format("jdbc")// option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL// 确定表名.option("dbtable", "public.spark_kafka") // 指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在10.0.0.108 打开9999端口键入数值 ,使其被spark接收到并进行运算

nc -lk 9999

开启端口可以键入数值 此时会在IDEA的控制台显示其计算值
在这里插入图片描述

相关文章:

SparkStreaming在实时处理的两个场景示例

简介 Spark Streaming是Apache Spark生态系统中的一个组件&#xff0c;用于实时流式数据处理。它提供了类似于Spark的API&#xff0c;使开发者可以使用相似的编程模型来处理实时数据流。 Spark Streaming的工作原理是将连续的数据流划分成小的批次&#xff0c;并将每个批次作…...

02点亮一个LED

书接上回 上回讲到创建一个示例工程 今天讲如何实现LED的点亮 点亮一个led 所需代码 参考来源网络 延时函数参考&#xff1a; Delay.c #include "stm32f10x.h"/*** brief 微秒级延时* param xus 延时时长&#xff0c;范围&#xff1a;0~233015* retval 无*/ vo…...

【代码分享】

//插入排序 void lnsertionSort(int a[], int n) { int end 0; int tmp 0; int i 0; for (i 0;i < n - 1; i) { end i; tmp a[end 1]; while (end > 0) { if (a[end] > tmp) { a[end 1] a[end]; end–; } else { break; } } a[end 1] tmp; } } //希尔排序…...

windows 使用ffmpeg .a静态库:读取Wav音频并保存PCM

ffmpeg读取Wav音频并保存PCM&#xff08;源代码保存成 c 文件&#xff09;&#xff1a; // test_ffmpeg.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 ////#include <iostream>#include <libavcodec/avcodec.h> #include <libavform…...

Docker部署ZooKeeper

在分布式系统中,ZooKeeper是一个关键的组件,用于协调和管理多个节点之间的状态。本文将详细介绍如何使用Docker安装和部署ZooKeeper,包括非集群部署和集群部署两种情况。 非集群部署 前期准备 在开始之前,请确保你已经安装了Docker,并且拥有sudo权限。 关闭防火墙和SEL…...

在PyCharm中使用Git

安装Git CMD检查Git版本 打开cmd&#xff0c;输入git version&#xff0c;检查当前下载版本 配置git的user信息 在cmd中输入 git config --global user.name "用户名"git config --global user.email "用户邮箱"输入&#xff1a;git config --list&…...

【JavaSE】 P165 ~ P194 抽象方法,抽象类,接口,接口内容,多接口实现和父类继承,多态,向上转型,向下转型

目录 抽象抽象的概念抽象方法和抽象类的格式抽象方法和抽象类的使用抽象方法和抽象类的注意事项● 练习1. 写一个父类图形类&#xff0c;其中有方法&#xff0c;功能计算面积为抽象方法。2. 抽象类继承。判断对错,没错的分析运行结果3. 发红包,群内用户类作为父类&#xff0c;有…...

LeetCode: 数组中的第K个最大元素

问题描述 在未排序的数组中找到第k个最大的元素。请注意&#xff0c;你需要找的是数组排序后的第k个最大的元素&#xff0c;而不是第k个不同的元素。 解题思路 解决这个问题有多种方法&#xff0c;下面是几种常见的解题策略&#xff1a; 排序后选择: 将数组排序&#xff0c…...

亚马逊自养号测评:如何安全搭建环境,有效规避风险

要在亚马逊上进行自养号测评&#xff0c;构建一个真实的国外环境至关重要。这包括模拟国外的服务器、IP地址、浏览器环境&#xff0c;甚至支付方式&#xff0c;以创建一个完整的国际操作环境。这样的环境能让我们自由注册、养号并下单&#xff0c;确保所有操作均符合国际规范。…...

uniApp 调整小程序 单个/全部界面横屏展示效果

我们打开uni项目 小程序端运行 默认是竖着的一个效果 我们打开项目的 pages.json 给需要横屏的界面 的 style 属性 加上 "mp-weixin": {"pageOrientation": "landscape" }界面就横屏了 如果是要所有界面都横屏的话 就直接在pages.json 的 gl…...

【java】18:内部类(2)匿名内部类

&#xff08;1&#xff09;本质是类&#xff08;2&#xff09;内部类&#xff08;3&#xff09;该类没有名字&#xff08;4&#xff09;同时还是一个对象 说明&#xff1a;匿名内部类是定义在外部类的局部位置&#xff0c;比如方法中&#xff0c;并且没有类名 1.匿名内部类的…...

c语言之字符串的输入和输出

c语言在输出字符串时&#xff0c;用格式符‘%s"&#xff0c;代码比较简洁 如果说数组长度大于字符串长度&#xff0c;也只输出\0前的内容 字符串默认后面有\0. 如果字符串有多个\0&#xff0c;会默认在第一个\0结束 #include<stdio.h> int main() {int i;char a…...

戏说c第二十六篇: 测试完备性衡量(代码覆盖率)

前言 师弟&#xff1a;“师兄&#xff0c;我又被鄙视了。说我的系统太差&#xff0c;测试不过关。” 我&#xff1a;“怎么说&#xff1f;” 师弟&#xff1a;“每次发布版本给程夏&#xff0c;都被她发现一些bug&#xff0c;太丢人了。师兄&#xff0c;有什么方法来衡量测试的…...

C语言初阶—函数

函数&#xff1a;子程序&#xff0c;是一个大型程序中的某部分代码&#xff0c;由一个或多个语句块组成&#xff0c;它负责完成某项特定任务&#xff0c;而且相较于其他代码&#xff0c;具有相对独立性。一般会有输入参数并有返回值&#xff0c;提供对过程的封装和细节的隐藏&a…...

vue3的router

需求 路由组件一般放在&#xff0c;pages或views文件夹, 一般组件通常放在component文件夹 路由的2中写法 子路由 其实就是在News组件里面&#xff0c;再定义一个router-view组件 他的子组件&#xff0c;机会渲染在router-view区域 路由传参 <RouterLink :to"/news…...

云时代【5】—— LXC 与 容器

云时代【5】—— LXC 与 容器 三、LXC&#xff08;一&#xff09;基本介绍&#xff08;二&#xff09;相关 Linux 指令实战&#xff1a;使用 LXC 操作容器 四、Docker&#xff08;一&#xff09;删除、安装、配置&#xff08;二&#xff09;镜像仓库1. 分类2. 相关指令&#xf…...

npm digital envelope routines::unsupported

问题描述&#xff1a;npm运行命令报错&#xff1a;digital envelope routines::unsupported 原因&#xff1a;node版本过高 解决方案&#xff1a;在运行命令之前加上 SET NODE_OPTIONS--openssl-legacy-provider && SET NODE_OPTIONS--openssl-legacy-provider &&a…...

深入理解Flutter中的StreamSubscription和StreamController

在Flutter中&#xff0c;StreamSubscription和StreamController是处理异步数据流的重要工具。它们提供了一种方便的方式来处理来自异步事件源的数据。本文将深入探讨它们的区别以及在实际应用中的使用场景。 StreamSubscription StreamSubscription代表了对数据流的订阅&…...

聊聊 HTTP 性能优化

作为用户的我们在 "上网冲浪" 的时候总是希望快一点&#xff0c;尤其是抢演唱会门票的时候&#xff0c;但是现实并非如此&#xff0c;有时候我们会遇到页面加载缓慢、响应延迟的情况。 而 HTTP 协议作为互联网世界的基础&#xff0c;从网站打开速度到移动应用的响应…...

六、防御保护---防火墙内容安全篇

攻击可能只是一个点&#xff0c;防御需要全方面进行 DPI --- 深度包检测技术 --- 主要针对完整的数据包&#xff08;数据包分片&#xff0c;分段需要重组&#xff09;&#xff0c;之后对 数据包的内容进行识别。&#xff08;应用层&#xff09; 1&#xff0c;基于“特征字”的…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

java 实现excel文件转pdf | 无水印 | 无限制

文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

Spring AI与Spring Modulith核心技术解析

Spring AI核心架构解析 Spring AI&#xff08;https://spring.io/projects/spring-ai&#xff09;作为Spring生态中的AI集成框架&#xff0c;其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似&#xff0c;但特别为多语…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...