【Storm】【五】Storm集成Kafka
Storm集成Kafka
一、整合说明
Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下:
- Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持;
- Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。
这里我服务端安装的 Kafka 版本为 2.2.0(Released Mar 22, 2019) ,按照官方 0.10.x+ 的整合文档进行整合,不适用于 0.8.x 版本的 Kafka。
二、写入数据到Kafka
2.1 项目结构
2.2 项目主要依赖
<properties><storm.version>1.2.2</storm.version><kafka.version>2.2.0</kafka.version>
</properties><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka-client</artifactId><version>${storm.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency>
</dependencies>
2.3 DataSourceSpout
/*** 产生词频样本的数据源*/
public class DataSourceSpout extends BaseRichSpout {private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");private SpoutOutputCollector spoutOutputCollector;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector = spoutOutputCollector;}@Overridepublic void nextTuple() {// 模拟产生数据String lineData = productData();spoutOutputCollector.emit(new Values(lineData));Utils.sleep(1000);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("line"));}/*** 模拟数据*/private String productData() {Collections.shuffle(list);Random random = new Random();int endIndex = random.nextInt(list.size()) % (list.size()) + 1;return StringUtils.join(list.toArray(), "\t", 0, endIndex);}}
产生的模拟数据格式如下:
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
2.4 WritingToKafkaApp
/*** 写入数据到 Kafka 中*/
public class WritingToKafkaApp {private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";private static final String TOPIC_NAME = "storm-topic";public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();// 定义 Kafka 生产者属性Properties props = new Properties();/** 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。* 不过建议至少要提供两个 broker 的信息作为容错。*/props.put("bootstrap.servers", BOOTSTRAP_SERVERS);/** acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。* acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。* acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。* acks=all : 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。*/props.put("acks", "1");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaBolt bolt = new KafkaBolt<String, String>().withProducerProperties(props).withTopicSelector(new DefaultTopicSelector(TOPIC_NAME)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>());builder.setSpout("sourceSpout", new DataSourceSpout(), 1);builder.setBolt("kafkaBolt", bolt, 1).shuffleGrouping("sourceSpout");if (args.length > 0 && args[0].equals("cluster")) {try {StormSubmitter.submitTopology("ClusterWritingToKafkaApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("LocalWritingToKafkaApp",new Config(), builder.createTopology());}}
}
2.5 测试准备工作
进行测试前需要启动 Kakfa:
1. 启动Kakfa
Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:
# zookeeper启动命令
bin/zkServer.sh start# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
启动单节点 kafka 用于测试:
# bin/kafka-server-start.sh config/server.properties
2. 创建topic
# 创建用于测试主题
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 1 --partitions 1 --topic storm-topic# 查看所有主题bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3. 启动消费者
启动一个消费者用于观察写入情况,启动命令如下:
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic storm-topic --from-beginning
2.6 测试
可以用直接使用本地模式运行,也可以打包后提交到服务器集群运行。本仓库提供的源码默认采用 maven-shade-plugin
进行打包,打包命令如下:
# mvn clean package -D maven.test.skip=true
启动后,消费者监听情况如下:
三、从Kafka中读取数据
3.1 项目结构
3.2 ReadingFromKafkaApp
/*** 从 Kafka 中读取数据*/
public class ReadingFromKafkaApp {private static final String BOOTSTRAP_SERVERS = "hadoop001:9092";private static final String TOPIC_NAME = "storm-topic";public static void main(String[] args) {final TopologyBuilder builder = new TopologyBuilder();builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(BOOTSTRAP_SERVERS, TOPIC_NAME)), 1);builder.setBolt("bolt", new LogConsoleBolt()).shuffleGrouping("kafka_spout");// 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动if (args.length > 0 && args[0].equals("cluster")) {try {StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {e.printStackTrace();}} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("LocalReadingFromKafkaApp",new Config(), builder.createTopology());}}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {return KafkaSpoutConfig.builder(bootstrapServers, topic)// 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")// 定义重试策略.setRetry(getRetryService())// 定时提交偏移量的时间间隔,默认是 15s.setOffsetCommitPeriodMs(10_000).build();}// 定义重试策略private static KafkaSpoutRetryService getRetryService() {return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));}
}
3.3 LogConsoleBolt
/*** 打印从 Kafka 中获取的数据*/
public class LogConsoleBolt extends BaseRichBolt {private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector=collector;}public void execute(Tuple input) {try {String value = input.getStringByField("value");System.out.println("received from kafka : "+ value);// 必须 ack,否则会重复消费 kafka 中的消息collector.ack(input);}catch (Exception e){e.printStackTrace();collector.fail(input);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
这里从 value
字段中获取 kafka 输出的值数据。
在开发中,我们可以通过继承 RecordTranslator
接口定义了 Kafka 中 Record 与输出流之间的映射关系,可以在构建 KafkaSpoutConfig
的时候通过构造器或者 setRecordTranslator()
方法传入,并最后传递给具体的 KafkaSpout
。
默认情况下使用内置的 DefaultRecordTranslator
,其源码如下,FIELDS
中 定义了 tuple 中所有可用的字段:主题,分区,偏移量,消息键,值。
public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {private static final long serialVersionUID = -5782462870112305750L;public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");@Overridepublic List<Object> apply(ConsumerRecord<K, V> record) {return new Values(record.topic(),record.partition(),record.offset(),record.key(),record.value());}@Overridepublic Fields getFieldsFor(String stream) {return FIELDS;}@Overridepublic List<String> streams() {return DEFAULT_STREAM;}
}
3.4 启动测试
这里启动一个生产者用于发送测试数据,启动命令如下:
# bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic storm-topic
本地运行的项目接收到从 Kafka 发送过来的数据:
用例源码下载地址:storm-kafka-integration
参考资料
- Storm Kafka Integration (0.10.x+)
相关文章:

【Storm】【五】Storm集成Kafka
Storm集成Kafka 一、整合说明二、写入数据到Kafka三、从Kafka中读取数据一、整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持;Storm Kafka …...

GVRP-LNP-VCMP讲解
目录 GVRP讲解 动态创建Vlan并将端口加入Vlan GVRP消息类型 GVRP工作原理 LNP讲解 动态修改接口链路类型 VCMP讲解 动态创建Vlan 相关概念 Vlan同步 VCMP与GVRP的区别 GVRP讲解 动态创建Vlan并将端口加入Vlan GVRP(GARR Vlan Registration Protocol…...

28个精品Python爬虫实战项目
先来说说Python的优势!然后给大家看下这28个实战项目的实用性!Python跟其他语言相比,有以下优点:1. 简单Python是所有编程语言里面,代码量最低,非常易于读写,遇到问题时,程序员可以把…...

相信人还是相信ChatGPT,龙测首席AI专家给出了意料之外的答案
最近,关于ChatGPT的话题太火了!各大社交软件都是他的消息!从去年12月份ChatGPT横空出世,再到近期百度文心一言、复旦Moss的陆续宣布,点燃了全球对AIGC(内容人工智能自动生成)领域的热情…...

安卓逆向_5 --- jeb 和 AndroidStudio 动态调试 smali
Jeb 工具的使用 :https://www.52pojie.cn/forum.php?modviewthread&tid742250:https://zhuanlan.zhihu.com/p/302856081动态调试 smali 有两种方法: Jeb 调试AndroidStudio smalidea 插件动态调试。1、Jeb 动态调试 smali JEB是一个…...

docker-容器命令
1.新建启动 docker run options image command [arg..] options: --name"容器新名字" -d:后台运行程序 -it:交互式运行 -P: 随机端口 -p: 指定端口 docker run -it ubuntu /bin/bash docker run -it ubuntu:v1 /bin/bash docker run -it 1c352…...

Spring——是什么?作用?内容?用到的设计模式?
目录 什么是spring? spring是为了解决什么问题而衍生的?(历史)Spring解决了实际生产中的什么问题? spring包含了哪些部分?(组成) Spring的特点是什么? spring框架中…...

Qt交叉编译环境搭建
环境及版本: 编译机:Deepin 20.3 Qt 5.12.9 arm编译工具: gcc-linaro-6.5.0-2018.12-x86_64_arm-linux-gnueabihf.tar.xz 运行机:创龙335X开发板 1.下载arm编译工具: gcc-linaro-6.5.0-2018.12-x86_64_arm-linux-…...

Java switch case 语句
Java 的 switch case 语句是一种常用的控制流语句,用于基于不同的输入值执行不同的操作。本文将详细介绍 Java switch case 语句的作用、用法以及在实际工作中的应用。 一、switch case 语句的作用 switch case 语句是一种多分支条件语句,它基于不同的输…...

Linux下MQTT客户端消息订阅与发布实现
MQTT(消息队列遥测传输)是一个基于客户端-服务器的消息发布/订阅传输协议。它基于TCP协议,默认端口号为1883,为此,它也需要一个消息中间件 。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下…...

代码规范----编程规约(下)
目录 四、OOP规约 五、日期时间 六、集合处理 四、OOP规约 (1)、避免通过一个类的对象引用访问此类的静态变量或静态方法,无谓增加编译器解析成本,直接用类名来访问即可 (2)、所有的覆写方法࿰…...

c++连接mysql
开始想用mysql connector/c8.0 来操作数据库cmake加上配置后一直编译错误 我这里也没有截屏编译错误大概意思是driver.h里面声明的一个check_lib函数里面用了一个未定义的check找遍了资料都没有找到解决办法最后还是用了原始API如果有人有解决办法请留个位置先上在用的cmake配置…...

CentOS7操作系统安装nginx实战(多种方法,超详细)
文章目录前言一. 实验环境二. 使用yum安装nginx2.1 添加yum源2.1.1 使用官网提供的源地址(方法一)2.1.2 使用epel的方式进行安装(方法二)2.2 开始安装nginx2.3 启动并进行测试2.4 其他的一些用法:三. 编译方式安装ngin…...

【测绘程序设计】——空间直角坐标转换
测绘工程中经常遇到空间直角坐标转换——比如,北京54(或西安80)空间直角坐标转换成CGCS2000(或WGS-84)空间直角坐标,常用转换模型包括:①布尔沙模型(国家级及省级范围);②莫洛坚斯基模型(省级以下范围);③三维四参数(小于22局部区域) 等。 本文分享了基于布…...

数组--java--动态数组--有序数组--底层
java数组基础--java中的数组创建数组空间占用初始化数组访问元素插入查找删除元素动态数组扩容插入和添加重写toString删除二维数组二维数组注意点有序数组实现测试写在开头: 这篇文章包括数组的基础、一点底层的内容和一些稍微深入的东西。 作为第一个深入学习的数…...

Linux下使用C语言实现简单的聊天室程序
本文章介绍一种基于Linux使用C语言实现简单的局域网聊天室程序的方法,支持消息群发,历史数据查询,好友列表查看,好友上线下线提醒等功能。聊天界面如下图所示:下面将按步骤介绍该系统的设计实现,首先在linu…...

【数学】任意一个正整数n最多只有一个质因数大于根号n,怎么证明?
定理 任意一个正整数n最多只有一个大于n\sqrt{n}n的质因子,并且该大于n\sqrt{n}n质因子的幂次是1。 证明(反证法) 证明:最多只有一个大于n\sqrt{n}n的质因子 假设n存在两个大于n\sqrt{n}n的质因子,分别为p…...

【ES6】var let const 之面试题系列
关于 var、let、const 是前端开发人员经常用到的关键字,也是经典的面试题,接下来就站在面试题的角度来看待它们之间的区别。 一、区别 1. var 声明的范围是函数作用域,let 和 const 声明的范围是块作用域,块作用域是函数作用域的…...

Vue基础入门讲义(四)-组件化
文章目录1.引言2.定义全局组件3.组件的复用4.局部注册5.组件通信5.1.父向子传递props5.2.传递复杂数据5.3.子向父的通信1.引言 在大型应用开发的时候,页面可以划分成很多部分。往往不同的页面,也会有相同的部分。例如可能会有相同的头部导航。 但是如果…...

Android onLayout布局流程解析
组件布局流程结论 1.)layout流程始于ViewRootImpl的performLayout()方法,该方法会调用根View(DecorView)的layout()方法进行布局,因为DecorView是ViewGroup(FrameLayout),所以layout流程来到了ViewGroup(其…...

浅分析BIG-建筑展示系统
一、主页(主要界面)重点疑点(需要解决)1.云平台实时同步。是否可以电脑与hololens2同步或链接?并可以传输信息提醒?一级界面(启动界面)1.交互式启动激活效果(触发按钮旋转…...

模电基础(1) 半导体基础知识
基本内容: 1.本征半导体的基本介绍结构; 2.杂质半导体; 3.PN结的形成; 4.PN结的性质。 1.本征半导体 半导体:导电性能介于绝缘体和导体之间的物质。 本征半导体是纯净的晶体结构的半导体。 纯净→无杂质晶体结构→稳…...

阅读笔记:TF - IDF 原理
今天查阅 TF-IDF 资料,发现百度百科里面提供了一个例子,解释的很清楚,记下来备用。 原文链接:https://baike.baidu.com/item/tf-idf/8816134?fraladdin 例子:在某个一共有一千词的网页中 “原子能”、“的” 和 “应…...

【C语言】float 关键字
🚩write in front🚩 🔎大家好,我是謓泽,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎 🏅2021|2022年度博客之星物联网与嵌入式开发TOP5|TOP4…...

Linux 网络编程(实现多路IO转接服务器)
1.select函数实现多路IO转接服务器select函数原型:包含在头文件<sys/time.h>,<sys/types.h>和<unistd.h>int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);作用:确定…...

DC-4 靶场学习
信息搜集: 首先获取靶场ip,和之前一样。 arp-scan -l nmap -sP 192.168.28.0/24然后访问。 发现需要登录。 漏洞分析: 直接用bp爆破,爆破出来密码为happy,登录。 发现执行了命令,抓包。 修改命令可以执行ÿ…...

QML组件
一个QML文件定义了一个独立的、顶级的QML组件。 一个QML组件就是一个模板,被QML运行环境解释来创建一个带有一些预定义行为的对象。 一个独立的QML组件可以运行多次来禅城多个对象,每个对象都可以称为该组件的实例。 例子: 在项目中添加一…...

canvas 学习指南
canvas 学习指南 创建一个 canvas <! DOCTYPE html><html xmlns"http://www.w3.org/1999/xhtml"><head><title></title><meta charset"utf-8" /><script type"text/javascript">window.onload fun…...

【华为OD机试2023】开心消消乐 C++
【华为OD机试2023】开心消消乐 C++ 前言 如果您在准备华为的面试,期间有想了解的可以私信我,我会尽可能帮您解答,也可以给您一些建议! 本文解法非最优解(即非性能最优),不能保证通过率。 Tips1:机试为ACM 模式 你的代码需要处理输入输出,input/cin接收输入、print/cou…...

学历?能力?
一个面试官愿意看一张有形的总结报告,还是愿意相信看不到的人品?...