「Kafka」监控、集成篇
Kafka-Eagle 监控
Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。
MySQL环境准备
Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。
安装步骤参考:P61 尚硅谷 kafka监控_MySQL环境准备
Kafka 环境准备
-
关闭 Kafka 集群
[atguigu@hadoop102 kafka]$ kf.sh stop
-
修改
/opt/module/kafka/bin/kafka-server-start.sh
[atguigu@hadoop102 kafka]$ vim bin/kafka-server-start.sh
修改如下参数值:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
为
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"export JMX_PORT="9999"#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
初始内存只分配1G,如果要使用 Eagle 功能,我们可以将内存设置为 2G。
注意:修改之后在启动 Kafka 之前要分发至其他节点。
[atguigu@hadoop102 bin]$ xsync kafka-server-start.sh
Kafka-Eagle 安装
-
官网:https://www.kafka-eagle.org/
-
上传压缩包
kafka-eagle-bin-2.0.8.tar.gz
到集群/opt/software
目录 -
解压到本地
[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
-
进入刚才解压的目录
[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ ll
总用量 79164 -rw-rw-r--. 1 atguigu atguigu 81062577 10 月 13 00:00 efak-web-2.0.8-bin.tar.gz
-
将
efak-web-2.0.8-bin.tar.gz
解压至/opt/module
[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/
-
修改名称
[atguigu@hadoop102 module]$ mv efak-web-2.0.8/ efak
-
修改配置文件
/opt/module/efak/conf/system-config.properties
[atguigu@hadoop102 conf]$ vim system-config.properties
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.'instead ###################################### efak.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # EFAK webui port ###################################### efak.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### # offset 保存在 kafka cluster1.efak.offset.storage=kafka ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramL oginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= cluster2.efak.sasl.enable=false cluster2.efak.sasl.protocol=SASL_PLAINTEXT cluster2.efak.sasl.mechanism=PLAIN cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainL oginModule required username="kafka" password="kafka-eagle"; cluster2.efak.sasl.client.id= cluster2.efak.blacklist.topics= cluster2.efak.sasl.cgroup.enable=false cluster2.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enable=false cluster3.efak.ssl.protocol=SSL cluster3.efak.ssl.truststore.location= cluster3.efak.ssl.truststore.password= cluster3.efak.ssl.keystore.location= cluster3.efak.ssl.keystore.password= cluster3.efak.ssl.key.password= cluster3.efak.ssl.endpoint.identification.algorithm=https cluster3.efak.blacklist.topics= cluster3.efak.ssl.cgroup.enable=false cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### # 配置 mysql 连接 efak.driver=com.mysql.jdbc.Driver efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=000000 ###################################### # kafka mysql jdbc driver address ###################################### #efak.driver=com.mysql.cj.jdbc.Driver #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #efak.username=root #efak.password=123456
-
添加环境变量
[atguigu@hadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK export KE_HOME=/opt/module/efak export PATH=$PATH:$KE_HOME/bin
注意:source /etc/profile
[atguigu@hadoop102 conf]$ source /etc/profile
-
启动
-
注意:启动之前需要先启动 zk 以及 kafka
[atguigu@hadoop102 kafka]$ kf.sh start
-
启动 efak
[atguigu@hadoop102 efak]$ bin/ke.sh start
Version 2.0.8 -- Copyright 2016-2021 ***************************************************************** * EFAK Service has started success. * Welcome, Now you can visit 'http://192.168.10.102:8048' * Account:admin ,Password:123456 ***************************************************************** * <Usage> ke.sh [start|status|stop|restart|stats] </Usage> * <Usage> https://www.kafka-eagle.org/ </Usage> *****************************************************************
-
如果停止 efak,执行命令:
[atguigu@hadoop102 efak]$ bin/ke.sh stop
-
Kafka-Eagle 页面操作
- 登录页面查看监控数据
- http://192.168.10.102:8048/
主面板
Brokers
Topics
Zookeepers
Consumers
大屏信息
Kafka-Kraft 模式
Kafka-Kraft 架构
左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。
右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个:
- Kafka 不再依赖外部框架,而是能够独立运行;
- controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
- 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
- controller 不再动态选举,而是由配置文件规定。
- 这样我们可以有针对性的加强 controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。
Kafka-Kraft 集群部署
Kafka-Kraft 集群启动停止脚本
-
在
/home/atguigu/bin
目录下创建文件kf2.sh
脚本文件[atguigu@hadoop102 bin]$ vim kf2.sh
脚本如下:
#! /bin/bashcase $1 in "start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties"done };; "stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "done };; esac
-
添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf2.sh
-
启动集群命令
[atguigu@hadoop102 ~]$ kf2.sh start
-
停止集群命令
[atguigu@hadoop102 ~]$ kf2.sh stop
Kafka 集成
Kafka 集成 Flume
Flume 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
Flume 环境准备
-
启动 kafka 集群
[atguigu@hadoop102 ~]$ zk.sh start [atguigu@hadoop102 ~]$ kf.sh start
-
启动 kafka 消费者
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
-
Flume 安装步骤
- 参考:P66 尚硅谷 Kafka 集成 Flume 环境准备
Flume 生产者
- 通过 Flume 实时监控
app.log
文件数据的变化 - 使用 taildir source,支持断点续传、实时监控文件变化,并获取到数据
- 由于我们传输的就是普通的日志,没有必要追求太高的可靠性,使用 memory channel,完全基于内存,速度非常快;断电后会丢数据,最多丢 100 条日志(因为内存大小最大上线就是 100)
- 数据是发往到 kafka 的,所以使用 kafka sink
- 发到 first 主题中,启动消费者消费。
-
配置 Flume
-
在 hadoop102 节点的 Flume 的 job 目录下创建
file_to_kafka.conf
[atguigu@hadoop102 flume]$ mkdir jobs [atguigu@hadoop102 flume]$ vim jobs/file_to_kafka.conf
-
配置文件内容如下:
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1# 2 配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/app.* # 监控文件目录 a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # offset文件 支持断点续传# 3 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 4 配置sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
-
启动 Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf &
-
向
/opt/module/applog/app.log
里追加数据,查看 kafka 消费者消费情况[atguigu@hadoop102 module]$ mkdir applog [atguigu@hadoop102 applog]$ echo hello >> /opt/module/applog/app.log
-
观察 kafka 消费者,能够看到消费的 hello 数据
Flume 消费者
- Flume 作为消费者,首先肯定选用 kafka source
- 通道选择 memory channel
- 打印到控制台选择 logger sink
-
配置 Flume
-
在 hadoop102 节点的 Flume 的
/opt/module/flume/jobs
目录下创建kafka_to_file.conf
[atguigu@hadoop102 jobs]$ vim kafka_to_file.conf
-
配置文件内容如下:
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1# 2 配置source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 4 配置sink a1.sinks.k1.type = logger# 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
-
启动 Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
-
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
并输入数据,例如:hello
-
观察控制台输出的日志
Kafka 集成 Flink
Flink是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
Flink 环境准备
-
创建一个 maven 项目
flink-kafka
-
添加配置文件
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.0</version></dependency> </dependencies>
-
将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error
log4j.rootLogger=error, stdout,R log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=../log/agent.log log4j.appender.R.MaxFileSize=1024KB log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
-
在 java 文件夹下创建包名为
com.atguigu.flink
Flink 生产者
-
在
com.atguigu.flink
包下创建 java 类:FlinkKafkaProducer1
(系统也有一个 FlinkKafkaProducer,会重名,所以这里命名为 1)。import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig;import java.util.ArrayList; import java.util.Properties;public class FlinkKafkaProducer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); // 3个槽 对应kafka主题题的3个分区// 1 准备数据源 读取集合中数据ArrayList<String> wordsList = new ArrayList<>();wordsList.add("hello");wordsList.add("atguigu");DataStream<String> stream = env.fromCollection(wordsList);// 2 kafka生产者配置信息Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 3 创建kafka生产者FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(), // 序列化和反序列化模板类 string类型properties);// 4 生产者和flink流关联stream.addSink(kafkaProducer);// 5 执行env.execute();} }
-
启动Kafka消费者
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
-
执行
FlinkKafkaProducer1
程序,观察 kafka 消费者控制台情况Q:
- 为什么先接收到 atguigu,然后才是 hello 呢?
A:
- 在 Flink 中,对于并行度大于 1 的情况,不同的算子实例是并行运行的,也就是说当你的
env.setParallelism(3)
时,会有3
个线程同时运行。在你的例子中,"hello"
和"atguigu"
可能由不同的线程处理,并且处理的顺序是不确定的。 - 如果你希望严格按照顺序处理,你可以将并行度设置为
1
,即env.setParallelism(1)
。但是这样可能会影响处理速度。此外,Flink 也提供了一些方法来保证在并行处理时的顺序,可以查阅相关资料来了解更多。
Flink 消费者
-
在
com.atguigu.flink
包下创建 java 类:FlinkKafkaConsumer1
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class FlinkKafkaConsumer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 1 kafka消费者配置信息Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// group.id可选,不配置不会报错// 2 创建kafka消费者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleStringSchema(),properties);// 3 消费者和flink流关联env.addSource(kafkaConsumer).print();// 4 执行env.execute();} }
-
启动
FlinkKafkaConsumer1
消费者 -
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
-
观察 IDEA 控制台数据打印
有 3 个消费者并行消费,因为只发了两条消息,所以这里只有 1 和 3。
Kafka 集成 SpringBoot
SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者。
跟之前不太一样的是,外部数据是通过接口的方式发送到 SpringBoot 程序,然后 SpringBoot 接收到这个接口的数据,然后再发送到 kafka 集群。
SpringBoot 环境准备
-
在 IDEA 中安装
lombok
插件在 Plugins 下搜索
lombok
然后在线安装即可,安装后注意重启 -
创建一个 Spring Initializr
注意:有时候SpringBoot官方脚手架不稳定,我们切换国内地址:https://start.aliyun.com
-
项目名称 springboot
-
添加项目依赖
-
检查自动生成的配置文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu</groupId><artifactId>springboot</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
SpringBoot 生产者
-
修改 SpringBoot 核心配置文件 application.propeties,添加生产者相关信息
# 应用名称 spring.application.name=atguigu_springboot_kafka# 指定kafka的地址 spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的序列化器 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
-
创建 controller 从浏览器接收数据,并写入指定的 topic
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;@RestController public class ProducerController {// Kafka模板用来向kafka发送数据@AutowiredKafkaTemplate<String, String> kafka;@RequestMapping("/atguigu")public String data(String msg) {kafka.send("first", msg);return "ok";} }
-
在浏览器中给 /atguigu 接口发送数据
http://localhost:8080/atguigu?msg=hello
-
kafka 消费者接收到数据
SpringBoot 消费者
-
修改 SpringBoot 核心配置文件 application.propeties
# =========消费者配置开始========= # 指定kafka的地址 spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的反序列化器 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 指定消费者组的group_id spring.kafka.consumer.group-id=atguigu # =========消费者配置结束=========
-
创建类消费 Kafka 中指定 topic 的数据
import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener;@Configuration public class KafkaConsumer {// 指定要监听的topic@KafkaListener(topics = "first")public void consumeTopic(String msg) { // 参数: 收到的valueSystem.out.println("收到的信息: " + msg);} }
-
向 first 主题发送数据
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first > atguigu
-
SpringBoot 消费者接收到数据
Kafka 集成 Spark
Spark 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
Spark 环境准备
-
Scala 环境准备
- 参考:P73 尚硅谷 Kafka 集成 Spark 生产者
Spark 的底层源码是用 Scala 编写的。
-
创建一个 maven 项目 spark-kafka
-
在项目 spark-kafka 上点击右键,Add Framework Support => 勾选
scala
-
在 main 下创建 scala 文件夹,并右键 Mark Directory as Sources Root => 在 scala 下创建包名为 com.atguigu.spark
-
添加配置文件
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency> </dependencies>
-
将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error
log4j.rootLogger=error, stdout,R log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=../log/agent.log log4j.appender.R.MaxFileSize=1024KB log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
Spark 生产者
-
在 com.atguigu.spark 包下创建
scala Object
:SparkKafkaProducer
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object SparkKafkaProducer {def main(args: Array[String]): Unit = {// 0 kafka配置信息val properties = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])// 1 创建kafka生产者var producer = new KafkaProducer[String, String](properties)// 2 发送数据for (i <- 1 to 5) {producer.send(new ProducerRecord[String, String]("first", "atguigu" + i))}// 3 关闭资源producer.close()} }
-
启动 Kafka 消费者
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
-
执行 SparkKafkaProducer 程序,观察 kafka 消费者控制台情况
Spark 消费者
-
添加配置文件
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency> </dependencies>
-
在 com.atguigu.spark 包下创建
scala Object
:SparkKafkaConsumer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object SparkKafkaConsumer {def main(args: Array[String]): Unit = {// 1.创建SparkConfval sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")// 2.创建StreamingContext 初始化上下文环境// Seconds(3):时间窗口,批处理间隔,表示每隔3秒钟,Spark Streaming就会收集一次数据进行处理。val ssc = new StreamingContext(sparkConf, Seconds(3))// 3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup")// 4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, // 上下文环境LocationStrategies.PreferConsistent, // 数据存储位置 优先位置ConsumerStrategies.Subscribe[String, String](Set("first"), kafkaPara) // 消费策略:(订阅多个主题,配置参数) )// 5.将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())// 6.计算WordCountvalueDStream.print()// 7.开启任务 并阻塞(使程序一直执行)ssc.start()ssc.awaitTermination()} }
-
启动 SparkKafkaConsumer 消费者
-
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
-
观察IDEA控制台数据打印
笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)
相关文章:

「Kafka」监控、集成篇
Kafka-Eagle 监控 Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。 MySQL环境准备 Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。 安装步骤参考:P61 尚硅谷 kafka监控_MySQL环境准备 …...
Linux之用户和用户组用户账号系统文件
一、简介 1.用户的定义 在linux系统中用户(User)需要用用户账号来访问系统,服务和信息,系统中的每个进程(运行的程序)都是使用一个特定的用户运行。每个文件都属于一个特定的用户所有。对文件和目录的访…...
ESP8266 (5),驱动屏幕
代码 第一步设置驱动库TFT_eSPI的默认配置文件 1,设置适配的屏幕 #define ST7789_DRIVER 2,设置屏幕大小 #define TFT_WIDTH 170 #define TFT_HEIGHT 320 3,设置屏幕驱动板端口和ESP8266对应的端口 // For NodeMCU - use pin numbers in the…...

ChatGPT-01 用ChatGPT指令,自学任何领域的系统知识
1. 指令位置 Github仓库:Mr Ranedeer AI Tutor 但是需要开通chatgtp plus版本,并且打开代码解释器 2 使用 学习内容 开始学习 GPT甚至可以给你思考题,给出的答案还能进行评价 配置 通过配置表修改 深度 学习风格 沟通风格 语气风格 …...

android studio模拟器不能打开
Andriod:The selected AVD is currently running in the Emulator. Please exit the emulator instance… 1.点击 2.删除下面文件 3.重新打开即可 参考...
设计模式学习笔记 - 面向对象 - 5.接口和抽象类的区别
简述 在面向对象编程中,抽象类和接口是常被用到的语法概念,是面向对象四大特性,以及很多设计模式、设计思想、设计原则实现的基础。它们之间的区别是什么?什么时候用接口?什么时候用抽象类?抽象类和接口存…...

PolarDN MISC做题笔记
cat flag 使用01打开flag.png,发现图片尾部有padding的数据。D0 CF 11 E0 A1 B1 1A E1为office2007以前版本的文件头。将其另存为flag.doc,打开发现提示需要密码。(可以注意到:D0CF11E0非常类似DOCFILE) 使用john的office2john.py 提取hash …...

Web安全之浅见
备注:这是我在2017年在自己的网站上写的文章,今天迁移过来。 昨天去参加了公司组织的一个关于网络安全的培训,了解了很多关于网络安全方面的知识,也才意识到网络安全是一项极其重要的领域。 本篇文章主要聊聊Web安全。不过我对于网…...

企业安全建设工具推荐
全自动化挖洞,助力企业安全建设,一键实现域名扫描、IP 发现、端口扫描、服务识别、网站识别、漏洞探测、分析发现、合规检查。 使用方式: 录入目标企业名称即可开始使用 技术细节: 第一步:通过企业主体关联企业备案…...
力扣(leetcode)第455题分发饼干(Python)
455.分发饼干 题目链接:455.分发饼干 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],这是能让孩子们满足胃口的饼干的最小尺寸;并且每块饼干 j,都有一个尺寸 s[j] 。如果 s[j] >= g[i…...

隐私也要付费?Meta公司为收集用户数据再出“奇招”
Cybernews网站消息,有相关人士表示,如果欧洲数据保护委员会(EDPB)不明确指出Meta公司的“付费或同意”的模式违反了欧盟的隐私法规,那么这一模式很可能会被大规模复制,危及数百万欧洲公民的自由选择权。 自…...

Android14 InputManager-InputReader的处理
IMS启动时会调用InputReader.start()方法 InputReader.cpp status_t InputReader::start() {if (mThread) {return ALREADY_EXISTS;}mThread std::make_unique<InputThread>("InputReader", [this]() { loopOnce(); }, [this]() { mEventHub->wake(); });…...
web前端安全性——JSONP劫持
1、JSONP概念 JSONP(JSON with Padding)是JSON的一种“使用模式”,可用于解决主流浏览器的跨域数据访问的问题。由于同源策略,协议IP端口有任意不同都会导致请求跨域,而HTML的script元素是一个例外。利用script元素的这个开放策略࿰…...
从零开始学HCIA之广域网技术03
1、LCP中包含的报文类型 (1)Configure-Request(配置请求),链路层协商过程中发送的第一个报文,该报文表明点对点双方开始进行链路层参数的协商。 (2) Configure-Ack(配置…...
AI推介-大语言模型LLMs论文速览(arXiv方向):2024.01.01-2024.01.10
1.Pre-trained Large Language Models for Financial Sentiment Analysis 标题:用于金融情感分析的预训练大型语言模型 author:Wei Luo, Dihong Gong date Time:2024-01-10 paper pdf:http://arxiv.org/pdf/2401.05215v1 摘要: 金融情感分析是指将金融文本内容划分…...
Redis降低内存占用(二)分片结构
一、分区方法: 分片,也称为分区。Redis提供了多种分区实现方案: 1、哈希分区 2、区间分区 3、一致性哈希分区 4、虚拟分区 5、LUA脚本实现分片 二、...

vue大文件读取部分内容,避免重复加载大文件,造成流量浪费
使用场景:项目点云地图是pcd文件,但是文件可能上百兆,我需要获取到文件中的版本信息,跟本地的缓存文件做比较,如果不一致,才会加载整个文件。从而节省流量。 避免重复加载整个“.pcd文件,以最大…...

5G网络RedCap
RedCap:RedCap(Reduced Capability),即“降低能力”。它是3GPP在5G R17阶段,针对速率、时延要求不高的5G应用场景,专门推出的一种新技术标准协议,旨在全面提升5G网络质量和覆盖率,也…...

vue+springboot登录与注册功能的实现
①首先写一个登录页面 <template> <div style"background-color: #42b983;display: flex;align-items: center;justify-content: center;height: 100vh"><div style"background-color: white;display: flex;width: 50%;height: 50%;overflow: h…...

数据结构D3作业
1. 2. 按位插入 void insert_pos(seq_p L,datatype num,int pos) { if(LNULL) { printf("入参为空,请检查\n"); return; } if(seq_full(L)1) { printf("表已满,不能插入\n"); …...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
06 Deep learning神经网络编程基础 激活函数 --吴恩达
深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...

论文阅读笔记——Muffin: Testing Deep Learning Libraries via Neural Architecture Fuzzing
Muffin 论文 现有方法 CRADLE 和 LEMON,依赖模型推理阶段输出进行差分测试,但在训练阶段是不可行的,因为训练阶段直到最后才有固定输出,中间过程是不断变化的。API 库覆盖低,因为各个 API 都是在各种具体场景下使用。…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...

aardio 自动识别验证码输入
技术尝试 上周在发学习日志时有网友提议“在网页上识别验证码”,于是尝试整合图像识别与网页自动化技术,完成了这套模拟登录流程。核心思路是:截图验证码→OCR识别→自动填充表单→提交并验证结果。 代码在这里 import soImage; import we…...

PH热榜 | 2025-06-08
1. Thiings 标语:一套超过1900个免费AI生成的3D图标集合 介绍:Thiings是一个不断扩展的免费AI生成3D图标库,目前已有超过1900个图标。你可以按照主题浏览,生成自己的图标,或者下载整个图标集。所有图标都可以在个人或…...

ArcGIS Pro+ArcGIS给你的地图加上北回归线!
今天来看ArcGIS Pro和ArcGIS中如何给制作的中国地图或者其他大范围地图加上北回归线。 我们将在ArcGIS Pro和ArcGIS中一同介绍。 1 ArcGIS Pro中设置北回归线 1、在ArcGIS Pro中初步设置好经纬格网等,设置经线、纬线都以10间隔显示。 2、需要插入背会归线…...
boost::filesystem::path文件路径使用详解和示例
boost::filesystem::path 是 Boost 库中用于跨平台操作文件路径的类,封装了路径的拼接、分割、提取、判断等常用功能。下面是对它的使用详解,包括常用接口与完整示例。 1. 引入头文件与命名空间 #include <boost/filesystem.hpp> namespace fs b…...