Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)
一、安装Kafka
1.执行以下命令完成Kafka的安装:
cd ~ //默认压缩包放在根目录
sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.6.0 kafka-2.6.0
sudo chown -R qiangzi ./kafka-2.6.0
二、启动Kafaka
1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd /usr/local/kafka-2.6.0
./bin/zookeeper-server-start.sh config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!
2.打开第二个终端,输入下面命令启动Kafka服务:
cd /usr/local/kafka-2.6.0
./bin/kafka-server-start.sh config/server.properties//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh config/server.properties &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。
三、创建Topic
1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.6.0
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.6.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender
注意,所有这些终端窗口都不要关闭,要继续留着后面使用。
四、Spark准备工作
Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。
cd ~ .jar文件默认放在根目录
sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/
spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:
Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)
spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:
Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)
进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源
1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala
它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper connection propertiesval props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)// Send some messageswhile(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString).mkString(" ")print(str)println()val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}
}
2.编写消费者(Consumer)程序
在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:
cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafkaWordCount{def main(args:Array[String]){val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val sc = new SparkContext(sparkConf)sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoopval kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean))val topics = Array("wordsender")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))val lines = maped.map(_._2)val words = lines.flatMap(_.split(" "))val pair = words.map(x => (x,1))val wordCounts = pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger} /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)} } }
5.编译打包程序
现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:
cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt
在simple.sbt中输入以下代码:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"
然后执行下面命令,进行编译打包:
cd /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt package
打包成功界面

6. 运行程序
首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:
cd /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh
启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):
cd /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender 3 5
注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:
cd /usr/local/spark-3.5.1/mycode/kafka/
/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar
运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

相关文章:
Kafka和Spark Streaming的组合使用学习笔记(Spark 3.5.1)
一、安装Kafka 1.执行以下命令完成Kafka的安装: cd ~ //默认压缩包放在根目录 sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local cd /usr/local sudo mv kafka_2.12-2.6.0 kafka-2.6.0 sudo chown -R qiangzi ./kafka-2.6.0 二、启动Kafaka 1.首先需要启动K…...
5.9网络协议
由网卡发送数据通过网线进行发送,当网卡接收到信号以后将数据传给内核数据区,然后由操作系统交给相应的进程。 将数据进行发送的时候需要借助于网线实现,这个时候会出现当传输的数据比较远的时候就借助于中继器将信号进行再生扩大࿰…...
QT客户端开发的注意事项
QT客户端开发是一个涉及图形用户界面(GUI)设计、网络编程、数据库交互等多个方面的复杂过程。以下是在进行QT客户端开发时应注意的一些关键事项,通过关注这些事项,可以提高QT客户端应用的质量和开发效率。北京木奇移动技术有限公司…...
k8s源码编译失败:Makefile:1: *** 缺失分隔符。 停止。
目录 问题解决 更换Arch或系统 问题解决 编译k8s源码的kubelet时执行make失败:Makefile:1: *** 缺失分隔符。 停止。 首先,查看文件内容 # cat Makefile build/root/Makefile 修改Makefile,给第一行前增加include,如下&…...
服务器数据恢复—拯救raid5阵列数据行动,raid5数据恢复案例分享
Raid5数据恢复算法原理: 分布式奇偶校验的独立磁盘结构(被称之为raid5)的数据恢复有一个“奇偶校验”的概念。可以简单的理解为二进制运算中的“异或运算”,通常使用的标识是xor。运算规则:若二者值相同则结果为0&…...
旅游集市数仓建设
旅游集市数仓建设 小白如何从0到1成为大数据工程师 目录 旅游集市数仓建设 1.上传数据 2.可能用到的UDF函数 3.创建所需数据库及表 1)ODS层 ①ods_oidd ②ods_wcdr ③ods_ddr ④ods_dpi 2)DWD层 ①dwd_res_regn_mergelocation_msk_d ②dwm_s…...
vue实现点击高亮效果
<view class"tabs"><textv-for"(item, index) in subTypes":key"item.id"class"text":class"{ active: index activeIndex }"//动态绑定高亮类:判断下标是否等于当前下标tap"activeIndex index&…...
uniapp 配置请求代理+请求封装
uniapp官网提供了三种方式:什么是跨域 | uni-app官网 1. 通过uniapp自带浏览器 打开项目是不存在跨域的 第二种方式: "h5" : {"template" : "static/index.html","devServer": {"proxy": {&quo…...
代码随想录算法训练营第二十八天|216.组合总和III、17.电话号码的字母组合
216.组合总和III 文档讲解:代码随想录 题目链接:. - 力扣(LeetCode) 这一题与昨天的组合差不多,区别就在只有和是目标值的时候才会加入到result数组中,并且在回溯时,会处理sum的值 class Solution:def __i…...
大模型prompt实例:知识库信息质量校验模块
大模型相关目录 大模型,包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步,扬帆起航。 大模型应用向开发路径:AI代理工作流大模型应用开发实用开源项目汇总大模…...
正则表达式和lambda表达式
正则表达式(Regular Expressions)和Lambda表达式虽然都包含“表达式”一词,但它们在编程中的作用和用法是完全不同的。让我们详细比较一下它们的定义、用途和应用场景: 正则表达式 定义:正则表达式是一种用于匹配文本…...
pyenv 之 python 多版本管理(win11)
1. 背景 常常会用到Python的多个版本,因此可以使用Pyenv来对Python版本进行管理。 2. win11下载 pyenv 在终端执行下载语句: pip install pyenv-win --target D:\software\pyenv 其中 D:\software\pyenv 为你想要下载到的文件目录,建议在 …...
nodemon运行ts文件
https://juejin.cn/post/7035637086451400734 nodemon经常用来调试js文件,大家都是知道的,但是用nodemon来调试ts文件,大家试过吗? 如果直接运行nodemon index.ts是会报错的。 ts 复制代码 //index.ts console.log(1) 需要全局…...
内网渗透瑞士军刀-impacket工具解析(二)
impacket工具解析之Kerberos认证协议 上一期我们介绍了impacket中ntlm协议的实现,在Windows认证中除了使用ntlm认证,还支持Kerberos认证协议,Kerberos认证也是Windows 活动目录中占比最高的认证方式。 什么是Kerberos协议? Kerb…...
huggingface 笔记:pipeline
1 介绍 pipeline() 是使用预训练模型进行推理的最简单和最快速的方式。可以针对不同模态的许多任务直接使用 pipeline() 2 举例:情感分析 2.1 创建pipeline实例 from transformers import pipelineclassifier pipeline("sentiment-analysis") #首先创…...
玩转Matlab-Simscape(初级)-01-从一个简单模型开始学习之旅
** 玩转Matlab-Simscape(初级)- 01 - 从一个简单模型开始学习之旅 ** 目录 玩转Matlab-Simscape(初级)- 01 - 从一个简单模型开始学习之旅 前言一、从模板开始建模二、建模一个简单的连杆2.1 建模2.2 生成子系统 总结 前言 在产…...
电脑录屏软件有哪些?这3款神器必须要知道
在当今现代社会,电脑录屏软件已经成为人们日常生活中不可或缺的一部分。无论是录制游戏精彩瞬间、制作教程、还是在线会议记录,一款好用的电脑录屏软件都能帮助我们更高效地完成任务。可是电脑录屏软件有哪些呢?接下来,我们将介绍…...
如何在华企盾DSC防泄密系统中设置文件自动加密?
在华企盾DSC系统中设置文件自动加密的过程,简单且用户友好,确保了企业数据的安全,同时不干扰日常工作流程。以下是设置文件自动加密的步骤: 系统安装与配置:确保华企盾DSC数据防泄密系统已经在企业的网络中正确安装和配…...
【DevOps】Dockerfile详解,做自己的docker镜像
学会使用DockerHub找自己想要的镜像以后,我们会很方便的使用一些公用镜像仓库的Docker镜像。但是开发和部署的过程中,能找到的镜像可能并不能满足我们需要,这样我们就需要自己制作Docker镜像。我们通过需要编写一个 Dockerfile,然…...
CSRF 攻击实验:Token 不存在绕过验证
前言 CSRF(Cross-Site Request Forgery),也称为XSRF,是一种安全漏洞,攻击者通过欺骗用户在受信任网站上执行非自愿的操作,以实现未经授权的请求。 CSRF攻击利用了网站对用户提交的请求缺乏充分验证和防范…...
逻辑回归:给不确定性划界的分类大师
想象你是一名医生。面对患者的检查报告(肿瘤大小、血液指标),你需要做出一个**决定性判断**:恶性还是良性?这种“非黑即白”的抉择,正是**逻辑回归(Logistic Regression)** 的战场&a…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...
Axure 下拉框联动
实现选省、选完省之后选对应省份下的市区...
使用SSE解决获取状态不一致问题
使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件,这个上传文件是整体功能的一部分,文件在上传的过程中…...
若依登录用户名和密码加密
/*** 获取公钥:前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...
