【大数据之Kafka】十六、Kafka集成外部系统之集成Flume
Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。

Flume安装和部署:https://blog.csdn.net/qq_18625571/article/details/131678589?spm=1001.2014.3001.5501
1 Flume生产者

(1)在hadoop102启动Kafka集群。
zk.sh start
kf.sh start
(2)在hadoop103启动Kafka消费者。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first
(3)在hadoop102上安装Flume:https://blog.csdn.net/qq_18625571/article/details/131678589?spm=1001.2014.3001.5501 第一章Flume安装部署。
(4)在/opt/module/flume-1.9.0/job目录下创建配置文件file_to_kafka.conf。
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(5)启动Flume
bin/flume-ng agent -c conf/ -n a1 -f job/file_to_kafka.conf
(6)向/opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况。
mkdir applog
echo hello >> /opt/module/applog/app.log
(7)观察 kafka 消费者,能够看到消费的 hello 数据。
2 Flume消费者

(1)在 hadoop102 节点的 Flume 的/opt/module/flume/job 目录下创建 kafka_to_file.conf。
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 4 配置 sink
a1.sinks.k1.type = logger# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)启动 Flume。
bin/flume-ng agent -c conf/ -n a1 -f job/kafka_to_file.conf -Dflume.root.logger=INFO,console
(3)启动 kafka 生产者,并输入数据,例如:hello world
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
(4)观察控制台输出的日志
相关文章:
【大数据之Kafka】十六、Kafka集成外部系统之集成Flume
Flume 是一个在大数据开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。 Flume安装和部署:https://blog.csdn.net/qq_18625571/article/details/131678589?spm1001.2014.3001.5501 1 Flume生产者 (1)…...
java学习--day3 (运算符、if循环、switch-case结构)
文章目录 今天的内容1.运算符1.1关系运算符1.2逻辑运算符1.3逻辑运算符的短路原则 2.分支结构【重点】2.1if分支2.2if-else分支2.3if-else的嵌套写法2.4if-else if 分支结构2.5swicth-case结构 扩展知识点 1.八大基本数据类型整型: byte short int long浮点: float double字…...
ActiveMQ、RabbitMQ、RocketMQ、Kafka区别
一、消息中间件的使用场景 消息中间件的使用场景总结就是六个字:解耦、异步、削峰 1.解耦 如果我方系统A要与三方B系统进行数据对接,推送系统人员信息,通常我们会使用接口开发来进行。但是如果运维期间B系统进行了调整,或者推送…...
csp初赛总结 那些年编程走过的坑 初高中信竞常考语法算法点
😘个人主页:曲终酣兴晚的小书屋💖 😕作者介绍:一个莽莽撞撞的🐻 💖专栏介绍:日常生活&往事回忆 😶🌫️每日金句:祝大家心有山水不造作&…...
DollarTree(美元树)验厂需要注意哪些方面?
【DollarTree(美元树)验厂需要注意哪些方面?】 美元树(Dollar tree),是美国的一元店。每件商品都只卖一美元,吃的、用的和玩的应有尽有。美元树在美国共拥有4900家门店,其中一半的连…...
vector使用和模拟实现
💓博主个人主页:不是笨小孩👀 ⏩专栏分类:数据结构与算法👀 C👀 刷题专栏👀 C语言👀 🚚代码仓库:笨小孩的代码库👀 ⏩社区:不是笨小孩👀 🌹欢迎大…...
token登录的实现
token登录的实现 我这种token只是简单的实现token,就是后端利用UUID 生成简单随机码,利用随机码作为在Redis中的键,然后存储的用户信息作为值,在每次合理请求的时候对token的有效时间进行刷新(利用拦截器)&…...
GO语言从入门到实战-Go语言课程介绍
为什么选择 Go 语言来完成这么大一个项目呢?我们不妨回到 Go 语言的源头看一看。 Go 语言的初步设想始于 2007 年,当时 Go 语言的三位创始人是想通过开发一种新型的语言来解决 Google 在软件开发中面临的问题: 多核硬件架构;超大…...
七天学会C语言-第六天(指针)
1.指针变量与普通变量 指针变量与普通变量是C语言中的两种不同类型的变量,它们有一些重要的区别和联系。 普通变量是一种存储数据的容器,可以直接存储和访问数据的值。: int num 10; // 定义一个整数型普通变量num,赋值为10在例…...
2023年腾讯云轻量服务器测评:16核 32G 28M 配置CPU测试
腾讯云轻量应用服务器16核32G28M配置优惠价3468元15个月(支持免费续3个月/送同配置3个月),轻量应用服务器具有100%CPU性能,系统盘为380GB SSD盘,28M带宽下载速度3584KB/秒,月流量6000GB,折合每天…...
macos (M2芯片)搭建flutter环境
安装的版本3.13.4、电脑上没有安装过android studio、安装过brew 1.在终端运行sudo softwareupdate --install-rosetta --agree-to-license,下图展示安装成功的效果 2.下载以下安装包来获取最新的 stable Flutter SDK 3.解压,⚠️注意下载安装sdk的包名…...
Xilinx FPGA未使用管脚上下拉状态配置(ISE和Vivado环境)
文章目录 ISE开发环境Vivado开发环境方式1:XDC文件约束方式2:生成选项配置 ISE开发环境 ISE开发环境,可在如下Bit流文件生成选项中配置。 右键点击Generate Programming File,选择Process Properties, 在弹出的窗口选…...
数据结构---链表(java)
目录 1. 链表 2. 创建Node 3. 增加 4. 获取元素 5. 删除 6. 遍历链表 7. 查找元素是否存在 8. 链栈的实现 9. 链队的实现 1. 链表 数据存放在"Node"结点中 优点:不用考虑扩容和缩容的问题,实现了动态存储数据 缺点:没有…...
Qt --- Day02
实现效果: 点击登录,检验用户密码是否正确,正确则弹出消息框,点击ok转到另一个页面 不正确跳出错误消息框,默认选线为Cancel,点击Yes继续登录 点击Cancel跳出问题消息框,默认选项No,…...
Redis 集合(Set)快速指南 | Navicat
Redis 支持通过多种数据类型来存储项目集合。其中,包括列表、集合和哈希。上周的博文介绍了列表(List)数据类型并重点介绍了一些用于管理列表(List)的主要命令。在今天的文章中,我们将转向关注集合…...
【华为云云耀云服务器L实例评测】- 云原生实践,快捷部署人才招聘平台容器化技术方案!
🤵♂️ 个人主页: AI_magician 📡主页地址: 作者简介:CSDN内容合伙人,全栈领域优质创作者。 👨💻景愿:旨在于能和更多的热爱计算机的伙伴一起成长!!&…...
【Java】泛型 之 什么是泛型
什么是泛型 泛型是一种“代码模板”,可以用一套代码套用各种类型。 在讲解什么是泛型之前,我们先观察Java标准库提供的ArrayList,它可以看作“可变长度”的数组,因为用起来比数组更方便。 实际上ArrayList内部就是一个Object[]…...
Python yaml 详解
文章目录 1 概述1.1 特点1.2 导入 2 对象2.1 字典2.2 数组2.3 复合结构 3 操作3.1 读取3.2 写入 1 概述 1.1 特点 yaml 文件是一种数据序列化语言,广泛用于配置文件、日志文件等特点: ① 大小写敏感。② 使用缩进表示层级关系。缩进时不允许使用 Tab 键…...
RabbitMQ消息可靠性(二)-- 消费者消息确认
一、消费者消息确认是什么? 在这种机制下,消费者在接收到消息后,需要向 RabbitMQ 发送确认信息,告知 RabbitMQ 已经接收到该消息,并已经处理完毕。如果 RabbitMQ 没有接收到确认信息,则会将该消息重新加入…...
【python第7课 实例,类】
文章目录 一、实例1.1实例的变量1.2实例方法1.3 构造方法1.4析构函数1.4预置实例属性: 二,类1.1类变量1.2类方法1.3静态方法1.4类属性的增删改查 一、实例 1.1实例的变量 使用示例 class dog:def __init__(self,k,c,a):self.kinds kself.color csel…...
DeepSeek代码能力实测:3大编程范式通过率对比,92.7%准确率背后的5个隐藏陷阱
更多请点击: https://intelliparadigm.com 第一章:DeepSeek HumanEval测试全景概览 HumanEval 是由 OpenAI 提出的函数级代码生成基准测试集,包含 164 道 Python 编程题,每道题提供函数签名、文档字符串(docstring&am…...
Turms开发者定制指南:如何基于源码进行二次开发
Turms开发者定制指南:如何基于源码进行二次开发 【免费下载链接】turms 🕊️ The worlds most advanced open source instant messaging engine for 100K~10M concurrent users https://turms-im.github.io/docs 项目地址: https://gitcode.com/gh_mir…...
AI 驱动单元测试生成:智能优先级与自动化验证实践
1. 项目概述如果你和我一样,长期在维护一个中大型的 TypeScript 项目,那么“补单元测试”这件事,大概率是你技术债清单上那个永远在滚动、却很少被真正划掉的任务。手动写测试枯燥耗时,尤其是面对那些遗留的、逻辑复杂的业务函数时…...
Starter计划配额耗尽预警失效?我们逆向解析其API响应头,发现3个未文档化的速率控制暗门
更多请点击: https://intelliparadigm.com 第一章:Starter计划配额耗尽预警失效?我们逆向解析其API响应头,发现3个未文档化的速率控制暗门 在对 Starter 计划的 API 调用行为进行深度监控时,我们观察到配额耗尽告警频…...
[具身智能-670]:ROS2 Node内部的工作原理:rclpy.init()、node = MyNode() 、rclpy.spin(node)
一、三个函数的一句话功能rclpy.init()初始化 ROS2 全局系统(上下文、信号处理、DDS)。node MyNode()创建节点对象,注册名字,分配通信句柄,不创建线程。rclpy.spin(node)进入主线程死循环,不断检查消息 / …...
机器学习模型安全防护与TEE技术实践
1. 机器学习模型安全与完整性挑战概述 在金融风控、医疗诊断等关键领域,机器学习模型的安全与完整性已成为AI落地的首要考量。过去三年中,恶意数据投毒攻击增长了近300%,而预训练模型供应链中的安全漏洞更是导致了多起重大数据泄露事件。这些…...
Nintendo Switch游戏安装终极指南:3种方法解决所有格式兼容问题
Nintendo Switch游戏安装终极指南:3种方法解决所有格式兼容问题 【免费下载链接】Awoo-Installer A No-Bullshit NSP, NSZ, XCI, and XCZ Installer for Nintendo Switch 项目地址: https://gitcode.com/gh_mirrors/aw/Awoo-Installer 还在为Nintendo Switch…...
Jeandle:基于LLVM的Java JIT编译器架构解析与实战
1. 项目概述与核心价值最近在Java性能优化这个老生常谈的话题里,我又看到了一个新面孔——Jeandle。简单来说,这是一个基于OpenJDK和LLVM构建的Java即时编译器。如果你对JVM的JIT(Just-in-Time Compilation)机制有所了解ÿ…...
利用Taotoken模型广场为不同AI应用场景挑选合适模型
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 利用Taotoken模型广场为不同AI应用场景挑选合适模型 面对文本生成、代码审查、智能对话、翻译等多样化的AI应用场景,如…...
Java SE 与 Spring Boot 在电商场景中的应用
面试:Java SE 与 Spring Boot 在电商场景中的应用 今天,我们将围绕一位求职者在一家电商公司的面试场景,与面试官进行一场激烈的技术问答。第一轮提问 面试官: 首先,请你简单介绍一下 JVM 的工作原理。 燕双非…...
