当前位置: 首页 > news >正文

【Storm】【五】Storm集成Kafka

Storm集成Kafka

一、整合说明

Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下:

  • Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持;
  • Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。

这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文档进行整合,不适用于 0.8.x 版本的 Kafka。

二、写入数据到Kafka

2.1 项目结构

在这里插入图片描述

2.2 项目主要依赖

<properties><storm.version>1.2.2</storm.version><kafka.version>2.2.0</kafka.version>
</properties><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency>
</dependencies>

2.3 DataSourceSpout

/*** 产生词频样本的数据源*/
public class DataSourceSpout extends BaseRichSpout {private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");private SpoutOutputCollector spoutOutputCollector;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}@Overridepublic void nextTuple() {// 模拟产生数据String lineData = productData();spoutOutputCollector.emit(new Values(lineData));Utils.sleep(1000);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("line"));}/*** 模拟数据*/private String productData() {Collections.shuffle(list);Random random = new Random();int endIndex = random.nextInt(list.size()) % (list.size()) + 1;return StringUtils.join(list.toArray(), "\t", 0, endIndex);}}

产生的模拟数据格式如下:

Spark	HBase
Hive	Flink	Storm	Hadoop	HBase	Spark
Flink
HBase	Storm
HBase	Hadoop	Hive	Flink
HBase	Flink	Hive	Storm
Hive	Flink	Hadoop
HBase	Hive
Hadoop	Spark	HBase	Storm

2.4 WritingToKafkaApp

/*** 写入数据到 Kafka 中*/
public class WritingToKafkaApp {private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";private static final String TOPIC_NAME = "storm-topic";public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();// 定义 Kafka 生产者属性Properties props = new Properties();/** 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。* 不过建议至少要提供两个 broker 的信息作为容错。*/props.put("bootstrap.servers", BOOTSTRAP_SERVERS);/** acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。* acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。* acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。* acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。*/props.put("acks", "1");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaBolt bolt = new KafkaBolt<String, String>().withProducerProperties(props).withTopicSelector(new DefaultTopicSelector(TOPIC_NAME)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());builder.setSpout("sourceSpout", new DataSourceSpout(), 1);builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");if (args.length > 0 && args[0].equals("cluster")) {try {StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("LocalWritingToKafkaApp",new Config(), builder.createTopology());}}
}

2.5 测试准备工作

进行测试前需要启动 Kakfa:

1. 启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

# zookeeper启动命令
bin/zkServer.sh start# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

# bin/kafka-server-start.sh config/server.properties

2. 创建topic

# 创建用于测试主题
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic# 查看所有主题bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 启动消费者

启动一个消费者用于观察写入情况,启动命令如下:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning

2.6 测试

可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin 进行打包,打包命令如下:

# mvn clean package -D maven.test.skip=true

启动后,消费者监听情况如下:

在这里插入图片描述

三、从Kafka中读取数据

3.1 项目结构

在这里插入图片描述

3.2 ReadingFromKafkaApp

/*** 从 Kafka 中读取数据*/
public class ReadingFromKafkaApp {private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";private static final String TOPIC_NAME = "storm-topic";public static void main(String[] args) {final TopologyBuilder builder = new TopologyBuilder();builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");// 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动if (args.length > 0 && args[0].equals("cluster")) {try {StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("LocalReadingFromKafkaApp",new Config(), builder.createTopology());}}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {return KafkaSpoutConfig.builder(bootstrapServers, topic)// 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")// 定义重试策略.setRetry(getRetryService())// 定时提交偏移量的时间间隔,默认是 15s.setOffsetCommitPeriodMs(10_000).build();}// 定义重试策略private static KafkaSpoutRetryService getRetryService() {return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));}
}

3.3 LogConsoleBolt

/*** 打印从 Kafka 中获取的数据*/
public class LogConsoleBolt extends BaseRichBolt {private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector=collector;}public void execute(Tuple input) {try {String value = input.getStringByField("value");System.out.println("received from kafka : "+ value);// 必须 ack,否则会重复消费 kafka 中的消息collector.ack(input);}catch (Exception e){e.printStackTrace();collector.fail(input);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

这里从 value 字段中获取 kafka 输出的值数据。

在开发中,我们可以通过继承 RecordTranslator 接口定义了 Kafka 中 Record 与输出流之间的映射关系,可以在构建 KafkaSpoutConfig 的时候通过构造器或者 setRecordTranslator() 方法传入,并最后传递给具体的 KafkaSpout

默认情况下使用内置的 DefaultRecordTranslator,其源码如下,FIELDS 中 定义了 tuple 中所有可用的字段:主题,分区,偏移量,消息键,值。

public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {private static final long serialVersionUID = -5782462870112305750L;public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");@Overridepublic List<Object> apply(ConsumerRecord<K, V> record) {return new Values(record.topic(),record.partition(),record.offset(),record.key(),record.value());}@Overridepublic Fields getFieldsFor(String stream) {return FIELDS;}@Overridepublic List<String> streams() {return DEFAULT_STREAM;}
}

3.4 启动测试

这里启动一个生产者用于发送测试数据,启动命令如下:

# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic

在这里插入图片描述

本地运行的项目接收到从 Kafka 发送过来的数据:

在这里插入图片描述


用例源码下载地址:storm-kafka-integration

参考资料

  1. Storm Kafka Integration (0.10.x+)

相关文章:

【Storm】【五】Storm集成Kafka

Storm集成Kafka 一、整合说明二、写入数据到Kafka三、从Kafka中读取数据一、整合说明 Storm 官方对 Kafka 的整合分为两个版本&#xff0c;官方说明文档分别如下&#xff1a; Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持&#xff1b;Storm Kafka …...

GVRP-LNP-VCMP讲解

目录 GVRP讲解 动态创建Vlan并将端口加入Vlan GVRP消息类型 GVRP工作原理 LNP讲解 动态修改接口链路类型 VCMP讲解 动态创建Vlan 相关概念 Vlan同步 VCMP与GVRP的区别 GVRP讲解 动态创建Vlan并将端口加入Vlan GVRP&#xff08;GARR Vlan Registration Protocol&#xf…...

28个精品Python爬虫实战项目

先来说说Python的优势&#xff01;然后给大家看下这28个实战项目的实用性&#xff01;Python跟其他语言相比&#xff0c;有以下优点&#xff1a;1. 简单Python是所有编程语言里面&#xff0c;代码量最低&#xff0c;非常易于读写&#xff0c;遇到问题时&#xff0c;程序员可以把…...

相信人还是相信ChatGPT,龙测首席AI专家给出了意料之外的答案

最近&#xff0c;关于ChatGPT的话题太火了&#xff01;各大社交软件都是他的消息&#xff01;从去年12月份ChatGPT横空出世&#xff0c;再到近期百度文心一言、复旦Moss的陆续宣布&#xff0c;点燃了全球对AIGC&#xff08;内容人工智能自动生成&#xff09;领域的热情&#xf…...

安卓逆向_5 --- jeb 和 AndroidStudio 动态调试 smali

Jeb 工具的使用 &#xff1a;https://www.52pojie.cn/forum.php?modviewthread&tid742250&#xff1a;https://zhuanlan.zhihu.com/p/302856081动态调试 smali 有两种方法&#xff1a; Jeb 调试AndroidStudio smalidea 插件动态调试。1、Jeb 动态调试 smali ​JEB是一个…...

docker-容器命令

1.新建启动 docker run options image command [arg..] options: --name"容器新名字" -d&#xff1a;后台运行程序 -it&#xff1a;交互式运行 -P: 随机端口 -p: 指定端口 docker run -it ubuntu /bin/bash docker run -it ubuntu:v1 /bin/bash docker run -it 1c352…...

Spring——是什么?作用?内容?用到的设计模式?

目录 什么是spring&#xff1f; spring是为了解决什么问题而衍生的&#xff1f;&#xff08;历史&#xff09;Spring解决了实际生产中的什么问题&#xff1f; spring包含了哪些部分&#xff1f;&#xff08;组成&#xff09; Spring的特点是什么&#xff1f; spring框架中…...

Qt交叉编译环境搭建

环境及版本&#xff1a; 编译机&#xff1a;Deepin 20.3 Qt 5.12.9 arm编译工具&#xff1a; gcc-linaro-6.5.0-2018.12-x86_64_arm-linux-gnueabihf.tar.xz 运行机&#xff1a;创龙335X开发板 1.下载arm编译工具&#xff1a; gcc-linaro-6.5.0-2018.12-x86_64_arm-linux-…...

Java switch case 语句

Java 的 switch case 语句是一种常用的控制流语句&#xff0c;用于基于不同的输入值执行不同的操作。本文将详细介绍 Java switch case 语句的作用、用法以及在实际工作中的应用。 一、switch case 语句的作用 switch case 语句是一种多分支条件语句&#xff0c;它基于不同的输…...

Linux下MQTT客户端消息订阅与发布实现

MQTT(消息队列遥测传输)是一个基于客户端-服务器的消息发布/订阅传输协议。它基于TCP协议&#xff0c;默认端口号为1883&#xff0c;为此&#xff0c;它也需要一个消息中间件 。MQTT协议是轻量、简单、开放和易于实现的&#xff0c;这些特点使它适用范围非常广泛。在很多情况下…...

代码规范----编程规约(下)

目录 四、OOP规约 五、日期时间 六、集合处理 四、OOP规约 &#xff08;1&#xff09;、避免通过一个类的对象引用访问此类的静态变量或静态方法&#xff0c;无谓增加编译器解析成本&#xff0c;直接用类名来访问即可 &#xff08;2&#xff09;、所有的覆写方法&#xff0…...

c++连接mysql

开始想用mysql connector/c8.0 来操作数据库cmake加上配置后一直编译错误 我这里也没有截屏编译错误大概意思是driver.h里面声明的一个check_lib函数里面用了一个未定义的check找遍了资料都没有找到解决办法最后还是用了原始API如果有人有解决办法请留个位置先上在用的cmake配置…...

CentOS7操作系统安装nginx实战(多种方法,超详细)

文章目录前言一. 实验环境二. 使用yum安装nginx2.1 添加yum源2.1.1 使用官网提供的源地址&#xff08;方法一&#xff09;2.1.2 使用epel的方式进行安装&#xff08;方法二&#xff09;2.2 开始安装nginx2.3 启动并进行测试2.4 其他的一些用法&#xff1a;三. 编译方式安装ngin…...

【测绘程序设计】——空间直角坐标转换

测绘工程中经常遇到空间直角坐标转换——比如,北京54(或西安80)空间直角坐标转换成CGCS2000(或WGS-84)空间直角坐标,常用转换模型包括:①布尔沙模型(国家级及省级范围);②莫洛坚斯基模型(省级以下范围);③三维四参数(小于22局部区域) 等。   本文分享了基于布…...

数组--java--动态数组--有序数组--底层

java数组基础--java中的数组创建数组空间占用初始化数组访问元素插入查找删除元素动态数组扩容插入和添加重写toString删除二维数组二维数组注意点有序数组实现测试写在开头&#xff1a; 这篇文章包括数组的基础、一点底层的内容和一些稍微深入的东西。 作为第一个深入学习的数…...

Linux下使用C语言实现简单的聊天室程序

本文章介绍一种基于Linux使用C语言实现简单的局域网聊天室程序的方法&#xff0c;支持消息群发&#xff0c;历史数据查询&#xff0c;好友列表查看&#xff0c;好友上线下线提醒等功能。聊天界面如下图所示&#xff1a;下面将按步骤介绍该系统的设计实现&#xff0c;首先在linu…...

【数学】任意一个正整数n最多只有一个质因数大于根号n,怎么证明?

定理 任意一个正整数n最多只有一个大于n\sqrt{n}n​的质因子&#xff0c;并且该大于n\sqrt{n}n​质因子的幂次是1。 证明&#xff08;反证法&#xff09; 证明&#xff1a;最多只有一个大于n\sqrt{n}n​的质因子 假设n存在两个大于n\sqrt{n}n​的质因子&#xff0c;分别为p…...

【ES6】var let const 之面试题系列

关于 var、let、const 是前端开发人员经常用到的关键字&#xff0c;也是经典的面试题&#xff0c;接下来就站在面试题的角度来看待它们之间的区别。 一、区别 1. var 声明的范围是函数作用域&#xff0c;let 和 const 声明的范围是块作用域&#xff0c;块作用域是函数作用域的…...

Vue基础入门讲义(四)-组件化

文章目录1.引言2.定义全局组件3.组件的复用4.局部注册5.组件通信5.1.父向子传递props5.2.传递复杂数据5.3.子向父的通信1.引言 在大型应用开发的时候&#xff0c;页面可以划分成很多部分。往往不同的页面&#xff0c;也会有相同的部分。例如可能会有相同的头部导航。 但是如果…...

Android onLayout布局流程解析

组件布局流程结论 1.&#xff09;layout流程始于ViewRootImpl的performLayout()方法&#xff0c;该方法会调用根View&#xff08;DecorView&#xff09;的layout()方法进行布局&#xff0c;因为DecorView是ViewGroup(FrameLayout),所以layout流程来到了ViewGroup&#xff08;其…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比

在机器学习的回归分析中&#xff0c;损失函数的选择对模型性能具有决定性影响。均方误差&#xff08;MSE&#xff09;作为经典的损失函数&#xff0c;在处理干净数据时表现优异&#xff0c;但在面对包含异常值的噪声数据时&#xff0c;其对大误差的二次惩罚机制往往导致模型参数…...

Mysql故障排插与环境优化

前置知识点 最上层是一些客户端和连接服务&#xff0c;包含本 sock 通信和大多数jiyukehuduan/服务端工具实现的TCP/IP通信。主要完成一些简介处理、授权认证、及相关的安全方案等。在该层上引入了线程池的概念&#xff0c;为通过安全认证接入的客户端提供线程。同样在该层上可…...

深入浅出WebGL:在浏览器中解锁3D世界的魔法钥匙

WebGL&#xff1a;在浏览器中解锁3D世界的魔法钥匙 引言&#xff1a;网页的边界正在消失 在数字化浪潮的推动下&#xff0c;网页早已不再是静态信息的展示窗口。如今&#xff0c;我们可以在浏览器中体验逼真的3D游戏、交互式数据可视化、虚拟实验室&#xff0c;甚至沉浸式的V…...