Java版Flink使用指南——从RabbitMQ中队列中接入消息流
大纲
- 创建RabbitMQ队列
- 新建工程
- 新增依赖
- 编码
- 设置数据源配置
- 读取、处理数据
- 完整代码
- 打包、上传和运行任务
- 测试
- 工程代码
在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。
创建RabbitMQ队列
我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。

后续我们将在后台通过默认交换器,给这个队列新增消息。
新建工程
我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1

新增依赖
在pom.xml中新增RabbitMQ连接器
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>
编码
设置数据源配置
String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());
读取、处理数据
下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。
final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);
完整代码
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}
打包、上传和运行任务




测试
在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq

然后使用下面指令可以看到Flink读取到消息并执行了print方法
tail log/flink-*-taskexecutor-*.out
==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default
工程代码
https://github.com/f304646673/FlinkDemo
相关文章:
Java版Flink使用指南——从RabbitMQ中队列中接入消息流
大纲 创建RabbitMQ队列新建工程新增依赖编码设置数据源配置读取、处理数据完整代码 打包、上传和运行任务测试 工程代码 在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而…...
Python酷库之旅-第三方库Pandas(013)
目录 一、用法精讲 31、pandas.read_feather函数 31-1、语法 31-2、参数 31-3、功能 31-4、返回值 31-5、说明 31-6、用法 31-6-1、数据准备 31-6-2、代码示例 31-6-3、结果输出 32、pandas.DataFrame.to_feather函数 32-1、语法 32-2、参数 32-3、功能 32-4、…...
Linux 高级 Shell 脚本编程:掌握 Shell 脚本精髓,提升工作效率
【Linux】 高级 Shell 脚本编程:掌握 Shell 脚本精髓,提升工作效率 Shell 脚本编程是 Linux 系统管理员和开发人员的必备技能。通过学习高级 Shell 脚本编程,你可以编写更高效、更灵活和更易于维护的脚本。本文将介绍 Shell 脚本编程中的函数…...
【ARMv8/v9 GIC 系列 1.5 -- Enabling the distribution of interrupts】
请阅读【ARM GICv3/v4 实战学习 】 文章目录 Enabling the distribution of interruptsGIC Distributor 中断组分发控制CPU Interface 中断组分发控制Physical LPIs 的启用Summary Enabling the distribution of interrupts 在ARM GICv3和GICv4体系结构中,中断分发…...
《mysql篇》--索引事务
索引 索引的介绍 索引是帮助MySQL高效获取数据的数据结构,是一种特殊的文件,包含着对数据表里所有记录的引用指针,因为索引本身也比较大,所以索引一般是存储在磁盘上的,索引的种类有很多,不过如果没有特殊…...
科研绘图系列:R语言STAMP图(STAMP Plot)
介绍 STAMP图(STAMP plot)并非一个广泛认知的、具有特定名称的图表类型,而是可能指在STAMP(Statistical Analysis of Metagenomic Profiles:“STAMP: statistical analysis of taxonomic and functional profiles”)软件使用过程中生成的各种统计和可视化图表的总称。ST…...
运维团队如何应对动环监控与IT监控分离的挑战
IT与机房动环监控的一体化是当下及未来的必然趋势,这一模式显著节省了运维过程中的时间与成本。一体化平台不仅消除了频繁切换系统的繁琐,更在一个统一界面上实现了多元化的管理运维功能,极大地提升了工作效率。 在机房升级或新建项目中&…...
深入解析大数据核心概念:数据平台、数据中台、数据湖与数据仓库的异同与应用
大数据领域内的诸多概念常常让人困惑,其中数据平台、数据中台、数据湖和数据仓库是最为关键的几个。 1. 数据平台 定义: 数据平台是一个综合性的技术框架,旨在支持整个数据生命周期的管理和使用。它包含数据采集、存储、处理、分析和可视化…...
开发指南040-业务操作日志
平台所有业务操作都存储在核心库,以便统一分析处理。各业务微服务通过feign调用核心日志服务。底层提供了API: <dependency><groupId>org.qlm</groupId><artifactId>qlm-api</artifactId><version>1.0-SNAPSHOT<…...
如何构建数据驱动的企业?爬虫管理平台是关键桥梁吗?
一、数据驱动时代:为何选择爬虫管理平台? 在信息爆炸的今天,数据驱动已成为企业发展的核心战略之一。爬虫管理平台,作为数据采集的第一站,它的重要性不言而喻。这类平台通过自动化手段,从互联网的各个角落…...
多线程Thread
线程Thread简介 任务、线程、金城、多线程 多任务:短时间切换不同得任务 多线程:通过同一条道路,增加道多条道路,提高使用率,解决堵塞问题 普通方法调多线程只有主线一台执行路径是主线程调run()方法,方…...
计算机网络之WPAN 和 WLAN
上一篇文章内容:无线局域网 1.WPAN(无线个人区域网) WPAN 是以个人为中心来使用的无线个人区域网,它实际上就是一个低功率、小范围、低速率和低价格的电缆替代技术。 (1) 蓝牙系统(Bluetooth) &#…...
TikTok海外运营,云手机多种变现方法
从现阶段来看,TikTok 的用户基数不断增长,已然成为全球创业者和品牌的全新竞争舞台。其用户数量近乎 20 亿,年轻用户占据主导,市场渗透率也逐年提高。不管是大型企业、著名品牌,还是个体创业者,都绝不能小觑…...
kubekey在ubuntu24实现kubernetes快速安装
基于Ubunut24.04安装 设置主机名 hostnamectl set-hostname kkmain hostnamectl set-hostname kknode1 hostnamectl set-hostname kknode2关闭swap sudo swapoff -a sudo sed -i s/.*swap.*/#&/ /etc/fstab安装kubekey export KKZONEcn curl -sfL https://get-kk.kubes…...
根据关键词query获取google_img(api方式)
文章目录 说明代码第一部分:链接保存为Json第二部分:链接转换为img 说明 根据关键词query获取google_img USERNAME “xxx” PASSWORD “xxx” 官网申请。 代码 首先获取图片链接,保存为json之后下载。 第一部分:链接保存为…...
西安明德理工学院师生莅临泰迪智能科技开展参观见习活动
为进一步深化校企合作,落实高校应用型人才培养。7月8日,西安明德理工学院与广东泰迪智能科技股份有限公司联合开展学生企业见习活动。西安明德理工学院金融产业学院副院长刘敏、金融学专业负责人张莉萍、金融学专业教师曹艳飞、赵浚妤、泰迪智能科技董事…...
通用机器人里程碑!MIT提出策略组合框架PoCo,解决数据源异构难题,实现机器人多任务灵活执行
18 位人形机器人充当「迎宾」人员,整齐划一向嘉宾挥手,这是 2024 世界人工智能大会上的一个震撼场景,让人们直观感受到了今年机器人的飞速发展。 图源:甲子光年 1954 年,世界上第一台可编程机器人「尤尼梅特」在通用汽…...
基于Java中的SSM框架实现疫情冷链追溯系统项目【项目源码+论文说明】
基于Java中的SSM框架实现疫情冷链追溯系统演示 摘要 近几年随着城镇化发展和居民消费水平的不断提升,人们对健康生活方式的追求意识逐渐加强,生鲜食品逐渐受到大众青睐,诸如盒马鲜生、7-fresh等品牌生鲜超市,一时间如雨后春笋般迅…...
想在vue中预览doxc,excel,pdf文件? vue-office提供包支持
在浩瀚的Vue生态中,vue-office犹如一颗璀璨的星辰,以其独特的魅力照亮了开发者处理多种文件格式的预览之路。这款精心打造的Vue组件库,不仅拥抱了Vue2的经典,也紧密跟随Vue3的步伐,展现了卓越的技术前瞻性和兼容性。它…...
PostgreSQL16安装Mac(brew)
问题 最近需要从MySQL切换到PostgreSQL。我得在本地准备一个PostgreSQL。 步骤 使用brew安装postgresql16: arch -arm64 brew install postgresql16启动postgresql16: brew services start postgresql16配置postgresql环境变量,打开环境变量文件: …...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
论文阅读笔记——Muffin: Testing Deep Learning Libraries via Neural Architecture Fuzzing
Muffin 论文 现有方法 CRADLE 和 LEMON,依赖模型推理阶段输出进行差分测试,但在训练阶段是不可行的,因为训练阶段直到最后才有固定输出,中间过程是不断变化的。API 库覆盖低,因为各个 API 都是在各种具体场景下使用。…...
【LeetCode】算法详解#6 ---除自身以外数组的乘积
1.题目介绍 给定一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O…...
