当前位置: 首页 > news >正文

SparkStreaming在实时处理的两个场景示例

简介

Spark Streaming是Apache Spark生态系统中的一个组件,用于实时流式数据处理。它提供了类似于Spark的API,使开发者可以使用相似的编程模型来处理实时数据流。

Spark Streaming的工作原理是将连续的数据流划分成小的批次,并将每个批次作为RDD(弹性分布式数据集)来处理。这样,开发者可以使用Spark的各种高级功能,如map、reduce、join等,来进行实时数据处理。Spark Streaming还提供了内置的窗口操作、状态管理、容错处理等功能,使得开发者能够轻松处理实时数据的复杂逻辑。

Spark Streaming支持多种数据源,包括Kafka、Flume、HDFS、S3等,因此可以轻松地集成到各种数据管道中。它还能够与Spark的批处理和SQL引擎进行无缝集成,从而实现流式处理与批处理的混合使用。
在这里插入图片描述

本文以 TCP、kafka场景讲解spark streaming的使用

消息队列下的信息铺抓

类似消息队列的有redis、kafka等核心组件。
本文以kafka为例,向kafka中实时抓取数据,

pom.xml中添加以下依赖

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark SQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.0</version></dependency><!-- Kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Spark Streaming Kafka Connector --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.0</version></dependency><!-- PostgreSQL JDBC --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.24</version></dependency>
</dependencies>

创建项目编写以下代码实现功能

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[*]").setExecutorEnv("setLogLevel", "ERROR");//设置日志等级为ERROR,避免日志增长导致的磁盘膨胀// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("auto.offset.reset", "earliest");// auto.offset.reset可指定参数有// latest:从分区的最新偏移量开始读取消息。// earliest:从分区的最早偏移量开始读取消息。// none:如果没有有效的偏移量,则抛出异常。kafkaParams.put("enable.auto.commit", true);  //采用自动提交offset 的模式kafkaParams.put("auto.commit.interval.ms",2000);//每隔离两秒提交一次commited-offsetkafkaParams.put("group.id", "spark_kafka"); //消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  //订阅kafka);//定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  //将偏移量和value聚合}), schema);// 写入到 PostgreSQLdf.write()//选择写入数据库的模式.mode(SaveMode.Append)//采用追加的写入模式//协议.format("jdbc")//option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL//确定表名.option("dbtable", "public.spark_kafka")//指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在执行代码前,向创建名为spark_kafka的topic

kafka-topics.sh --create --topic spark_kafka --bootstrap-server 10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

向spark_kafka 主题进行随机推数

kafka-producer-perf-test.sh --topic spark_kafka --thrghput 10 --num-records 10000 --record-size 100000 --producer-props bootstrap.servers=10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

运行过程中消费的offset会一直被提交到每一个分区
在这里插入图片描述

此时在数据库中查看,数据已经实时落地到库中
在这里插入图片描述

TCP

TCP环境下,实时监控日志的输出,可用于监控设备状态、环境变化等。当监测到异常情况时,可以实时发出警报。

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka") // 设置应用程序名称.setMaster("local[*]") // 设置 Spark master 为本地模式,[*]表示使用所有可用核心// 设置日志等级为ERROR,避免日志增长导致的磁盘膨胀.setExecutorEnv("setLogLevel", "ERROR");// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092"); // Kafka 服务器地址kafkaParams.put("key.deserializer", StringDeserializer.class); // key 反序列化器类kafkaParams.put("value.deserializer", StringDeserializer.class); // value 反序列化器类kafkaParams.put("auto.offset.reset", "earliest"); // 从最早的偏移量开始消费消息kafkaParams.put("enable.auto.commit", true);  // 采用自动提交 offset 的模式kafkaParams.put("auto.commit.interval.ms", 2000); // 每隔两秒提交一次 committed-offsetkafkaParams.put("group.id", "spark_kafka"); // 消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  // 订阅 Kafka);// 定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  // 将偏移量和 value 聚合}), schema);// 写入到 PostgreSQLdf.write()// 选择写入数据库的模式.mode(SaveMode.Append) // 采用追加的写入模式// 协议.format("jdbc")// option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL// 确定表名.option("dbtable", "public.spark_kafka") // 指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在10.0.0.108 打开9999端口键入数值 ,使其被spark接收到并进行运算

nc -lk 9999

开启端口可以键入数值 此时会在IDEA的控制台显示其计算值
在这里插入图片描述

相关文章:

SparkStreaming在实时处理的两个场景示例

简介 Spark Streaming是Apache Spark生态系统中的一个组件&#xff0c;用于实时流式数据处理。它提供了类似于Spark的API&#xff0c;使开发者可以使用相似的编程模型来处理实时数据流。 Spark Streaming的工作原理是将连续的数据流划分成小的批次&#xff0c;并将每个批次作…...

02点亮一个LED

书接上回 上回讲到创建一个示例工程 今天讲如何实现LED的点亮 点亮一个led 所需代码 参考来源网络 延时函数参考&#xff1a; Delay.c #include "stm32f10x.h"/*** brief 微秒级延时* param xus 延时时长&#xff0c;范围&#xff1a;0~233015* retval 无*/ vo…...

【代码分享】

//插入排序 void lnsertionSort(int a[], int n) { int end 0; int tmp 0; int i 0; for (i 0;i < n - 1; i) { end i; tmp a[end 1]; while (end > 0) { if (a[end] > tmp) { a[end 1] a[end]; end–; } else { break; } } a[end 1] tmp; } } //希尔排序…...

windows 使用ffmpeg .a静态库:读取Wav音频并保存PCM

ffmpeg读取Wav音频并保存PCM&#xff08;源代码保存成 c 文件&#xff09;&#xff1a; // test_ffmpeg.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 ////#include <iostream>#include <libavcodec/avcodec.h> #include <libavform…...

Docker部署ZooKeeper

在分布式系统中,ZooKeeper是一个关键的组件,用于协调和管理多个节点之间的状态。本文将详细介绍如何使用Docker安装和部署ZooKeeper,包括非集群部署和集群部署两种情况。 非集群部署 前期准备 在开始之前,请确保你已经安装了Docker,并且拥有sudo权限。 关闭防火墙和SEL…...

在PyCharm中使用Git

安装Git CMD检查Git版本 打开cmd&#xff0c;输入git version&#xff0c;检查当前下载版本 配置git的user信息 在cmd中输入 git config --global user.name "用户名"git config --global user.email "用户邮箱"输入&#xff1a;git config --list&…...

【JavaSE】 P165 ~ P194 抽象方法,抽象类,接口,接口内容,多接口实现和父类继承,多态,向上转型,向下转型

目录 抽象抽象的概念抽象方法和抽象类的格式抽象方法和抽象类的使用抽象方法和抽象类的注意事项● 练习1. 写一个父类图形类&#xff0c;其中有方法&#xff0c;功能计算面积为抽象方法。2. 抽象类继承。判断对错,没错的分析运行结果3. 发红包,群内用户类作为父类&#xff0c;有…...

LeetCode: 数组中的第K个最大元素

问题描述 在未排序的数组中找到第k个最大的元素。请注意&#xff0c;你需要找的是数组排序后的第k个最大的元素&#xff0c;而不是第k个不同的元素。 解题思路 解决这个问题有多种方法&#xff0c;下面是几种常见的解题策略&#xff1a; 排序后选择: 将数组排序&#xff0c…...

亚马逊自养号测评:如何安全搭建环境,有效规避风险

要在亚马逊上进行自养号测评&#xff0c;构建一个真实的国外环境至关重要。这包括模拟国外的服务器、IP地址、浏览器环境&#xff0c;甚至支付方式&#xff0c;以创建一个完整的国际操作环境。这样的环境能让我们自由注册、养号并下单&#xff0c;确保所有操作均符合国际规范。…...

uniApp 调整小程序 单个/全部界面横屏展示效果

我们打开uni项目 小程序端运行 默认是竖着的一个效果 我们打开项目的 pages.json 给需要横屏的界面 的 style 属性 加上 "mp-weixin": {"pageOrientation": "landscape" }界面就横屏了 如果是要所有界面都横屏的话 就直接在pages.json 的 gl…...

【java】18:内部类(2)匿名内部类

&#xff08;1&#xff09;本质是类&#xff08;2&#xff09;内部类&#xff08;3&#xff09;该类没有名字&#xff08;4&#xff09;同时还是一个对象 说明&#xff1a;匿名内部类是定义在外部类的局部位置&#xff0c;比如方法中&#xff0c;并且没有类名 1.匿名内部类的…...

c语言之字符串的输入和输出

c语言在输出字符串时&#xff0c;用格式符‘%s"&#xff0c;代码比较简洁 如果说数组长度大于字符串长度&#xff0c;也只输出\0前的内容 字符串默认后面有\0. 如果字符串有多个\0&#xff0c;会默认在第一个\0结束 #include<stdio.h> int main() {int i;char a…...

戏说c第二十六篇: 测试完备性衡量(代码覆盖率)

前言 师弟&#xff1a;“师兄&#xff0c;我又被鄙视了。说我的系统太差&#xff0c;测试不过关。” 我&#xff1a;“怎么说&#xff1f;” 师弟&#xff1a;“每次发布版本给程夏&#xff0c;都被她发现一些bug&#xff0c;太丢人了。师兄&#xff0c;有什么方法来衡量测试的…...

C语言初阶—函数

函数&#xff1a;子程序&#xff0c;是一个大型程序中的某部分代码&#xff0c;由一个或多个语句块组成&#xff0c;它负责完成某项特定任务&#xff0c;而且相较于其他代码&#xff0c;具有相对独立性。一般会有输入参数并有返回值&#xff0c;提供对过程的封装和细节的隐藏&a…...

vue3的router

需求 路由组件一般放在&#xff0c;pages或views文件夹, 一般组件通常放在component文件夹 路由的2中写法 子路由 其实就是在News组件里面&#xff0c;再定义一个router-view组件 他的子组件&#xff0c;机会渲染在router-view区域 路由传参 <RouterLink :to"/news…...

云时代【5】—— LXC 与 容器

云时代【5】—— LXC 与 容器 三、LXC&#xff08;一&#xff09;基本介绍&#xff08;二&#xff09;相关 Linux 指令实战&#xff1a;使用 LXC 操作容器 四、Docker&#xff08;一&#xff09;删除、安装、配置&#xff08;二&#xff09;镜像仓库1. 分类2. 相关指令&#xf…...

npm digital envelope routines::unsupported

问题描述&#xff1a;npm运行命令报错&#xff1a;digital envelope routines::unsupported 原因&#xff1a;node版本过高 解决方案&#xff1a;在运行命令之前加上 SET NODE_OPTIONS--openssl-legacy-provider && SET NODE_OPTIONS--openssl-legacy-provider &&a…...

深入理解Flutter中的StreamSubscription和StreamController

在Flutter中&#xff0c;StreamSubscription和StreamController是处理异步数据流的重要工具。它们提供了一种方便的方式来处理来自异步事件源的数据。本文将深入探讨它们的区别以及在实际应用中的使用场景。 StreamSubscription StreamSubscription代表了对数据流的订阅&…...

聊聊 HTTP 性能优化

作为用户的我们在 "上网冲浪" 的时候总是希望快一点&#xff0c;尤其是抢演唱会门票的时候&#xff0c;但是现实并非如此&#xff0c;有时候我们会遇到页面加载缓慢、响应延迟的情况。 而 HTTP 协议作为互联网世界的基础&#xff0c;从网站打开速度到移动应用的响应…...

六、防御保护---防火墙内容安全篇

攻击可能只是一个点&#xff0c;防御需要全方面进行 DPI --- 深度包检测技术 --- 主要针对完整的数据包&#xff08;数据包分片&#xff0c;分段需要重组&#xff09;&#xff0c;之后对 数据包的内容进行识别。&#xff08;应用层&#xff09; 1&#xff0c;基于“特征字”的…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

HTML 列表、表格、表单

1 列表标签 作用&#xff1a;布局内容排列整齐的区域 列表分类&#xff1a;无序列表、有序列表、定义列表。 例如&#xff1a; 1.1 无序列表 标签&#xff1a;ul 嵌套 li&#xff0c;ul是无序列表&#xff0c;li是列表条目。 注意事项&#xff1a; ul 标签里面只能包裹 li…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

MySQL:分区的基本使用

目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区&#xff08;Partitioning&#xff09;是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分&#xff08;分区&#xff09;可以独立存储、管理和优化&#xff0c;…...

适应性Java用于现代 API:REST、GraphQL 和事件驱动

在快速发展的软件开发领域&#xff0c;REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名&#xff0c;不断适应这些现代范式的需求。随着不断发展的生态系统&#xff0c;Java 在现代 API 方…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器

一、原理介绍 传统滑模观测器采用如下结构&#xff1a; 传统SMO中LPF会带来相位延迟和幅值衰减&#xff0c;并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF)&#xff0c;可以去除高次谐波&#xff0c;并且不用相位补偿就可以获得一个误差较小的转子位…...