(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例
前言
本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:
正文
-
启动Kafka集群,创建first主题
- 启动Kafka集群
- 创建first主题
kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3
- 查看first主题详情
kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first
- 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务
- 创建nc监听的flume任务:job-netcat-flume-kafka.conf
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置 source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop101 a1.sources.r1.port = 1111 # 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置 sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务
- 创建kafka监听的flume任务:job-kafka-flume-console.conf
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置 source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id # 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置 sink a1.sinks.k1.type = logger # 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf
- 启动job-kafka-flume-console.conf任务
bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console
- 在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf
- 启动job-netcat-flume-kafka.conf任务
bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console
- 使用netcat工具发送数据到nc服务1111端口
- 发送nc消息
- 查看结果
- 控制台结果
结语
该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。
相关文章:

(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例
前言 本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产…...

vue3:22、vue-router的使用
import { createRouter, createWebHistory } from vue-router//history模式:createWebHistory //hash模式:createWebHashHistory//vite中的环境变量 import.meta.env.BASE_URL 就是vite.config.js中的base配置项 const router createRouter({history:…...

深入理解JVM虚拟机第五篇:一些常用的JVM虚拟机(二)
文章目录 一:JRockit VM的介绍 二:J9 VM的介绍 三:KVM和CDC/CLDC Hotspot 四:Azul VM的介绍 五:Liquid VM的介绍 六:Apache Harmoney 七:Microsoft JVM 八:Taobao JVM 九&a…...

导数公式及求导法则
目录 基本初等函数的导数公式 求导法则 有理运算法则 复合函数求导法 隐函数求导法 反函数求导法 参数方程求导法 对数求导法 基本初等函数的导数公式 基本初等函数的导数公式包括: C0(x^n)nx^(n-1)(a^x)a^x*lna(e^x)e^x(loga(x))1/(xlna)(lnx)1/x(sinx)cos…...

SpringMVC系列(六)之JSON数据返回以及异常处理机制
目录 前言 一. JSON概述 二. JSON数据返回 1. 导入pom依赖 2. 添加配置文件(spring-mvc.xml) 3. ResponseBody注解使用 4. 效果展示 5. Jackson介绍 三. 全局异常处理 1. 为什么要全局异常处理 2. 异常处理思路 3. 异常处理方式一 4. 异常处…...
民安智库(北京第三方窗口测评)开展汽车消费者焦点小组座谈会调查
民安智库近日开展了一场汽车消费者焦点小组座谈会,旨在深入了解目标消费者对汽车功能的需求和消费习惯,为汽车企业提供有针对性的解决方案。 在焦点小组座谈会中,民安智库公司(第三方市容环境指数测评)邀请了一群具有…...

【CVPR2021】MVDNet论文阅读分析与总结
Challenge: 现有的目标检测器主要融合激光雷达和相机,通常提供丰富和冗余的视觉信息 利用最先进的成像雷达,其分辨率比RadarNet和LiRaNet中使用的分辨率要细得多,提出了一种有效的深度后期融合方法来结合雷达和激光雷达信号。 MV…...

IDEA指定Maven settings file文件未生效
背景:在自己电脑上配置的时候,由于公司项目和我自己的项目的Maven仓库不一致,我就在项目中指定了各自的Maven配置文件。但是我发现公司的项目私有仓库地址IDEA总是识别不到! 俩个配置文件分别是: /Users/sml/Mine/研发…...
swift UI 和UIKIT 如何配合使用
SwiftUI和UIKit可以在同一个iOS应用程序中配合使用。它们是两个不同的用户界面框架,各自有自己的优势和特点。在现实开发中,很多iOS应用程序并不是一开始就完全采用SwiftUI或UIKit,而是根据需要逐步引入SwiftUI或者使用两者共存。 SwiftUI的…...
c语言练习题55:IP 地址⽆效化
IP 地址⽆效化 题⽬描述: 给你⼀个有效的 IPv4 地址 address ,返回这个 IP 地址的⽆效化版本。 所谓⽆效化 IP 地址,其实就是⽤ "[.]" 代替了每个 "."。 • ⽰例 1: 输⼊:address "1.1.1.…...
nvidia-persistenced 常驻
本文地址:blog.lucien.ink/archives/542 发现每次执行 nvidia-smi 都特别慢,发现是需要 nvidia-persistenced 常驻才可以,这个并不会在安装完驱动之后自动配置,需要手动设置一个自启。 cat <<EOF >> /etc/systemd/sy…...
leetcode 42, 58, 14(*)
42. Trapping Rain Water 1.暴力解法(未通过) class Solution { public:int trap(vector<int>& height) {int n height.size();int res 0;for(int i0; i<n; i){int r_max 0, l_max 0;for(int j i; j<n; j)r_max max(r_max, heigh…...

SpringCloud-微服务CAP原则
接上文 SpringCloud-Config配置中心 到此部分即微服务的入门。 总的来说,数据存放的节点数越多,分区容忍性就越高,但要复制更新的次数就越多,一致性就越难保证。同时为了保证一致性,更新所有节点数据所需要的时间就…...

K8S:Yaml文件详解
目录 一.Yaml文件详解 1.Yaml文件格式 2.YAML 语法格式 二.Yaml文件编写及相关概念 1.查看 api 资源版本标签 2.yaml编写案例 (2)Deployment类型编写nginx服务 (3)k8s集群中的port介绍 (5)快速编写yaml文件 …...

机器人连续位姿同步插值轨迹规划—对数四元数、b样条曲线、c2连续位姿同步规划
简介:Smooth orientation planning is benefificial for the working performance and service life of industrial robots, keeping robots from violent impacts and shocks caused by discontinuous orientation planning. Nevertheless, the popular used quate…...

三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析
三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析 三维模型的轻量化压缩是一项技术挑战,特别是在处理复杂的3DTile格式时。下面列举了一些处理过程中可能遇到的常见问题以及相应的处理方法: 模型精度损失:在进行压缩处理时&#x…...

2023-简单点-开启防火墙后,ping显示请求超时;windows共享盘挂在不上
情景描述 树莓派 挂载 windows共享盘 之前一直可以,突然有一天不行了 ping xxxx不通了 一查,或许是服务器被同事开了防火墙,默认关闭了ping的回显 操作: 开启ping回显cmd ping通了,但是挂载还是不行, 显示 dmesg命…...
华为Java工程师面试题
常见问题: 什么是Java虚拟机(JVM)?它与现实中的计算机有什么不同?Java中的基本数据类型有哪些?它们的范围是什么?什么是引用类型?Java中的引用类型有哪些?什么是对象&am…...

大数据Flink(七十四):SQL的滑动窗口(HOP)
文章目录 SQL的滑动窗口(HOP) SQL的滑动窗口(HOP) 滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大…...

Hystrix和Sentinel熔断降级设计理念
目录 1 基本介绍2 Hystrix信号量和线程池区别2.1 信号量模式2.2 线程池模式2.3 注意 3 Sentinel介绍 1 基本介绍 Sentinel 和 Hystrix 的原则是一致的: 当检测到调用链路中某个资源出现不稳定的表现,例如请求响应时间长或异常比例升高的时候,则对这个资源…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
React Native 导航系统实战(React Navigation)
导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...

VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...

Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

MySQL 8.0 OCP 英文题库解析(十三)
Oracle 为庆祝 MySQL 30 周年,截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始,将英文题库免费公布出来,并进行解析,帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...