当前位置: 首页 > news >正文

day10_Structured Steaming

文章目录

  • Structured Steaming
    • 一、结构化流介绍(了解)
      • 1、有界和无界数据
      • 2、基本介绍
      • 3、使用三大步骤(掌握)
      • 4.回顾sparkSQL的词频统计案例
    • 二、结构化流的编程模型(掌握)
      • 1、数据结构
      • 2、读取数据源
        • 2.1 File Source
        • 2.2 Socket Source
        • 2.3 Rate Source
      • 3、数据处理
      • 4、数据输出
        • 4.1 输出模式
          • 4.1.1 append 模式
          • 4.1.2 complete模式
          • 4.1.3 update模式
        • 4.2 输出终端/位置
      • 5、综合案例(练习)
        • 词频统计_读取文件方式
        • 词频统计_Socket方式
        • 自动生成数据_Rate方式
      • 6、设置触发器Trigger
      • 7、CheckPoint检查点目录设置
  • JSON是什么?
    • 三、Spark 和 Kafka 整合(掌握)
      • 0、整合Kafka准备工作
      • 1.spark和kafka集成
        • 1.1 官网文档链接:
        • 1.2 常见选项:
        • 1.3 常见参数
      • 2、从kafka中读取数据
        • 2.1 流式处理
          • 官方示例:
          • 练习示例
        • 2.2 批处理
          • 官方示例:
          • 演示示例
      • 3、数据写入Kafka中
        • 3.1 流式处理
          • 官方示例:
          • 练习示例
        • 3.2 批处理
          • 官方示例:
          • 演示示例
  • 01_回顾sparkSQL词频统计过程.py
  • 02_结构化流词频统计案例_读取文件方式.py
  • 03_结构化流词频统计案例_socket方式.py
  • 04_结构化流词频统计案例_设置触发器和检查点.py
  • 05_流方式读取kafka数据.py
  • 06_流方式写数据到kafka.py

Structured Steaming

  1. 简单来说:Structured Streaming是Spark提供的一种流处理引擎,就像是“实时数据处理的流水线”,能够以批处理的方式处理实时数据流。

  2. 具体而言

    • 核心概念
      • 流式DataFrame:将实时数据流视为一个无限扩展的DataFrame,支持类似批处理的API。
      • 触发器:控制流处理的时间间隔,如每1秒处理一次数据。
      • 输出模式:支持多种输出模式,如append(追加)、update(更新)和complete(完整输出)。
    • 特点
      • 易用性:提供与Spark SQL一致的API,降低学习成本。
      • 容错性:通过检查点机制(Checkpoint)确保数据处理的可靠性。
      • 扩展性:支持从Kafka、文件系统等多种数据源读取数据,并输出到多种目标系统。
  3. 实际生产场景

    • 在实时监控中,使用Structured Streaming处理传感器数据,实时生成报警。
    • 在用户行为分析中,使用Structured Streaming处理点击流数据,实时更新用户画像。
  4. 总之:Structured Streaming通过易用的API和强大的容错机制,为实时数据处理提供了高效、可靠的解决方案,广泛应用于实时监控、用户行为分析等场景。

在这里插入图片描述

一、结构化流介绍(了解)

1、有界和无界数据

  1. 简单来说:有界数据就像是“有限的书本”,数据量固定且已知;无界数据则像是“无限的河流”,数据持续生成且量未知。

  2. 具体而言

    • 有界数据
      • 定义:数据量固定且已知,处理完成后任务结束。
      • 示例:存储在文件或数据库中的历史数据。
      • 处理方式:适合批处理(Batch Processing),如使用Spark的RDD或DataFrame处理。
    • 无界数据
      • 定义:数据持续生成且量未知,处理任务通常不会结束。
      • 示例:实时日志流、传感器数据、用户点击流。
      • 处理方式:适合流处理(Stream Processing),如使用Spark的Structured Streaming或Flink处理。
  3. 实际生产场景

    • 在历史数据分析中,使用有界数据进行批处理,生成报表和洞察。
    • 在实时监控中,使用无界数据进行流处理,实时生成报警和推荐。
  4. 总之:有界数据和无界数据分别适合批处理和流处理,根据数据特点选择合适的处理方式,能够高效地完成数据分析和处理任务。

  • 有界数据:
有界数据: 指的数据有固定的开始和固定的结束,数据大小是固定。我们称之为有界数据。对于有界数据,一般采用批处理方案(离线计算)特点:1-数据大小是固定2-程序处理有界数据,程序最终一定会停止
  • 无界数据:
无界数据: 指的数据有固定的开始,但是没有固定的结束。我们称之为无界数据
注意: 对于无界数据,我们一般采用流式处理方案(实时计算)特点:1-数据没有明确的结束,也就是数据大小不固定2-数据是源源不断的过来3-程序处理无界数据,程序会一直运行不会结束

2、基本介绍

结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL …

​ Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次

​ 真正的流处理引擎: Storm(早期流式处理引擎)、Flink、Flume(流式数据采集)

3、使用三大步骤(掌握)

StructuredStreaming在进行数据流开发时的三个步骤

  • 1、读取数据流数据 : 指定数据源模式
    • sparksession对象.readStream.format(指定读取的数据源).option(指定读取的参数).load()
  • 2、数据处理: 使用dsl或者sql方式计算数据和SparkSQL操作一样
  • 3、将计算的结果保存 : 指定输出模式,指定位置
    • writeStream.outputMode(输出模式).option(输出的参数配置).format(指定输出位置).start().awaitTermination()

4.回顾sparkSQL的词频统计案例

# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.read\.format('text')\.load('file:///export/data/spark_project/structured_Streaming/data/w1.txt')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出sql_df.show()dsl_df.show()# 5.关闭资源spark.stop()

二、结构化流的编程模型(掌握)

1、数据结构

在这里插入图片描述

在结构化流中,我们可以将DataFrame称为无界的DataFrame或者无界的二维表

2、读取数据源

对应官网文档内容:

https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources

结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作。目前提供了如下数据源:

  • File Source:文件数据源。读取文件系统,一般用于测试。如果文件夹下发生变化,有新文件产生,那么就会触发程序的运行

  • Socket Source:网络套接字数据源,一般用于测试。也就是从网络上消费/读取数据

  • Rate Source:速率数据源。了解即可,一般用于基准测试。通过配置参数,由结构化流自动生成测试数据。

  • Kafka Source:Kafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。

2.1 File Source

在这里插入图片描述

相关的参数:

option参数描述说明
maxFilesPerTrigger每次触发时要考虑的最大新文件数 (默认: no max)
latestFirst是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)
fileNameOnly是否检查新文件只有文件名而不是完整路径(默认值:false)将此设置为 true 时,以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同: “file:///dataset.txt” “s3://a/dataset.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt”

将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet。。。。

文件数据源特点:
1- 只能监听目录,不能监听具体的文件
2- 可以通过*通配符的形式监听目录中满足条件的文件 
3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况

读取代码通用格式:

# 原生API
sparksession.readStream.format('CSV|JSON|Text|Parquet|ORC...').option('参数名1','参数值1').option('参数名2','参数值2').option('参数名N','参数值N').schema(元数据信息).load('需要监听的目录地址')# 简化API	
针对具体数据格式,还有对应的简写API格式,例如:sparksession.readStream.csv(path='需要监听的目录地址',schema=元数据信息。。。)

可能遇到的错误一:

在这里插入图片描述

原因: 如果是文件数据源,需要手动指定schema信息

可能遇到的错误二:

在这里插入图片描述

原因: File source只能监听目录,不能监听具体文件
2.2 Socket Source

在这里插入图片描述

首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc执行nc命令, 开启端口号, 写入数据: nc -lk 端口号查看端口号是否被使用命令: netstat -nlp | grep 要查询的端口

注意: 要先启动nc,再启动我们的程序

代码格式:df = spark.readStream \.format('socket') \.option('host', '主机地址') \.option('port', '端口号') \.load()
2.3 Rate Source

在这里插入图片描述

此数据源的提供, 主要是用于进行基准测试

option参数描述说明
rowsPerSecond每秒应该生成多少行 : (例如 100,默认值:1)
rampUpTime在生成速度变为rowsPerSecond之前应该经过多久的加速时间(例如5 s,默认0)
numPartitions生成行的分区: (例如 10,默认值:Spark 的默认并行度)

3、数据处理

​ 指的是数据处理部分,该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理,也可以使用DSL方式进行处理。

4、数据输出

​ 在结构化流中定义好DataFrame或者处理好DataFrame之后,调用**writeStream()**方法完成数据的输出操作。在输出的过程中,我们可以设置一些相关的属性,然后启动结构化流程序运行。

在这里插入图片描述

4.1 输出模式

可能遇到的错误:

在这里插入图片描述

原因: 在结构化流中不能调用show()方法
解决办法: 需要使用writeStream().start()进行结果数据的输出

在进行数据输出的时候,必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式:

append模式
- 定义:只输出新增的数据,适用于不需要更新历史结果的场景。
- 示例:实时日志处理中,只输出新产生的日志记录。
update模式
- 定义:输出新增或更新的数据,适用于需要更新历史结果的场景。
- 示例:实时用户行为分析中,输出用户的最新行为数据。
complete模式
- 定义:输出完整的结果集,适用于需要全局统计结果的场景。
- 示例:实时销售统计中,输出所有销售数据的汇总结果。

实际生产场景

  • 在实时日志处理中,使用append模式输出新日志记录。
  • 在实时用户行为分析中,使用update模式输出用户的最新行为数据。
  • 在实时销售统计中,使用complete模式输出所有销售数据的汇总结果。
  • 1- append模式:增量模式 (默认)

    特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。

  • 2- complete模式:完全(全量)模式

    特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。

  • 3- update模式:更新模式

    特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。

4.1.1 append 模式

1- append模式:增量模式

特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。

如果有了聚合操作,会报如下错误:

在这里插入图片描述

如果有了排序操作,会报如下错误:

在这里插入图片描述

4.1.2 complete模式

2- complete模式:完全(全量)模式

特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。

如果没有聚合操作,会报如下错误:

在这里插入图片描述

4.1.3 update模式

3- update模式:更新模式

特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。

如果有了排序操作,会报如下错误:
在这里插入图片描述

4.2 输出终端/位置

默认情况下,Spark的结构化流支持多种输出方案:

1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式

5、综合案例(练习)

需求: 已知文件中存储了多个单词,要求计算统计出现的次数

词频统计_读取文件方式
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('pyspark_demo')\.master('local[*]')\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream\.format('text')\.load('file:///export/data/spark_project/structured_Streaming/data/')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start()dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 5.关闭资源spark.stop()
词频统计_Socket方式
首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc# 注意: 端口号: 范围0-65535   但是0-1024都是知名端口号查看端口号是否被使用命令: netstat -nlp | grep 55555执行nc命令, 开启端口号(选择没有被占用), 写入数据: nc -lk 55555

注意: 要先启动nc,再启动我们的程序

代码格式:df = spark.readStream \.format('socket') \.option('host', '主机地址') \.option('port', '端口号') \.load()
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('pyspark_demo')\.master('local[*]')\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream\.format('socket')\.option('host',"192.168.88.161")\.option('port',"55555")\.load()# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start()dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 5.关闭资源spark.stop()
自动生成数据_Rate方式
from pyspark.sql import SparkSession
import osos.environ["SPARK_HOME"] = "/export/server/spark"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"if __name__ == '__main__':# 1.创建SparkSession对象spark = SparkSession.builder \.appName("StructuredStream_rate") \.master('local[*]') \.getOrCreate()# 2。读取数据df = spark.readStream \.format('rate') \.option("rowsPerSecond", "5") \.option('numPartitions', 1) \.load()# 3.数据处理# 略# 4.数据输出:df.writeStream \.format('console') \.outputMode('update') \.option('truncate', 'false') \.start() \.awaitTermination()# 5.关闭资源spark.stop()

6、设置触发器Trigger

触发器Trigger:决定多久执行一次操作并且输出结果。也就是在结构化流中,处理完一批数据以后,等待一会,再处理下一批数据

主要提供如下几种触发器:

  • 1- 默认方案:也就是不使用触发器的情况。如果没有明确指定,那么结构化流会自动进行决策每一个批次的大小。在运行过程中,会尽可能让每一个批次间的间隔时间变得更短

    result_df.writeStream\.outputMode('append')\.start()\.awaitTermination()
    
  • 2- 配置固定的时间间隔:在结构化流运行的过程中,当一批数据处理完以后,下一批数据需要等待一定的时间间隔才会进行处理**(常用,推荐使用)**

    result_df.writeStream\.outputMode('append')\.trigger(processingTime='5 seconds')\.start()\.awaitTermination()情形说明:
    1- 上一批次的数据在时间间隔内处理完成了,那么会等待我们配置触发器固定的时间间隔结束,才会开始处理下一批数据
    2- 上一批次的数据在固定时间间隔结束的时候才处理完成,那么下一批次会立即被处理,不会等待
    3- 上一批次的数据在固定时间间隔内没有处理完成,那么下一批次会等待上一批次处理完成以后立即开始处理,不会等待
    
  • 3- 仅此一次:在运行的过程中,程序只需要执行一次,然后就退出。这种方式适用于进行初始化操作,以及关闭资源等

    result_df.writeStream.foreachBatch(func)\.outputMode('append')\.trigger(once=True)\.start()\.awaitTermination()
    

7、CheckPoint检查点目录设置

设置检查点,目的是为了提供容错性。当程序出现失败了,可以从检查点的位置,直接恢复处理即可。避免出现重复处理的问题

默认位置: hdfs的/tmp/xxx

如何设置检查点:

1- SparkSession.conf.set("spark.sql.streaming.checkpointLocation", "检查点路径")
2- option("checkpointLocation", "检查点路径")推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径

检查点目录主要包含以下几个目录位置:
在这里插入图片描述

1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次3-元数据文件metadata: 和整个查询关联的元数据信息,目前只保留当前的job id4-数据源sources: 是数据源(Source)各个批次的读取的详情5-数据接收端sinks: 是数据接收端各个批次的写出的详情6-状态state: 当有状态操作的时候,例如:累加、聚合、去重等操作场景,这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态

JSON是什么?

  1. 简单来说:JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,就像是“数据的通用语言”,易于人阅读和编写,也易于机器解析和生成。

  2. 具体而言

    • 结构
      • 对象:用花括号{}表示,包含键值对,键和值之间用冒号:分隔,键值对之间用逗号,分隔。
      • 数组:用方括号[]表示,包含多个值,值之间用逗号,分隔。
      • :可以是字符串、数字、布尔值、对象、数组或null
    • 示例
      {"name": "Alice","age": 30,"isStudent": false,"courses": ["Math", "Science"],"address": {"city": "Beijing","zip": "100000"}
      }
      
    • 特点
      • 轻量级:相比于XML,JSON格式更简洁,数据量更小。
      • 易读性:结构清晰,易于人阅读和编写。
      • 跨平台:支持多种编程语言,如JavaScript、Python、Java等。
  3. 实际生产场景

    • 在Web开发中,使用JSON作为前后端数据交换的格式。
    • 在API设计中,使用JSON作为请求和响应的数据格式。
    • 在配置文件中,使用JSON存储配置信息。
  4. 总之:JSON是一种轻量级、易读、跨平台的数据交换格式,广泛应用于Web开发、API设计和配置文件等领域。

三、Spark 和 Kafka 整合(掌握)

​ Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表

0、整合Kafka准备工作

说明: Jar包上传的位置说明

如何放置相关的Jar包?  1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,目录位置: /export/server/spark/jars2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars jar包路径jar包下载地址: https://mvnrepository.com/

1.spark和kafka集成

1.1 官网文档链接:

https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html

1.2 常见选项:
选项解释
kafka.bootstrap.servers以英文逗号分隔的host:port列表指定kafka服务的地址
subscribe以逗号分隔的Topic主题列表订阅一个主题topic1或者多个主题topic1,topic2
subscribePattern正则表达式字符串订阅主题的模式。可以用 topic.* 代表多个主题
assign通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]}要使用的特定TopicPartitions
includeHeaders默认false是否在行中包含Kafka headers。
startingOffsets流或者批的查询开始时的起始点: “earliest”(批默认), “latest” (流默认), or json string json串格式如下 { “topicA”: {“0”:23,“1”:-1}, “topicB”:{“0”:-2} }“earliest”表示最早的偏移量, “latest”表示最近的偏移量, 或每个TopicPartition起始偏移量的json字符串。在json中,-2作为偏移量表示最早,-1表示最晚。注意: 对于批量查询:不允许使用latest(无论是隐式查询还是在json中使用-1)。 对于流查询: 这只适用于新查询开始时,恢复总是从查询结束的地方继续。在查询期间新发现的分区将最早开始。
endingOffsets批量查询结束时的结束点: latest(默认) , or json string {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}“latest”,指的是最新的, 或每个TopicPartition结束偏移量的json字符串。在json中,-1可以用来表示最近的偏移量,-2(最早的)是不允许的!
1.3 常见参数
参数类型解释
topicstring表示消息是从哪个Topic中消费出来
valuebinary最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null
keybinary发送数据的key值。如果没有,就为null
partitionint分区编号。表示消费到的该条数据来源于Topic的哪个分区
offsetlong表示消息偏移量
timestamptimestamp接收的时间戳

2、从kafka中读取数据

2.1 流式处理
官方示例:
# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1,topic2") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribePattern", "topic.*") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅一个Topic,并且指定header信息
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.option("includeHeaders", "true") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
练习示例

从某一个Topic中读取消息数据

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092")\.option("subscribe","itheima")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(df.topic,F.decode(df.value, 'utf8').alias('key'),F.decode(df.key,'utf8').alias('value'),df.partition,df.offset,df.timestamp,df.timestampType)# 获取数据etl_df.writeStream.format("console").outputMode("append").start().awaitTermination()# 3- 数据处理# result_df1 = df.select(F.expr("cast(value as string) as value"))# # selectExpr = select + F.expr# result_df2 = df.selectExpr("cast(value as string) as value")# result_df3 = df.withColumn("value",F.expr("cast(value as string)"))# 4- 数据输出# 5- 启动流式任务"""如果有多个输出,那么只能在最后一个start的后面写awaitTermination()"""# result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()# result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()# result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()
2.2 批处理
官方示例:
# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \.read \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \.read \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribePattern", "topic.*") \.option("startingOffsets", "earliest") \.option("endingOffsets", "latest") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅多个主题,明确指定Kafka偏移量
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
演示示例

订阅一个Topic

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从Topic开头一直消费到结尾df = spark.read\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(F.expr("cast(key as string) as key"),F.decode(df.key,'utf8'),F.expr("cast(value as string) as value"),F.decode(df.value, 'utf8'),df.topic,df.partition,df.offset)# 获取数据etl_df.show()# # 3- 数据处理# result_df1 = init_df.select(F.expr("cast(value as string) as value"))# # selectExpr = select + F.expr# result_df2 = init_df.selectExpr("cast(value as string) as value")# result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))# # 4- 数据输出# print("result_df1")# result_df1.show()# print("result_df2")# result_df2.show()# print("result_df3")# result_df3.show()# # 5- 释放资源# spark.stop()

3、数据写入Kafka中

3.1 流式处理
官方示例:
# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
# 的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
练习示例

写出到指定Topic

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 3- 数据处理result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))# 4- 数据输出# 注意: 咱们修改完直接保存到kafka的itcast主题中,所以控制台没有数据,这是正常的哦!!!# 5- 启动流式任务result_df.writeStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("topic","itcast")\.option("checkpointLocation", "hdfs://node1:8020/ck")\.start()\.awaitTermination()
3.2 批处理
官方示例:
# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \.write \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("topic", "topic1") \.save()# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \.write \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.save()
演示示例
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df = spark.read\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 3- 数据处理result_df = init_df.select(F.expr("concat(cast(value as string),'_666') as value"))# 4- 数据输出# 5- 启动流式任务result_df.write.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("topic","itcast")\.option("checkpointLocation", "hdfs://node1:8020/ck")\.save()

01_回顾sparkSQL词频统计过程.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.读取文件生成dfdf = spark.read.text("file:///export/data/spark_project/09_结构化流/data3.txt")# df.show()# 2.数据处理etl_df = df.dropDuplicates().fillna('未知')# 3.数据分析# 需求: 统计每个单词出现的次数# 方式1: sql方式etl_df.createTempView("word_tb")sql_result_df = spark.sql("""with t as (select explode(split(value," ")) as wordfrom word_tb)select word,count(*) as cnt from t group by word""")# 方式2: dsl方式dsl_result_df = etl_df.select(F.explode(F.split("value", " ")).alias("word")).groupby("word").agg(F.count("word").alias("cnt"))# 4.数据展示/导出sql_result_df.show()dsl_result_df.show()# 注意: 最后一定释放资源spark.stop()

02_结构化流词频统计案例_读取文件方式.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder \.config('spark.sql.shuffle.partitions', 1) \.appName('pyspark_demo') \.master('local[*]') \.getOrCreate()# 2.TODO 数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream \.format('text') \.load('file:///export/data/spark_project/09_结构化流/data/')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出# 注意: 输出不能使用原来sparksql的show(),否则报错# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()

03_结构化流词频统计案例_socket方式.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.读取socket发来的消息df = spark.readStream \.format('socket') \.option('host', '192.168.88.161') \.option('port', '55555') \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出sql_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()

04_结构化流词频统计案例_设置触发器和检查点.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# TODO: 设置检查点路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://node1:8020/ckpt2")# 1.读取socket发来的消息df = spark.readStream \.format('socket') \.option('host', '192.168.88.161') \.option('port', '55555') \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出# TODO: .trigger(processingTime='5 seconds')添加触发器sql_df.writeStream.format('console').outputMode('complete').trigger(processingTime='5 seconds').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()

05_流方式读取kafka数据.py

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("subscribe","kafka_spark1")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(df.topic,F.decode(df.key, 'utf8').alias('key'),F.decode(df.value,'utf8').alias('value'),df.partition,df.offset,df.timestamp,df.timestampType)# 展示数据# 直接展示到控制台etl_df.writeStream.format("console").outputMode("append").start().awaitTermination()

06_流方式写数据到kafka.py

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("subscribe","kafka_spark1")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(F.decode(df.value,'utf8').alias('value'))# TODO: 原来默认展示到控制台,接下来演示如何把数据存储到kafka中etl_df.writeStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("topic","kafka_spark2")\.option("checkpointLocation", "hdfs://node1:8020/ckpt3")\.start()\.awaitTermination()

相关文章:

day10_Structured Steaming

文章目录 Structured Steaming一、结构化流介绍(了解)1、有界和无界数据2、基本介绍3、使用三大步骤(掌握)4.回顾sparkSQL的词频统计案例 二、结构化流的编程模型(掌握)1、数据结构2、读取数据源2.1 File Source2.2 Socket Source…...

Python的秘密基地--[章节11] Python 性能优化与多线程编程

第11章:Python 性能优化与多线程编程 在开发复杂系统时,性能优化和并发编程是不可忽视的重点。Python 提供了多种工具和技术用于优化代码性能,并通过多线程、多进程等方式实现并发处理。本章将探讨如何在 Python 中提升性能,并实…...

drawDB docker部属

docker pull xinsodev/drawdb docker run --name some-drawdb -p 3000:80 -d xinsodev/drawdb浏览器访问:http://192.168.31.135:3000/...

探索图像编辑的无限可能——Adobe Photoshop全解析

文章目录 前言一、PS的历史二、PS的应用场景三、PS的功能及工具用法四、图层的概念五、调整与滤镜六、创建蒙版七、绘制形状与路径八、实战练习结语 前言 在当今数字化的世界里,视觉内容无处不在,而创建和编辑这些内容的能力已经成为许多行业的核心技能…...

【Vim Masterclass 笔记13】第 7 章:Vim 核心操作之——文本对象与宏操作 + S07L28:Vim 文本对象

文章目录 Section 7:Text Objects and MacrosS07L28 Text Objects1 文本对象的含义2 操作文本对象的基本语法3 操作光标所在的整个单词4 删除光标所在的整个句子5 操作光标所在的整个段落6 删除光标所在的中括号内的文本7 删除光标所在的小括号内的文本8 操作尖括号…...

Spring Boot教程之五十五:Spring Boot Kafka 消费者示例

Spring Boot Kafka 消费者示例 Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可…...

统计有序矩阵中的负数

统计有序矩阵中的负数 描述 给你一个 m * n 的矩阵 grid,矩阵中的元素无论是按行还是按列,都以非递增顺序排列。 请你统计并返回 grid 中 负数 的数目 示例 1: 输入:grid [[4,3,2,-1],[3,2,1,-1],[1,1,-1,-2],[-1,-1,-2,-3]]…...

【6】Word:海名公司文秘❗

目录 题目 List.docx Word.docx List.docx和Word.docx 题目 List.docx 选中1/4全角空格复制→选中全部文本→开始→替换:粘贴将1/4全角空格 替换成 空格选中全部文本→插入→表格→将文本转化成表格→勾选和布局→自动调整→勾选 选中第一列,单机右键…...

c语言 --- 字符串

创建字符串 1. 使用字符数组创建字符串 #include <stdio.h>int main() {char str[20] "Hello, world!";str[0] h; // 修改字符串的第一个字符printf("%s\n", str); // 输出&#xff1a;hello, world!return 0; }解释&#xff1a; 数组大小 20 表…...

LeetCode 热题 100_二叉树的最近公共祖先(49_236_中等_C++)(二叉树;深度优先搜索)

LeetCode 热题 100_二叉树的最近公共祖先&#xff08;49_236&#xff09; 题目描述&#xff1a;输入输出样例&#xff1a;题解&#xff1a;解题思路&#xff1a;思路一&#xff08;深度优先搜索&#xff09;&#xff1a; 代码实现代码实现&#xff08;思路一&#xff08;深度优…...

(三)c#中const、static、readonly的区别

在 C# 中&#xff0c;const、static 和 readonly 都是用来定义不可变的值&#xff0c;但它们有一些关键的区别。让我们详细比较一下这三者的用途和特点&#xff1a; 1. const&#xff08;常量&#xff09; 编译时常量&#xff1a;const 用于声明常量&#xff0c;其值必须在编…...

人工智能任务19-基于BERT、ELMO模型对诈骗信息文本进行识别与应用

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能任务19-基于BERT、ELMO模型对诈骗信息文本进行识别与应用。近日&#xff0c;演员王星因接到一份看似来自知名公司的拍戏邀约&#xff0c;被骗至泰国并最终被带到缅甸。这一事件迅速引发了社会的广泛关注。该…...

【C++】函数(下)

1、函数的常见样式 常见的函数样式有四种&#xff1a; &#xff08;1&#xff09;无参数无返回值 &#xff08;2&#xff09;有参数无返回值 &#xff08;3&#xff09;无参数有返回值 &#xff08;4&#xff09;有参数有返回值 &#xff08;1&#xff09;无参数无返回值 示例…...

一个使用 Golang 编写的新一代网络爬虫框架,支持JS动态内容爬取

大家好&#xff0c;今天给大家分享一个由ProjectDiscovery组织开发的开源“下一代爬虫框架”Katana&#xff0c;旨在提供高效、灵活且功能丰富的网络爬取体验&#xff0c;适用于各种自动化管道和数据收集任务。 项目介绍 Katana 是 ProjectDiscovery 精心打造的命令行界面&…...

深入探讨 Vue.js 的动态组件渲染与性能优化

Vue.js 作为一款前端领域中备受欢迎的渐进式框架&#xff0c;以其简单优雅的 API 和灵活性受到开发者的喜爱。在开发复杂应用时&#xff0c;动态组件渲染是一项极其重要的技术&#xff0c;它能够在页面中动态地加载或切换组件&#xff0c;从而显著提升应用的灵活性与用户体验。…...

vulnhub靶场【IA系列】之Tornado

前言 靶机&#xff1a;IA-Tornado&#xff0c;IP地址为192.168.10.11 攻击&#xff1a;kali&#xff0c;IP地址为192.168.10.2 都采用虚拟机&#xff0c;网卡为桥接模式 本文所用靶场、kali镜像以及相关工具&#xff0c;我放置在网盘中&#xff0c;可以复制后面链接查看 htt…...

简要认识JAVAWeb技术三剑客:HTMLCSSJavaScript

目录 一、web标准二、什么是HTML三、什么是CSS四、什么是JavaScript 黑马JAVAWeb飞书在线讲义地址&#xff1a; https://heuqqdmbyk.feishu.cn/wiki/LYVswfK4eigRIhkW0pvcqgH9nWd 一、web标准 Web标准也称网页标准&#xff0c;由一系列的标准组成&#xff0c;大部分由W3C&…...

C# 修改项目类型 应用程序程序改类库

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…...

卡通风格渲染

1、卡通风格渲染是什么 卡通风格渲染&#xff08;Cartoon Shading&#xff09;&#xff0c;也称为非真实感渲染&#xff08;NPR&#xff09;或卡通渲染&#xff08;Toon Shading&#xff09; 主要目的是使3D模型看起来更像手绘的二维卡通或漫画风格&#xff0c;而不是逼真写实…...

ubuntu各分区的用途

在 Ubuntu 中&#xff0c;分区是将硬盘划分为多个逻辑部分的过程&#xff0c;每个分区可以用于不同的用途。合理分区可以提高系统性能、数据安全性和管理效率。以下是 Ubuntu 中常见分区及其用途的详细说明&#xff1a; 1. 根分区 (/) 用途&#xff1a;存放操作系统核心文件、…...

第19节 Node.js Express 框架

Express 是一个为Node.js设计的web开发框架&#xff0c;它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用&#xff0c;和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...