当前位置: 首页 > news >正文

Java版Flink使用指南——自定义无界流生成器

大纲

  • 新建工程
    • 自定义无界流
  • 使用
  • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。

新建工程

我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口

public abstract class RichSourceFunction<OUT> extends AbstractRichFunctionimplements SourceFunction<OUT> {private static final long serialVersionUID = 1L;
}

主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

在run方法中,我们每隔一秒产生一条数据,且这个数字自增。

使用

我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source").setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute("Flink Java API Skeleton");}
}

打包、提交、运行

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用下面命令查看日志输出

tail -f log/*

在这里插入图片描述
然后我们在后台点击Cancel Job
在这里插入图片描述
可以看到输出
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

Java版Flink使用指南——自定义无界流生成器

大纲 新建工程自定义无界流 使用打包、提交、运行工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中&#xff0c;我们让外部组件RabbitMQ充当了无界流的数据源&#xff0c;使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ…...

Vue3框架搭建4:配置说明-eslint配置

配置说明&#xff1a; .eslintrc.cjs&#xff1a; /* eslint-env node */ //node环境&#xff0c;并引入一个模块解析补丁 require(rushstack/eslint-patch/modern-module-resolution)module.exports { //继承其他配置root: true, //跟配置文件&#xff0c;ESLint不会在父目…...

JavaFx+MySql学生管理系统

前言: 上个月学习了javafx和mysql数据库,于是写了一个学生管理系统,因为上个月在复习并且有一些事情,比较忙,所以没有更新博客了,这个项目页面虽然看着有点简陋了,但是大致内容还是比较简单的,于是现在跟大家分享一下我的学生管理系统,希望对这方面有兴趣的同学提供一些帮助 &a…...

Java--抽象类

1.抽象--abstract 2.不能对抽象类进行实例化&#xff0c;也就是不能new这个抽象类 3.抽象类的应用&#xff0c;就是在class前加入abstract这个单词&#xff0c;同理抽象方法也是在void前加入abstract 4.在抽象类中可以写普通方法&#xff0c;但抽象方法只能写在抽象类中 5.…...

26.Labview波形图、XY图、强度图使用精讲

我们如何使用Labview显示曲线或者制作出下面这种我们想要的曲线并随着我们输入值的变化而变化呢&#xff1f; 本文详细讲解一下每种波形图的使用方式&#xff0c;帮助大家深入了解波形图的使用技巧。 文章中所有程序均可在百度网盘下载&#xff0c;下载方式&#xff1a;复制下…...

系统启动 | 安全启动时 “地址线” 被篡改了怎么办?

Hi&#xff0c;你们有没有想过&#xff0c;在咱们启动的过程中&#xff0c;就算是开了安全启动。但是如果在执行最后一条跳转指令时&#xff0c;如果此时改变FLASH或者DDR的地址线&#xff0c;相当于跳转到了非法的地址&#xff0c;那安全启动功能不就丧失了吗&#xff1f; 提到…...

Kafka基础组件图推演

文章目录 1. Controller Broker保障机制 2. 组件架构1. Log Manager2. Replication Manager3. SocketServer4. NetworkServer5. ZKClient 1. Controller Broker Kafka集群中有一个Controller Broker&#xff0c;负责元数据管理和协调。 Kafka使用Zookeeper作为集群元数据的存储…...

k8s中使用cert-manager生成自签名证书

一、安装 cert-manager 注意查看cert-manager和K8S支持的对应版本 我的 k8sv1.28.2&#xff0c;cert-manager v1.12.11 下载 cert-manager.yaml 文件&#xff0c;执行 kubectl apply -f cert-manager.yaml注意:安装成功后如果应用路由使用了Ingress&#xff0c;应用路由不需要…...

处于群晖Docker中的HomeAssistant监控宿主机CPU温度

处于群晖Docker中的HomeAssistant监控宿主机CPU温度 解决方案 在configuration.yaml中添加&#xff1a; sensor: - platform: filename: "Host Temperature" # 可以自定义file_path: /sys/class/hwmon/hwmon0/temp1_inputvalue_template: "{{ int(value)/100…...

STM32串口工作原理

STM32的串口是相当丰富的&#xff0c;功能也很强劲。最多可提供5 路串口&#xff0c;有分数波特率发生器、支持单线光通信和半双工单线通讯、支持LIN、智能卡协议和IrDA SIRENDEC 规范(仅串口3支持)、具有DMA等。 串口最基本的设置&#xff0c;就是波特率的设置。STM32的串口使…...

STM32杂交版(HAL库、音乐盒、闹钟、点阵屏、温湿度)

一、设计描述 本设计精心构建了一个以STM32MP157A高性能单片机为核心控制单元的综合性嵌入式系统。该系统巧妙融合了蜂鸣器、数码管显示器、点阵屏、温湿度传感器、LED指示灯以及按键等多种外设模块&#xff0c;形成了一个功能丰富、操作便捷的杂交版智能设备。通过串口…...

多输入多输出 | Matlab实现Transformer多输入多输出预测

多输入多输出 | Matlab实现Transformer多输入多输出预测 目录 多输入多输出 | Matlab实现Transformer多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 多输入多输出 | Matlab实现Transformer多输入多输出预测&#xff08;完整源码和数据&#xff09; 1.da…...

Linux文件编程(标准C库)

目录 一、标准C库打开/创建文件&#xff0c;读写文件&#xff0c;光标移动 二、标准C库写入结构体到文件 三、其他函数补充 1.fputc函数 2.feof函数和fgetc函数 前面讲到的open函数都是基于linux内核的&#xff0c;也就是说在Windows系统上无法运行&#xff0c;移植性比较…...

生产英特尔CPU处理器繁忙的一天

早晨&#xff1a;准备与检查 7:00 AM - 起床与准备 工厂员工们早早起床&#xff0c;快速洗漱并享用早餐。为了在一天的工作中保持高效&#xff0c;他们会进行一些晨间锻炼&#xff0c;保持头脑清醒和身体活力。 8:00 AM - 到达工厂 员工们到达英特尔的半导体制造工厂&#…...

MVC拦截器、ThreadLocal来进行登录拦截

MVC拦截器、ThreadLocal来进行登录拦截 1. 对登录进行拦截1.1 什么是ThreadLocal1.2 定义UserHolder 类&#xff0c;来封装ThreadLocal方法1.3 拦截器WebMvcConfigurer 的配置1.4 登录的配置&#xff0c;当碰到拦截的方法的时候调用1.5 UserServiceImpl1.6 controller&#xff…...

小程序问题

1.获取节点 wx.createSelectorQuery() wx.createSelectorQuery().in(this) //组件中加in(this)&#xff0c;不然获取不到 2.使用实例 wx.createSelectorQuery().in(this).select(#share).fields({node: true,size: true}).exec(async (res) > {const canvas res[0].node;…...

arm 版的 deb、rpm、AppImage 都有什么区别

qq arm 版的 deb、rpm 和 AppImage 格式之间存在几个关键区别。以下是对这些区别的详细解释&#xff1a; 包管理系统与兼容性&#xff1a; deb&#xff1a;是Debian及其衍生发行版&#xff08;如Ubuntu&#xff09;中使用的软件包格式。这些系统使用dpkg命令来管理deb包&#…...

docker中mysql设置lower_case_table_names配置的坑

前沿 今天在使用flowable流程框架的时候&#xff0c;遇到一个问题。需要配置MySQL数据库以实现表名大小写不敏感。本以为这是一个简单的任务&#xff0c;却耗费了我两个多小时的时间。 docker容器中修改配置&#xff0c;重启不成功 我们前提是容器中的mysql中已经有很多数据…...

python日志记录工具:loguru日志库使用

文章目录 一、使用loguru1、安装2、简单使用3、详细使用4、工具类&#xff08;1&#xff09;logUtil.py&#xff08;2&#xff09;测试类&#xff08;3&#xff09;效果 参考资料 一、使用loguru 1、安装 pip install loguru2、简单使用 from loguru import logger# 打印到文…...

python入门基础知识·二

""" # Python介绍 # Python注释 # 单行注释&#xff1a; # # 多行注释&#xff1a; r """""" # Python输出和输入 # print: 输出 # input: 输入 ①会让程序暂停&#xff0c;②得到的是字符串内容 int(&…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制&#xff08;CCA-Attention&#xff09;&#xff0c;…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

Nuxt.js 中的路由配置详解

Nuxt.js 通过其内置的路由系统简化了应用的路由配置&#xff0c;使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​&#xff1a; 下载安装DevEco Studio 4.0&#xff08;支持HarmonyOS 5&#xff09;配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​&#xff1a; ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...