当前位置: 首页 > news >正文

(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

前言

本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:

正文

  • 启动Kafka集群,创建first主题

- 启动Kafka集群

- 创建first主题

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3

- 查看first主题详情

kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first

  • 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务

 - 创建nc监听的flume任务:job-netcat-flume-kafka.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 1111
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务

-  创建kafka监听的flume任务:job-kafka-flume-console.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf

- 启动job-kafka-flume-console.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console

  •  在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf

 - 启动job-netcat-flume-kafka.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console

  •  使用netcat工具发送数据到nc服务1111端口

- 发送nc消息

  • 查看结果 

- 控制台结果

结语

该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。

相关文章:

(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

前言 本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产…...

vue3:22、vue-router的使用

import { createRouter, createWebHistory } from vue-router//history模式:createWebHistory //hash模式:createWebHashHistory//vite中的环境变量 import.meta.env.BASE_URL 就是vite.config.js中的base配置项 const router createRouter({history:…...

深入理解JVM虚拟机第五篇:一些常用的JVM虚拟机(二)

文章目录 一:JRockit VM的介绍 二:J9 VM的介绍 三:KVM和CDC/CLDC Hotspot 四:Azul VM的介绍 五:Liquid VM的介绍 六:Apache Harmoney 七:Microsoft JVM 八:Taobao JVM 九&a…...

导数公式及求导法则

目录 基本初等函数的导数公式 求导法则 有理运算法则 复合函数求导法 隐函数求导法 反函数求导法 参数方程求导法 对数求导法 基本初等函数的导数公式 基本初等函数的导数公式包括: C0(x^n)nx^(n-1)(a^x)a^x*lna(e^x)e^x(loga(x))1/(xlna)(lnx)1/x(sinx)cos…...

SpringMVC系列(六)之JSON数据返回以及异常处理机制

目录 前言 一. JSON概述 二. JSON数据返回 1. 导入pom依赖 2. 添加配置文件(spring-mvc.xml) 3. ResponseBody注解使用 4. 效果展示 5. Jackson介绍 三. 全局异常处理 1. 为什么要全局异常处理 2. 异常处理思路 3. 异常处理方式一 4. 异常处…...

民安智库(北京第三方窗口测评)开展汽车消费者焦点小组座谈会调查

民安智库近日开展了一场汽车消费者焦点小组座谈会,旨在深入了解目标消费者对汽车功能的需求和消费习惯,为汽车企业提供有针对性的解决方案。 在焦点小组座谈会中,民安智库公司(第三方市容环境指数测评)邀请了一群具有…...

【CVPR2021】MVDNet论文阅读分析与总结

Challenge: 现有的目标检测器主要融合激光雷达和相机,通常提供丰富和冗余的视觉信息 利用最先进的成像雷达,其分辨率比RadarNet和LiRaNet中使用的分辨率要细得多,提出了一种有效的深度后期融合方法来结合雷达和激光雷达信号。 MV…...

IDEA指定Maven settings file文件未生效

背景:在自己电脑上配置的时候,由于公司项目和我自己的项目的Maven仓库不一致,我就在项目中指定了各自的Maven配置文件。但是我发现公司的项目私有仓库地址IDEA总是识别不到! 俩个配置文件分别是: /Users/sml/Mine/研发…...

swift UI 和UIKIT 如何配合使用

SwiftUI和UIKit可以在同一个iOS应用程序中配合使用。它们是两个不同的用户界面框架,各自有自己的优势和特点。在现实开发中,很多iOS应用程序并不是一开始就完全采用SwiftUI或UIKit,而是根据需要逐步引入SwiftUI或者使用两者共存。 SwiftUI的…...

c语言练习题55:IP 地址⽆效化

IP 地址⽆效化 题⽬描述: 给你⼀个有效的 IPv4 地址 address ,返回这个 IP 地址的⽆效化版本。 所谓⽆效化 IP 地址,其实就是⽤ "[.]" 代替了每个 "."。 • ⽰例 1: 输⼊:address "1.1.1.…...

nvidia-persistenced 常驻

本文地址&#xff1a;blog.lucien.ink/archives/542 发现每次执行 nvidia-smi 都特别慢&#xff0c;发现是需要 nvidia-persistenced 常驻才可以&#xff0c;这个并不会在安装完驱动之后自动配置&#xff0c;需要手动设置一个自启。 cat <<EOF >> /etc/systemd/sy…...

leetcode 42, 58, 14(*)

42. Trapping Rain Water 1.暴力解法&#xff08;未通过&#xff09; class Solution { public:int trap(vector<int>& height) {int n height.size();int res 0;for(int i0; i<n; i){int r_max 0, l_max 0;for(int j i; j<n; j)r_max max(r_max, heigh…...

SpringCloud-微服务CAP原则

接上文 SpringCloud-Config配置中心 到此部分即微服务的入门。 总的来说&#xff0c;数据存放的节点数越多&#xff0c;分区容忍性就越高&#xff0c;但要复制更新的次数就越多&#xff0c;一致性就越难保证。同时为了保证一致性&#xff0c;更新所有节点数据所需要的时间就…...

K8S:Yaml文件详解

目录 一.Yaml文件详解 1.Yaml文件格式 2.YAML 语法格式 二.Yaml文件编写及相关概念 1.查看 api 资源版本标签 2.yaml编写案例 &#xff08;2&#xff09;Deployment类型编写nginx服务 (3&#xff09;k8s集群中的port介绍 &#xff08;5&#xff09;快速编写yaml文件 …...

机器人连续位姿同步插值轨迹规划—对数四元数、b样条曲线、c2连续位姿同步规划

简介&#xff1a;Smooth orientation planning is benefificial for the working performance and service life of industrial robots, keeping robots from violent impacts and shocks caused by discontinuous orientation planning. Nevertheless, the popular used quate…...

三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析

三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析 三维模型的轻量化压缩是一项技术挑战&#xff0c;特别是在处理复杂的3DTile格式时。下面列举了一些处理过程中可能遇到的常见问题以及相应的处理方法&#xff1a; 模型精度损失&#xff1a;在进行压缩处理时&#x…...

2023-简单点-开启防火墙后,ping显示请求超时;windows共享盘挂在不上

情景描述 树莓派 挂载 windows共享盘 之前一直可以&#xff0c;突然有一天不行了 ping xxxx不通了 一查&#xff0c;或许是服务器被同事开了防火墙&#xff0c;默认关闭了ping的回显 操作&#xff1a; 开启ping回显cmd ping通了&#xff0c;但是挂载还是不行, 显示 dmesg命…...

华为Java工程师面试题

常见问题&#xff1a; 什么是Java虚拟机&#xff08;JVM&#xff09;&#xff1f;它与现实中的计算机有什么不同&#xff1f;Java中的基本数据类型有哪些&#xff1f;它们的范围是什么&#xff1f;什么是引用类型&#xff1f;Java中的引用类型有哪些&#xff1f;什么是对象&am…...

大数据Flink(七十四):SQL的滑动窗口(HOP)

文章目录 SQL的滑动窗口(HOP) SQL的滑动窗口(HOP) 滑动窗口定义:滑动窗口也是将元素指定给固定长度的窗口。与滚动窗口功能一样,也有窗口大小的概念。不一样的地方在于,滑动窗口有另一个参数控制窗口计算的频率(滑动窗口滑动的步长)。因此,如果滑动的步长小于窗口大…...

Hystrix和Sentinel熔断降级设计理念

目录 1 基本介绍2 Hystrix信号量和线程池区别2.1 信号量模式2.2 线程池模式2.3 注意 3 Sentinel介绍 1 基本介绍 Sentinel 和 Hystrix 的原则是一致的: 当检测到调用链路中某个资源出现不稳定的表现&#xff0c;例如请求响应时间长或异常比例升高的时候&#xff0c;则对这个资源…...

大数据学习栈记——Neo4j的安装与使用

本文介绍图数据库Neofj的安装与使用&#xff0c;操作系统&#xff1a;Ubuntu24.04&#xff0c;Neofj版本&#xff1a;2025.04.0。 Apt安装 Neofj可以进行官网安装&#xff1a;Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖

在Vuzix M400 AR智能眼镜的助力下&#xff0c;卢森堡罗伯特舒曼医院&#xff08;the Robert Schuman Hospitals, HRS&#xff09;凭借在无菌制剂生产流程中引入增强现实技术&#xff08;AR&#xff09;创新项目&#xff0c;荣获了2024年6月7日由卢森堡医院药剂师协会&#xff0…...

基于SpringBoot在线拍卖系统的设计和实现

摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统&#xff0c;主要的模块包括管理员&#xff1b;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill

视觉语言模型&#xff08;Vision-Language Models, VLMs&#xff09;&#xff0c;为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展&#xff0c;机器人仍难以胜任复杂的长时程任务&#xff08;如家具装配&#xff09;&#xff0c;主要受限于人…...

Caliper 负载(Workload)详细解析

Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...