(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例
前言
本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:

正文
-
启动Kafka集群,创建first主题
- 启动Kafka集群
- 创建first主题
kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3
- 查看first主题详情
kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first
- 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务
- 创建nc监听的flume任务:job-netcat-flume-kafka.conf
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置 source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop101 a1.sources.r1.port = 1111 # 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置 sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务
- 创建kafka监听的flume任务:job-kafka-flume-console.conf
# 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置 source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 50 a1.sources.r1.batchDurationMillis = 200 a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.topics = first a1.sources.r1.kafka.consumer.group.id = custom.g.id # 3 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置 sink a1.sinks.k1.type = logger # 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf
- 启动job-kafka-flume-console.conf任务
bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console
- 在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf
- 启动job-netcat-flume-kafka.conf任务
bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console
- 使用netcat工具发送数据到nc服务1111端口
- 发送nc消息
- 查看结果
- 控制台结果
结语
该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。
相关文章:
(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例
前言 本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产…...
vue3:22、vue-router的使用
import { createRouter, createWebHistory } from vue-router//history模式:createWebHistory //hash模式:createWebHashHistory//vite中的环境变量 import.meta.env.BASE_URL 就是vite.config.js中的base配置项 const router createRouter({history:…...
深入理解JVM虚拟机第五篇:一些常用的JVM虚拟机(二)
文章目录 一:JRockit VM的介绍 二:J9 VM的介绍 三:KVM和CDC/CLDC Hotspot 四:Azul VM的介绍 五:Liquid VM的介绍 六:Apache Harmoney 七:Microsoft JVM 八:Taobao JVM 九&a…...
导数公式及求导法则
目录 基本初等函数的导数公式 求导法则 有理运算法则 复合函数求导法 隐函数求导法 反函数求导法 参数方程求导法 对数求导法 基本初等函数的导数公式 基本初等函数的导数公式包括: C0(x^n)nx^(n-1)(a^x)a^x*lna(e^x)e^x(loga(x))1/(xlna)(lnx)1/x(sinx)cos…...
SpringMVC系列(六)之JSON数据返回以及异常处理机制
目录 前言 一. JSON概述 二. JSON数据返回 1. 导入pom依赖 2. 添加配置文件(spring-mvc.xml) 3. ResponseBody注解使用 4. 效果展示 5. Jackson介绍 三. 全局异常处理 1. 为什么要全局异常处理 2. 异常处理思路 3. 异常处理方式一 4. 异常处…...
民安智库(北京第三方窗口测评)开展汽车消费者焦点小组座谈会调查
民安智库近日开展了一场汽车消费者焦点小组座谈会,旨在深入了解目标消费者对汽车功能的需求和消费习惯,为汽车企业提供有针对性的解决方案。 在焦点小组座谈会中,民安智库公司(第三方市容环境指数测评)邀请了一群具有…...
【CVPR2021】MVDNet论文阅读分析与总结
Challenge: 现有的目标检测器主要融合激光雷达和相机,通常提供丰富和冗余的视觉信息 利用最先进的成像雷达,其分辨率比RadarNet和LiRaNet中使用的分辨率要细得多,提出了一种有效的深度后期融合方法来结合雷达和激光雷达信号。 MV…...
IDEA指定Maven settings file文件未生效
背景:在自己电脑上配置的时候,由于公司项目和我自己的项目的Maven仓库不一致,我就在项目中指定了各自的Maven配置文件。但是我发现公司的项目私有仓库地址IDEA总是识别不到! 俩个配置文件分别是: /Users/sml/Mine/研发…...
swift UI 和UIKIT 如何配合使用
SwiftUI和UIKit可以在同一个iOS应用程序中配合使用。它们是两个不同的用户界面框架,各自有自己的优势和特点。在现实开发中,很多iOS应用程序并不是一开始就完全采用SwiftUI或UIKit,而是根据需要逐步引入SwiftUI或者使用两者共存。 SwiftUI的…...
c语言练习题55:IP 地址⽆效化
IP 地址⽆效化 题⽬描述: 给你⼀个有效的 IPv4 地址 address ,返回这个 IP 地址的⽆效化版本。 所谓⽆效化 IP 地址,其实就是⽤ "[.]" 代替了每个 "."。 • ⽰例 1: 输⼊:address "1.1.1.…...
nvidia-persistenced 常驻
本文地址:blog.lucien.ink/archives/542 发现每次执行 nvidia-smi 都特别慢,发现是需要 nvidia-persistenced 常驻才可以,这个并不会在安装完驱动之后自动配置,需要手动设置一个自启。 cat <<EOF >> /etc/systemd/sy…...
leetcode 42, 58, 14(*)
42. Trapping Rain Water 1.暴力解法(未通过) class Solution { public:int trap(vector<int>& height) {int n height.size();int res 0;for(int i0; i<n; i){int r_max 0, l_max 0;for(int j i; j<n; j)r_max max(r_max, heigh…...
SpringCloud-微服务CAP原则
接上文 SpringCloud-Config配置中心 到此部分即微服务的入门。 总的来说,数据存放的节点数越多,分区容忍性就越高,但要复制更新的次数就越多,一致性就越难保证。同时为了保证一致性,更新所有节点数据所需要的时间就…...
K8S:Yaml文件详解
目录 一.Yaml文件详解 1.Yaml文件格式 2.YAML 语法格式 二.Yaml文件编写及相关概念 1.查看 api 资源版本标签 2.yaml编写案例 (2)Deployment类型编写nginx服务 (3)k8s集群中的port介绍 (5)快速编写yaml文件 …...
机器人连续位姿同步插值轨迹规划—对数四元数、b样条曲线、c2连续位姿同步规划
简介:Smooth orientation planning is benefificial for the working performance and service life of industrial robots, keeping robots from violent impacts and shocks caused by discontinuous orientation planning. Nevertheless, the popular used quate…...
三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析
三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析 三维模型的轻量化压缩是一项技术挑战,特别是在处理复杂的3DTile格式时。下面列举了一些处理过程中可能遇到的常见问题以及相应的处理方法: 模型精度损失:在进行压缩处理时&#x…...
2023-简单点-开启防火墙后,ping显示请求超时;windows共享盘挂在不上
情景描述 树莓派 挂载 windows共享盘 之前一直可以,突然有一天不行了 ping xxxx不通了 一查,或许是服务器被同事开了防火墙,默认关闭了ping的回显 操作: 开启ping回显cmd ping通了,但是挂载还是不行, 显示 dmesg命…...
华为Java工程师面试题
常见问题: 什么是Java虚拟机(JVM)?它与现实中的计算机有什么不同?Java中的基本数据类型有哪些?它们的范围是什么?什么是引用类型?Java中的引用类型有哪些?什么是对象&am…...
大数据Flink(七十四):SQL的滑动窗口(HOP)
文章目录 SQL的滑动窗口(HOP) SQL的滑动窗口(HOP) 滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大…...
Hystrix和Sentinel熔断降级设计理念
目录 1 基本介绍2 Hystrix信号量和线程池区别2.1 信号量模式2.2 线程池模式2.3 注意 3 Sentinel介绍 1 基本介绍 Sentinel 和 Hystrix 的原则是一致的: 当检测到调用链路中某个资源出现不稳定的表现,例如请求响应时间长或异常比例升高的时候,则对这个资源…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
汽车生产虚拟实训中的技能提升与生产优化
在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...








