当前位置: 首页 > news >正文

Java版Flink使用指南——自定义无界流生成器

大纲

  • 新建工程
    • 自定义无界流
  • 使用
  • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。

新建工程

我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口

public abstract class RichSourceFunction<OUT> extends AbstractRichFunctionimplements SourceFunction<OUT> {private static final long serialVersionUID = 1L;
}

主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

在run方法中,我们每隔一秒产生一条数据,且这个数字自增。

使用

我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source").setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute("Flink Java API Skeleton");}
}

打包、提交、运行

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用下面命令查看日志输出

tail -f log/*

在这里插入图片描述
然后我们在后台点击Cancel Job
在这里插入图片描述
可以看到输出
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

Java版Flink使用指南——自定义无界流生成器

大纲 新建工程自定义无界流 使用打包、提交、运行工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中&#xff0c;我们让外部组件RabbitMQ充当了无界流的数据源&#xff0c;使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ…...

Vue3框架搭建4:配置说明-eslint配置

配置说明&#xff1a; .eslintrc.cjs&#xff1a; /* eslint-env node */ //node环境&#xff0c;并引入一个模块解析补丁 require(rushstack/eslint-patch/modern-module-resolution)module.exports { //继承其他配置root: true, //跟配置文件&#xff0c;ESLint不会在父目…...

JavaFx+MySql学生管理系统

前言: 上个月学习了javafx和mysql数据库,于是写了一个学生管理系统,因为上个月在复习并且有一些事情,比较忙,所以没有更新博客了,这个项目页面虽然看着有点简陋了,但是大致内容还是比较简单的,于是现在跟大家分享一下我的学生管理系统,希望对这方面有兴趣的同学提供一些帮助 &a…...

Java--抽象类

1.抽象--abstract 2.不能对抽象类进行实例化&#xff0c;也就是不能new这个抽象类 3.抽象类的应用&#xff0c;就是在class前加入abstract这个单词&#xff0c;同理抽象方法也是在void前加入abstract 4.在抽象类中可以写普通方法&#xff0c;但抽象方法只能写在抽象类中 5.…...

26.Labview波形图、XY图、强度图使用精讲

我们如何使用Labview显示曲线或者制作出下面这种我们想要的曲线并随着我们输入值的变化而变化呢&#xff1f; 本文详细讲解一下每种波形图的使用方式&#xff0c;帮助大家深入了解波形图的使用技巧。 文章中所有程序均可在百度网盘下载&#xff0c;下载方式&#xff1a;复制下…...

系统启动 | 安全启动时 “地址线” 被篡改了怎么办?

Hi&#xff0c;你们有没有想过&#xff0c;在咱们启动的过程中&#xff0c;就算是开了安全启动。但是如果在执行最后一条跳转指令时&#xff0c;如果此时改变FLASH或者DDR的地址线&#xff0c;相当于跳转到了非法的地址&#xff0c;那安全启动功能不就丧失了吗&#xff1f; 提到…...

Kafka基础组件图推演

文章目录 1. Controller Broker保障机制 2. 组件架构1. Log Manager2. Replication Manager3. SocketServer4. NetworkServer5. ZKClient 1. Controller Broker Kafka集群中有一个Controller Broker&#xff0c;负责元数据管理和协调。 Kafka使用Zookeeper作为集群元数据的存储…...

k8s中使用cert-manager生成自签名证书

一、安装 cert-manager 注意查看cert-manager和K8S支持的对应版本 我的 k8sv1.28.2&#xff0c;cert-manager v1.12.11 下载 cert-manager.yaml 文件&#xff0c;执行 kubectl apply -f cert-manager.yaml注意:安装成功后如果应用路由使用了Ingress&#xff0c;应用路由不需要…...

处于群晖Docker中的HomeAssistant监控宿主机CPU温度

处于群晖Docker中的HomeAssistant监控宿主机CPU温度 解决方案 在configuration.yaml中添加&#xff1a; sensor: - platform: filename: "Host Temperature" # 可以自定义file_path: /sys/class/hwmon/hwmon0/temp1_inputvalue_template: "{{ int(value)/100…...

STM32串口工作原理

STM32的串口是相当丰富的&#xff0c;功能也很强劲。最多可提供5 路串口&#xff0c;有分数波特率发生器、支持单线光通信和半双工单线通讯、支持LIN、智能卡协议和IrDA SIRENDEC 规范(仅串口3支持)、具有DMA等。 串口最基本的设置&#xff0c;就是波特率的设置。STM32的串口使…...

STM32杂交版(HAL库、音乐盒、闹钟、点阵屏、温湿度)

一、设计描述 本设计精心构建了一个以STM32MP157A高性能单片机为核心控制单元的综合性嵌入式系统。该系统巧妙融合了蜂鸣器、数码管显示器、点阵屏、温湿度传感器、LED指示灯以及按键等多种外设模块&#xff0c;形成了一个功能丰富、操作便捷的杂交版智能设备。通过串口…...

多输入多输出 | Matlab实现Transformer多输入多输出预测

多输入多输出 | Matlab实现Transformer多输入多输出预测 目录 多输入多输出 | Matlab实现Transformer多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 多输入多输出 | Matlab实现Transformer多输入多输出预测&#xff08;完整源码和数据&#xff09; 1.da…...

Linux文件编程(标准C库)

目录 一、标准C库打开/创建文件&#xff0c;读写文件&#xff0c;光标移动 二、标准C库写入结构体到文件 三、其他函数补充 1.fputc函数 2.feof函数和fgetc函数 前面讲到的open函数都是基于linux内核的&#xff0c;也就是说在Windows系统上无法运行&#xff0c;移植性比较…...

生产英特尔CPU处理器繁忙的一天

早晨&#xff1a;准备与检查 7:00 AM - 起床与准备 工厂员工们早早起床&#xff0c;快速洗漱并享用早餐。为了在一天的工作中保持高效&#xff0c;他们会进行一些晨间锻炼&#xff0c;保持头脑清醒和身体活力。 8:00 AM - 到达工厂 员工们到达英特尔的半导体制造工厂&#…...

MVC拦截器、ThreadLocal来进行登录拦截

MVC拦截器、ThreadLocal来进行登录拦截 1. 对登录进行拦截1.1 什么是ThreadLocal1.2 定义UserHolder 类&#xff0c;来封装ThreadLocal方法1.3 拦截器WebMvcConfigurer 的配置1.4 登录的配置&#xff0c;当碰到拦截的方法的时候调用1.5 UserServiceImpl1.6 controller&#xff…...

小程序问题

1.获取节点 wx.createSelectorQuery() wx.createSelectorQuery().in(this) //组件中加in(this)&#xff0c;不然获取不到 2.使用实例 wx.createSelectorQuery().in(this).select(#share).fields({node: true,size: true}).exec(async (res) > {const canvas res[0].node;…...

arm 版的 deb、rpm、AppImage 都有什么区别

qq arm 版的 deb、rpm 和 AppImage 格式之间存在几个关键区别。以下是对这些区别的详细解释&#xff1a; 包管理系统与兼容性&#xff1a; deb&#xff1a;是Debian及其衍生发行版&#xff08;如Ubuntu&#xff09;中使用的软件包格式。这些系统使用dpkg命令来管理deb包&#…...

docker中mysql设置lower_case_table_names配置的坑

前沿 今天在使用flowable流程框架的时候&#xff0c;遇到一个问题。需要配置MySQL数据库以实现表名大小写不敏感。本以为这是一个简单的任务&#xff0c;却耗费了我两个多小时的时间。 docker容器中修改配置&#xff0c;重启不成功 我们前提是容器中的mysql中已经有很多数据…...

python日志记录工具:loguru日志库使用

文章目录 一、使用loguru1、安装2、简单使用3、详细使用4、工具类&#xff08;1&#xff09;logUtil.py&#xff08;2&#xff09;测试类&#xff08;3&#xff09;效果 参考资料 一、使用loguru 1、安装 pip install loguru2、简单使用 from loguru import logger# 打印到文…...

python入门基础知识·二

""" # Python介绍 # Python注释 # 单行注释&#xff1a; # # 多行注释&#xff1a; r """""" # Python输出和输入 # print: 输出 # input: 输入 ①会让程序暂停&#xff0c;②得到的是字符串内容 int(&…...

深度学习中的正则化技术 - 噪声鲁棒性篇

序言 在深度学习的蓬勃发展中&#xff0c;模型的性能与泛化能力成为了研究者们关注的焦点。然而&#xff0c;实际应用中的数据往往伴随着各种噪声&#xff0c;这些噪声不仅来源于数据采集过程中的硬件限制&#xff0c;还可能由环境干扰、传输错误等因素引入。噪声的存在严重影…...

如何通过 Java 来完成 zip 文件与 rar 文件的解压缩?

目录 一、用到的知识点 二、代码展示(分解版) 三、代码展示(整体版) 一、用到的知识点 1.IO流&#xff1a; Input:输入&#xff0c;通过“输入流”进行文件的读取操作 Output:输出&#xff0c;通过“输出流”进行文件的写入操作 2.文件操作相关&#xff1a; File类&#xff…...

C 语言中的联合(Union)的用途是什么?

&#x1f345;关注博主&#x1f397;️ 带你畅游技术世界&#xff0c;不错过每一次成长机会&#xff01; &#x1f4d9;C 语言百万年薪修炼课程 通俗易懂&#xff0c;深入浅出&#xff0c;匠心打磨&#xff0c;死磕细节&#xff0c;6年迭代&#xff0c;看过的人都说好。 文章目…...

汽车电子助力转向系统研究

汽车电子助力转向系统研究 摘要 电子助力转向系统&#xff08;Electric Power Steering&#xff0c;EPS&#xff09;是一种利用电动机辅助驾驶员进行车辆转向的系统。相比于传统的液压助力转向系统&#xff0c;EPS具有更高的效率、精确性和可控性。本文将详细探讨EPS的工作原理…...

大数据学习之 scala基础(补充)

scala基础&#xff1a; hello world: 写scala可运行文件的注意事项1、如果一个scala文件要运行&#xff0c;class要改成object2、如果是class&#xff0c;就仅单纯代表一个类&#xff0c;如果是object代表的是单例对象3、scala语法中&#xff0c;一句话结束不需要加分号4、sca…...

正向传播和反向传播

正向传播&#xff08;Forward Propagation&#xff09; 正向传播是指将输入数据通过神经网络&#xff0c;计算出预测值的过程。具体步骤如下&#xff1a; 输入层&#xff1a;接受输入数据。隐藏层&#xff1a;每个隐藏层中的神经元接收上一层的输出&#xff0c;进行加权求和&…...

前端文件下载的方式

方式一&#xff1a;a标签直接下载 <a href"链接" >下载</a>一个文件链接&#xff08;一般是服务器上的某个文件&#xff09;&#xff0c;这个链接一般地址栏输入是预览&#xff0c;不是附件下载 如果想改成附件下载&#xff0c;以下两种方式任选一个均…...

视图库对接系列(GA-T 1400)十六、视图库对接系列(本级)通知(订阅回调)

说明 之前我们实现了订阅接口,其中有一个receiveAddr参数, 这个就是对应的回调的地址。一般情况下对应的是同一个服务。 我们推荐使用http://xxx:xxx/VIID/SubscribeNotifications接口文档 SubscribeNotificationList对象对象如下: 文档中是xml,但实际上目前使用的都是jso…...

Python | Leetcode Python题解之第230题二叉搜索树中第K小的元素

题目&#xff1a; 题解&#xff1a; class AVL:"""平衡二叉搜索树&#xff08;AVL树&#xff09;&#xff1a;允许重复值"""class Node:"""平衡二叉搜索树结点"""__slots__ ("val", "parent&quo…...

Python酷库之旅-第三方库Pandas(018)

目录 一、用法精讲 44、pandas.crosstab函数 44-1、语法 44-2、参数 44-3、功能 44-4、返回值 44-5、说明 44-6、用法 44-6-1、数据准备 44-6-2、代码示例 44-6-3、结果输出 45、pandas.cut函数 45-1、语法 45-2、参数 45-3、功能 45-4、返回值 45-5、说明 4…...