「Kafka」监控、集成篇
Kafka-Eagle 监控
Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。
MySQL环境准备
Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。
安装步骤参考:P61 尚硅谷 kafka监控_MySQL环境准备
Kafka 环境准备
-
关闭 Kafka 集群
[atguigu@hadoop102 kafka]$ kf.sh stop
-
修改
/opt/module/kafka/bin/kafka-server-start.sh
[atguigu@hadoop102 kafka]$ vim bin/kafka-server-start.sh
修改如下参数值:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
为
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"export JMX_PORT="9999"#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
初始内存只分配1G,如果要使用 Eagle 功能,我们可以将内存设置为 2G。
注意:修改之后在启动 Kafka 之前要分发至其他节点。
[atguigu@hadoop102 bin]$ xsync kafka-server-start.sh
Kafka-Eagle 安装
-
官网:https://www.kafka-eagle.org/
-
上传压缩包
kafka-eagle-bin-2.0.8.tar.gz
到集群/opt/software
目录 -
解压到本地
[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
-
进入刚才解压的目录
[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ ll
总用量 79164 -rw-rw-r--. 1 atguigu atguigu 81062577 10 月 13 00:00 efak-web-2.0.8-bin.tar.gz
-
将
efak-web-2.0.8-bin.tar.gz
解压至/opt/module
[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/
-
修改名称
[atguigu@hadoop102 module]$ mv efak-web-2.0.8/ efak
-
修改配置文件
/opt/module/efak/conf/system-config.properties
[atguigu@hadoop102 conf]$ vim system-config.properties
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.'instead ###################################### efak.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # EFAK webui port ###################################### efak.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### # offset 保存在 kafka cluster1.efak.offset.storage=kafka ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramL oginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= cluster2.efak.sasl.enable=false cluster2.efak.sasl.protocol=SASL_PLAINTEXT cluster2.efak.sasl.mechanism=PLAIN cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainL oginModule required username="kafka" password="kafka-eagle"; cluster2.efak.sasl.client.id= cluster2.efak.blacklist.topics= cluster2.efak.sasl.cgroup.enable=false cluster2.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enable=false cluster3.efak.ssl.protocol=SSL cluster3.efak.ssl.truststore.location= cluster3.efak.ssl.truststore.password= cluster3.efak.ssl.keystore.location= cluster3.efak.ssl.keystore.password= cluster3.efak.ssl.key.password= cluster3.efak.ssl.endpoint.identification.algorithm=https cluster3.efak.blacklist.topics= cluster3.efak.ssl.cgroup.enable=false cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### # 配置 mysql 连接 efak.driver=com.mysql.jdbc.Driver efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=000000 ###################################### # kafka mysql jdbc driver address ###################################### #efak.driver=com.mysql.cj.jdbc.Driver #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #efak.username=root #efak.password=123456
-
添加环境变量
[atguigu@hadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK export KE_HOME=/opt/module/efak export PATH=$PATH:$KE_HOME/bin
注意:source /etc/profile
[atguigu@hadoop102 conf]$ source /etc/profile
-
启动
-
注意:启动之前需要先启动 zk 以及 kafka
[atguigu@hadoop102 kafka]$ kf.sh start
-
启动 efak
[atguigu@hadoop102 efak]$ bin/ke.sh start
Version 2.0.8 -- Copyright 2016-2021 ***************************************************************** * EFAK Service has started success. * Welcome, Now you can visit 'http://192.168.10.102:8048' * Account:admin ,Password:123456 ***************************************************************** * <Usage> ke.sh [start|status|stop|restart|stats] </Usage> * <Usage> https://www.kafka-eagle.org/ </Usage> *****************************************************************
-
如果停止 efak,执行命令:
[atguigu@hadoop102 efak]$ bin/ke.sh stop
-
Kafka-Eagle 页面操作
- 登录页面查看监控数据
- http://192.168.10.102:8048/
主面板
Brokers
Topics
Zookeepers
Consumers
大屏信息
Kafka-Kraft 模式
Kafka-Kraft 架构
左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。
右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个:
- Kafka 不再依赖外部框架,而是能够独立运行;
- controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
- 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
- controller 不再动态选举,而是由配置文件规定。
- 这样我们可以有针对性的加强 controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。
Kafka-Kraft 集群部署
Kafka-Kraft 集群启动停止脚本
-
在
/home/atguigu/bin
目录下创建文件kf2.sh
脚本文件[atguigu@hadoop102 bin]$ vim kf2.sh
脚本如下:
#! /bin/bashcase $1 in "start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties"done };; "stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "done };; esac
-
添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf2.sh
-
启动集群命令
[atguigu@hadoop102 ~]$ kf2.sh start
-
停止集群命令
[atguigu@hadoop102 ~]$ kf2.sh stop
Kafka 集成
Kafka 集成 Flume
Flume 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
Flume 环境准备
-
启动 kafka 集群
[atguigu@hadoop102 ~]$ zk.sh start [atguigu@hadoop102 ~]$ kf.sh start
-
启动 kafka 消费者
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
-
Flume 安装步骤
- 参考:P66 尚硅谷 Kafka 集成 Flume 环境准备
Flume 生产者
- 通过 Flume 实时监控
app.log
文件数据的变化 - 使用 taildir source,支持断点续传、实时监控文件变化,并获取到数据
- 由于我们传输的就是普通的日志,没有必要追求太高的可靠性,使用 memory channel,完全基于内存,速度非常快;断电后会丢数据,最多丢 100 条日志(因为内存大小最大上线就是 100)
- 数据是发往到 kafka 的,所以使用 kafka sink
- 发到 first 主题中,启动消费者消费。
-
配置 Flume
-
在 hadoop102 节点的 Flume 的 job 目录下创建
file_to_kafka.conf
[atguigu@hadoop102 flume]$ mkdir jobs [atguigu@hadoop102 flume]$ vim jobs/file_to_kafka.conf
-
配置文件内容如下:
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1# 2 配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/app.* # 监控文件目录 a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # offset文件 支持断点续传# 3 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 4 配置sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
-
启动 Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf &
-
向
/opt/module/applog/app.log
里追加数据,查看 kafka 消费者消费情况[atguigu@hadoop102 module]$ mkdir applog [atguigu@hadoop102 applog]$ echo hello >> /opt/module/applog/app.log
-
观察 kafka 消费者,能够看到消费的 hello 数据
Flume 消费者
- Flume 作为消费者,首先肯定选用 kafka source
- 通道选择 memory channel
- 打印到控制台选择 logger sink
-
配置 Flume
-
在 hadoop102 节点的 Flume 的
/opt/module/flume/jobs
目录下创建kafka_to_file.conf
[atguigu@hadoop102 jobs]$ vim kafka_to_file.conf
-
配置文件内容如下:
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1# 2 配置source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# 4 配置sink a1.sinks.k1.type = logger# 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
-
启动 Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
-
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
并输入数据,例如:hello
-
观察控制台输出的日志
Kafka 集成 Flink
Flink是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
Flink 环境准备
-
创建一个 maven 项目
flink-kafka
-
添加配置文件
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.0</version></dependency> </dependencies>
-
将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error
log4j.rootLogger=error, stdout,R log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=../log/agent.log log4j.appender.R.MaxFileSize=1024KB log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
-
在 java 文件夹下创建包名为
com.atguigu.flink
Flink 生产者
-
在
com.atguigu.flink
包下创建 java 类:FlinkKafkaProducer1
(系统也有一个 FlinkKafkaProducer,会重名,所以这里命名为 1)。import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig;import java.util.ArrayList; import java.util.Properties;public class FlinkKafkaProducer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); // 3个槽 对应kafka主题题的3个分区// 1 准备数据源 读取集合中数据ArrayList<String> wordsList = new ArrayList<>();wordsList.add("hello");wordsList.add("atguigu");DataStream<String> stream = env.fromCollection(wordsList);// 2 kafka生产者配置信息Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 3 创建kafka生产者FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(), // 序列化和反序列化模板类 string类型properties);// 4 生产者和flink流关联stream.addSink(kafkaProducer);// 5 执行env.execute();} }
-
启动Kafka消费者
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
-
执行
FlinkKafkaProducer1
程序,观察 kafka 消费者控制台情况Q:
- 为什么先接收到 atguigu,然后才是 hello 呢?
A:
- 在 Flink 中,对于并行度大于 1 的情况,不同的算子实例是并行运行的,也就是说当你的
env.setParallelism(3)
时,会有3
个线程同时运行。在你的例子中,"hello"
和"atguigu"
可能由不同的线程处理,并且处理的顺序是不确定的。 - 如果你希望严格按照顺序处理,你可以将并行度设置为
1
,即env.setParallelism(1)
。但是这样可能会影响处理速度。此外,Flink 也提供了一些方法来保证在并行处理时的顺序,可以查阅相关资料来了解更多。
Flink 消费者
-
在
com.atguigu.flink
包下创建 java 类:FlinkKafkaConsumer1
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class FlinkKafkaConsumer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 1 kafka消费者配置信息Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// group.id可选,不配置不会报错// 2 创建kafka消费者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleStringSchema(),properties);// 3 消费者和flink流关联env.addSource(kafkaConsumer).print();// 4 执行env.execute();} }
-
启动
FlinkKafkaConsumer1
消费者 -
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
-
观察 IDEA 控制台数据打印
有 3 个消费者并行消费,因为只发了两条消息,所以这里只有 1 和 3。
Kafka 集成 SpringBoot
SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者。
跟之前不太一样的是,外部数据是通过接口的方式发送到 SpringBoot 程序,然后 SpringBoot 接收到这个接口的数据,然后再发送到 kafka 集群。
SpringBoot 环境准备
-
在 IDEA 中安装
lombok
插件在 Plugins 下搜索
lombok
然后在线安装即可,安装后注意重启 -
创建一个 Spring Initializr
注意:有时候SpringBoot官方脚手架不稳定,我们切换国内地址:https://start.aliyun.com
-
项目名称 springboot
-
添加项目依赖
-
检查自动生成的配置文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu</groupId><artifactId>springboot</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
SpringBoot 生产者
-
修改 SpringBoot 核心配置文件 application.propeties,添加生产者相关信息
# 应用名称 spring.application.name=atguigu_springboot_kafka# 指定kafka的地址 spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的序列化器 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
-
创建 controller 从浏览器接收数据,并写入指定的 topic
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;@RestController public class ProducerController {// Kafka模板用来向kafka发送数据@AutowiredKafkaTemplate<String, String> kafka;@RequestMapping("/atguigu")public String data(String msg) {kafka.send("first", msg);return "ok";} }
-
在浏览器中给 /atguigu 接口发送数据
http://localhost:8080/atguigu?msg=hello
-
kafka 消费者接收到数据
SpringBoot 消费者
-
修改 SpringBoot 核心配置文件 application.propeties
# =========消费者配置开始========= # 指定kafka的地址 spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的反序列化器 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 指定消费者组的group_id spring.kafka.consumer.group-id=atguigu # =========消费者配置结束=========
-
创建类消费 Kafka 中指定 topic 的数据
import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener;@Configuration public class KafkaConsumer {// 指定要监听的topic@KafkaListener(topics = "first")public void consumeTopic(String msg) { // 参数: 收到的valueSystem.out.println("收到的信息: " + msg);} }
-
向 first 主题发送数据
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first > atguigu
-
SpringBoot 消费者接收到数据
Kafka 集成 Spark
Spark 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。
Spark 环境准备
-
Scala 环境准备
- 参考:P73 尚硅谷 Kafka 集成 Spark 生产者
Spark 的底层源码是用 Scala 编写的。
-
创建一个 maven 项目 spark-kafka
-
在项目 spark-kafka 上点击右键,Add Framework Support => 勾选
scala
-
在 main 下创建 scala 文件夹,并右键 Mark Directory as Sources Root => 在 scala 下创建包名为 com.atguigu.spark
-
添加配置文件
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency> </dependencies>
-
将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error
log4j.rootLogger=error, stdout,R log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=../log/agent.log log4j.appender.R.MaxFileSize=1024KB log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
Spark 生产者
-
在 com.atguigu.spark 包下创建
scala Object
:SparkKafkaProducer
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object SparkKafkaProducer {def main(args: Array[String]): Unit = {// 0 kafka配置信息val properties = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])// 1 创建kafka生产者var producer = new KafkaProducer[String, String](properties)// 2 发送数据for (i <- 1 to 5) {producer.send(new ProducerRecord[String, String]("first", "atguigu" + i))}// 3 关闭资源producer.close()} }
-
启动 Kafka 消费者
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
-
执行 SparkKafkaProducer 程序,观察 kafka 消费者控制台情况
Spark 消费者
-
添加配置文件
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency> </dependencies>
-
在 com.atguigu.spark 包下创建
scala Object
:SparkKafkaConsumer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object SparkKafkaConsumer {def main(args: Array[String]): Unit = {// 1.创建SparkConfval sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")// 2.创建StreamingContext 初始化上下文环境// Seconds(3):时间窗口,批处理间隔,表示每隔3秒钟,Spark Streaming就会收集一次数据进行处理。val ssc = new StreamingContext(sparkConf, Seconds(3))// 3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup")// 4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, // 上下文环境LocationStrategies.PreferConsistent, // 数据存储位置 优先位置ConsumerStrategies.Subscribe[String, String](Set("first"), kafkaPara) // 消费策略:(订阅多个主题,配置参数) )// 5.将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())// 6.计算WordCountvalueDStream.print()// 7.开启任务 并阻塞(使程序一直执行)ssc.start()ssc.awaitTermination()} }
-
启动 SparkKafkaConsumer 消费者
-
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
-
观察IDEA控制台数据打印
笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)
相关文章:

「Kafka」监控、集成篇
Kafka-Eagle 监控 Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。 MySQL环境准备 Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。 安装步骤参考:P61 尚硅谷 kafka监控_MySQL环境准备 …...
Linux之用户和用户组用户账号系统文件
一、简介 1.用户的定义 在linux系统中用户(User)需要用用户账号来访问系统,服务和信息,系统中的每个进程(运行的程序)都是使用一个特定的用户运行。每个文件都属于一个特定的用户所有。对文件和目录的访…...
ESP8266 (5),驱动屏幕
代码 第一步设置驱动库TFT_eSPI的默认配置文件 1,设置适配的屏幕 #define ST7789_DRIVER 2,设置屏幕大小 #define TFT_WIDTH 170 #define TFT_HEIGHT 320 3,设置屏幕驱动板端口和ESP8266对应的端口 // For NodeMCU - use pin numbers in the…...

ChatGPT-01 用ChatGPT指令,自学任何领域的系统知识
1. 指令位置 Github仓库:Mr Ranedeer AI Tutor 但是需要开通chatgtp plus版本,并且打开代码解释器 2 使用 学习内容 开始学习 GPT甚至可以给你思考题,给出的答案还能进行评价 配置 通过配置表修改 深度 学习风格 沟通风格 语气风格 …...

android studio模拟器不能打开
Andriod:The selected AVD is currently running in the Emulator. Please exit the emulator instance… 1.点击 2.删除下面文件 3.重新打开即可 参考...
设计模式学习笔记 - 面向对象 - 5.接口和抽象类的区别
简述 在面向对象编程中,抽象类和接口是常被用到的语法概念,是面向对象四大特性,以及很多设计模式、设计思想、设计原则实现的基础。它们之间的区别是什么?什么时候用接口?什么时候用抽象类?抽象类和接口存…...

PolarDN MISC做题笔记
cat flag 使用01打开flag.png,发现图片尾部有padding的数据。D0 CF 11 E0 A1 B1 1A E1为office2007以前版本的文件头。将其另存为flag.doc,打开发现提示需要密码。(可以注意到:D0CF11E0非常类似DOCFILE) 使用john的office2john.py 提取hash …...

Web安全之浅见
备注:这是我在2017年在自己的网站上写的文章,今天迁移过来。 昨天去参加了公司组织的一个关于网络安全的培训,了解了很多关于网络安全方面的知识,也才意识到网络安全是一项极其重要的领域。 本篇文章主要聊聊Web安全。不过我对于网…...

企业安全建设工具推荐
全自动化挖洞,助力企业安全建设,一键实现域名扫描、IP 发现、端口扫描、服务识别、网站识别、漏洞探测、分析发现、合规检查。 使用方式: 录入目标企业名称即可开始使用 技术细节: 第一步:通过企业主体关联企业备案…...
力扣(leetcode)第455题分发饼干(Python)
455.分发饼干 题目链接:455.分发饼干 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],这是能让孩子们满足胃口的饼干的最小尺寸;并且每块饼干 j,都有一个尺寸 s[j] 。如果 s[j] >= g[i…...

隐私也要付费?Meta公司为收集用户数据再出“奇招”
Cybernews网站消息,有相关人士表示,如果欧洲数据保护委员会(EDPB)不明确指出Meta公司的“付费或同意”的模式违反了欧盟的隐私法规,那么这一模式很可能会被大规模复制,危及数百万欧洲公民的自由选择权。 自…...

Android14 InputManager-InputReader的处理
IMS启动时会调用InputReader.start()方法 InputReader.cpp status_t InputReader::start() {if (mThread) {return ALREADY_EXISTS;}mThread std::make_unique<InputThread>("InputReader", [this]() { loopOnce(); }, [this]() { mEventHub->wake(); });…...
web前端安全性——JSONP劫持
1、JSONP概念 JSONP(JSON with Padding)是JSON的一种“使用模式”,可用于解决主流浏览器的跨域数据访问的问题。由于同源策略,协议IP端口有任意不同都会导致请求跨域,而HTML的script元素是一个例外。利用script元素的这个开放策略࿰…...
从零开始学HCIA之广域网技术03
1、LCP中包含的报文类型 (1)Configure-Request(配置请求),链路层协商过程中发送的第一个报文,该报文表明点对点双方开始进行链路层参数的协商。 (2) Configure-Ack(配置…...
AI推介-大语言模型LLMs论文速览(arXiv方向):2024.01.01-2024.01.10
1.Pre-trained Large Language Models for Financial Sentiment Analysis 标题:用于金融情感分析的预训练大型语言模型 author:Wei Luo, Dihong Gong date Time:2024-01-10 paper pdf:http://arxiv.org/pdf/2401.05215v1 摘要: 金融情感分析是指将金融文本内容划分…...
Redis降低内存占用(二)分片结构
一、分区方法: 分片,也称为分区。Redis提供了多种分区实现方案: 1、哈希分区 2、区间分区 3、一致性哈希分区 4、虚拟分区 5、LUA脚本实现分片 二、...

vue大文件读取部分内容,避免重复加载大文件,造成流量浪费
使用场景:项目点云地图是pcd文件,但是文件可能上百兆,我需要获取到文件中的版本信息,跟本地的缓存文件做比较,如果不一致,才会加载整个文件。从而节省流量。 避免重复加载整个“.pcd文件,以最大…...

5G网络RedCap
RedCap:RedCap(Reduced Capability),即“降低能力”。它是3GPP在5G R17阶段,针对速率、时延要求不高的5G应用场景,专门推出的一种新技术标准协议,旨在全面提升5G网络质量和覆盖率,也…...

vue+springboot登录与注册功能的实现
①首先写一个登录页面 <template> <div style"background-color: #42b983;display: flex;align-items: center;justify-content: center;height: 100vh"><div style"background-color: white;display: flex;width: 50%;height: 50%;overflow: h…...

数据结构D3作业
1. 2. 按位插入 void insert_pos(seq_p L,datatype num,int pos) { if(LNULL) { printf("入参为空,请检查\n"); return; } if(seq_full(L)1) { printf("表已满,不能插入\n"); …...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...

网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...

20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)
引言 工欲善其事,必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后,我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集,就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...