Kafka集成flume
1.flume作为生产者集成Kafka
kafka作为flume的sink,扮演消费者角色
1.1 flume配置文件
vim $kafka/jobs/flume-kafka.conf
# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = TAILDIR
#记录最后监控文件的断点的文件,此文件位置可不改
a1.sources.r1.positionFile = /export/server/flume/job/data/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /export/server/flume/job/data/.*file.*
a1.sources.r1.filegroups.f2 =/export/server/flume/job/data/.*log.*# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = customers
a1.sinks.k1.kafka.bootstrap.servers =node1:9092,node2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.2开启flume监控
flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume.conf
1.3开启Kafka消费者
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers --from-beginning
1.4生产数据
往被监控文件输入数据
[ljr@node1 data]$echo hello >>file2.txt
[ljr@node1 data]$ echo ============== >>file2.txt
查看Kafka消费者

可见Kafka集成flume生产者成功。
2.flume作为消费者集成Kafka
kafka作为flume的source,扮演生产者角色
2.1flume配置文件
vim $kafka/jobs/flume-kafka.conf
# agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#注意不要大于channel transactionCapacity的值100
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers =node1:9092, node1:9092
a1.sources.r1.kafka.topics = consumers
a1.sources.r1.kafka.consumer.group.id = custom.g.id# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#注意transactionCapacity的值不要小于sources batchSize的值50
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2开启flume监控
flume-ng agent -n a1 -c conf/ -f /export/server/kafka/jobs/kafka-flume1.conf
2.3开启Kafka生产者并生产数据
kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092 --topic consumers

查看flume监控台

可见Kafka集成flume消费者成功。
相关文章:
Kafka集成flume
1.flume作为生产者集成Kafka kafka作为flume的sink,扮演消费者角色 1.1 flume配置文件 vim $kafka/jobs/flume-kafka.conf # agent a1.sources r1 a1.sinks k1 a1.channels c1 c2# Describe/configure the source a1.sources.r1.type TAILDIR #记录最后监控文件…...
如何让视频有高级感 高级感视频制作方法 高级感视频怎么剪 会声会影视频剪辑制作教程 会声会影中文免费下载
高质量视频通常具有清晰的画面、优质的音频和令人印象深刻的视觉效果。这篇文章来了解如何让视频有高级感,高级感视频制作方法。 一、如何让视频有高级感 要让视频有高级感,要注意以下几个要点: 1、剧本和故事性:一个好的剧本和…...
详解|访问学者申请被拒原因有哪些?
访问学者项目为全球科研人员提供了一个难得的机会,让他们能够跨越国界,深入不同的学术环境,进行学术交流和合作。然而,并非所有申请者都能如愿以偿地获得这一机会。本文将对访问学者申请中常见的被拒原因进行详细解析,…...
[鹤城杯 2021]BabyRSA
题目: from Crypto.Util.number import getPrime, bytes_to_long from secret import flagp getPrime(1024) q getPrime(1024) n p * q e 65537 hint1 p >> 724 hint2 q % (2 ** 265) ct pow(bytes_to_long(flag), e, n) print(hint1) print(hint2) p…...
西安市工业倍增引导基金子基金申报条件流程和材料程序指南(2024年)
一、基本情况 产业投资基金是以产业发展为首要目标,围绕经济社会发展规划和产业发展政策,发挥“有效市场”作用,支持重点领域、重点产业、重点区域(如:全市六大支柱产业、五大新兴产业领域成熟期重点规模以上企业以及“…...
微型丝杆的耐用性和延长使用寿命的关键因素!
无论是机械设备,还是精密传动元件,高精度微型丝杆是各种机械设备中不可或缺的重要组件。它的精度和耐用性直接影响着工作效率和产品品质,在工业技术不断进步的情况下,对微型丝杆的性能要求也越来越高,如何提升微型丝杆…...
音频文件下载后,如何轻松转换格式?
在我们日常的数字生活中,下载各种音频文件是司空见惯的事情。然而,有时候我们可能需要将这些音频文件转换为不同的格式,以适应不同的设备或编辑需求。无论您是希望将下载的音频文件转换为通用的MP3格式,还是需要将其转换为高保真的…...
Intel平台,13600KF+3060Ti,虚拟机安装macOS 14(2024年6月)
距离上次装macOS虚拟机已经有一段时间了,macOS系统现在大版本升级的速度也是越来越快了,由于Office只支持最新三个版本的macOS,所以现在保底也得安装macOS 12了,我这次是用macOS 14做实验,13和12的安装方式和macOS 14一…...
Cookie、Session、Token的关系和区别
关系 Session与Cookie:Session通常依赖于Cookie来工作。当服务器为客户端创建一个Session时,它会在服务器上存储与客户端相关的信息,并将一个唯一的SessionID通过Cookie发送给客户端。客户端在后续的请求中会携带这个Cookie(包含…...
Windows 11 中安装 Docker Desktop 并安装镜像
本该主要介绍在 Windows 11 中安装 Docker Desktop 时的一些准备工作,以及该如何下载和安装,然后分别使用管理界面和 Docker 命令安装两个镜像。 一、准备工作 在 Windows 11 中安装 Docker Desktop 前,需要做一些准备。打开 【Windows 功能…...
深入剖析Java线程池之“newWorkStealingPool“
1. 概述 newWorkStealingPool 是Java 8中引入的一个新型线程池,它基于ForkJoinPool实现,并采用了“工作窃取”(Work-Stealing)算法。这种线程池特别适用于可并行化且计算密集型的任务,能够充分利用多核CPU资源,提高任务执行效率。 2. 工作窃取算法(Work-Stealing Algor…...
《跟我一起学“网络安全”》——安全设备
安全设备 一、安全设备–IDS IDS入侵检测 (1)什么是入侵检测: 入侵检测系统(intrusion detection system,简称“IDS”)是一种对网络传输进行即时监视,在发现可疑传输时发出警报或者采取主动反应措施的网络安全设备。…...
猜测Tomcat如何实现WebSocket协议
一、WebSocket协议的实现 (一)WebSocket是官方的协议接口标准。 (二)如果一门编程语言可以网络连接和并发,就能创建一种WebSocket实现。 (三)同一种编程语言,有不同的协议实现版本和框架。 二、Tomcat实现 在Tomcat容器中实现了对应的WebSocket版本&am…...
uniApp @input事件更改输入框值,值改变了但是页面没更新新的值
<uni-easyinputtype"text"trim"all":inputBorder"false"v-model"customFormData.completePercent"input"(val) > completeOnInput(val)"placeholder"请输入" /> function completeOnInput(val) {let num…...
两行css 实现瀑布流
html <ul ><li><a href"" ><img src"05094532gc6w.jpg" alt"111" /><p>传奇</p></a></li><li><a href"" ><img src"05094532gc6w.jpg" alt"111"…...
Centos7.9部署单节点K8S环境
Centos7.9部署单节点K8S环境 通过Centos extras镜像源安装K8S环境,优点是方便快捷,缺点是版本较低,安装后的版本为1.5.2。 1. 准备工作 关闭selinux [rootlocalhost ~]# cat /etc/selinux/config# This file controls the state of SELin…...
【CV】stable diffusion初步理解
来自gpt-4o Stable diffusion 和DALLE的关系 Stable Diffusion 和 DALL-E 都是生成图像的人工智能模型,但它们有不同的开发背景和技术实现。 Stable Diffusion: 开发者: 由Stability AI开发,并与CompVis和LAION等组织合作。技术: 基于扩散模型…...
足底筋膜炎最好的恢复办法
足底筋膜炎是一种由足底筋膜受到炎症刺激而引起的疼痛和不适的疾病。其典型症状主要包括: 1、足底疼痛:这是足底筋膜炎最常见的症状。疼痛通常位于足跟部位,患者可能感到刺痛或灼热感。尤其在早晨起床或长时间站立后,这种疼痛感会…...
Fiddler抓包工具介绍
下载 下载:Web Debugging Proxy and Troubleshooting Tools|Fiddler 进去要填一个表 汉化版 百度网盘 请输入提取码 提取码:xq9t 下载过附件之后分别把两个文件 点开fiddler就ok了 配置https fiddler要想抓到https包(解密的),点击tools->options勾选三个对…...
知乎号开始运营了,宣传一波
知乎号开始发布一些小说、散文还有诗歌了,欢迎大家多来关注 知乎链接:姜亚轲 每篇小说都改编成网易云音乐,文章中也有链接,我做的词,Suno编曲和演唱,欢迎大家来听听...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
HTML 列表、表格、表单
1 列表标签 作用:布局内容排列整齐的区域 列表分类:无序列表、有序列表、定义列表。 例如: 1.1 无序列表 标签:ul 嵌套 li,ul是无序列表,li是列表条目。 注意事项: ul 标签里面只能包裹 li…...
页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
JVM虚拟机:内存结构、垃圾回收、性能优化
1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...
基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
实战设计模式之模板方法模式
概述 模板方法模式定义了一个操作中的算法骨架,并将某些步骤延迟到子类中实现。模板方法使得子类可以在不改变算法结构的前提下,重新定义算法中的某些步骤。简单来说,就是在一个方法中定义了要执行的步骤顺序或算法框架,但允许子类…...
