Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)
一、安装Kafka
1.执行以下命令完成Kafka的安装:
cd ~ //默认压缩包放在根目录
sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka-2.6.0
sudo chown -R qiangzi ./kafka-2.6.0
二、启动Kafaka
1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd /usr/local/kafka-2.6.0
./bin/zookeeper-server-start.sh config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!
2.打开第二个终端,输入下面命令启动Kafka服务:
cd /usr/local/kafka-2.6.0
./bin/kafka-server-start.sh config/server.properties//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh config/server.properties &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。
三、创建Topic
1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.6.0
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.6.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender
注意,所有这些终端窗口都不要关闭,要继续留着后面使用。
四、Spark准备工作
Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。
cd ~ .jar文件默认放在根目录
sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/
spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:
Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)
spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:
Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)
进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源
1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala
它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper connection propertiesval props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)// Send some messageswhile(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString).mkString(" ")print(str)println()val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}
}
2.编写消费者(Consumer)程序
在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:
cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafkaWordCount{def main(args:Array[String]){val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val sc = new SparkContext(sparkConf)sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoopval kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean))val topics = Array("wordsender")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))val lines = maped.map(_._2)val words = lines.flatMap(_.split(" "))val pair = words.map(x => (x,1))val wordCounts = pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger} /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)} } }
5.编译打包程序
现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:
cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt
在simple.sbt中输入以下代码:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"
然后执行下面命令,进行编译打包:
cd /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt package
打包成功界面

6. 运行程序
首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:
cd /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh
启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):
cd /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender 3 5
注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:
cd /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

相关文章:
Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)
一、安装Kafka 1.执行以下命令完成Kafka的安装: cd ~ //默认压缩包放在根目录 sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local cd /usr/local sudo mv kafka_2.12-2.6.0 kafka-2.6.0 sudo chown -R qiangzi ./kafka-2.6.0 二、启动Kafaka 1.首先需要启动K…...
5.9网络协议
由网卡发送数据通过网线进行发送,当网卡接收到信号以后将数据传给内核数据区,然后由操作系统交给相应的进程。 将数据进行发送的时候需要借助于网线实现,这个时候会出现当传输的数据比较远的时候就借助于中继器将信号进行再生扩大࿰…...
QT客户端开发的注意事项
QT客户端开发是一个涉及图形用户界面(GUI)设计、网络编程、数据库交互等多个方面的复杂过程。以下是在进行QT客户端开发时应注意的一些关键事项,通过关注这些事项,可以提高QT客户端应用的质量和开发效率。北京木奇移动技术有限公司…...
k8s源码编译失败:Makefile:1: *** 缺失分隔符。 停止。
目录 问题解决 更换Arch或系统 问题解决 编译k8s源码的kubelet时执行make失败:Makefile:1: *** 缺失分隔符。 停止。 首先,查看文件内容 # cat Makefile build/root/Makefile 修改Makefile,给第一行前增加include,如下&…...
服务器数据恢复—拯救raid5阵列数据行动,raid5数据恢复案例分享
Raid5数据恢复算法原理: 分布式奇偶校验的独立磁盘结构(被称之为raid5)的数据恢复有一个“奇偶校验”的概念。可以简单的理解为二进制运算中的“异或运算”,通常使用的标识是xor。运算规则:若二者值相同则结果为0&…...
旅游集市数仓建设
旅游集市数仓建设 小白如何从0到1成为大数据工程师 目录 旅游集市数仓建设 1.上传数据 2.可能用到的UDF函数 3.创建所需数据库及表 1)ODS层 ①ods_oidd ②ods_wcdr ③ods_ddr ④ods_dpi 2)DWD层 ①dwd_res_regn_mergelocation_msk_d ②dwm_s…...
vue实现点击高亮效果
<view class"tabs"><textv-for"(item, index) in subTypes":key"item.id"class"text":class"{ active: index activeIndex }"//动态绑定高亮类:判断下标是否等于当前下标tap"activeIndex index&…...
uniapp 配置请求代理+请求封装
uniapp官网提供了三种方式:什么是跨域 | uni-app官网 1. 通过uniapp自带浏览器 打开项目是不存在跨域的 第二种方式: "h5" : {"template" : "static/index.html","devServer": {"proxy": {&quo…...
代码随想录算法训练营第二十八天|216.组合总和III、17.电话号码的字母组合
216.组合总和III 文档讲解:代码随想录 题目链接:. - 力扣(LeetCode) 这一题与昨天的组合差不多,区别就在只有和是目标值的时候才会加入到result数组中,并且在回溯时,会处理sum的值 class Solution:def __i…...
大模型prompt实例:知识库信息质量校验模块
大模型相关目录 大模型,包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步,扬帆起航。 大模型应用向开发路径:AI代理工作流大模型应用开发实用开源项目汇总大模…...
正则表达式和lambda表达式
正则表达式(Regular Expressions)和Lambda表达式虽然都包含“表达式”一词,但它们在编程中的作用和用法是完全不同的。让我们详细比较一下它们的定义、用途和应用场景: 正则表达式 定义:正则表达式是一种用于匹配文本…...
pyenv 之 python 多版本管理(win11)
1. 背景 常常会用到Python的多个版本,因此可以使用Pyenv来对Python版本进行管理。 2. win11下载 pyenv 在终端执行下载语句: pip install pyenv-win --target D:\software\pyenv 其中 D:\software\pyenv 为你想要下载到的文件目录,建议在 …...
nodemon运行ts文件
https://juejin.cn/post/7035637086451400734 nodemon经常用来调试js文件,大家都是知道的,但是用nodemon来调试ts文件,大家试过吗? 如果直接运行nodemon index.ts是会报错的。 ts 复制代码 //index.ts console.log(1) 需要全局…...
内网渗透瑞士军刀-impacket工具解析(二)
impacket工具解析之Kerberos认证协议 上一期我们介绍了impacket中ntlm协议的实现,在Windows认证中除了使用ntlm认证,还支持Kerberos认证协议,Kerberos认证也是Windows 活动目录中占比最高的认证方式。 什么是Kerberos协议? Kerb…...
huggingface 笔记:pipeline
1 介绍 pipeline() 是使用预训练模型进行推理的最简单和最快速的方式。可以针对不同模态的许多任务直接使用 pipeline() 2 举例:情感分析 2.1 创建pipeline实例 from transformers import pipelineclassifier pipeline("sentiment-analysis") #首先创…...
玩转Matlab-Simscape(初级)-01-从一个简单模型开始学习之旅
** 玩转Matlab-Simscape(初级)- 01 - 从一个简单模型开始学习之旅 ** 目录 玩转Matlab-Simscape(初级)- 01 - 从一个简单模型开始学习之旅 前言一、从模板开始建模二、建模一个简单的连杆2.1 建模2.2 生成子系统 总结 前言 在产…...
电脑录屏软件有哪些?这3款神器必须要知道
在当今现代社会,电脑录屏软件已经成为人们日常生活中不可或缺的一部分。无论是录制游戏精彩瞬间、制作教程、还是在线会议记录,一款好用的电脑录屏软件都能帮助我们更高效地完成任务。可是电脑录屏软件有哪些呢?接下来,我们将介绍…...
如何在华企盾DSC防泄密系统中设置文件自动加密?
在华企盾DSC系统中设置文件自动加密的过程,简单且用户友好,确保了企业数据的安全,同时不干扰日常工作流程。以下是设置文件自动加密的步骤: 系统安装与配置:确保华企盾DSC数据防泄密系统已经在企业的网络中正确安装和配…...
【DevOps】Dockerfile详解,做自己的docker镜像
学会使用DockerHub找自己想要的镜像以后,我们会很方便的使用一些公用镜像仓库的Docker镜像。但是开发和部署的过程中,能找到的镜像可能并不能满足我们需要,这样我们就需要自己制作Docker镜像。我们通过需要编写一个 Dockerfile,然…...
CSRF 攻击实验:Token 不存在绕过验证
前言 CSRF(Cross-Site Request Forgery),也称为XSRF,是一种安全漏洞,攻击者通过欺骗用户在受信任网站上执行非自愿的操作,以实现未经授权的请求。 CSRF攻击利用了网站对用户提交的请求缺乏充分验证和防范…...
Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
从零实现富文本编辑器#5-编辑器选区模型的状态结构表达
先前我们总结了浏览器选区模型的交互策略,并且实现了基本的选区操作,还调研了自绘选区的实现。那么相对的,我们还需要设计编辑器的选区表达,也可以称为模型选区。编辑器中应用变更时的操作范围,就是以模型选区为基准来…...
(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
