尚硅谷大数据项目《在线教育之采集系统》笔记002
视频地址:尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili
目录
P032
P033
P033
P034
P035
P036
P032
P033
# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
P033
2023-07-26 11:13:42,136 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2023-07-26 11:13:42,139 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:42,241 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2023-07-26 11:13:43,157 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:43,164 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
[2023-07-26 11:03:06,989] INFO Opening socket connection to server node002/192.168.10.102:2181. (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,989] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,992] WARN Session 0x0 for sever node002/192.168.10.102:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)
flume生效!
node001
启动hadoop、zookeeper、kafka,再启动flume。[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf
Info: Sourcing environment configuration script /opt/module/flume/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/opt/module/hadoop/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
...
[atguigu@node001 ~]$ jpsall
================ node001 ================
6368 NodeManager
5793 NameNode
2819 QuorumPeerMain
6598 JobHistoryServer
5960 DataNode
6681 Application
4955 Kafka
7532 Jps
================ node002 ================
4067 NodeManager
2341 Kafka
3942 ResourceManager
4586 ConsoleConsumer
5131 Jps
1950 QuorumPeerMain
3742 DataNode
================ node003 ================
3472 NodeManager
3235 DataNode
1959 QuorumPeerMain
3355 SecondaryNameNode
2347 Kafka
3679 Jps
[atguigu@node001 ~]$
[atguigu@node002 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_log
[atguigu@node001 ~]$ mock.sh
[atguigu@node001 ~]$
P034
# /opt/module/flume/flume-1.9.0/job# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
package com.atguigu.flume.interceptor;import com.atguigu.flume.interceptor.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 过滤掉脏数据(不完整的json)** @param event* @return*/@Overridepublic Event intercept(Event event) {//1、获取body当中的数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断数据是否为完整的jsonif (JSONUtil.isJSONValidate(log)) {return event;}return null;}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
package com.atguigu.flume.interceptor.utils;import com.alibaba.fastjson.JSONObject;public class JSONUtil {public static boolean isJSONValidate(String log) {try {JSONObject.parseObject(log);return true;} catch (Exception e) {e.printStackTrace();return false;}}
}
[atguigu@node001 log]$ echo '{"id":1}' >> app.log
[atguigu@node001 log]$ echo '{"id": }' >> app.log
[atguigu@node001 log]$ echo '{"id":2}' >> app.log
[atguigu@node001 log]$
P035
#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"done
};;
"stop"){for i in hadoop102 hadoop103doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done};;
esac
#! /bin/bashcase $1 in
"start"){for i in node001 node002doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/flume-1.9.0/log1.txt 2>&1 &"done
};;
"stop"){for i in node001 node002doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done
};;
esac
#! /bin/bashcase $1 in
"start") {echo " --------采集flume启动-------"ssh node001 "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume/flume-1.9.0/conf/ -f /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf >/dev /null 2>&1 &"
};;
"stop") {echo " --------采集flume关闭-------"ssh node001 "ps -ef | grep file_to_kafka | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
};;
esac
P036
## 1、定义组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1## 2、配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.sources.r1.kafka.consumer.group.id = topic_log
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.useFlumeEventFormat = false## 3、配置channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/flume-1.9.0/data/behavior1/
a1.channels.c1.capacity = 1000000
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.keep-alive = 3## 4、配置sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/log/edu_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#a1.sinks.k1.hdfs.rollInterval = 10
#a1.sinks.k1.hdfs.rollSize = 134217728
#a1.sinks.k1.hdfs.rollCount = 0## 5、组装 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf
相关文章:

尚硅谷大数据项目《在线教育之采集系统》笔记002
视频地址:尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili 目录 P032 P033 P033 P034 P035 P036 P032 P033 # 1、定义组件,为各组件命名 a1.sources r1 a1.channels c1 a1.sinks - k1# 2、配置sources,描述source a1.sour…...

校园跑腿小程序功能分享
提起校园跑腿小程序大家都不陌生,尤其是对上大学的伙伴们来说,更是熟悉得不能再熟悉了,和我们的生活息息相关,密不可分。 对于现在的年轻人来说,网购是非常简单和方便的一种购物方式,随之快递也会越来越多。在我们国家…...

PHP8的变量-PHP8知识详解
昨天我们讲解了PHP8的常量,今天讲解PHP8的变量。常量有定义常量和预定义常量,变量呢?那就没有定义变量了,那叫给变量赋值,但是还是有预定义变量的。下面就给大家讲解什么是变量、变量赋值及使用及预定义变量。 一、什么…...

图解TCP 三次握手和四次挥手的高频面试题(2023最新版)
大家好,最近重新整理了一版 TCP 三次握手和四次挥手的面试题(2023最新版)。 ----- 任 TCP 虐我千百遍,我仍待 TCP 如初恋。 巨巨巨巨长的提纲,发车!发车! img TCP 基本认识 TCP 头格式有哪些…...

【mysql】Win10安装配置MySQL8.0简要
下载 MySQL官网下载安装包 安装...

SQL SERVER使用发布订阅同步数据库遇到的坑
可能遇到的各种坑 1.在执行 xp_cmdshell 的过程中出错。调用 ‘CreateProcess’ 失败,错误代码: ‘5’ 网上有各种解决办法,包括改本地安全策略,将sql server服务的网络权限改为本机系统,改cmd用户的读写权限,退出360…...

3个命令定位CPU飙高
top 指令找出消耗CPU最厉害的那个进程的pid top -H -p 进程pid 找出耗用CPU资源最多的线程pid printf ‘0x%x\n’ 线程pid 将线程pid转换为16进制 结合jstack 找出哪个代码有问题 jstack 进程pid | grep 16进制的线程pid -A 多少行日志 jstack 进程pid | grep 16进制的线程…...

Java版知识付费 Spring Cloud+Spring Boot+Mybatis+uniapp+前后端分离实现知识付费平台免费搭建
提供职业教育、企业培训、知识付费系统搭建服务。系统功能包含:录播课、直播课、题库、营销、公司组织架构、员工入职培训等。 提供私有化部署,免费售后,专业技术指导,支持PC、APP、H5、小程序多终端同步,支持二次开发…...

使用多数据源dynamic-datasource-spring-boot-starter遇到的问题记录
记录使用多数据源dynamic-datasource-spring-boot-starter遇到的问题: 1、工程启动失败 缺少clickhouse连接驱动,引入对应的maven依赖 <!--ck连接驱动--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>…...

构建语言模型:BERT 分步实施指南
学习目标 了解 BERT 的架构和组件。了解 BERT 输入所需的预处理步骤以及如何处理不同的输入序列长度。获得使用 TensorFlow 或 PyTorch 等流行机器学习框架实施 BERT 的实践知识。了解如何针对特定下游任务(例如文本分类或命名实体识别)微调 BERT。为什么我们需要 BERT? 正…...

⛳ Java多线程 一,线程基础
线程基础 ⛳ Java多线程 一,线程基础🐾 一,线程基础💭 1.1,什么是程序,进程,线程🏭 1.2,什么是并行和并发👣 1.3,线程使用的场景🎨 1.…...

【iOS】多线程 锁问题总结
文章目录 前言1. 你理解的多线程优点缺点 2. atomic 和 nonatomic 的区别及其作用3. GCD的队列类型 - 三种队列类型4. GCD的死锁问题5. 多线程之间的区别和联系6. 进程和线程?进程间的通信方式线程间的通信方式 6. iOS的线程安全手段如何保证 前言 iOS 锁和多线程的…...

Pytorch深度学习-----神经网络之池化层用法详解及其最大池化的使用
系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用(ToTensor,Normalize,Resize ,Co…...

Docker啥是数据持久化?
文章目录 数据持久化数据卷相关命令创建读写数据卷创建只读数据卷数据卷共享数据卷容器实现数据卷共享nginx实现数据卷共享nfs总结 Dockerfile持久化Dockerfile方式docker run总结 数据持久化 在容器层的 UnionFS(联合文件系统)中对文件/目录的任何修…...

CGAL 线段简化算法(2D)
文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 线段简化是指:在减少一组折线中顶点数量的同时,尽可能保持整体形状的过程。CGAL中提供了一种迭代算法:通过从一条折线上移除顶点 q q q,迭代地将边 ( p , q...

在CentOS 7上挂载硬盘到系统的步骤及操作
目录 1:查询未挂载硬盘2:创建挂载目录3:检查磁盘是否被分区4:格式化硬盘5:挂载目录6:检查挂载状态7:设置开机自动挂载总结: 本文介绍了在CentOS 7上挂载硬盘到系统的详细步骤。通过确…...

螺旋矩阵(JS)
螺旋矩阵 题目 给你一个正整数 n ,生成一个包含 1 到 n2 所有元素,且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1: 输入:n 3 输出:[[1,2,3],[8,9,4],[7,6,5]]示例 2: 输入ÿ…...

C#常用数学插值法
目录 1、分段线性插值 2、三次样条插值 3、拉格朗日插值 (1)一元全区间不等距插值 (2)一元全区间等距插值 4、埃尔米特插值 (1)埃尔米特不等距插值 (2)埃尔米特等距插值 1、…...

ELK日志管理平台架构和使用说明
一、部署架构 二、服务注册 2.1 日志解析服务 服务名:日志解析服务(Logstash) 服务默认端口:9600 2.2 日志查询服务 服务名:日志查询服务(Kibana) 服务默认端口:5601 三、对接…...

抖音短视频seo矩阵系统源码开发部署技术分享
抖音短视频的SEO矩阵系统是一个非常重要的部分,它可以帮助视频更好地被搜索引擎识别和推荐。以下是一些关于开发和部署抖音短视频SEO矩阵系统的技术分享: 一、 抖音短视频SEO矩阵系统的技术分享: 关键词研究:在开发抖音短视频SEO矩…...

docker 部署一个单节点的rocketmq
拉取镜像 sudo docker pull rocketmqinc/rocketmq创建数据挂载目录 mkdir -p /docker/rocketmq/data/namesrv/logs mkdir -p /docker/rocketmq/data/namesrv/store mkdir -p /docker/rocketmq/data/broker/logs mkdir -p /docker/rocketmq/data/broker/store /docker/…...

MySQL优化
目录 一. 优化 SQL 查询语句 1.1. 分析慢查询日志 1.2. 优化 SQL 查询语句的性能 1.2.1 优化查询中的索引 1.2.2 减少表的连接(join) 1.2.3 优化查询语句中的过滤条件 1.2.4 避免使用SELECT * 1.2.5 优化存储过程和函数 1.2.6 使用缓存 二. 优化表结构…...

【C++】总结9
文章目录 C从源代码到可执行程序经过什么步骤静态链接和动态链接类的对象存储空间C的内存分区内存池在成员函数中调用delete this会出现什么问题?如果在类的析构函数中调用delete this,会发生什么? C从源代码到可执行程序经过什么步骤 预处理…...

C++报错 XX does not name a type;field `XX’ has incomplete type解决方案
C报错 XX does not name a type;field XX’ has incomplete type解决方案 两个C编译错误及解决办法–does not name a type和field XX’ has incomplete type 编译错误一:XX does not name a type 编译错误二:field XX’ has incomplete t…...

28.利用fminsearch、fminunc 求解最大利润问题(matlab程序)
1.简述 1.无约束(无条件)的最优化 fminunc函数 : - 可用于任意函数求最小值 - 统一求最小值问题 - 如求最大值问题: >对函数取相反数而变成求最小值问题,最后把函数值取反即为函数的最大值。 使用格式如下 1.必须预先把函数存…...

图像 检测 - FCOS: Fully Convolutional One-Stage Object Detection (ICCV 2019)
FCOS: Fully Convolutional One-Stage Object Detection - 全卷积一阶段目标检测(ICCV 2019) 摘要1. 引言2. 相关工作3. 我们的方法3.1 全卷积一阶目标检测器3.2 FCOS的FPN多级预测3.3 FCOS中心度 4. 实验4.1 消融研究4.1.1 FPN多级预测4.1.2 有无中心度…...

C# NDArray System.IO.FileLoadException报错原因分析
C# NDArray System.IO.FileLoadException 报错原因分析: 1.NuGet程序包版本有冲突 2.统一项目版本 1.打开解决方案NuGet程序包设置 2.查看是否有版本冲突 3.统一版本冲突...

快速响应,上门维修小程序让您享受无忧生活
随着科技的不断发展和智能手机的普及,上门维修小程序成为了现代人生活中越来越重要的一部分。上门维修小程序通过将维修服务与互联网相结合,为用户提供了更加便捷、高效的维修服务体验。下面将介绍上门维修小程序开发的优势。 提供便捷的预约方式&am…...

05、性能分析思路?
工具操作:包括压力工具、监控工具、剖析工具、调试工具。数值理解:包括上面工具中所有输出的数据。趋势分析、相关性分析、证据链分析:就是理解了工具产生的数值之后,还要把它们的逻辑关系想明白。这才是性能测试分析中最重要的一…...

【编程语言 · C语言 · calloc和realloc】
【编程语言 C语言 calloc和realloc】https://mp.weixin.qq.com/s?__bizMzg4NTE5MDAzOA&mid2247491544&idx1&sn72d8f9931cfa7ce7441a3248475ab619&chksmcfade321f8da6a374a5935bb46441a03a007c0589db6b8afa8c1991854d632a3201553e37b0b&payreadticketHGy…...