当前位置: 首页 > news >正文

尚硅谷大数据项目《在线教育之采集系统》笔记002

视频地址:尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili

目录

P032

P033

P033

P034

P035

P036


P032

P033

# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

P033

2023-07-26 11:13:42,136 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2023-07-26 11:13:42,139 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:42,241 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2023-07-26 11:13:43,157 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:43,164 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.

[2023-07-26 11:03:06,989] INFO Opening socket connection to server node002/192.168.10.102:2181. (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,989] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,992] WARN Session 0x0 for sever node002/192.168.10.102:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)

flume生效!

node001
启动hadoop、zookeeper、kafka,再启动flume。[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf
Info: Sourcing environment configuration script /opt/module/flume/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/opt/module/hadoop/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
...
[atguigu@node001 ~]$ jpsall
================ node001 ================
6368 NodeManager
5793 NameNode
2819 QuorumPeerMain
6598 JobHistoryServer
5960 DataNode
6681 Application
4955 Kafka
7532 Jps
================ node002 ================
4067 NodeManager
2341 Kafka
3942 ResourceManager
4586 ConsoleConsumer
5131 Jps
1950 QuorumPeerMain
3742 DataNode
================ node003 ================
3472 NodeManager
3235 DataNode
1959 QuorumPeerMain
3355 SecondaryNameNode
2347 Kafka
3679 Jps
[atguigu@node001 ~]$ 
[atguigu@node002 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_log
[atguigu@node001 ~]$ mock.sh
[atguigu@node001 ~]$ 

P034

# /opt/module/flume/flume-1.9.0/job# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
package com.atguigu.flume.interceptor;import com.atguigu.flume.interceptor.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}/*** 过滤掉脏数据(不完整的json)** @param event* @return*/@Overridepublic Event intercept(Event event) {//1、获取body当中的数据byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2、判断数据是否为完整的jsonif (JSONUtil.isJSONValidate(log)) {return event;}return null;}@Overridepublic List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
package com.atguigu.flume.interceptor.utils;import com.alibaba.fastjson.JSONObject;public class JSONUtil {public static boolean isJSONValidate(String log) {try {JSONObject.parseObject(log);return true;} catch (Exception e) {e.printStackTrace();return false;}}
}
[atguigu@node001 log]$ echo '{"id":1}' >> app.log
[atguigu@node001 log]$ echo '{"id": }' >> app.log
[atguigu@node001 log]$ echo '{"id":2}' >> app.log
[atguigu@node001 log]$ 

P035

#! /bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"done
};;	
"stop"){for i in hadoop102 hadoop103doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "done};;
esac
#! /bin/bashcase $1 in
"start"){for i in node001 node002doecho " --------启动 $i 采集flume-------"ssh $i "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/flume-1.9.0/log1.txt 2>&1 &"done
};;	
"stop"){for i in node001 node002doecho " --------停止 $i 采集flume-------"ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done
};;
esac
#! /bin/bashcase $1 in
"start") {echo " --------采集flume启动-------"ssh node001 "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume/flume-1.9.0/conf/ -f /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf >/dev /null 2>&1 &"
};;	
"stop") {echo " --------采集flume关闭-------"ssh node001 "ps -ef | grep file_to_kafka | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
};;
esac

P036

## 1、定义组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1## 2、配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.sources.r1.kafka.consumer.group.id = topic_log
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.useFlumeEventFormat = false## 3、配置channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/flume-1.9.0/data/behavior1/
a1.channels.c1.capacity = 1000000
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.keep-alive = 3## 4、配置sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/log/edu_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#a1.sinks.k1.hdfs.rollInterval = 10
#a1.sinks.k1.hdfs.rollSize = 134217728
#a1.sinks.k1.hdfs.rollCount = 0## 5、组装 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf 

相关文章:

尚硅谷大数据项目《在线教育之采集系统》笔记002

视频地址&#xff1a;尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili 目录 P032 P033 P033 P034 P035 P036 P032 P033 # 1、定义组件&#xff0c;为各组件命名 a1.sources r1 a1.channels c1 a1.sinks - k1# 2、配置sources&#xff0c;描述source a1.sour…...

校园跑腿小程序功能分享

提起校园跑腿小程序大家都不陌生&#xff0c;尤其是对上大学的伙伴们来说,更是熟悉得不能再熟悉了&#xff0c;和我们的生活息息相关&#xff0c;密不可分。 对于现在的年轻人来说&#xff0c;网购是非常简单和方便的一种购物方式&#xff0c;随之快递也会越来越多。在我们国家…...

PHP8的变量-PHP8知识详解

昨天我们讲解了PHP8的常量&#xff0c;今天讲解PHP8的变量。常量有定义常量和预定义常量&#xff0c;变量呢&#xff1f;那就没有定义变量了&#xff0c;那叫给变量赋值&#xff0c;但是还是有预定义变量的。下面就给大家讲解什么是变量、变量赋值及使用及预定义变量。 一、什么…...

图解TCP 三次握手和四次挥手的高频面试题(2023最新版)

大家好&#xff0c;最近重新整理了一版 TCP 三次握手和四次挥手的面试题&#xff08;2023最新版&#xff09;。 ----- 任 TCP 虐我千百遍&#xff0c;我仍待 TCP 如初恋。 巨巨巨巨长的提纲&#xff0c;发车&#xff01;发车&#xff01; img TCP 基本认识 TCP 头格式有哪些…...

【mysql】Win10安装配置MySQL8.0简要

下载 MySQL官网下载安装包 安装...

SQL SERVER使用发布订阅同步数据库遇到的坑

可能遇到的各种坑 1.在执行 xp_cmdshell 的过程中出错。调用 ‘CreateProcess’ 失败&#xff0c;错误代码: ‘5’ 网上有各种解决办法&#xff0c;包括改本地安全策略&#xff0c;将sql server服务的网络权限改为本机系统&#xff0c;改cmd用户的读写权限&#xff0c;退出360…...

3个命令定位CPU飙高

top 指令找出消耗CPU最厉害的那个进程的pid top -H -p 进程pid 找出耗用CPU资源最多的线程pid printf ‘0x%x\n’ 线程pid 将线程pid转换为16进制 结合jstack 找出哪个代码有问题 jstack 进程pid | grep 16进制的线程pid -A 多少行日志 jstack 进程pid | grep 16进制的线程…...

Java版知识付费 Spring Cloud+Spring Boot+Mybatis+uniapp+前后端分离实现知识付费平台免费搭建

提供职业教育、企业培训、知识付费系统搭建服务。系统功能包含&#xff1a;录播课、直播课、题库、营销、公司组织架构、员工入职培训等。 提供私有化部署&#xff0c;免费售后&#xff0c;专业技术指导&#xff0c;支持PC、APP、H5、小程序多终端同步&#xff0c;支持二次开发…...

使用多数据源dynamic-datasource-spring-boot-starter遇到的问题记录

记录使用多数据源dynamic-datasource-spring-boot-starter遇到的问题&#xff1a; 1、工程启动失败 缺少clickhouse连接驱动&#xff0c;引入对应的maven依赖 <!--ck连接驱动--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>…...

构建语言模型:BERT 分步实施指南

学习目标 了解 BERT 的架构和组件。了解 BERT 输入所需的预处理步骤以及如何处理不同的输入序列长度。获得使用 TensorFlow 或 PyTorch 等流行机器学习框架实施 BERT 的实践知识。了解如何针对特定下游任务(例如文本分类或命名实体识别)微调 BERT。为什么我们需要 BERT? 正…...

⛳ Java多线程 一,线程基础

线程基础 ⛳ Java多线程 一&#xff0c;线程基础&#x1f43e; 一&#xff0c;线程基础&#x1f4ad; 1.1&#xff0c;什么是程序&#xff0c;进程&#xff0c;线程&#x1f3ed; 1.2&#xff0c;什么是并行和并发&#x1f463; 1.3&#xff0c;线程使用的场景&#x1f3a8; 1.…...

【iOS】多线程 锁问题总结

文章目录 前言1. 你理解的多线程优点缺点 2. atomic 和 nonatomic 的区别及其作用3. GCD的队列类型 - 三种队列类型4. GCD的死锁问题5. 多线程之间的区别和联系6. 进程和线程&#xff1f;进程间的通信方式线程间的通信方式 6. iOS的线程安全手段如何保证 前言 iOS 锁和多线程的…...

Pytorch深度学习-----神经网络之池化层用法详解及其最大池化的使用

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…...

Docker啥是数据持久化?

文章目录 数据持久化数据卷相关命令创建读写数据卷创建只读数据卷数据卷共享数据卷容器实现数据卷共享nginx实现数据卷共享nfs总结 Dockerfile持久化Dockerfile方式docker run总结 数据持久化 ​ 在容器层的 UnionFS&#xff08;联合文件系统&#xff09;中对文件/目录的任何修…...

CGAL 线段简化算法(2D)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 线段简化是指:在减少一组折线中顶点数量的同时,尽可能保持整体形状的过程。CGAL中提供了一种迭代算法:通过从一条折线上移除顶点 q q q,迭代地将边 ( p , q...

在CentOS 7上挂载硬盘到系统的步骤及操作

目录 1&#xff1a;查询未挂载硬盘2&#xff1a;创建挂载目录3&#xff1a;检查磁盘是否被分区4&#xff1a;格式化硬盘5&#xff1a;挂载目录6&#xff1a;检查挂载状态7&#xff1a;设置开机自动挂载总结&#xff1a; 本文介绍了在CentOS 7上挂载硬盘到系统的详细步骤。通过确…...

螺旋矩阵(JS)

螺旋矩阵 题目 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]]示例 2&#xff1a; 输入&#xff…...

C#常用数学插值法

目录 1、分段线性插值 2、三次样条插值 3、拉格朗日插值 &#xff08;1&#xff09;一元全区间不等距插值 &#xff08;2&#xff09;一元全区间等距插值 4、埃尔米特插值 &#xff08;1&#xff09;埃尔米特不等距插值 &#xff08;2&#xff09;埃尔米特等距插值 1、…...

ELK日志管理平台架构和使用说明

一、部署架构 二、服务注册 2.1 日志解析服务 服务名&#xff1a;日志解析服务&#xff08;Logstash&#xff09; 服务默认端口&#xff1a;9600 2.2 日志查询服务 服务名&#xff1a;日志查询服务&#xff08;Kibana&#xff09; 服务默认端口&#xff1a;5601 三、对接…...

抖音短视频seo矩阵系统源码开发部署技术分享

抖音短视频的SEO矩阵系统是一个非常重要的部分&#xff0c;它可以帮助视频更好地被搜索引擎识别和推荐。以下是一些关于开发和部署抖音短视频SEO矩阵系统的技术分享&#xff1a; 一、 抖音短视频SEO矩阵系统的技术分享&#xff1a; 关键词研究&#xff1a;在开发抖音短视频SEO矩…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

给网站添加live2d看板娘

给网站添加live2d看板娘 参考文献&#xff1a; stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下&#xff0c;文章也主…...