Kafka集成flume
1.flume作为生产者集成Kafka
kafka作为flume的sink,扮演消费者角色
1.1 flume配置文件
vim $kafka/jobs/flume-kafka.conf
# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = TAILDIR
#记录最后监控文件的断点的文件,此文件位置可不改
a1.sources.r1.positionFile = /export/server/flume/job/data/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /export/server/flume/job/data/.*file.*
a1.sources.r1.filegroups.f2 =/export/server/flume/job/data/.*log.*# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = customers
a1.sinks.k1.kafka.bootstrap.servers =node1:9092,node2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.2开启flume监控
flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume.conf
1.3开启Kafka消费者
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers --from-beginning
1.4生产数据
往被监控文件输入数据
[ljr@node1 data]$echo hello >>file2.txt
[ljr@node1 data]$ echo ============== >>file2.txt
查看Kafka消费者

可见Kafka集成flume生产者成功。
2.flume作为消费者集成Kafka
kafka作为flume的source,扮演生产者角色
2.1flume配置文件
vim $kafka/jobs/flume-kafka.conf
# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#注意不要大于channel transactionCapacity的值100
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers =node1:9092, node1:9092
a1.sources.r1.kafka.topics = consumers
a1.sources.r1.kafka.consumer.group.id = custom.g.id# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#注意transactionCapacity的值不要小于sources batchSize的值50
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2开启flume监控
flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume1.conf
2.3开启Kafka生产者并生产数据
kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers

查看flume监控台

可见Kafka集成flume消费者成功。
相关文章:
Kafka集成flume
1.flume作为生产者集成Kafka kafka作为flume的sink,扮演消费者角色 1.1 flume配置文件 vim $kafka/jobs/flume-kafka.conf # agent a1.sources r1 a1.sinks k1 a1.channels c1 c2# Describe/configure the source a1.sources.r1.type TAILDIR #记录最后监控文件…...
如何让视频有高级感 高级感视频制作方法 高级感视频怎么剪 会声会影视频剪辑制作教程 会声会影中文免费下载
高质量视频通常具有清晰的画面、优质的音频和令人印象深刻的视觉效果。这篇文章来了解如何让视频有高级感,高级感视频制作方法。 一、如何让视频有高级感 要让视频有高级感,要注意以下几个要点: 1、剧本和故事性:一个好的剧本和…...
详解|访问学者申请被拒原因有哪些?
访问学者项目为全球科研人员提供了一个难得的机会,让他们能够跨越国界,深入不同的学术环境,进行学术交流和合作。然而,并非所有申请者都能如愿以偿地获得这一机会。本文将对访问学者申请中常见的被拒原因进行详细解析,…...
[鹤城杯 2021]BabyRSA
题目: from Crypto.Util.number import getPrime, bytes_to_long from secret import flagp getPrime(1024) q getPrime(1024) n p * q e 65537 hint1 p >> 724 hint2 q % (2 ** 265) ct pow(bytes_to_long(flag), e, n) print(hint1) print(hint2) p…...
西安市工业倍增引导基金子基金申报条件流程和材料程序指南(2024年)
一、基本情况 产业投资基金是以产业发展为首要目标,围绕经济社会发展规划和产业发展政策,发挥“有效市场”作用,支持重点领域、重点产业、重点区域(如:全市六大支柱产业、五大新兴产业领域成熟期重点规模以上企业以及“…...
微型丝杆的耐用性和延长使用寿命的关键因素!
无论是机械设备,还是精密传动元件,高精度微型丝杆是各种机械设备中不可或缺的重要组件。它的精度和耐用性直接影响着工作效率和产品品质,在工业技术不断进步的情况下,对微型丝杆的性能要求也越来越高,如何提升微型丝杆…...
音频文件下载后,如何轻松转换格式?
在我们日常的数字生活中,下载各种音频文件是司空见惯的事情。然而,有时候我们可能需要将这些音频文件转换为不同的格式,以适应不同的设备或编辑需求。无论您是希望将下载的音频文件转换为通用的MP3格式,还是需要将其转换为高保真的…...
Intel平台,13600KF+3060Ti,虚拟机安装macOS 14(2024年6月)
距离上次装macOS虚拟机已经有一段时间了,macOS系统现在大版本升级的速度也是越来越快了,由于Office只支持最新三个版本的macOS,所以现在保底也得安装macOS 12了,我这次是用macOS 14做实验,13和12的安装方式和macOS 14一…...
Cookie、Session、Token的关系和区别
关系 Session与Cookie:Session通常依赖于Cookie来工作。当服务器为客户端创建一个Session时,它会在服务器上存储与客户端相关的信息,并将一个唯一的SessionID通过Cookie发送给客户端。客户端在后续的请求中会携带这个Cookie(包含…...
Windows 11 中安装 Docker Desktop 并安装镜像
本该主要介绍在 Windows 11 中安装 Docker Desktop 时的一些准备工作,以及该如何下载和安装,然后分别使用管理界面和 Docker 命令安装两个镜像。 一、准备工作 在 Windows 11 中安装 Docker Desktop 前,需要做一些准备。打开 【Windows 功能…...
深入剖析Java线程池之“newWorkStealingPool“
1. 概述 newWorkStealingPool 是Java 8中引入的一个新型线程池,它基于ForkJoinPool实现,并采用了“工作窃取”(Work-Stealing)算法。这种线程池特别适用于可并行化且计算密集型的任务,能够充分利用多核CPU资源,提高任务执行效率。 2. 工作窃取算法(Work-Stealing Algor…...
《跟我一起学“网络安全”》——安全设备
安全设备 一、安全设备–IDS IDS入侵检测 (1)什么是入侵检测: 入侵检测系统(intrusion detection system,简称“IDS”)是一种对网络传输进行即时监视,在发现可疑传输时发出警报或者采取主动反应措施的网络安全设备。…...
猜测Tomcat如何实现WebSocket协议
一、WebSocket协议的实现 (一)WebSocket是官方的协议接口标准。 (二)如果一门编程语言可以网络连接和并发,就能创建一种WebSocket实现。 (三)同一种编程语言,有不同的协议实现版本和框架。 二、Tomcat实现 在Tomcat容器中实现了对应的WebSocket版本&am…...
uniApp @input事件更改输入框值,值改变了但是页面没更新新的值
<uni-easyinputtype"text"trim"all":inputBorder"false"v-model"customFormData.completePercent"input"(val) > completeOnInput(val)"placeholder"请输入" /> function completeOnInput(val) {let num…...
两行css 实现瀑布流
html <ul ><li><a href"" ><img src"05094532gc6w.jpg" alt"111" /><p>传奇</p></a></li><li><a href"" ><img src"05094532gc6w.jpg" alt"111"…...
Centos7.9部署单节点K8S环境
Centos7.9部署单节点K8S环境 通过Centos extras镜像源安装K8S环境,优点是方便快捷,缺点是版本较低,安装后的版本为1.5.2。 1. 准备工作 关闭selinux [rootlocalhost ~]# cat /etc/selinux/config# This file controls the state of SELin…...
【CV】stable diffusion初步理解
来自gpt-4o Stable diffusion 和DALLE的关系 Stable Diffusion 和 DALL-E 都是生成图像的人工智能模型,但它们有不同的开发背景和技术实现。 Stable Diffusion: 开发者: 由Stability AI开发,并与CompVis和LAION等组织合作。技术: 基于扩散模型…...
足底筋膜炎最好的恢复办法
足底筋膜炎是一种由足底筋膜受到炎症刺激而引起的疼痛和不适的疾病。其典型症状主要包括: 1、足底疼痛:这是足底筋膜炎最常见的症状。疼痛通常位于足跟部位,患者可能感到刺痛或灼热感。尤其在早晨起床或长时间站立后,这种疼痛感会…...
Fiddler抓包工具介绍
下载 下载:Web Debugging Proxy and Troubleshooting Tools|Fiddler 进去要填一个表 汉化版 百度网盘 请输入提取码 提取码:xq9t 下载过附件之后分别把两个文件 点开fiddler就ok了 配置https fiddler要想抓到https包(解密的),点击tools->options勾选三个对…...
知乎号开始运营了,宣传一波
知乎号开始发布一些小说、散文还有诗歌了,欢迎大家多来关注 知乎链接:姜亚轲 每篇小说都改编成网易云音乐,文章中也有链接,我做的词,Suno编曲和演唱,欢迎大家来听听...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...
C++:多态机制详解
目录 一. 多态的概念 1.静态多态(编译时多态) 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1).协变 2).析构函数的重写 5.override 和 final关键字 1&#…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...
Linux 下 DMA 内存映射浅析
序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存,但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程,可以参考这篇文章,我觉得写的非常…...
GAN模式奔溃的探讨论文综述(一)
简介 简介:今天带来一篇关于GAN的,对于模式奔溃的一个探讨的一个问题,帮助大家更好的解决训练中遇到的一个难题。 论文题目:An in-depth review and analysis of mode collapse in GAN 期刊:Machine Learning 链接:...
用 FFmpeg 实现 RTMP 推流直播
RTMP(Real-Time Messaging Protocol) 是直播行业中常用的传输协议。 一般来说,直播服务商会给你: ✅ 一个 RTMP 推流地址(你推视频上去) ✅ 一个 HLS 或 FLV 拉流地址(观众观看用)…...
