Java版Flink使用指南——从RabbitMQ中队列中接入消息流
大纲
- 创建RabbitMQ队列
- 新建工程
- 新增依赖
- 编码
- 设置数据源配置
- 读取、处理数据
- 完整代码
- 打包、上传和运行任务
- 测试
- 工程代码
在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。
创建RabbitMQ队列
我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。

后续我们将在后台通过默认交换器,给这个队列新增消息。
新建工程
我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1

新增依赖
在pom.xml中新增RabbitMQ连接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
编码
设置数据源配置
String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());
读取、处理数据
下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。
final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);
完整代码
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}
打包、上传和运行任务




测试
在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq

然后使用下面指令可以看到Flink读取到消息并执行了print方法
tail log/flink-*-taskexecutor-*.out
==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default
工程代码
https://github.com/f304646673/FlinkDemo
相关文章:
Java版Flink使用指南——从RabbitMQ中队列中接入消息流
大纲 创建RabbitMQ队列新建工程新增依赖编码设置数据源配置读取、处理数据完整代码 打包、上传和运行任务测试 工程代码 在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而…...
Python酷库之旅-第三方库Pandas(013)
目录 一、用法精讲 31、pandas.read_feather函数 31-1、语法 31-2、参数 31-3、功能 31-4、返回值 31-5、说明 31-6、用法 31-6-1、数据准备 31-6-2、代码示例 31-6-3、结果输出 32、pandas.DataFrame.to_feather函数 32-1、语法 32-2、参数 32-3、功能 32-4、…...
Linux 高级 Shell 脚本编程:掌握 Shell 脚本精髓,提升工作效率
【Linux】 高级 Shell 脚本编程:掌握 Shell 脚本精髓,提升工作效率 Shell 脚本编程是 Linux 系统管理员和开发人员的必备技能。通过学习高级 Shell 脚本编程,你可以编写更高效、更灵活和更易于维护的脚本。本文将介绍 Shell 脚本编程中的函数…...
【ARMv8/v9 GIC 系列 1.5 -- Enabling the distribution of interrupts】
请阅读【ARM GICv3/v4 实战学习 】 文章目录 Enabling the distribution of interruptsGIC Distributor 中断组分发控制CPU Interface 中断组分发控制Physical LPIs 的启用Summary Enabling the distribution of interrupts 在ARM GICv3和GICv4体系结构中,中断分发…...
《mysql篇》--索引事务
索引 索引的介绍 索引是帮助MySQL高效获取数据的数据结构,是一种特殊的文件,包含着对数据表里所有记录的引用指针,因为索引本身也比较大,所以索引一般是存储在磁盘上的,索引的种类有很多,不过如果没有特殊…...
科研绘图系列:R语言STAMP图(STAMP Plot)
介绍 STAMP图(STAMP plot)并非一个广泛认知的、具有特定名称的图表类型,而是可能指在STAMP(Statistical Analysis of Metagenomic Profiles:“STAMP: statistical analysis of taxonomic and functional profiles”)软件使用过程中生成的各种统计和可视化图表的总称。ST…...
运维团队如何应对动环监控与IT监控分离的挑战
IT与机房动环监控的一体化是当下及未来的必然趋势,这一模式显著节省了运维过程中的时间与成本。一体化平台不仅消除了频繁切换系统的繁琐,更在一个统一界面上实现了多元化的管理运维功能,极大地提升了工作效率。 在机房升级或新建项目中&…...
深入解析大数据核心概念:数据平台、数据中台、数据湖与数据仓库的异同与应用
大数据领域内的诸多概念常常让人困惑,其中数据平台、数据中台、数据湖和数据仓库是最为关键的几个。 1. 数据平台 定义: 数据平台是一个综合性的技术框架,旨在支持整个数据生命周期的管理和使用。它包含数据采集、存储、处理、分析和可视化…...
开发指南040-业务操作日志
平台所有业务操作都存储在核心库,以便统一分析处理。各业务微服务通过feign调用核心日志服务。底层提供了API: <dependency><groupId>org.qlm</groupId><artifactId>qlm-api</artifactId><version>1.0-SNAPSHOT<…...
如何构建数据驱动的企业?爬虫管理平台是关键桥梁吗?
一、数据驱动时代:为何选择爬虫管理平台? 在信息爆炸的今天,数据驱动已成为企业发展的核心战略之一。爬虫管理平台,作为数据采集的第一站,它的重要性不言而喻。这类平台通过自动化手段,从互联网的各个角落…...
多线程Thread
线程Thread简介 任务、线程、金城、多线程 多任务:短时间切换不同得任务 多线程:通过同一条道路,增加道多条道路,提高使用率,解决堵塞问题 普通方法调多线程只有主线一台执行路径是主线程调run()方法,方…...
计算机网络之WPAN 和 WLAN
上一篇文章内容:无线局域网 1.WPAN(无线个人区域网) WPAN 是以个人为中心来使用的无线个人区域网,它实际上就是一个低功率、小范围、低速率和低价格的电缆替代技术。 (1) 蓝牙系统(Bluetooth) &#…...
TikTok海外运营,云手机多种变现方法
从现阶段来看,TikTok 的用户基数不断增长,已然成为全球创业者和品牌的全新竞争舞台。其用户数量近乎 20 亿,年轻用户占据主导,市场渗透率也逐年提高。不管是大型企业、著名品牌,还是个体创业者,都绝不能小觑…...
kubekey在ubuntu24实现kubernetes快速安装
基于Ubunut24.04安装 设置主机名 hostnamectl set-hostname kkmain hostnamectl set-hostname kknode1 hostnamectl set-hostname kknode2关闭swap sudo swapoff -a sudo sed -i s/.*swap.*/#&/ /etc/fstab安装kubekey export KKZONEcn curl -sfL https://get-kk.kubes…...
根据关键词query获取google_img(api方式)
文章目录 说明代码第一部分:链接保存为Json第二部分:链接转换为img 说明 根据关键词query获取google_img USERNAME “xxx” PASSWORD “xxx” 官网申请。 代码 首先获取图片链接,保存为json之后下载。 第一部分:链接保存为…...
西安明德理工学院师生莅临泰迪智能科技开展参观见习活动
为进一步深化校企合作,落实高校应用型人才培养。7月8日,西安明德理工学院与广东泰迪智能科技股份有限公司联合开展学生企业见习活动。西安明德理工学院金融产业学院副院长刘敏、金融学专业负责人张莉萍、金融学专业教师曹艳飞、赵浚妤、泰迪智能科技董事…...
通用机器人里程碑!MIT提出策略组合框架PoCo,解决数据源异构难题,实现机器人多任务灵活执行
18 位人形机器人充当「迎宾」人员,整齐划一向嘉宾挥手,这是 2024 世界人工智能大会上的一个震撼场景,让人们直观感受到了今年机器人的飞速发展。 图源:甲子光年 1954 年,世界上第一台可编程机器人「尤尼梅特」在通用汽…...
基于Java中的SSM框架实现疫情冷链追溯系统项目【项目源码+论文说明】
基于Java中的SSM框架实现疫情冷链追溯系统演示 摘要 近几年随着城镇化发展和居民消费水平的不断提升,人们对健康生活方式的追求意识逐渐加强,生鲜食品逐渐受到大众青睐,诸如盒马鲜生、7-fresh等品牌生鲜超市,一时间如雨后春笋般迅…...
想在vue中预览doxc,excel,pdf文件? vue-office提供包支持
在浩瀚的Vue生态中,vue-office犹如一颗璀璨的星辰,以其独特的魅力照亮了开发者处理多种文件格式的预览之路。这款精心打造的Vue组件库,不仅拥抱了Vue2的经典,也紧密跟随Vue3的步伐,展现了卓越的技术前瞻性和兼容性。它…...
PostgreSQL16安装Mac(brew)
问题 最近需要从MySQL切换到PostgreSQL。我得在本地准备一个PostgreSQL。 步骤 使用brew安装postgresql16: arch -arm64 brew install postgresql16启动postgresql16: brew services start postgresql16配置postgresql环境变量,打开环境变量文件: …...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
基于IDIG-GAN的小样本电机轴承故障诊断
目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) 梯度归一化(Gradient Normalization) (2) 判别器梯度间隙正则化(Discriminator Gradient Gap Regularization) (3) 自注意力机制(Self-Attention) 3. 完整损失函数 二…...
scikit-learn机器学习
# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...
Caliper 负载(Workload)详细解析
Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...
Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storms…...
日常一水C
多态 言简意赅:就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过,当子类和父类的函数名相同时,会隐藏父类的同名函数转而调用子类的同名函数,如果要调用父类的同名函数,那么就需要对父类进行引用&#…...
