当前位置: 首页 > news >正文

Java版Flink使用指南——自定义无界流生成器

大纲

  • 新建工程
    • 自定义无界流
  • 使用
  • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。

新建工程

我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口

public abstract class RichSourceFunction<OUT> extends AbstractRichFunctionimplements SourceFunction<OUT> {private static final long serialVersionUID = 1L;
}

主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

在run方法中,我们每隔一秒产生一条数据,且这个数字自增。

使用

我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source").setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute("Flink Java API Skeleton");}
}

打包、提交、运行

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用下面命令查看日志输出

tail -f log/*

在这里插入图片描述
然后我们在后台点击Cancel Job
在这里插入图片描述
可以看到输出
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

Java版Flink使用指南——自定义无界流生成器

大纲 新建工程自定义无界流 使用打包、提交、运行工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中&#xff0c;我们让外部组件RabbitMQ充当了无界流的数据源&#xff0c;使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ…...

Vue3框架搭建4:配置说明-eslint配置

配置说明&#xff1a; .eslintrc.cjs&#xff1a; /* eslint-env node */ //node环境&#xff0c;并引入一个模块解析补丁 require(rushstack/eslint-patch/modern-module-resolution)module.exports { //继承其他配置root: true, //跟配置文件&#xff0c;ESLint不会在父目…...

JavaFx+MySql学生管理系统

前言: 上个月学习了javafx和mysql数据库,于是写了一个学生管理系统,因为上个月在复习并且有一些事情,比较忙,所以没有更新博客了,这个项目页面虽然看着有点简陋了,但是大致内容还是比较简单的,于是现在跟大家分享一下我的学生管理系统,希望对这方面有兴趣的同学提供一些帮助 &a…...

Java--抽象类

1.抽象--abstract 2.不能对抽象类进行实例化&#xff0c;也就是不能new这个抽象类 3.抽象类的应用&#xff0c;就是在class前加入abstract这个单词&#xff0c;同理抽象方法也是在void前加入abstract 4.在抽象类中可以写普通方法&#xff0c;但抽象方法只能写在抽象类中 5.…...

26.Labview波形图、XY图、强度图使用精讲

我们如何使用Labview显示曲线或者制作出下面这种我们想要的曲线并随着我们输入值的变化而变化呢&#xff1f; 本文详细讲解一下每种波形图的使用方式&#xff0c;帮助大家深入了解波形图的使用技巧。 文章中所有程序均可在百度网盘下载&#xff0c;下载方式&#xff1a;复制下…...

系统启动 | 安全启动时 “地址线” 被篡改了怎么办?

Hi&#xff0c;你们有没有想过&#xff0c;在咱们启动的过程中&#xff0c;就算是开了安全启动。但是如果在执行最后一条跳转指令时&#xff0c;如果此时改变FLASH或者DDR的地址线&#xff0c;相当于跳转到了非法的地址&#xff0c;那安全启动功能不就丧失了吗&#xff1f; 提到…...

Kafka基础组件图推演

文章目录 1. Controller Broker保障机制 2. 组件架构1. Log Manager2. Replication Manager3. SocketServer4. NetworkServer5. ZKClient 1. Controller Broker Kafka集群中有一个Controller Broker&#xff0c;负责元数据管理和协调。 Kafka使用Zookeeper作为集群元数据的存储…...

k8s中使用cert-manager生成自签名证书

一、安装 cert-manager 注意查看cert-manager和K8S支持的对应版本 我的 k8sv1.28.2&#xff0c;cert-manager v1.12.11 下载 cert-manager.yaml 文件&#xff0c;执行 kubectl apply -f cert-manager.yaml注意:安装成功后如果应用路由使用了Ingress&#xff0c;应用路由不需要…...

处于群晖Docker中的HomeAssistant监控宿主机CPU温度

处于群晖Docker中的HomeAssistant监控宿主机CPU温度 解决方案 在configuration.yaml中添加&#xff1a; sensor: - platform: filename: "Host Temperature" # 可以自定义file_path: /sys/class/hwmon/hwmon0/temp1_inputvalue_template: "{{ int(value)/100…...

STM32串口工作原理

STM32的串口是相当丰富的&#xff0c;功能也很强劲。最多可提供5 路串口&#xff0c;有分数波特率发生器、支持单线光通信和半双工单线通讯、支持LIN、智能卡协议和IrDA SIRENDEC 规范(仅串口3支持)、具有DMA等。 串口最基本的设置&#xff0c;就是波特率的设置。STM32的串口使…...

STM32杂交版(HAL库、音乐盒、闹钟、点阵屏、温湿度)

一、设计描述 本设计精心构建了一个以STM32MP157A高性能单片机为核心控制单元的综合性嵌入式系统。该系统巧妙融合了蜂鸣器、数码管显示器、点阵屏、温湿度传感器、LED指示灯以及按键等多种外设模块&#xff0c;形成了一个功能丰富、操作便捷的杂交版智能设备。通过串口…...

多输入多输出 | Matlab实现Transformer多输入多输出预测

多输入多输出 | Matlab实现Transformer多输入多输出预测 目录 多输入多输出 | Matlab实现Transformer多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 多输入多输出 | Matlab实现Transformer多输入多输出预测&#xff08;完整源码和数据&#xff09; 1.da…...

Linux文件编程(标准C库)

目录 一、标准C库打开/创建文件&#xff0c;读写文件&#xff0c;光标移动 二、标准C库写入结构体到文件 三、其他函数补充 1.fputc函数 2.feof函数和fgetc函数 前面讲到的open函数都是基于linux内核的&#xff0c;也就是说在Windows系统上无法运行&#xff0c;移植性比较…...

生产英特尔CPU处理器繁忙的一天

早晨&#xff1a;准备与检查 7:00 AM - 起床与准备 工厂员工们早早起床&#xff0c;快速洗漱并享用早餐。为了在一天的工作中保持高效&#xff0c;他们会进行一些晨间锻炼&#xff0c;保持头脑清醒和身体活力。 8:00 AM - 到达工厂 员工们到达英特尔的半导体制造工厂&#…...

MVC拦截器、ThreadLocal来进行登录拦截

MVC拦截器、ThreadLocal来进行登录拦截 1. 对登录进行拦截1.1 什么是ThreadLocal1.2 定义UserHolder 类&#xff0c;来封装ThreadLocal方法1.3 拦截器WebMvcConfigurer 的配置1.4 登录的配置&#xff0c;当碰到拦截的方法的时候调用1.5 UserServiceImpl1.6 controller&#xff…...

小程序问题

1.获取节点 wx.createSelectorQuery() wx.createSelectorQuery().in(this) //组件中加in(this)&#xff0c;不然获取不到 2.使用实例 wx.createSelectorQuery().in(this).select(#share).fields({node: true,size: true}).exec(async (res) > {const canvas res[0].node;…...

arm 版的 deb、rpm、AppImage 都有什么区别

qq arm 版的 deb、rpm 和 AppImage 格式之间存在几个关键区别。以下是对这些区别的详细解释&#xff1a; 包管理系统与兼容性&#xff1a; deb&#xff1a;是Debian及其衍生发行版&#xff08;如Ubuntu&#xff09;中使用的软件包格式。这些系统使用dpkg命令来管理deb包&#…...

docker中mysql设置lower_case_table_names配置的坑

前沿 今天在使用flowable流程框架的时候&#xff0c;遇到一个问题。需要配置MySQL数据库以实现表名大小写不敏感。本以为这是一个简单的任务&#xff0c;却耗费了我两个多小时的时间。 docker容器中修改配置&#xff0c;重启不成功 我们前提是容器中的mysql中已经有很多数据…...

python日志记录工具:loguru日志库使用

文章目录 一、使用loguru1、安装2、简单使用3、详细使用4、工具类&#xff08;1&#xff09;logUtil.py&#xff08;2&#xff09;测试类&#xff08;3&#xff09;效果 参考资料 一、使用loguru 1、安装 pip install loguru2、简单使用 from loguru import logger# 打印到文…...

python入门基础知识·二

""" # Python介绍 # Python注释 # 单行注释&#xff1a; # # 多行注释&#xff1a; r """""" # Python输出和输入 # print: 输出 # input: 输入 ①会让程序暂停&#xff0c;②得到的是字符串内容 int(&…...

数字孪生赋能设备预测性维护:构建工业设备全生命周期智能运维新模式

在智能制造加速推进的今天&#xff0c;工业设备作为生产体系的核心资产&#xff0c;其稳定运行直接决定着企业的生产效率、产品质量与经济效益。但据行业统计&#xff0c;全球制造业每年因设备非计划停机造成的损失超过 5000 亿美元&#xff0c;单台关键设备每分钟停机损失可达…...

如何在浏览器中直接查看SQLite数据库文件?WebAssembly技术带来的零安装解决方案

如何在浏览器中直接查看SQLite数据库文件&#xff1f;WebAssembly技术带来的零安装解决方案 【免费下载链接】sqlite-viewer View SQLite file online 项目地址: https://gitcode.com/gh_mirrors/sq/sqlite-viewer 你是否曾经需要快速查看一个SQLite数据库文件&#xff…...

值得收藏的27个Linux文档编辑命令

Linux col命令Linux col命令用于过滤控制字符。在许多UNIX说明文件里&#xff0c;都有RLF控制字符。当我们运用shell特殊字符">"和">>"&#xff0c;把说明文件的内容输出成纯文本文件时&#xff0c;控制字符会变成乱码&#xff0c;col指令则能有效…...

STM32H7 QSPI Flash程序调试全攻略:从MDK配置到单步调试,解决‘算法加载失败’的常见问题

STM32H7 QSPI Flash程序调试实战&#xff1a;破解算法加载失败的终极指南 当你第一次看到MDK弹窗提示"Download Algorithm Failed"时&#xff0c;那种挫败感我深有体会。作为使用STM32H7系列开发过多个量产项目的工程师&#xff0c;我曾在QSPI Flash调试过程中踩过所…...

WordPress靶场构建指南:从渗透测试流程到GetShell实战

1. 为什么这个靶场不是“玩具”&#xff0c;而是渗透测试能力的试金石WordPress靶场搭建这件事&#xff0c;圈内很多人第一反应是&#xff1a;“不就是下个DVWA或者bWAPP&#xff1f;点几下就完事。”但真正带过红队新人、做过甲方渗透评估的同行都清楚&#xff1a;一个能支撑从…...

Android动态调试实战:密钥提取四步法与JEB深度用法

1. 这不是“破解游戏”&#xff0c;而是一场对Android应用安全边界的系统性测绘你有没有遇到过这样的情况&#xff1a;一个内部工具APK&#xff0c;文档里写着“密钥已硬编码在so中”&#xff0c;但反编译Java层完全找不到明文&#xff1b;或者某SDK的初始化方法里反复调用getS…...

如何用开源自动化工具告别抢票焦虑:大麦自动抢票系统完全指南

如何用开源自动化工具告别抢票焦虑&#xff1a;大麦自动抢票系统完全指南 【免费下载链接】ticket-purchase 大麦自动抢票&#xff0c;支持人员、城市、日期场次、价格选择 项目地址: https://gitcode.com/GitHub_Trending/ti/ticket-purchase 还在为心仪演唱会门票秒空…...

GitLab CVE-2025-1477:URI编码绕过身份验证的应急防护指南

1. 这个漏洞不是“修个补丁就完事”的普通问题GitLab 安全漏洞 CVE-2025-1477&#xff0c;光看编号容易误以为是又一个常规的权限绕过或信息泄露类CVE——毕竟GitLab每年披露几十个中低危漏洞&#xff0c;运维同学看到CVE编号第一反应往往是查CVSS评分、翻官方通告、打补丁、走…...

技术人准备英文面试:除了刷题,这五个表达习惯更关键

许多软件测试工程师在准备英文面试时&#xff0c;往往会陷入一个误区&#xff1a;将大量时间花在背诵专业术语&#xff08;如“Equivalence Partitioning”、“Regression Testing”&#xff09;&#xff0c;或者在技术问答环节机械地复述测试用例的设计逻辑。诚然&#xff0c;…...

ChatGPT生成的SQL注入漏洞代码竟通过了87%静态扫描器?安全团队紧急避坑指南(含检测脚本)

更多请点击&#xff1a; https://codechina.net 第一章&#xff1a;ChatGPT生成的SQL注入漏洞代码竟通过了87%静态扫描器&#xff1f;安全团队紧急避坑指南&#xff08;含检测脚本&#xff09; 近期&#xff0c;某金融企业安全团队在代码审计中发现&#xff0c;一段由ChatGPT生…...