当前位置: 首页 > news >正文

Flink日志采集-ELK可视化实现

一、各组件版本

组件版本
Flink1.16.1
kafka2.0.0
Logstash6.5.4
Elasticseach6.3.1
Kibana6.3.1

  针对按照⽇志⽂件⼤⼩滚动⽣成⽂件的⽅式,可能因为某个错误的问题,需要看好多个⽇志⽂件,还有Flink on Yarn模式提交Flink任务,在任务执行完毕或者任务报错后container会被回收从而导致日志丢失,为了方便排查问题可以把⽇志⽂件通过KafkaAppender写⼊到kafka中,然后通过ELK等进⾏⽇志搜索甚⾄是分析告警。

二、Flink配置将日志写入Kafka

2.1 flink-conf.yaml增加下面两行配置信息

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID
env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID

2.2 log4j.properties配置案例如下

##################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
##################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30# This affects logging for both user code and Flink
#rootLogger.appenderRef.file.ref = MainAppender
rootLogger.level = INFO
rootLogger.appenderRef.kafka.ref = Kafka
rootLogger.appenderRef.file.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO# Log all infos in the given file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 500MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10#appender.main.name = MainAppender
#appender.main.type = RollingFile
#appender.main.append = true
#appender.main.fileName = ${sys:log.file}
#appender.main.filePattern = ${sys:log.file}.%i
#appender.main.layout.type = PatternLayout
#appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
#appender.main.policies.type = Policies
#appender.main.policies.size.type = SizeBasedTriggeringPolicy
#appender.main.policies.size.size = 100MB
#appender.main.policies.startup.type = OnStartupTriggeringPolicy
#appender.main.strategy.type = DefaultRolloverStrategy
#appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.syncSend = true
appender.kafka.ignoreExceptions = false
appender.kafka.topic = flink_logs
appender.kafka.property.type = Property
appender.kafka.property.name = bootstrap.servers
appender.kafka.property.value = xxx1:9092,xxx2:9092,xxx3:9092
appender.kafka.layout.type = JSONLayout
apender.kafka.layout.value = net.logstash.log4j.JSONEventLayoutV1
appender.kafka.layout.compact = true
appender.kafka.layout.complete = false# Suppress the irrelevant (wrong) warnings from the Netty channel handler
#logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF#通过 flink on yarn 模式还可以添加⾃定义字段
# 日志路径
appender.kafka.layout.additionalField1.type = KeyValuePair
appender.kafka.layout.additionalField1.key = logdir
appender.kafka.layout.additionalField1.value = ${sys:log.file}
# flink-job-name
appender.kafka.layout.additionalField2.type = KeyValuePair
appender.kafka.layout.additionalField2.key = flinkJobName
appender.kafka.layout.additionalField2.value = ${sys:flinkJobName}
# 提交到yarn的containerId
appender.kafka.layout.additionalField3.type = KeyValuePair
appender.kafka.layout.additionalField3.key = yarnContainerId
appender.kafka.layout.additionalField3.value = ${sys:yarnContainerId}

  上⾯的 appender.kafka.layout.type 可以使⽤ JSONLayout ,也可以⾃定义。

  ⾃定义需要将上⾯的appender.kafka.layout.type 和 appender.kafka.layout.value 修改成如下:

appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern ={"log_level":"%p","log_timestamp":"%d{ISO8601}","log_thread":"%t","log_file":"%F","l
og_line":"%L","log_message":"'%m'","log_path":"%X{log_path}","job_name":"${sys:flink
_job_name}"}%n

2.3 基于Flink on yarn模式提交任务前期准备

2.3.1 需要根据kafka的版本在flink/lib⽬录下放⼊kafka-clients的jar包

在这里插入图片描述

2.3.2 kafka处于启动状态

2.3.3 Flink Standalone集群

# 根据kafka的版本放⼊kafka-clients
kafka-clients-3.1.0.jar
# jackson对应的jar包
jackson-annotations-2.13.3.jar
jackson-core-2.13.3.jar
jackson-databind-2.13.3.jar

2.4 Flink on yarn任务提交案例

/root/software/flink-1.16.1/bin/flink run-application \
-t yarn-application \
-D yarn.application.name=TopSpeedWindowing \
-D parallelism.default=3 \
-D jobmanager.memory.process.size=2g \
-D taskmanager.memory.process.size=2g \
-D env.java.opts="-DflinkJobName=TopSpeedWindowing" \
/root/software/flink-1.16.1/examples/streaming/TopSpeedWindowing.jar

【注意】启动脚本需要加入这个参数,日志才能采集到任务名称(-D env.java.opts="-DflinkJobName=xxx")

  消费flink_logs案例

{instant: {epochSecond: 1698723428,nanoOfSecond: 544000000,},thread: 'flink-akka.actor.default-dispatcher-17',level: 'INFO',loggerName: 'org.apache.flink.runtime.rpc.akka.AkkaRpcService',message: 'Stopped Akka RPC service.',endOfBatch: false,loggerFqcn: 'org.apache.logging.slf4j.Log4jLogger',threadId: 68,threadPriority: 5,logdir: '/yarn/container-logs/application_1697779774806_0046/container_1697779774806_0046_01_000002/taskmanager.log',flinkJobName: 'flink-log-collect-test',yarnContainerId: 'container_1697779774806_0046_01_000002',
}

  ⽇志写⼊Kafka之后可以通过Logstash接⼊elasticsearch,然后通过kibana进⾏查询或搜索

三、LogStash部署

  部署过程略,网上都有

  需要注意Logstash内部kafka-clients和Kafka版本兼容问题,需要根据Kafka版本选择合适的Logstash版本

  将以下内容写⼊config/logstash-sample.conf ⽂件中

input {kafka {bootstrap_servers => ["xxx1:9092,xxx2:9092,xxx3:9092"] group_id => "logstash-group"topics => ["flink_logs"] consumer_threads => 3 type => "flink-logs" codec => "json"auto_offset_reset => "latest"}
}output {elasticsearch {hosts => ["192.168.1.249:9200"] index => "flink-log-%{+YYYY-MM-dd}"}
}

  Logstash启动:

logstash-6.5.4/bin/logstash -f logstash-6.5.4/config/logstash-sample.conf 2>&1 >logstash-6.5.4/logs/logstash.log &

四、Elasticsearch部署

  部署过程略,网上都有

  注意需要用root用户以外的用户启动Elasticsearch

  启动脚本:

Su elasticsearchlogtestelasticsearch-6.3.1/bin/elasticsearch

在这里插入图片描述

  Windows访问ES客户端推荐使用ElasticHD,本地运行后可以直连ES
在这里插入图片描述

五、Kibana部署

  部署过程略,网上都有

  启动脚本:

  kibana-6.3.1-linux-x86_64/bin/kibana

5.1 配置规则

在这里插入图片描述
在这里插入图片描述

5.2 日志分析

在这里插入图片描述

相关文章:

Flink日志采集-ELK可视化实现

一、各组件版本 组件版本Flink1.16.1kafka2.0.0Logstash6.5.4Elasticseach6.3.1Kibana6.3.1 针对按照⽇志⽂件⼤⼩滚动⽣成⽂件的⽅式,可能因为某个错误的问题,需要看好多个⽇志⽂件,还有Flink on Yarn模式提交Flink任务,在任务执…...

iOS NSKeyedUnarchiver归档和读取

使用NSKeyedUnarchiver归档数据到本地&#xff0c;很多时候保存的并不是基础数据类型&#xff0c;更多是自己定义的Model。有时会碰到归档或者读取的内容跟自己保存的数据类型不匹配。 现在按照思路一步一步解决&#xff1a; 1.先保存文件 保存的数据的类型 #import <Fou…...

算法通关村第五关|青铜|基于链表实现队列

基于链表实现队列 public class LinkQueue {// front的next指向首部结点private Node front;// rear记录尾部结点private Node rear;private int size;public LinkQueue() {this.front new Node(0);this.rear new Node(0);}// 入队public void push(int value) {Node newNod…...

【Vue】使用v-model实现控制子组件显隐

v-model 可以实现双向绑定的效果&#xff0c;允许父组件控制子组件的显示/隐藏&#xff0c;同时允许子组件自己控制自身的显示/隐藏。以下是如何使用 v-model 实现这个需求&#xff1a; 在父组件中&#xff0c;你可以使用 v-model 来双向绑定一个变量&#xff0c;这个变量用于…...

一篇博客读懂顺序表 —— Sequence-List

目录 一、顺序表的初始定义 1.1新建头文件和源文件 1.2 SeqList.h 中的准备工作 二、顺序表的初始化与销毁 三、首尾插入元素 四、首尾删除元素 五、中间插入元素 六、中间删除元素 七、查找指定元素下标 八、源代码 一、顺序表的初始定义 1.1新建头文件和源文件 当我…...

OceanBase:02-单机部署(生产环境)

目录 一、部署规划 二、配置要求 三、部署前配置 1.配置 limits.conf 2.配置 sysctl.conf 3.关闭防火墙 4.关闭 SELinux 5.创建数据目录&#xff0c;修改文件所有者信息 6.设置无密码 SSH 登录 7.安装jdk 四、解压执行安装 五、OBD命令行部署 1.修改配置文件(all-c…...

【嵌入式 C 常用算法 2 -- 变量值交换函数异或方式实现】

文章目录 变量值交换函数异或方式实现 变量值交换函数异或方式实现 在C语言中&#xff0c;可以使用异或运算符&#xff08;^&#xff09;来进行两个数的交换&#xff0c;而不需要使用额外的临时变量。这种交换方式的基础是异或运算的以下性质&#xff1a; 任何数和 0 做异或运…...

Hadoop HDFS(分布式文件系统)

一、Hadoop HDFS(分布式文件系统) 为什么要分布式存储数据 假设一个文件有100tb&#xff0c;我们就把文件划分为多个部分&#xff0c;放入到多个服务器 靠数量取胜&#xff0c;多台服务器组合&#xff0c;才能Hold住 数据量太大&#xff0c;单机存储能力有上限&#xff0c;需要…...

力扣1.两数之和

原题链接&#xff1a;1.两数之和 根据题意可以得出 需要找出数组nums内 有两个元素相加等于target的两个整数&#xff0c;并且返回这两个证书的下标。并且数组内有重复元素&#xff0c;但是返回的答案不能有重复元素出现 要记住的就是&#xff0c;需要判断元素是否出现过&…...

JTA分布式事务管理器

XA协议:是一种标准协议&#xff0c;允许事务管理器协调多个资源管理器&#xff0c;确保在分布式事务中的一致性和原子性。 JTA:是JavaEE规范中的一种&#xff0c;用于管理分布式事务的 API&#xff0c;提供了事务的控制和协调机制 Atomikos理解成JTA的实现 XA是JTA的基础(JT…...

晨控CK-GW08系列网关控制器与CODESYS软件MODBUSTCP通讯手册

晨控CK-GW08系列是一款支持标准工业通讯协议ModbusTCP的网关控制器,方便用户集成到PLC等控制系统中。系统还集成了8路读写接口&#xff0c;用户可通过通信接口使用Modbus TCP协议对8路读写接口所连接的读卡器进行相对独立的读写操作。 晨控CK-GW08系列网关控制器适用于本公司多…...

读书笔记——labuladong算法笔记

读书笔记——labuladong算法笔记 序言计算机算法世界观计算机算法方法论二叉树遍历广度遍历BFS二叉树的前中后序遍历回溯算法动态规划算法二分搜索算法 其他算法滑动窗口双指针Union-Find算法 序言 labuladong算法笔记是一本讲解算法题求解技巧的书。本次读书笔记为2023年8月第…...

Linux中阶教程:bash shell基础

文章目录 输入输出赋值和计算条件判断函数for 循环数组及其遍历其他控制语句 输入输出 echo表示打印字符串&#xff1b;read表示获取用户输入&#xff1b;$用于引用变量。 # test1.sh bash中用#进行单行注释 echo "input your name:" read user_name echo "h…...

Golang 编译原理

简介 Golang&#xff08;Go语言&#xff09;是一种开源的编程语言&#xff0c;由Google开发并于2009年首次发布。它具备高效、可靠的特性&#xff0c;被广泛应用于云计算、分布式系统、网络服务等领域。Golang的编译原理是理解和掌握这门语言的重要基础之一。本文将介绍Golang…...

基于深度学习的动物识别 - 卷积神经网络 机器视觉 图像识别 计算机竞赛

文章目录 0 前言1 背景2 算法原理2.1 动物识别方法概况2.2 常用的网络模型2.2.1 B-CNN2.2.2 SSD 3 SSD动物目标检测流程4 实现效果5 部分相关代码5.1 数据预处理5.2 构建卷积神经网络5.3 tensorflow计算图可视化5.4 网络模型训练5.5 对猫狗图像进行2分类 6 最后 0 前言 &#…...

计算机视觉基础——基于yolov5-face算法的车牌检测

文章目录 车牌检测算法检测实现1.环境布置2.数据处理2.1 CCPD数据集介绍2.1.1 ccpd2019及20202.1.2 文件名字解析 2.2数据集处理2.2.1 CCPD数据处理2.2.2 CPRD数据集处理 2.3 检测算法2.3.1 数据配置car_plate.yaml2.3.2 模型配置2.3.3 train.py2.3.4 训练结果 2.4 部署2.4.1 p…...

【好书推荐】AI时代架构师修炼之道:ChatGPT让架构师插上翅膀

目录 前言 ChatGPT对架构师工作的帮助 快速理解和分析需求 提供代码建议和解决方案 辅助系统设计和优化 提高团队协作效率 如何使用ChatGPT提高架构师工作效率 了解用户需求和分析问题 编码实践和问题解决 系统设计和优化建议 团队协作和沟通效率提升 知识管理和文…...

全局代理和局部代理的区别

在计算机领域中&#xff0c;代理是一种常见的网络技术&#xff0c;它可以帮助用户更好地控制网络访问和数据传输。代理可以分为全局代理和局部代理两种&#xff0c;它们有着不同的作用和适用场景。 一、全局代理 全局代理指的是在系统级别设置的代理&#xff0c;它可以代理所…...

基于EPICS stream模块的直流电源的IOC控制程序实例

本实例程序实现了对优利德UDP6720系列直流电源的网络控制和访问&#xff0c;先在此介绍这个项目中使用的硬件&#xff1a; 1、UDP6721直流电源&#xff1a;受控设备 2、moxa串口服务器5150&#xff1a;将UDP6721直流电源设备串口连接转成网络连接 3、香橙派Zero3&#xff1a;运…...

Unity3D ECS架构适合作为主架构还是局部架构

前言 前言 Unity3D是一款广泛应用于游戏开发的跨平台游戏引擎&#xff0c;提供了丰富的功能和工具来简化游戏开发的过程。而Entity-Component-System&#xff08;ECS&#xff09;架构则是一种面向数据的设计模式&#xff0c;它将游戏对象&#xff08;Entity&#xff09;分解为…...

从零开始的目标检测和关键点检测(三):训练一个Glue的RTMPose模型

从零开始的目标检测和关键点检测&#xff08;三&#xff09;&#xff1a;训练一个Glue的RTMPose模型 一、重写config文件二、开始训练三、ncnn部署 从零开始的目标检测和关键点检测&#xff08;一&#xff09;&#xff1a;用labelme标注数据集 从零开始的目标检测和关键点检测…...

Qt6 中弹出消息框,一段时间后自动退出

以下代码功能&#xff0c;弹出模态消息框&#xff0c;然后&#xff0c;等待 3 秒&#xff0c;消息框自动退出 QMessageBox msgbox;msgbox.setText("sleep 3s");QTimer::singleShot(3000, &msgbox, &QMessageBox::close);msgbox.exec();...

elementUI树节点全选,反选,半选状态

// <template>部分 <div class"check-block"><el-divider></el-divider><el-checkbox :indeterminate"indeterminate" v-model"checkAll" change"handleCheckAllChange">全选</el-checkbox><e…...

Kafka、RabbitMQ、RocketMQ中间件的对比

消息中间件现在有不少&#xff0c;网上很多文章都对其做过对比&#xff0c;在这我对其做进一步总结与整理。 RocketMQ 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件&#xff0c;使用Mysql作为消息存储媒介&#xff0c;可完全水平扩容&#xff0c;为了进一步降低成…...

Mac 创建并使用 .zshrc 文件

1&#xff0c;打开终端输入指令 touch .zshrc 2&#xff0c;你可能希望将 .bash_profile 文件中的内容复制到 .zshrc 文件中&#xff0c;那建议复制过来。 3&#xff0c;使用 .zshrc 文件 执行以下指令&#xff1a; source .zshrc 注&#xff1a;以后希望使用 .bash_prof…...

Unity3D移动开发如何依据性能选择Shader

前言 在Unity3D移动开发中&#xff0c;选择合适的Shader是非常重要的&#xff0c;它直接影响到游戏的性能和画面效果。本文将介绍如何依据性能选择Shader&#xff0c;并给出相应的技术详解以及代码实现。 对惹&#xff0c;这里有一个游戏开发交流小组&#xff0c;希望大家可以…...

基于stm32F4的智能宠物喂食器的设计:LVGL界面、定时喂食喂水通风

宠物喂食器 一、功能设计二、元器件选型三、UI设计四、原理图设计五、源代码设计六、成品展示 实物链接&#xff1a;https://m.tb.cn/h.5iCUX6H?tkPL65WXCEipQ CZ3457 一、功能设计 1、设计一个触摸屏作为人机交互 2、通过触摸屏设置时间定时喂食喂水通风 3、获取当前水槽的…...

jumpserver堡垒机docker方式安装部署

1、环境要求 请先自行创建 数据库 和 Redis, 版本要求参考上面环境要求说明 mysql>5.7 redis >5.0 2、创建数据库 mysql&#xff1a; create database jumpserver default charset utf8; GRANT ALL PRIVILEGES ON jumpserver.* TO jumpserver% IDENTIFIED BY nu4x599…...

在基于亚马逊云科技的湖仓一体架构上构建数据血缘的探索和实践

背景介绍 随着大数据技术的进步&#xff0c;企业和组织越来越依赖数据驱动的决策。数据的质量、来源及其流动性因此显得非常关键。数据血缘分析为我们提供了一种追踪数据从起点到终点的方法&#xff0c;有助于理解数据如何被转换和消费&#xff0c;同时对数据治理和合规性起到关…...

VScode clangd 插件浏览 linux 源码

文章目录 VScode clangd 插件浏览 linux 源码clangd 安装与配置VScode 插件安装clangd 安装方法一方法二 clangd 配置 cmake 生成bear 生成 compile_commands.json触发 clangd linux 内核脚本生成 compile_commands.json 文件三种方式对比 VScode clangd 插件浏览 linux 源码 …...