当前位置: 首页 > news >正文

Java版Flink使用指南——自定义无界流生成器

大纲

  • 新建工程
    • 自定义无界流
  • 使用
  • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。

新建工程

我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口

public abstract class RichSourceFunction<OUT> extends AbstractRichFunctionimplements SourceFunction<OUT> {private static final long serialVersionUID = 1L;
}

主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

在run方法中,我们每隔一秒产生一条数据,且这个数字自增。

使用

我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source").setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute("Flink Java API Skeleton");}
}

打包、提交、运行

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用下面命令查看日志输出

tail -f log/*

在这里插入图片描述
然后我们在后台点击Cancel Job
在这里插入图片描述
可以看到输出
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

Java版Flink使用指南——自定义无界流生成器

大纲 新建工程自定义无界流 使用打包、提交、运行工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中&#xff0c;我们让外部组件RabbitMQ充当了无界流的数据源&#xff0c;使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ…...

Vue3框架搭建4:配置说明-eslint配置

配置说明&#xff1a; .eslintrc.cjs&#xff1a; /* eslint-env node */ //node环境&#xff0c;并引入一个模块解析补丁 require(rushstack/eslint-patch/modern-module-resolution)module.exports { //继承其他配置root: true, //跟配置文件&#xff0c;ESLint不会在父目…...

JavaFx+MySql学生管理系统

前言: 上个月学习了javafx和mysql数据库,于是写了一个学生管理系统,因为上个月在复习并且有一些事情,比较忙,所以没有更新博客了,这个项目页面虽然看着有点简陋了,但是大致内容还是比较简单的,于是现在跟大家分享一下我的学生管理系统,希望对这方面有兴趣的同学提供一些帮助 &a…...

Java--抽象类

1.抽象--abstract 2.不能对抽象类进行实例化&#xff0c;也就是不能new这个抽象类 3.抽象类的应用&#xff0c;就是在class前加入abstract这个单词&#xff0c;同理抽象方法也是在void前加入abstract 4.在抽象类中可以写普通方法&#xff0c;但抽象方法只能写在抽象类中 5.…...

26.Labview波形图、XY图、强度图使用精讲

我们如何使用Labview显示曲线或者制作出下面这种我们想要的曲线并随着我们输入值的变化而变化呢&#xff1f; 本文详细讲解一下每种波形图的使用方式&#xff0c;帮助大家深入了解波形图的使用技巧。 文章中所有程序均可在百度网盘下载&#xff0c;下载方式&#xff1a;复制下…...

系统启动 | 安全启动时 “地址线” 被篡改了怎么办?

Hi&#xff0c;你们有没有想过&#xff0c;在咱们启动的过程中&#xff0c;就算是开了安全启动。但是如果在执行最后一条跳转指令时&#xff0c;如果此时改变FLASH或者DDR的地址线&#xff0c;相当于跳转到了非法的地址&#xff0c;那安全启动功能不就丧失了吗&#xff1f; 提到…...

Kafka基础组件图推演

文章目录 1. Controller Broker保障机制 2. 组件架构1. Log Manager2. Replication Manager3. SocketServer4. NetworkServer5. ZKClient 1. Controller Broker Kafka集群中有一个Controller Broker&#xff0c;负责元数据管理和协调。 Kafka使用Zookeeper作为集群元数据的存储…...

k8s中使用cert-manager生成自签名证书

一、安装 cert-manager 注意查看cert-manager和K8S支持的对应版本 我的 k8sv1.28.2&#xff0c;cert-manager v1.12.11 下载 cert-manager.yaml 文件&#xff0c;执行 kubectl apply -f cert-manager.yaml注意:安装成功后如果应用路由使用了Ingress&#xff0c;应用路由不需要…...

处于群晖Docker中的HomeAssistant监控宿主机CPU温度

处于群晖Docker中的HomeAssistant监控宿主机CPU温度 解决方案 在configuration.yaml中添加&#xff1a; sensor: - platform: filename: "Host Temperature" # 可以自定义file_path: /sys/class/hwmon/hwmon0/temp1_inputvalue_template: "{{ int(value)/100…...

STM32串口工作原理

STM32的串口是相当丰富的&#xff0c;功能也很强劲。最多可提供5 路串口&#xff0c;有分数波特率发生器、支持单线光通信和半双工单线通讯、支持LIN、智能卡协议和IrDA SIRENDEC 规范(仅串口3支持)、具有DMA等。 串口最基本的设置&#xff0c;就是波特率的设置。STM32的串口使…...

STM32杂交版(HAL库、音乐盒、闹钟、点阵屏、温湿度)

一、设计描述 本设计精心构建了一个以STM32MP157A高性能单片机为核心控制单元的综合性嵌入式系统。该系统巧妙融合了蜂鸣器、数码管显示器、点阵屏、温湿度传感器、LED指示灯以及按键等多种外设模块&#xff0c;形成了一个功能丰富、操作便捷的杂交版智能设备。通过串口…...

多输入多输出 | Matlab实现Transformer多输入多输出预测

多输入多输出 | Matlab实现Transformer多输入多输出预测 目录 多输入多输出 | Matlab实现Transformer多输入多输出预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 多输入多输出 | Matlab实现Transformer多输入多输出预测&#xff08;完整源码和数据&#xff09; 1.da…...

Linux文件编程(标准C库)

目录 一、标准C库打开/创建文件&#xff0c;读写文件&#xff0c;光标移动 二、标准C库写入结构体到文件 三、其他函数补充 1.fputc函数 2.feof函数和fgetc函数 前面讲到的open函数都是基于linux内核的&#xff0c;也就是说在Windows系统上无法运行&#xff0c;移植性比较…...

生产英特尔CPU处理器繁忙的一天

早晨&#xff1a;准备与检查 7:00 AM - 起床与准备 工厂员工们早早起床&#xff0c;快速洗漱并享用早餐。为了在一天的工作中保持高效&#xff0c;他们会进行一些晨间锻炼&#xff0c;保持头脑清醒和身体活力。 8:00 AM - 到达工厂 员工们到达英特尔的半导体制造工厂&#…...

MVC拦截器、ThreadLocal来进行登录拦截

MVC拦截器、ThreadLocal来进行登录拦截 1. 对登录进行拦截1.1 什么是ThreadLocal1.2 定义UserHolder 类&#xff0c;来封装ThreadLocal方法1.3 拦截器WebMvcConfigurer 的配置1.4 登录的配置&#xff0c;当碰到拦截的方法的时候调用1.5 UserServiceImpl1.6 controller&#xff…...

小程序问题

1.获取节点 wx.createSelectorQuery() wx.createSelectorQuery().in(this) //组件中加in(this)&#xff0c;不然获取不到 2.使用实例 wx.createSelectorQuery().in(this).select(#share).fields({node: true,size: true}).exec(async (res) > {const canvas res[0].node;…...

arm 版的 deb、rpm、AppImage 都有什么区别

qq arm 版的 deb、rpm 和 AppImage 格式之间存在几个关键区别。以下是对这些区别的详细解释&#xff1a; 包管理系统与兼容性&#xff1a; deb&#xff1a;是Debian及其衍生发行版&#xff08;如Ubuntu&#xff09;中使用的软件包格式。这些系统使用dpkg命令来管理deb包&#…...

docker中mysql设置lower_case_table_names配置的坑

前沿 今天在使用flowable流程框架的时候&#xff0c;遇到一个问题。需要配置MySQL数据库以实现表名大小写不敏感。本以为这是一个简单的任务&#xff0c;却耗费了我两个多小时的时间。 docker容器中修改配置&#xff0c;重启不成功 我们前提是容器中的mysql中已经有很多数据…...

python日志记录工具:loguru日志库使用

文章目录 一、使用loguru1、安装2、简单使用3、详细使用4、工具类&#xff08;1&#xff09;logUtil.py&#xff08;2&#xff09;测试类&#xff08;3&#xff09;效果 参考资料 一、使用loguru 1、安装 pip install loguru2、简单使用 from loguru import logger# 打印到文…...

python入门基础知识·二

""" # Python介绍 # Python注释 # 单行注释&#xff1a; # # 多行注释&#xff1a; r """""" # Python输出和输入 # print: 输出 # input: 输入 ①会让程序暂停&#xff0c;②得到的是字符串内容 int(&…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar&#xff1a;依赖注入与仓储模式实践 在 C# 的应用开发中&#xff0c;数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护&#xff0c;许多开发者会选择成熟的 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;SqlSugar 就是其中备受…...

ios苹果系统,js 滑动屏幕、锚定无效

现象&#xff1a;window.addEventListener监听touch无效&#xff0c;划不动屏幕&#xff0c;但是代码逻辑都有执行到。 scrollIntoView也无效。 原因&#xff1a;这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作&#xff0c;从而会影响…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)

UniApp 集成腾讯云 IM 富媒体消息全攻略&#xff08;地理位置/文件&#xff09; 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型&#xff0c;核心实现方式&#xff1a; 标准消息类型&#xff1a;直接使用 SDK 内置类型&#xff08;文件、图片等&#xff09;自…...

云原生周刊:k0s 成为 CNCF 沙箱项目

开源项目推荐 HAMi HAMi&#xff08;原名 k8s‑vGPU‑scheduler&#xff09;是一款 CNCF Sandbox 级别的开源 K8s 中间件&#xff0c;通过虚拟化 GPU/NPU 等异构设备并支持内存、计算核心时间片隔离及共享调度&#xff0c;为容器提供统一接口&#xff0c;实现细粒度资源配额…...

密码学基础——SM4算法

博客主页&#xff1a;christine-rr-CSDN博客 ​​​​专栏主页&#xff1a;密码学 &#x1f4cc; 【今日更新】&#x1f4cc; 对称密码算法——SM4 目录 一、国密SM系列算法概述 二、SM4算法 2.1算法背景 2.2算法特点 2.3 基本部件 2.3.1 S盒 2.3.2 非线性变换 ​编辑…...

轻量级Docker管理工具Docker Switchboard

简介 什么是 Docker Switchboard &#xff1f; Docker Switchboard 是一个轻量级的 Web 应用程序&#xff0c;用于管理 Docker 容器。它提供了一个干净、用户友好的界面来启动、停止和监控主机上运行的容器&#xff0c;使其成为本地开发、家庭实验室或小型服务器设置的理想选择…...