当前位置: 首页 > news >正文

Flink日志采集-ELK可视化实现

一、各组件版本

组件版本
Flink1.16.1
kafka2.0.0
Logstash6.5.4
Elasticseach6.3.1
Kibana6.3.1

  针对按照⽇志⽂件⼤⼩滚动⽣成⽂件的⽅式,可能因为某个错误的问题,需要看好多个⽇志⽂件,还有Flink on Yarn模式提交Flink任务,在任务执行完毕或者任务报错后container会被回收从而导致日志丢失,为了方便排查问题可以把⽇志⽂件通过KafkaAppender写⼊到kafka中,然后通过ELK等进⾏⽇志搜索甚⾄是分析告警。

二、Flink配置将日志写入Kafka

2.1 flink-conf.yaml增加下面两行配置信息

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID
env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID

2.2 log4j.properties配置案例如下

##################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
##################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30# This affects logging for both user code and Flink
#rootLogger.appenderRef.file.ref = MainAppender
rootLogger.level = INFO
rootLogger.appenderRef.kafka.ref = Kafka
rootLogger.appenderRef.file.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO# Log all infos in the given file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 500MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10#appender.main.name = MainAppender
#appender.main.type = RollingFile
#appender.main.append = true
#appender.main.fileName = ${sys:log.file}
#appender.main.filePattern = ${sys:log.file}.%i
#appender.main.layout.type = PatternLayout
#appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
#appender.main.policies.type = Policies
#appender.main.policies.size.type = SizeBasedTriggeringPolicy
#appender.main.policies.size.size = 100MB
#appender.main.policies.startup.type = OnStartupTriggeringPolicy
#appender.main.strategy.type = DefaultRolloverStrategy
#appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.syncSend = true
appender.kafka.ignoreExceptions = false
appender.kafka.topic = flink_logs
appender.kafka.property.type = Property
appender.kafka.property.name = bootstrap.servers
appender.kafka.property.value = xxx1:9092,xxx2:9092,xxx3:9092
appender.kafka.layout.type = JSONLayout
apender.kafka.layout.value = net.logstash.log4j.JSONEventLayoutV1
appender.kafka.layout.compact = true
appender.kafka.layout.complete = false# Suppress the irrelevant (wrong) warnings from the Netty channel handler
#logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF#通过 flink on yarn 模式还可以添加⾃定义字段
# 日志路径
appender.kafka.layout.additionalField1.type = KeyValuePair
appender.kafka.layout.additionalField1.key = logdir
appender.kafka.layout.additionalField1.value = ${sys:log.file}
# flink-job-name
appender.kafka.layout.additionalField2.type = KeyValuePair
appender.kafka.layout.additionalField2.key = flinkJobName
appender.kafka.layout.additionalField2.value = ${sys:flinkJobName}
# 提交到yarn的containerId
appender.kafka.layout.additionalField3.type = KeyValuePair
appender.kafka.layout.additionalField3.key = yarnContainerId
appender.kafka.layout.additionalField3.value = ${sys:yarnContainerId}

  上⾯的 appender.kafka.layout.type 可以使⽤ JSONLayout ,也可以⾃定义。

  ⾃定义需要将上⾯的appender.kafka.layout.type 和 appender.kafka.layout.value 修改成如下:

appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern ={"log_level":"%p","log_timestamp":"%d{ISO8601}","log_thread":"%t","log_file":"%F","l
og_line":"%L","log_message":"'%m'","log_path":"%X{log_path}","job_name":"${sys:flink
_job_name}"}%n

2.3 基于Flink on yarn模式提交任务前期准备

2.3.1 需要根据kafka的版本在flink/lib⽬录下放⼊kafka-clients的jar包

在这里插入图片描述

2.3.2 kafka处于启动状态

2.3.3 Flink Standalone集群

# 根据kafka的版本放⼊kafka-clients
kafka-clients-3.1.0.jar
# jackson对应的jar包
jackson-annotations-2.13.3.jar
jackson-core-2.13.3.jar
jackson-databind-2.13.3.jar

2.4 Flink on yarn任务提交案例

/root/software/flink-1.16.1/bin/flink run-application \
-t yarn-application \
-D yarn.application.name=TopSpeedWindowing \
-D parallelism.default=3 \
-D jobmanager.memory.process.size=2g \
-D taskmanager.memory.process.size=2g \
-D env.java.opts="-DflinkJobName=TopSpeedWindowing" \
/root/software/flink-1.16.1/examples/streaming/TopSpeedWindowing.jar

【注意】启动脚本需要加入这个参数,日志才能采集到任务名称(-D env.java.opts="-DflinkJobName=xxx")

  消费flink_logs案例

{instant: {epochSecond: 1698723428,nanoOfSecond: 544000000,},thread: 'flink-akka.actor.default-dispatcher-17',level: 'INFO',loggerName: 'org.apache.flink.runtime.rpc.akka.AkkaRpcService',message: 'Stopped Akka RPC service.',endOfBatch: false,loggerFqcn: 'org.apache.logging.slf4j.Log4jLogger',threadId: 68,threadPriority: 5,logdir: '/yarn/container-logs/application_1697779774806_0046/container_1697779774806_0046_01_000002/taskmanager.log',flinkJobName: 'flink-log-collect-test',yarnContainerId: 'container_1697779774806_0046_01_000002',
}

  ⽇志写⼊Kafka之后可以通过Logstash接⼊elasticsearch,然后通过kibana进⾏查询或搜索

三、LogStash部署

  部署过程略,网上都有

  需要注意Logstash内部kafka-clients和Kafka版本兼容问题,需要根据Kafka版本选择合适的Logstash版本

  将以下内容写⼊config/logstash-sample.conf ⽂件中

input {kafka {bootstrap_servers => ["xxx1:9092,xxx2:9092,xxx3:9092"] group_id => "logstash-group"topics => ["flink_logs"] consumer_threads => 3 type => "flink-logs" codec => "json"auto_offset_reset => "latest"}
}output {elasticsearch {hosts => ["192.168.1.249:9200"] index => "flink-log-%{+YYYY-MM-dd}"}
}

  Logstash启动:

logstash-6.5.4/bin/logstash -f logstash-6.5.4/config/logstash-sample.conf 2>&1 >logstash-6.5.4/logs/logstash.log &

四、Elasticsearch部署

  部署过程略,网上都有

  注意需要用root用户以外的用户启动Elasticsearch

  启动脚本:

Su elasticsearchlogtestelasticsearch-6.3.1/bin/elasticsearch

在这里插入图片描述

  Windows访问ES客户端推荐使用ElasticHD,本地运行后可以直连ES
在这里插入图片描述

五、Kibana部署

  部署过程略,网上都有

  启动脚本:

  kibana-6.3.1-linux-x86_64/bin/kibana

5.1 配置规则

在这里插入图片描述
在这里插入图片描述

5.2 日志分析

在这里插入图片描述

相关文章:

Flink日志采集-ELK可视化实现

一、各组件版本 组件版本Flink1.16.1kafka2.0.0Logstash6.5.4Elasticseach6.3.1Kibana6.3.1 针对按照⽇志⽂件⼤⼩滚动⽣成⽂件的⽅式,可能因为某个错误的问题,需要看好多个⽇志⽂件,还有Flink on Yarn模式提交Flink任务,在任务执…...

iOS NSKeyedUnarchiver归档和读取

使用NSKeyedUnarchiver归档数据到本地&#xff0c;很多时候保存的并不是基础数据类型&#xff0c;更多是自己定义的Model。有时会碰到归档或者读取的内容跟自己保存的数据类型不匹配。 现在按照思路一步一步解决&#xff1a; 1.先保存文件 保存的数据的类型 #import <Fou…...

算法通关村第五关|青铜|基于链表实现队列

基于链表实现队列 public class LinkQueue {// front的next指向首部结点private Node front;// rear记录尾部结点private Node rear;private int size;public LinkQueue() {this.front new Node(0);this.rear new Node(0);}// 入队public void push(int value) {Node newNod…...

【Vue】使用v-model实现控制子组件显隐

v-model 可以实现双向绑定的效果&#xff0c;允许父组件控制子组件的显示/隐藏&#xff0c;同时允许子组件自己控制自身的显示/隐藏。以下是如何使用 v-model 实现这个需求&#xff1a; 在父组件中&#xff0c;你可以使用 v-model 来双向绑定一个变量&#xff0c;这个变量用于…...

一篇博客读懂顺序表 —— Sequence-List

目录 一、顺序表的初始定义 1.1新建头文件和源文件 1.2 SeqList.h 中的准备工作 二、顺序表的初始化与销毁 三、首尾插入元素 四、首尾删除元素 五、中间插入元素 六、中间删除元素 七、查找指定元素下标 八、源代码 一、顺序表的初始定义 1.1新建头文件和源文件 当我…...

OceanBase:02-单机部署(生产环境)

目录 一、部署规划 二、配置要求 三、部署前配置 1.配置 limits.conf 2.配置 sysctl.conf 3.关闭防火墙 4.关闭 SELinux 5.创建数据目录&#xff0c;修改文件所有者信息 6.设置无密码 SSH 登录 7.安装jdk 四、解压执行安装 五、OBD命令行部署 1.修改配置文件(all-c…...

【嵌入式 C 常用算法 2 -- 变量值交换函数异或方式实现】

文章目录 变量值交换函数异或方式实现 变量值交换函数异或方式实现 在C语言中&#xff0c;可以使用异或运算符&#xff08;^&#xff09;来进行两个数的交换&#xff0c;而不需要使用额外的临时变量。这种交换方式的基础是异或运算的以下性质&#xff1a; 任何数和 0 做异或运…...

Hadoop HDFS(分布式文件系统)

一、Hadoop HDFS(分布式文件系统) 为什么要分布式存储数据 假设一个文件有100tb&#xff0c;我们就把文件划分为多个部分&#xff0c;放入到多个服务器 靠数量取胜&#xff0c;多台服务器组合&#xff0c;才能Hold住 数据量太大&#xff0c;单机存储能力有上限&#xff0c;需要…...

力扣1.两数之和

原题链接&#xff1a;1.两数之和 根据题意可以得出 需要找出数组nums内 有两个元素相加等于target的两个整数&#xff0c;并且返回这两个证书的下标。并且数组内有重复元素&#xff0c;但是返回的答案不能有重复元素出现 要记住的就是&#xff0c;需要判断元素是否出现过&…...

JTA分布式事务管理器

XA协议:是一种标准协议&#xff0c;允许事务管理器协调多个资源管理器&#xff0c;确保在分布式事务中的一致性和原子性。 JTA:是JavaEE规范中的一种&#xff0c;用于管理分布式事务的 API&#xff0c;提供了事务的控制和协调机制 Atomikos理解成JTA的实现 XA是JTA的基础(JT…...

晨控CK-GW08系列网关控制器与CODESYS软件MODBUSTCP通讯手册

晨控CK-GW08系列是一款支持标准工业通讯协议ModbusTCP的网关控制器,方便用户集成到PLC等控制系统中。系统还集成了8路读写接口&#xff0c;用户可通过通信接口使用Modbus TCP协议对8路读写接口所连接的读卡器进行相对独立的读写操作。 晨控CK-GW08系列网关控制器适用于本公司多…...

读书笔记——labuladong算法笔记

读书笔记——labuladong算法笔记 序言计算机算法世界观计算机算法方法论二叉树遍历广度遍历BFS二叉树的前中后序遍历回溯算法动态规划算法二分搜索算法 其他算法滑动窗口双指针Union-Find算法 序言 labuladong算法笔记是一本讲解算法题求解技巧的书。本次读书笔记为2023年8月第…...

Linux中阶教程:bash shell基础

文章目录 输入输出赋值和计算条件判断函数for 循环数组及其遍历其他控制语句 输入输出 echo表示打印字符串&#xff1b;read表示获取用户输入&#xff1b;$用于引用变量。 # test1.sh bash中用#进行单行注释 echo "input your name:" read user_name echo "h…...

Golang 编译原理

简介 Golang&#xff08;Go语言&#xff09;是一种开源的编程语言&#xff0c;由Google开发并于2009年首次发布。它具备高效、可靠的特性&#xff0c;被广泛应用于云计算、分布式系统、网络服务等领域。Golang的编译原理是理解和掌握这门语言的重要基础之一。本文将介绍Golang…...

基于深度学习的动物识别 - 卷积神经网络 机器视觉 图像识别 计算机竞赛

文章目录 0 前言1 背景2 算法原理2.1 动物识别方法概况2.2 常用的网络模型2.2.1 B-CNN2.2.2 SSD 3 SSD动物目标检测流程4 实现效果5 部分相关代码5.1 数据预处理5.2 构建卷积神经网络5.3 tensorflow计算图可视化5.4 网络模型训练5.5 对猫狗图像进行2分类 6 最后 0 前言 &#…...

计算机视觉基础——基于yolov5-face算法的车牌检测

文章目录 车牌检测算法检测实现1.环境布置2.数据处理2.1 CCPD数据集介绍2.1.1 ccpd2019及20202.1.2 文件名字解析 2.2数据集处理2.2.1 CCPD数据处理2.2.2 CPRD数据集处理 2.3 检测算法2.3.1 数据配置car_plate.yaml2.3.2 模型配置2.3.3 train.py2.3.4 训练结果 2.4 部署2.4.1 p…...

【好书推荐】AI时代架构师修炼之道:ChatGPT让架构师插上翅膀

目录 前言 ChatGPT对架构师工作的帮助 快速理解和分析需求 提供代码建议和解决方案 辅助系统设计和优化 提高团队协作效率 如何使用ChatGPT提高架构师工作效率 了解用户需求和分析问题 编码实践和问题解决 系统设计和优化建议 团队协作和沟通效率提升 知识管理和文…...

全局代理和局部代理的区别

在计算机领域中&#xff0c;代理是一种常见的网络技术&#xff0c;它可以帮助用户更好地控制网络访问和数据传输。代理可以分为全局代理和局部代理两种&#xff0c;它们有着不同的作用和适用场景。 一、全局代理 全局代理指的是在系统级别设置的代理&#xff0c;它可以代理所…...

基于EPICS stream模块的直流电源的IOC控制程序实例

本实例程序实现了对优利德UDP6720系列直流电源的网络控制和访问&#xff0c;先在此介绍这个项目中使用的硬件&#xff1a; 1、UDP6721直流电源&#xff1a;受控设备 2、moxa串口服务器5150&#xff1a;将UDP6721直流电源设备串口连接转成网络连接 3、香橙派Zero3&#xff1a;运…...

Unity3D ECS架构适合作为主架构还是局部架构

前言 前言 Unity3D是一款广泛应用于游戏开发的跨平台游戏引擎&#xff0c;提供了丰富的功能和工具来简化游戏开发的过程。而Entity-Component-System&#xff08;ECS&#xff09;架构则是一种面向数据的设计模式&#xff0c;它将游戏对象&#xff08;Entity&#xff09;分解为…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

Java多线程实现之Callable接口深度解析

Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

ios苹果系统,js 滑动屏幕、锚定无效

现象&#xff1a;window.addEventListener监听touch无效&#xff0c;划不动屏幕&#xff0c;但是代码逻辑都有执行到。 scrollIntoView也无效。 原因&#xff1a;这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作&#xff0c;从而会影响…...

技术栈RabbitMq的介绍和使用

目录 1. 什么是消息队列&#xff1f;2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...

Webpack性能优化:构建速度与体积优化策略

一、构建速度优化 1、​​升级Webpack和Node.js​​ ​​优化效果​​&#xff1a;Webpack 4比Webpack 3构建时间降低60%-98%。​​原因​​&#xff1a; V8引擎优化&#xff08;for of替代forEach、Map/Set替代Object&#xff09;。默认使用更快的md4哈希算法。AST直接从Loa…...