当前位置: 首页 > news >正文

Java版Flink使用指南——将消息写入到RabbitMQ的队列中

大纲

  • 新建工程
    • 新增依赖
  • 编码
    • 自动产生数据
    • 写入RabbitMQ
  • 测试
  • 工程代码

在 《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们介绍了如何使用Java在Flink中读取RabbitMQ中的数据,并将其写入日志中。本文将通过代码产生一些数据,然后将它们写入到另外一个RabbitMQ队列中。

新建工程

我们在IntelliJ中新建一个工程SinkToRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

编码

自动产生数据

这段代码将产生两个字符串数据,后续这些数据会被写入到RabbitMQ的队列中。

List<String> data = new ArrayList<>();
data.add("Hello, World!");
data.add("Hello, Flink!");
DataStream<String> stream = env.fromCollection(data);

写入RabbitMQ

不同于《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》中创建RMQSource用来接收RabbitMQ队列中数据,这次我们创建RMQSink用来发布数据。

String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSink<String> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SimpleStringSchema());
stream.addSink(stringRMQSink).name(username + "'s sink to " + sinkQueueName).setParallelism(parallelism);	

测试

打包、提交并运行任务
在这里插入图片描述
然后在RabbitMQ的后台可以看到收到两条消息
在这里插入图片描述
其内容也是我们之前在代码中生成的内容
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

Java版Flink使用指南——将消息写入到RabbitMQ的队列中

大纲 新建工程新增依赖 编码自动产生数据写入RabbitMQ 测试工程代码 在 《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中&#xff0c;我们介绍了如何使用Java在Flink中读取RabbitMQ中的数据&#xff0c;并将其写入日志中。本文将通过代码产生一些数据&#xf…...

python excel openpyxl

python excel LTS 在开始之前&#xff0c;确保已经安装了 Python 和所需的库。 主要使用以下库&#xff1a; openpyxl&#xff1a;用于读取和写入 Excel 文件。 pandas&#xff1a;用于数据处理和分析。 xlwings&#xff1a;用于将 Python 与 Excel 连接&#xff0c;实现双向…...

C++八股(一)

目录 一、new和malloc ⭐ 二、class和struct的区别 ⭐ 三、char和int之间的转换 四、什么是野指针和悬挂指针 ⭐ 五、NULL和nullptr区别⭐ 六、指针常量和常量指针有何区别⭐ 七、物理内存和虚拟内存的区别⭐ 八、重载、重写和隐藏的区别⭐ 九、简述面向对象(OOP)的…...

【Git的基本操作】版本回退 | 撤销修改的三种情况 | 删除文件

目录 5.版本回退 5.1选项hard&后悔药 5.2后悔药&commit id 5.3版本回退的原理 6.撤销修改 6.1情况一 6.2情况二 6.3情况三 ​7.删除文件 Git重要能力之一马&#xff0c;版本回退功能。Git是版本控制系统&#xff0c;能够管理文件历史版本。本篇以ReadMe文件为…...

STM32-I2C

本内容基于江协科技STM32视频学习之后整理而得。 文章目录 1. I2C通信1.1 I2C通信简介1.2 硬件电路1.3 I2C时序基本单元1.3.1 起始条件和终止条件1.3.2 发送一个字节1.3.3 接收一个字节1.3.4 发送应答和接收应答 1.4 I2C时序1.4.1 指定地址写1.4.2 当前地址读1.4.3 指定地址读…...

04.ffmpeg打印音视频媒体信息

目录 1、相关头文件 2、相关结构体 3、相关函数 4、函数详解 5、源码附上 1、相关头文件 #include <libavformat/avformat.h> 包含格式相关的函数和数据结构 #include <libavutil/avutil.h> 包含一些通用实用函数 2、相关结构体 AV…...

微信开发授权登录梳理总结

授权登录流程对比 微信公众号/网页 微信文档地址&#xff1a;https://developers.weixin.qq.com/doc/offiaccount/OA_Web_Apps/Wechat_webpage_authorization.html 流程图如下&#xff1a; 特殊说明&#xff1a; 步骤1拼接的微信地址是&#xff1a;https://open.weixin.qq…...

HTML5实现我的音乐网站源码

文章目录 作者&#xff1a;[xcLeigh](https://blog.csdn.net/weixin_43151418) 1.设计来源1.1 界面效果1.2 轮播图界面1.3 音乐播放界面1.4 视频播放界面 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板&#xff0c;程序开发&#xff0c;在线开发&#xff0c;在线沟通 作…...

UNI_App平台调试指南 debug(十五)

App平台调试指南 debug 常规开发里,在 HBuilderX 的运行菜单里运行 App,手机端的错误或 console.log 日志信息会直接打印到控制台。 如果需要更多功能,比如审查元素、打断点 debug,则需要启动调试模式。自 HBuilderX 2.0.3+ 版本起开始支持 App 端的调试。 #打开调试窗口…...

LLM之RAG实战(四十一)| 使用LLamaIndex和Gemini构建高级搜索引擎

Retriever 是 RAG&#xff08;Retrieval Augmented Generation&#xff09;管道中最重要的部分。在本文中&#xff0c;我们将使用 LlamaIndex 实现一个结合关键字和向量搜索检索器的自定义检索器&#xff0c;并且使用 Gemini大模型来进行多个文档聊天。 通过本文&#xff0c;我…...

【错题集-编程题】AOE还是单体?(贪心)

牛客对应链接&#xff1a;AOE还是单体&#xff1f; (nowcoder.com) 一、分析题目 如果使用一次 AOE 造成的伤害比消耗的蓝量多&#xff0c;那就使用。否则就一直使用单体伤害。 二、代码 //值得学习的代码 #include <iostream> #include <algorithm>using namespa…...

怎么办?我的C盘又爆红了!别慌!博主手把手带你管理你的C盘空间~

怎么办&#xff1f;我的C盘又爆红了&#xff01;别慌&#xff01;博主手把手带你管理你的C盘空间~ 文章目录 怎么办&#xff1f;我的C盘又爆红了&#xff01;别慌&#xff01;博主手把手带你管理你的C盘空间~0. 在开始清理之前1. 推荐执行的操作1.1 清理系统缓存文件1.2 磁盘清…...

react启用mobx @decorators装饰器语法

react如果没有经过配置&#xff0c;直接使用decorators装饰器语法会报错&#xff1a; Support for the experimental syntax ‘decorators’ isn’t currently enabled 因为react默认是不支持装饰器语法&#xff0c;需要做一些配置来启用装饰器语法。 step1: 在 tsconfig.js…...

计算机如何学习

1. 不要只盯着计算机语言学习&#xff0c;你现在已经学习了C语言和Java&#xff0c;暑假又规划学习Python&#xff0c;最后你掌握的就是计算机语言包而已。 2. 建议你找一门想要深挖的语言&#xff0c;沿着这个方向继续往后学习知识就行。计算机语言是学不完的&#xff0c;而未…...

【Python 基础】函数 - 1

函数 从前面的章节中,你已经熟悉了 print()、input()和 len()函数。Python 提供了这样一些内建函数,但你也可以编写自己的函数。“函数”就像一个程序内的小程序。 为了更好地理解函数的工作原理,让我们来创建一个 函 数 。 在 文 件 编 辑器 中 输 入 下 面 的 程 序 , …...

从0到1开发一个Vue3的新手引导组件(附带遇到的问题以及解决方式)

1. 前言: 新手引导组件,顾名思义,就是强制性的要求第一次使用的用户跟随引导使用应用,可以让一些第一次使用系统的新手快速上手,正好我最近也遇到了这个需求,于是就想着开发一个通用组件拿出来使用(写完之后才发现element就有,后悔了哈哈哈&#x1f62d;&#x1f62d;) 示例图…...

概率统计(二)

二维离散型 联合分布律 样本总数为16是因为&#xff0c;两封信分别可以放在4个信箱 边缘分布律 条件分布律 独立性 选填才能用秒杀 联合概率乘积不等于边缘概率的乘积则不独立 二维连续型 区间用一重积分面积用二重积分 离散型随机变量 常见6个分布的期望和方差 离散型随机变…...

文件类:如何将excel文件转为csv文件(且保留时间格式)?

最近有个场景&#xff0c;在ftp服务器上&#xff0c;读取csv文件并入库&#xff0c;但是客户提供的一部分文件却是xls文件&#xff0c;就得搞个将excel转为csv文件的方法&#xff0c;话不多说直接开干。 方法 public static void convertExcelToCSV(String excelFilePath, Str…...

FiddlerScript Rules修改-更改发包中的cookie

直接在fiddler script editor中增加如下处理代码即可 推荐文档oSession -- 参数说明 测试笔记 看云...

直升机停机坪的H代表什么

可为什么直升机的停机坪为什么要用“H”来表示呢&#xff1f; Helicopter 直升机停机坪的“H”来自直升机的英文Helicopter的首字母&#xff0c;也是停机坪的识别标志&#xff0c;表示可用于直升机的垂直起降&#xff0c;方便于直升机飞行员在空中能快速识别降落位置。 另外…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局&#xff1a;PCB行业的时代之问 在数字经济蓬勃发展的浪潮中&#xff0c;PCB&#xff08;印制电路板&#xff09;作为 “电子产品之母”&#xff0c;其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透&#xff0c;PCB行业面临着前所未有的挑战与机遇。产品迭代…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

学校招生小程序源码介绍

基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码&#xff0c;专为学校招生场景量身打造&#xff0c;功能实用且操作便捷。 从技术架构来看&#xff0c;ThinkPHP提供稳定可靠的后台服务&#xff0c;FastAdmin加速开发流程&#xff0c;UniApp则保障小程序在多端有良好的兼…...

MMaDA: Multimodal Large Diffusion Language Models

CODE &#xff1a; https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA&#xff0c;它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构&#xf…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

Java多线程实现之Thread类深度解析

Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

Mysql中select查询语句的执行过程

目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析&#xff08;Parser&#xff09; 2.4、执行sql 1. 预处理&#xff08;Preprocessor&#xff09; 2. 查询优化器&#xff08;Optimizer&#xff09; 3. 执行器…...

【笔记】WSL 中 Rust 安装与测试完整记录

#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统&#xff1a;Ubuntu 24.04 LTS (WSL2)架构&#xff1a;x86_64 (GNU/Linux)Rust 版本&#xff1a;rustc 1.87.0 (2025-05-09)Cargo 版本&#xff1a;cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...