Kafka集成flume
1.flume作为生产者集成Kafka
kafka作为flume的sink,扮演消费者角色
1.1 flume配置文件
vim $kafka/jobs/flume-kafka.conf
# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = TAILDIR
#记录最后监控文件的断点的文件,此文件位置可不改
a1.sources.r1.positionFile = /export/server/flume/job/data/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /export/server/flume/job/data/.*file.*
a1.sources.r1.filegroups.f2 =/export/server/flume/job/data/.*log.*# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = customers
a1.sinks.k1.kafka.bootstrap.servers =node1:9092,node2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.2开启flume监控
flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume.conf
1.3开启Kafka消费者
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers --from-beginning
1.4生产数据
往被监控文件输入数据
[ljr@node1 data]$echo hello >>file2.txt
[ljr@node1 data]$ echo ============== >>file2.txt
查看Kafka消费者

可见Kafka集成flume生产者成功。
2.flume作为消费者集成Kafka
kafka作为flume的source,扮演生产者角色
2.1flume配置文件
vim $kafka/jobs/flume-kafka.conf
# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#注意不要大于channel transactionCapacity的值100
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers =node1:9092, node1:9092
a1.sources.r1.kafka.topics = consumers
a1.sources.r1.kafka.consumer.group.id = custom.g.id# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#注意transactionCapacity的值不要小于sources batchSize的值50
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2开启flume监控
flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume1.conf
2.3开启Kafka生产者并生产数据
kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers

查看flume监控台

可见Kafka集成flume消费者成功。
相关文章:
Kafka集成flume
1.flume作为生产者集成Kafka kafka作为flume的sink,扮演消费者角色 1.1 flume配置文件 vim $kafka/jobs/flume-kafka.conf # agent a1.sources r1 a1.sinks k1 a1.channels c1 c2# Describe/configure the source a1.sources.r1.type TAILDIR #记录最后监控文件…...
如何让视频有高级感 高级感视频制作方法 高级感视频怎么剪 会声会影视频剪辑制作教程 会声会影中文免费下载
高质量视频通常具有清晰的画面、优质的音频和令人印象深刻的视觉效果。这篇文章来了解如何让视频有高级感,高级感视频制作方法。 一、如何让视频有高级感 要让视频有高级感,要注意以下几个要点: 1、剧本和故事性:一个好的剧本和…...
详解|访问学者申请被拒原因有哪些?
访问学者项目为全球科研人员提供了一个难得的机会,让他们能够跨越国界,深入不同的学术环境,进行学术交流和合作。然而,并非所有申请者都能如愿以偿地获得这一机会。本文将对访问学者申请中常见的被拒原因进行详细解析,…...
[鹤城杯 2021]BabyRSA
题目: from Crypto.Util.number import getPrime, bytes_to_long from secret import flagp getPrime(1024) q getPrime(1024) n p * q e 65537 hint1 p >> 724 hint2 q % (2 ** 265) ct pow(bytes_to_long(flag), e, n) print(hint1) print(hint2) p…...
西安市工业倍增引导基金子基金申报条件流程和材料程序指南(2024年)
一、基本情况 产业投资基金是以产业发展为首要目标,围绕经济社会发展规划和产业发展政策,发挥“有效市场”作用,支持重点领域、重点产业、重点区域(如:全市六大支柱产业、五大新兴产业领域成熟期重点规模以上企业以及“…...
微型丝杆的耐用性和延长使用寿命的关键因素!
无论是机械设备,还是精密传动元件,高精度微型丝杆是各种机械设备中不可或缺的重要组件。它的精度和耐用性直接影响着工作效率和产品品质,在工业技术不断进步的情况下,对微型丝杆的性能要求也越来越高,如何提升微型丝杆…...
音频文件下载后,如何轻松转换格式?
在我们日常的数字生活中,下载各种音频文件是司空见惯的事情。然而,有时候我们可能需要将这些音频文件转换为不同的格式,以适应不同的设备或编辑需求。无论您是希望将下载的音频文件转换为通用的MP3格式,还是需要将其转换为高保真的…...
Intel平台,13600KF+3060Ti,虚拟机安装macOS 14(2024年6月)
距离上次装macOS虚拟机已经有一段时间了,macOS系统现在大版本升级的速度也是越来越快了,由于Office只支持最新三个版本的macOS,所以现在保底也得安装macOS 12了,我这次是用macOS 14做实验,13和12的安装方式和macOS 14一…...
Cookie、Session、Token的关系和区别
关系 Session与Cookie:Session通常依赖于Cookie来工作。当服务器为客户端创建一个Session时,它会在服务器上存储与客户端相关的信息,并将一个唯一的SessionID通过Cookie发送给客户端。客户端在后续的请求中会携带这个Cookie(包含…...
Windows 11 中安装 Docker Desktop 并安装镜像
本该主要介绍在 Windows 11 中安装 Docker Desktop 时的一些准备工作,以及该如何下载和安装,然后分别使用管理界面和 Docker 命令安装两个镜像。 一、准备工作 在 Windows 11 中安装 Docker Desktop 前,需要做一些准备。打开 【Windows 功能…...
深入剖析Java线程池之“newWorkStealingPool“
1. 概述 newWorkStealingPool 是Java 8中引入的一个新型线程池,它基于ForkJoinPool实现,并采用了“工作窃取”(Work-Stealing)算法。这种线程池特别适用于可并行化且计算密集型的任务,能够充分利用多核CPU资源,提高任务执行效率。 2. 工作窃取算法(Work-Stealing Algor…...
《跟我一起学“网络安全”》——安全设备
安全设备 一、安全设备–IDS IDS入侵检测 (1)什么是入侵检测: 入侵检测系统(intrusion detection system,简称“IDS”)是一种对网络传输进行即时监视,在发现可疑传输时发出警报或者采取主动反应措施的网络安全设备。…...
猜测Tomcat如何实现WebSocket协议
一、WebSocket协议的实现 (一)WebSocket是官方的协议接口标准。 (二)如果一门编程语言可以网络连接和并发,就能创建一种WebSocket实现。 (三)同一种编程语言,有不同的协议实现版本和框架。 二、Tomcat实现 在Tomcat容器中实现了对应的WebSocket版本&am…...
uniApp @input事件更改输入框值,值改变了但是页面没更新新的值
<uni-easyinputtype"text"trim"all":inputBorder"false"v-model"customFormData.completePercent"input"(val) > completeOnInput(val)"placeholder"请输入" /> function completeOnInput(val) {let num…...
两行css 实现瀑布流
html <ul ><li><a href"" ><img src"05094532gc6w.jpg" alt"111" /><p>传奇</p></a></li><li><a href"" ><img src"05094532gc6w.jpg" alt"111"…...
Centos7.9部署单节点K8S环境
Centos7.9部署单节点K8S环境 通过Centos extras镜像源安装K8S环境,优点是方便快捷,缺点是版本较低,安装后的版本为1.5.2。 1. 准备工作 关闭selinux [rootlocalhost ~]# cat /etc/selinux/config# This file controls the state of SELin…...
【CV】stable diffusion初步理解
来自gpt-4o Stable diffusion 和DALLE的关系 Stable Diffusion 和 DALL-E 都是生成图像的人工智能模型,但它们有不同的开发背景和技术实现。 Stable Diffusion: 开发者: 由Stability AI开发,并与CompVis和LAION等组织合作。技术: 基于扩散模型…...
足底筋膜炎最好的恢复办法
足底筋膜炎是一种由足底筋膜受到炎症刺激而引起的疼痛和不适的疾病。其典型症状主要包括: 1、足底疼痛:这是足底筋膜炎最常见的症状。疼痛通常位于足跟部位,患者可能感到刺痛或灼热感。尤其在早晨起床或长时间站立后,这种疼痛感会…...
Fiddler抓包工具介绍
下载 下载:Web Debugging Proxy and Troubleshooting Tools|Fiddler 进去要填一个表 汉化版 百度网盘 请输入提取码 提取码:xq9t 下载过附件之后分别把两个文件 点开fiddler就ok了 配置https fiddler要想抓到https包(解密的),点击tools->options勾选三个对…...
知乎号开始运营了,宣传一波
知乎号开始发布一些小说、散文还有诗歌了,欢迎大家多来关注 知乎链接:姜亚轲 每篇小说都改编成网易云音乐,文章中也有链接,我做的词,Suno编曲和演唱,欢迎大家来听听...
别再折腾了!STM32CubeMX+Keil 5+Proteus 8.9保姆级联调配置,一次搞定
STM32开发环境联调实战:从零搭建CubeMXKeilProteus高效工作流 第一次接触STM32开发时,我被各种工具链的配置折磨得焦头烂额——CubeMX生成的工程在Keil里报错、Proteus仿真时芯片毫无反应、Debug选项神秘消失...如果你也经历过这种绝望,这篇文…...
Arm CoreLink CMN-600硬件错误解析与解决方案
1. Arm CoreLink CMN-600硬件错误深度解析在复杂SoC设计中,互连架构的质量直接决定整个系统的稳定性和性能。作为Arm Neoverse平台的核心组件,CoreLink CMN-600(Coherent Mesh Network)承担着处理器集群、内存控制器和I/O设备之间…...
从零到一:Lmbench 性能测试实战与结果深度解读
1. 为什么你需要Lmbench性能测试 第一次听说Lmbench时,我也和大多数新手一样困惑:系统性能测试工具那么多,为什么非要选这个老古董?直到在服务器部署项目时连续遇到三次性能瓶颈,我才真正理解它的价值。那次我们用某款…...
如何为iOS 14.0-16.6.1设备安装TrollStore:TrollInstallerX完整指南
如何为iOS 14.0-16.6.1设备安装TrollStore:TrollInstallerX完整指南 【免费下载链接】TrollInstallerX A TrollStore installer for iOS 14.0 - 16.6.1 项目地址: https://gitcode.com/gh_mirrors/tr/TrollInstallerX 如果你正在寻找一种可靠且简单的方法在i…...
【云原生问题集】容器内存监控避坑:90%工程师踩过的“free命令雷区”
你有没有遇到过这种怪事?压测跑得好好的,容器突然被 OOM Kill 了。你赶紧进容器敲了个 free -h,一看内存快吃满了,心想“资源不够,加!” 加完内存,跑一会儿又被杀了。坑爹的是,你明明…...
为AI智能体注入人类洞察:用户研究技能全链路实践指南
1. 项目概述:为AI智能体注入“人类洞察层”如果你正在构建或使用AI智能体,无论是Claude Code、Cursor还是其他基于代码的智能助手,你可能会发现一个核心瓶颈:这些智能体虽然能处理代码、分析数据,但在涉及产品决策、功…...
Google Docs接入Gemini后,这6类高频写作场景效率飙升210%(附可复制Prompt库)
更多请点击: https://intelliparadigm.com 第一章:Gemini深度集成Google Docs的底层机制解析 Gemini 与 Google Docs 的深度集成并非简单的 API 调用叠加,而是依托 Google 的统一 AI 基础设施(AISI)和文档实时协作协议…...
7个DevPod自动化脚本技巧:批量操作工作空间的终极指南
7个DevPod自动化脚本技巧:批量操作工作空间的终极指南 【免费下载链接】devpod Codespaces but open-source, client-only and unopinionated: Works with any IDE and lets you use any cloud, kubernetes or just localhost docker. 项目地址: https://gitcode.…...
a16n:实现AI编程助手配置可移植性的插件化转换工具
1. 项目概述:AI编程助手配置的“翻译官”如果你和我一样,同时在使用 Cursor 和 Claude Code 这类 AI 编程工具,那你一定遇到过这个痛点:好不容易在 Cursor 里调教好了一套完美的.cursorrules文件,定义了代码风格、项目…...
解决Modelsim SE 10.6c仿真Vivado 2019乘法器IP核的“.vhd only”难题(附完整脚本)
解决Modelsim SE 10.6c仿真Vivado 2019乘法器IP核的“.vhd only”难题(附完整脚本) 在FPGA设计流程中,Xilinx Vivado与Mentor Modelsim的组合是许多工程师的首选工具链。但当Vivado 2019生成的乘法器IP核仅提供VHDL接口文件(.vhd)时ÿ…...
