当前位置: 首页 > news >正文

【Storm】【五】Storm集成Kafka

Storm集成Kafka

一、整合说明

Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下:

  • Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持;
  • Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。

这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文档进行整合,不适用于 0.8.x 版本的 Kafka。

二、写入数据到Kafka

2.1 项目结构

在这里插入图片描述

2.2 项目主要依赖

<properties><storm.version>1.2.2</storm.version><kafka.version>2.2.0</kafka.version>
</properties><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency>
</dependencies>

2.3 DataSourceSpout

/*** 产生词频样本的数据源*/
public class DataSourceSpout extends BaseRichSpout {private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");private SpoutOutputCollector spoutOutputCollector;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}@Overridepublic void nextTuple() {// 模拟产生数据String lineData = productData();spoutOutputCollector.emit(new Values(lineData));Utils.sleep(1000);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("line"));}/*** 模拟数据*/private String productData() {Collections.shuffle(list);Random random = new Random();int endIndex = random.nextInt(list.size()) % (list.size()) + 1;return StringUtils.join(list.toArray(), "\t", 0, endIndex);}}

产生的模拟数据格式如下:

Spark	HBase
Hive	Flink	Storm	Hadoop	HBase	Spark
Flink
HBase	Storm
HBase	Hadoop	Hive	Flink
HBase	Flink	Hive	Storm
Hive	Flink	Hadoop
HBase	Hive
Hadoop	Spark	HBase	Storm

2.4 WritingToKafkaApp

/*** 写入数据到 Kafka 中*/
public class WritingToKafkaApp {private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";private static final String TOPIC_NAME = "storm-topic";public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();// 定义 Kafka 生产者属性Properties props = new Properties();/** 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。* 不过建议至少要提供两个 broker 的信息作为容错。*/props.put("bootstrap.servers", BOOTSTRAP_SERVERS);/** acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。* acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。* acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。* acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。*/props.put("acks", "1");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaBolt bolt = new KafkaBolt<String, String>().withProducerProperties(props).withTopicSelector(new DefaultTopicSelector(TOPIC_NAME)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());builder.setSpout("sourceSpout", new DataSourceSpout(), 1);builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");if (args.length > 0 && args[0].equals("cluster")) {try {StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("LocalWritingToKafkaApp",new Config(), builder.createTopology());}}
}

2.5 测试准备工作

进行测试前需要启动 Kakfa:

1. 启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

# zookeeper启动命令
bin/zkServer.sh start# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

# bin/kafka-server-start.sh config/server.properties

2. 创建topic

# 创建用于测试主题
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic# 查看所有主题bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 启动消费者

启动一个消费者用于观察写入情况,启动命令如下:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning

2.6 测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

# mvn clean package -D maven.test.skip=true

启动后,消费者监听情况如下:

在这里插入图片描述

三、从Kafka中读取数据

3.1 项目结构

在这里插入图片描述

3.2 ReadingFromKafkaApp

/*** 从 Kafka 中读取数据*/
public class ReadingFromKafkaApp {private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";private static final String TOPIC_NAME = "storm-topic";public static void main(String[] args) {final TopologyBuilder builder = new TopologyBuilder();builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");// 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动if (args.length > 0 && args[0].equals("cluster")) {try {StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("LocalReadingFromKafkaApp",new Config(), builder.createTopology());}}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {return KafkaSpoutConfig.builder(bootstrapServers, topic)// 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")// 定义重试策略.setRetry(getRetryService())// 定时提交偏移量的时间间隔,默认是 15s.setOffsetCommitPeriodMs(10_000).build();}// 定义重试策略private static KafkaSpoutRetryService getRetryService() {return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));}
}

3.3 LogConsoleBolt

/*** 打印从 Kafka 中获取的数据*/
public class LogConsoleBolt extends BaseRichBolt {private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector=collector;}public void execute(Tuple input) {try {String value = input.getStringByField("value");System.out.println("received from kafka : "+ value);// 必须 ack,否则会重复消费 kafka 中的消息collector.ack(input);}catch (Exception e){e.printStackTrace();collector.fail(input);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

这里从 value 字段中获取 kafka 输出的值数据。

在开发中,我们可以通过继承 RecordTranslator 接口定义了 Kafka 中 Record 与输出流之间的映射关系,可以在构建 KafkaSpoutConfig 的时候通过构造器或者 setRecordTranslator() 方法传入,并最后传递给具体的 KafkaSpout

默认情况下使用内置的 DefaultRecordTranslator,其源码如下,FIELDS 中 定义了 tuple 中所有可用的字段:主题,分区,偏移量,消息键,值。

public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {private static final long serialVersionUID = -5782462870112305750L;public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");@Overridepublic List<Object> apply(ConsumerRecord<K, V> record) {return new Values(record.topic(),record.partition(),record.offset(),record.key(),record.value());}@Overridepublic Fields getFieldsFor(String stream) {return FIELDS;}@Overridepublic List<String> streams() {return DEFAULT_STREAM;}
}

3.4 启动测试

这里启动一个生产者用于发送测试数据,启动命令如下:

# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic

在这里插入图片描述

本地运行的项目接收到从 Kafka 发送过来的数据:

在这里插入图片描述


用例源码下载地址:storm-kafka-integration

参考资料

  1. Storm Kafka Integration (0.10.x+)

相关文章:

【Storm】【五】Storm集成Kafka

Storm集成Kafka 一、整合说明二、写入数据到Kafka三、从Kafka中读取数据一、整合说明 Storm 官方对 Kafka 的整合分为两个版本&#xff0c;官方说明文档分别如下&#xff1a; Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持&#xff1b;Storm Kafka …...

GVRP-LNP-VCMP讲解

目录 GVRP讲解 动态创建Vlan并将端口加入Vlan GVRP消息类型 GVRP工作原理 LNP讲解 动态修改接口链路类型 VCMP讲解 动态创建Vlan 相关概念 Vlan同步 VCMP与GVRP的区别 GVRP讲解 动态创建Vlan并将端口加入Vlan GVRP&#xff08;GARR Vlan Registration Protocol&#xf…...

28个精品Python爬虫实战项目

先来说说Python的优势&#xff01;然后给大家看下这28个实战项目的实用性&#xff01;Python跟其他语言相比&#xff0c;有以下优点&#xff1a;1. 简单Python是所有编程语言里面&#xff0c;代码量最低&#xff0c;非常易于读写&#xff0c;遇到问题时&#xff0c;程序员可以把…...

相信人还是相信ChatGPT,龙测首席AI专家给出了意料之外的答案

最近&#xff0c;关于ChatGPT的话题太火了&#xff01;各大社交软件都是他的消息&#xff01;从去年12月份ChatGPT横空出世&#xff0c;再到近期百度文心一言、复旦Moss的陆续宣布&#xff0c;点燃了全球对AIGC&#xff08;内容人工智能自动生成&#xff09;领域的热情&#xf…...

安卓逆向_5 --- jeb 和 AndroidStudio 动态调试 smali

Jeb 工具的使用 &#xff1a;https://www.52pojie.cn/forum.php?modviewthread&tid742250&#xff1a;https://zhuanlan.zhihu.com/p/302856081动态调试 smali 有两种方法&#xff1a; Jeb 调试AndroidStudio smalidea 插件动态调试。1、Jeb 动态调试 smali ​JEB是一个…...

docker-容器命令

1.新建启动 docker run options image command [arg..] options: --name"容器新名字" -d&#xff1a;后台运行程序 -it&#xff1a;交互式运行 -P: 随机端口 -p: 指定端口 docker run -it ubuntu /bin/bash docker run -it ubuntu:v1 /bin/bash docker run -it 1c352…...

Spring——是什么?作用?内容?用到的设计模式?

目录 什么是spring&#xff1f; spring是为了解决什么问题而衍生的&#xff1f;&#xff08;历史&#xff09;Spring解决了实际生产中的什么问题&#xff1f; spring包含了哪些部分&#xff1f;&#xff08;组成&#xff09; Spring的特点是什么&#xff1f; spring框架中…...

Qt交叉编译环境搭建

环境及版本&#xff1a; 编译机&#xff1a;Deepin 20.3 Qt 5.12.9 arm编译工具&#xff1a; gcc-linaro-6.5.0-2018.12-x86_64_arm-linux-gnueabihf.tar.xz 运行机&#xff1a;创龙335X开发板 1.下载arm编译工具&#xff1a; gcc-linaro-6.5.0-2018.12-x86_64_arm-linux-…...

Java switch case 语句

Java 的 switch case 语句是一种常用的控制流语句&#xff0c;用于基于不同的输入值执行不同的操作。本文将详细介绍 Java switch case 语句的作用、用法以及在实际工作中的应用。 一、switch case 语句的作用 switch case 语句是一种多分支条件语句&#xff0c;它基于不同的输…...

Linux下MQTT客户端消息订阅与发布实现

MQTT(消息队列遥测传输)是一个基于客户端-服务器的消息发布/订阅传输协议。它基于TCP协议&#xff0c;默认端口号为1883&#xff0c;为此&#xff0c;它也需要一个消息中间件 。MQTT协议是轻量、简单、开放和易于实现的&#xff0c;这些特点使它适用范围非常广泛。在很多情况下…...

代码规范----编程规约(下)

目录 四、OOP规约 五、日期时间 六、集合处理 四、OOP规约 &#xff08;1&#xff09;、避免通过一个类的对象引用访问此类的静态变量或静态方法&#xff0c;无谓增加编译器解析成本&#xff0c;直接用类名来访问即可 &#xff08;2&#xff09;、所有的覆写方法&#xff0…...

c++连接mysql

开始想用mysql connector/c8.0 来操作数据库cmake加上配置后一直编译错误 我这里也没有截屏编译错误大概意思是driver.h里面声明的一个check_lib函数里面用了一个未定义的check找遍了资料都没有找到解决办法最后还是用了原始API如果有人有解决办法请留个位置先上在用的cmake配置…...

CentOS7操作系统安装nginx实战(多种方法,超详细)

文章目录前言一. 实验环境二. 使用yum安装nginx2.1 添加yum源2.1.1 使用官网提供的源地址&#xff08;方法一&#xff09;2.1.2 使用epel的方式进行安装&#xff08;方法二&#xff09;2.2 开始安装nginx2.3 启动并进行测试2.4 其他的一些用法&#xff1a;三. 编译方式安装ngin…...

【测绘程序设计】——空间直角坐标转换

测绘工程中经常遇到空间直角坐标转换——比如,北京54(或西安80)空间直角坐标转换成CGCS2000(或WGS-84)空间直角坐标,常用转换模型包括:①布尔沙模型(国家级及省级范围);②莫洛坚斯基模型(省级以下范围);③三维四参数(小于22局部区域) 等。   本文分享了基于布…...

数组--java--动态数组--有序数组--底层

java数组基础--java中的数组创建数组空间占用初始化数组访问元素插入查找删除元素动态数组扩容插入和添加重写toString删除二维数组二维数组注意点有序数组实现测试写在开头&#xff1a; 这篇文章包括数组的基础、一点底层的内容和一些稍微深入的东西。 作为第一个深入学习的数…...

Linux下使用C语言实现简单的聊天室程序

本文章介绍一种基于Linux使用C语言实现简单的局域网聊天室程序的方法&#xff0c;支持消息群发&#xff0c;历史数据查询&#xff0c;好友列表查看&#xff0c;好友上线下线提醒等功能。聊天界面如下图所示&#xff1a;下面将按步骤介绍该系统的设计实现&#xff0c;首先在linu…...

【数学】任意一个正整数n最多只有一个质因数大于根号n,怎么证明?

定理 任意一个正整数n最多只有一个大于n\sqrt{n}n​的质因子&#xff0c;并且该大于n\sqrt{n}n​质因子的幂次是1。 证明&#xff08;反证法&#xff09; 证明&#xff1a;最多只有一个大于n\sqrt{n}n​的质因子 假设n存在两个大于n\sqrt{n}n​的质因子&#xff0c;分别为p…...

【ES6】var let const 之面试题系列

关于 var、let、const 是前端开发人员经常用到的关键字&#xff0c;也是经典的面试题&#xff0c;接下来就站在面试题的角度来看待它们之间的区别。 一、区别 1. var 声明的范围是函数作用域&#xff0c;let 和 const 声明的范围是块作用域&#xff0c;块作用域是函数作用域的…...

Vue基础入门讲义(四)-组件化

文章目录1.引言2.定义全局组件3.组件的复用4.局部注册5.组件通信5.1.父向子传递props5.2.传递复杂数据5.3.子向父的通信1.引言 在大型应用开发的时候&#xff0c;页面可以划分成很多部分。往往不同的页面&#xff0c;也会有相同的部分。例如可能会有相同的头部导航。 但是如果…...

Android onLayout布局流程解析

组件布局流程结论 1.&#xff09;layout流程始于ViewRootImpl的performLayout()方法&#xff0c;该方法会调用根View&#xff08;DecorView&#xff09;的layout()方法进行布局&#xff0c;因为DecorView是ViewGroup(FrameLayout),所以layout流程来到了ViewGroup&#xff08;其…...

ubuntu搭建nfs服务centos挂载访问

在Ubuntu上设置NFS服务器 在Ubuntu上&#xff0c;你可以使用apt包管理器来安装NFS服务器。打开终端并运行&#xff1a; sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享&#xff0c;例如/shared&#xff1a; sudo mkdir /shared sud…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​&#xff1a; 下载安装DevEco Studio 4.0&#xff08;支持HarmonyOS 5&#xff09;配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​&#xff1a; ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...

USB Over IP专用硬件的5个特点

USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中&#xff0c;从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备&#xff08;如专用硬件设备&#xff09;&#xff0c;从而消除了直接物理连接的需要。USB over IP的…...