当前位置: 首页 > news >正文

Flink消费Kafka实时写入Doris

本文模拟实际生产环境,通过FileBeat采集日志信息到Kafka,再通过Flink消费Kafka实时写入Doris。

文章目录

    • Filebeat采集日志到Kafka
    • Flink消费Kafka实时写入Doris
    • 总结

在这里插入图片描述

Filebeat采集日志到Kafka

常见的日志采集工具有以下几种:Flume、Logstash和Filebeat

  • Flume采用Java编写,它是一个分布式、高度可靠且高度可用的工具,旨在高效地搜集、汇总和转移大量日志数据,该工具拥有一个简洁且灵活的流数据流架构,它配备了可调节的可靠性机制、故障切换以及恢复功能,此外,Flume通过简单且可扩展的数据模型支持在线分析应用程序。
  • Logstash是一个开源的日志管理和分析工具,它能够从多个数据源收集数据,对数据进行转换和清洗,并将处理后的数据传输到目标系统。
  • Filebeat是一款go语言编写的日志文件收集工具,当在服务器上部署其客户端后,它会持续监听特定的日志目录或日志文件,实时跟踪并读取这些文件的更新内容,并将这些数据发送到指定的输出目标,例如Elasticsearch或Kafka等。

这里选择Filebeat进行日志采集的主要原因在于其资源消耗极低,相较于Flume和Logstash,Filebeat占用的内存最少,对CPU的负载也最小。它的运行进程十分稳定,很少出现崩溃或宕机的情况。

首先下载Filebeat

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz

在这里插入图片描述
解压缩文件

tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz

进入目录

cd filebeat-8.12.0-linux-x86_64

编写配置文件接入Kafka

vim filebeat.yaml

filebeat.yaml的文件内容

filebeat.inputs:
- type: logpaths:- /doc/input/*.log  # 更换为你的日志文件路径
processors:- include_fields:fields: ["message"]
output.kafka:# 更换为你的Kafka地址和主题.hosts: ["192.168.235.130:9092"]topic: k2ggcodec:format:string: '%{[message]}'

运行Filebeat采集日志

./filebeat -e -c ./filebeat.yaml

在这里插入图片描述

这是log日志的信息,现要求保持原始格式发送到Kafka
在这里插入图片描述Filebeat采集日志信息发送到Kafka的主题,消费者收到的信息如下,Filebeat会添加一些自带的数据,比如时间戳和元数据等,但是一般情况下只需要采集message里面的信息,通过filebeat.yaml中的processors和codec即可实现。
在这里插入图片描述processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
消费者消费原始格式的日志消息
在这里插入图片描述

Flink消费Kafka实时写入Doris

在写入之前,建立doris的数据表用于接收消费的信息

CREATE TABLE transactions (timestamp datetime,user_id INT,transaction_type VARCHAR(50),amount DECIMAL(15, 2),currency CHAR(3),status VARCHAR(20),description TEXT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES("replication_num"="1");

引入依赖

   <dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>24.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency>

主程序

package flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;import java.util.Properties;public class DorisWrite {public static void main(String[] args) throws Exception {Properties props = new Properties();//Kafka broker的地址props.put("bootstrap.servers", "192.168.235.130:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//指定消费的主题FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("k2gg",new SimpleStringSchema(),props);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();//Doris的地址以及账号密码等信息dorisBuilder.setFenodes("192.168.235.130:8030").setTableIdentifier("test.transactions").setUsername("root").setPassword("1445413748");Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("read_json_by_line", "true");DorisExecutionOptions  executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris12"+System.currentTimeMillis()) //streamload label prefix,.setStreamLoadProp(pro).build();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);// 将Kafka数据转换为JSON格式DataStream<String> jsonStream = dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println("value"+value);// 分割字符串String[] parts = value.split(",");// 创建JSON字符串StringBuilder jsonString = new StringBuilder();jsonString.append("{");jsonString.append("\"timestamp\":\"").append(parts[0]).append("\",");jsonString.append("\"user_id\":").append(parts[1]).append(",");jsonString.append("\"transaction_type\":\"").append(parts[2]).append("\",");jsonString.append("\"amount\":").append(parts[3]).append(",");jsonString.append("\"currency\":\"").append(parts[4]).append("\",");jsonString.append("\"status\":\"").append(parts[5]).append("\",");jsonString.append("\"description\":\"").append(parts[6].replace("\"", "")).append("\"");jsonString.append("}");return jsonString.toString();}});jsonStream.print();jsonStream.sinkTo(builder.build());env.execute("flink kafka to doris by datastream");}
}

运行主程序通过Flink消费Kafka的信息写入doris
在这里插入图片描述log日志的信息
在这里插入图片描述
登录Doris进行验证

mysql -h k8s-master -P 9030 -uroot -p

这是没运行主程序之前doris的数据,没有2024-10-15这一天的数据。

select * from transactions where date(timestamp) = "2024-10-15";

在这里插入图片描述
运行主程序之后,Flink将Kafka主题的信息实时写入Doris。
在这里插入图片描述

总结

1.Filebeat格式问题
Filebeat采集日志格式会添加一些自带的额外信息,一般情况下只需要message里面的字段信息,那么yaml文件配置processors和codec属性即可。processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
2.Flink消费Kafka失败
Flink在消费Kafka主题的过程中,不要往该主题发送其他格式的数据,否则会解析失败,尽量新建一个新主题来接收Filebeat采集过来的日志信息。如果还是执行失败,可以尝试在setLabelPrefix添加一个时间戳,这样保证每次生成的标签前缀都不一样,这是因为客户端会生成一个唯一的标签来标识这次导入Doris的操作,Doris服务器会根据这个标签来跟踪导入的进度和状态,如果导入过程中出现问题,Doris会保留失败的数据,客户端就可以通过标签重新导入这些数据。
3.实时写入Doris失败
Flink处理字段的数据类型要与Doris匹配,可以参考官方文档Doris 和 Flink 列类型映射关系。

相关文章:

Flink消费Kafka实时写入Doris

本文模拟实际生产环境&#xff0c;通过FileBeat采集日志信息到Kafka&#xff0c;再通过Flink消费Kafka实时写入Doris。 文章目录 Filebeat采集日志到KafkaFlink消费Kafka实时写入Doris总结 Filebeat采集日志到Kafka 常见的日志采集工具有以下几种&#xff1a;Flume、Logstash和…...

实现Web QQ音乐打开现有新标签页切换音乐

若没有打开播放音乐标签页&#xff0c;则打开新标签页播放所选音乐如果已打开新标签页&#xff0c;则直接切换所选音乐 pageA.vue <script setup lang"ts"> const tab2 ref<any>(null); const router useRouter();interface Track {id: number;name: …...

从底层结构开始学习FPGA(15)----时钟结构(通俗版)

目录 0、前言 1、IO Bank和Clock Region(时钟区域)是一个东西吗? 2、时钟输入管脚 3、时钟架构 3.1、全局时钟BUFG 3.2、水平时钟BUFH 3.3、IO时钟BUFIO 3.4、区域时钟BUFR/BUFMR 4、总结 《从底层结构开始学习FPGA》目录与传送门 0、前言 我思来想去,总觉…...

MacOS Sublime Text 解决中乱码

1. 安装Package Control 官方安装指南 手动安装 通过以此点击菜单 Sublime Text > Preferences > Browse Packages 打开Packages目录找到Packages的同级目录Installed Packages下载PackageControl.sublime-package并保存到Installed Packages中在菜单 Sublime Text &g…...

Python画笔案例-084 绘制 3D立方体

1、绘制 3D立方体 通过 python 的turtle 库绘制 3D立方体,如下图: 2、实现代码 绘制 3D立方体,以下为实现代码: import turtle import timeviewfactor = 150 xshift = 0 yshift = 0 zshift = 50...

“八股文”面试:助力、阻力还是空谈?

在当今的IT行业&#xff0c;面试程序员时提及“八股文”已成为一种普遍现象。所谓“八股文”&#xff0c;通常指的是一系列固定的、标准化的面试问题及其解答&#xff0c;这些问题往往涵盖了计算机科学和软件工程的基础知识&#xff0c;以及一些流行的技术框架和算法。然而&…...

如何实现弹出式窗口

文章目录 1 概念介绍2 使用方法3 示例代码我们在上一章回中介绍了Sliver综合示例相关的内容,本章回中将介绍PopupMenuButton组件.闲话休提,让我们一起Talk Flutter吧。 1 概念介绍 我们在本章回中介绍的PopupMenuButton组件位于AppBar右侧,通常显示三个圆点图标,点击该图标…...

Lua 函数

Lua 函数 Lua 是一种轻量级的编程语言&#xff0c;广泛用于游戏开发、脚本编写和其他应用程序中。在 Lua 中&#xff0c;函数是一等公民&#xff0c;这意味着它们可以被存储在变量中&#xff0c;作为参数传递给其他函数&#xff0c;以及作为其他函数的返回值。本文将详细介绍 …...

HTML_文本标签

概念&#xff1a; 1、用于包裹&#xff1a;词汇、短语等。 2、通常写在排版标签里面。 3、排版标签更宏观(大段的文字)&#xff0c;文本标签更微观(词汇、短语)。 4、文本标签通常都是行内元素。 常用的文本标签 标签名 全称 标签语义em Emphasized 加重(文本)。要着重阅…...

基于SpringBoot+Vue+uniapp的诗词学习系统的详细设计和实现(源码+lw+部署文档+讲解等)

详细视频演示 请联系我获取更详细的演示视频 项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不…...

健康睡眠的重要性

在快节奏的现代生活中&#xff0c;健康养生已成为人们日益关注的话题&#xff0c;而睡眠&#xff0c;这一看似平凡却至关重要的生理需求&#xff0c;往往被忽视在忙碌的缝隙中。今天&#xff0c;让我们深入探讨健康养生中的睡眠艺术&#xff0c;它不仅关乎身体的休息与恢复&…...

知道ip地址怎么看网络地址

在计算机网络的世界里&#xff0c;IP地址是设备之间通信的基础。然而&#xff0c;仅仅知道一个设备的IP地址并不足以完全理解它在网络中的位置和作用。网络地址&#xff0c;作为IP地址的一个重要组成部分&#xff0c;为我们提供了关于设备所属网络的更多信息。本文将深入探讨如…...

精心整理85道Java微服务面试题(含答案)

微服务 面试题 1、您对微服务有何了解&#xff1f; 2、微服务架构有哪些优势&#xff1f; 3。微服务有哪些特点&#xff1f; 4、设计微服务的最佳实践是什么&#xff1f; 5、微服务架构如何运作&#xff1f; 6、微服务架构的优缺点是什么&#xff1f; 7、单片&#xff0…...

MongoDB聚合管道(Aggregation Pipeline)

聚合管道&#xff08;Aggregation Pipeline&#xff09;是MongoDB中用于对数据进行处理和分析的一种强大机制。它由一系列的阶段&#xff08;Stage&#xff09;组成&#xff0c;每个阶段对输入的数据进行一种特定的操作&#xff0c;然后将结果传递给下一个阶段&#xff0c;就像…...

移情别恋c++ ദ്ദി˶ー̀֊ー́ ) ——6.vector(无习题)

C 中的 vector 容器详细总结 1. 什么是 vector&#xff1f; vector 是 C 标准模板库 (STL) 中的一种动态数组容器。它的底层实现是一个可以自动扩展的数组&#xff0c;支持随机访问和动态调整大小&#xff0c;是 C 中最常用的序列容器之一。vector 在插入、删除、遍历以及随机…...

SpringBoot技术支持的桂林景点导航

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…...

利用vmware在移动硬盘安装Ubuntu2go

安装 买个移动硬盘&#xff0c;usb插电脑&#xff0c;磁盘管理看磁盘序列号 vmware新建虚拟机 这一步选择磁盘管理里面看到的磁盘4 先不要开机&#xff0c;选择设置里面UEFI 和安装正常Ubuntu一致操作即可&#xff0c;这里可以不选高级&#xff0c;默认一个引导分区&…...

Spring Boot:中小型医院网站的敏捷开发

摘 要 本基于Spring Boot的中小型医院网站设计目标是实现用户网络预约挂号的功能&#xff0c;同时提高医院管理效率&#xff0c;更好的为广大用户服务。 本文重点阐述了中小型医院网站的开发过程&#xff0c;以实际运用为开发背景&#xff0c;基于Spring Boot框架&#xff0c;运…...

241011-在jupyter中实现文件夹压缩后下载

241011-在jupyter中实现文件夹压缩后下载 在使用jupyter notebook过程中&#xff0c;我们经常会遇到成堆的文件无法批量下载的问题&#xff0c;这里提供压缩文件夹代码&#xff0c;压缩后即可右键文件选择download实现批量下载 import zipfile import os# 设置你想要压缩的文…...

.NET 一款用于转储指定进程内存的工具

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

算法岗面试经验分享-大模型篇

文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer &#xff08;1&#xff09;资源 论文&a…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版

7种色调职场工作汇报PPT&#xff0c;橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版&#xff1a;职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

动态 Web 开发技术入门篇

一、HTTP 协议核心 1.1 HTTP 基础 协议全称 &#xff1a;HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09; 默认端口 &#xff1a;HTTP 使用 80 端口&#xff0c;HTTPS 使用 443 端口。 请求方法 &#xff1a; GET &#xff1a;用于获取资源&#xff0c;…...

从面试角度回答Android中ContentProvider启动原理

Android中ContentProvider原理的面试角度解析&#xff0c;分为​​已启动​​和​​未启动​​两种场景&#xff1a; 一、ContentProvider已启动的情况 1. ​​核心流程​​ ​​触发条件​​&#xff1a;当其他组件&#xff08;如Activity、Service&#xff09;通过ContentR…...

Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成

一个面向 Java 开发者的 Sring-Ai 示例工程项目&#xff0c;该项目是一个 Spring AI 快速入门的样例工程项目&#xff0c;旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计&#xff0c;每个模块都专注于特定的功能领域&#xff0c;便于学习和…...

【大模型】RankRAG:基于大模型的上下文排序与检索增强生成的统一框架

文章目录 A 论文出处B 背景B.1 背景介绍B.2 问题提出B.3 创新点 C 模型结构C.1 指令微调阶段C.2 排名与生成的总和指令微调阶段C.3 RankRAG推理&#xff1a;检索-重排-生成 D 实验设计E 个人总结 A 论文出处 论文题目&#xff1a;RankRAG&#xff1a;Unifying Context Ranking…...

2025.6.9总结(利与弊)

凡事都有两面性。在大厂上班也不例外。今天找开发定位问题&#xff0c;从一个接口人不断溯源到另一个 接口人。有时候&#xff0c;不知道是谁的责任填。将工作内容分的很细&#xff0c;每个人负责其中的一小块。我清楚的意识到&#xff0c;自己就是个可以随时替换的螺丝钉&…...