当前位置: 首页 > news >正文

Flink-DataStream API

一、什么样的数据可以用于流式传输

Flink的DataStream API 允许流式传输他们可以序列化的任何内容。Flink自己的序列化程序用于

  • 基本类型:即字符串、长、整数、布尔值、数组
  • 复合类型:元组、POJO和Scala样例类

基本类型我们已经很熟悉了,下面我们看下复合类型。

1、元组

对于java,Flink定义了Tuple0Tuple25类型,例如:

Tuple2<String, Integer> person = Tuple2.apply("Fred",35);String name = person._1;
Integer age = person._2;

2、POJO

如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“按名称”字段引用)

  • 该类是公共且独立的(没有非静态内部类)
  • 该类有一个公共的无参数构造函数
  • 类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(和非最终的),要么具有遵循getter和setterJavabean命名约定的公共getter和setter方法。

示例:

public class Person {public String name;  public Integer age;  public Person() {}public Person(String name, Integer age) {  . . .}
}  Person person = new Person("Fred Flintstone", 35);

3、样例类

样例类(Case classes)和普通类差不多,只有几点关键差别。样例类非常适合用于不可变的数据,多用于模式匹配。

case class Book(isbn: String)val frankenstein = Book("978-0486282114")

注意在实例化样例类Book时,并没有使用关键字new,这是因为样例类有一个默认的apply方法来负责对象的创建。

二、完整示例

该示例来自官方网站,是将有关人员的记录流作为输入,并对其进行过滤以仅包含成年人

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}public String toString() {return this.name.toString() + ": age " + this.age.toString();}}
}

1、执行环境

每个Flink应用程序都需要一个执行环境,在该例中为env。流应用程序需要使用StreamExecutionEnvironment

在应用程序中进行的DataStream API调用会构建一个附加到StreamExecutionEnvironment的作业图。当调用env.execute()时,此图会打包并发送到JobManager,JobManager会并行化作业并将其切片分发给TaskManager执行。作业的每个并行切片都将在一个任务槽中执行。

如果不调用execute(), 应用程序则不会执行。

此分布式运行时取决于您的应用程序是否可序列化。它还要求集群中的每个节点都可以使用所有依赖项。

2、source

上面的示例使用env.fromElements(...)构造DataStream<Person>。这是一种将简单流组合在一起以用于原型或测试的便捷方法。StreamExecutionEnvironment上还有一个fromCollection(Collection)方法。因此,也可以这样做:

List<Person> people = new ArrayList<Person>();people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));DataStream<Person> flintstones = env.fromCollection(people);

在原型设计时将一些数据导入流的另一种方便方法是使用socket或文件

DataStream<String> lines = env.socketTextStream("localhost", 9999);
DataStream<String> lines = env.readTextFile("file:///path");

在实际应用中,最常用的数据源是那些支持低延迟、高吞吐量并行读取以及倒带和重放的数据源——这是高性能和容错的先决条件——例如Apache Kafka、Kinesis和各种文件系统。REST API和数据库也经常使用。

3、sink

上面的示例使用adults.print()将其结果打印到任务管理器日志(在IDE中运行时将显示在IDE的控制台中)。这将在流的每个元素上调用toString()

例如输出如下:

1> Fred: age 35
2> Wilma: age 35

其中1>和2>表示哪个子任务(即线程)产生了输出。

在生产中,常用的接收器包括FileSink、各种数据库和几个发布子系统。

---------------------------------------------------------------------------------------------------------------------------------

大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

 第八届大数据与应用统计国际学术研讨会(ISBDAS 2025)

https://ais.cn/u/fEzmy2

第二届生成式人工智能与信息安全国际学术会议(GAIIS 2025)

https://ais.cn/u/uAbENn

第四届电子技术与人工智能国际学术会议(ETAI 2025)

https://ais.cn/u/vqM7Nj

第四届网络安全、人工智能与数字经济国际学术会议(CSAIDE 2025)

https://ais.cn/u/ZrERn2

相关文章:

Flink-DataStream API

一、什么样的数据可以用于流式传输 Flink的DataStream API 允许流式传输他们可以序列化的任何内容。Flink自己的序列化程序用于 基本类型&#xff1a;即字符串、长、整数、布尔值、数组复合类型&#xff1a;元组、POJO和Scala样例类 基本类型我们已经很熟悉了&#xff0c;下…...

2.4 构建模块化应用

第4章&#xff1a;构建模块化应用 模块化应用是 JDK 9 的核心特性之一&#xff0c;通过模块化系统&#xff08;Project Jigsaw&#xff09;实现代码的强封装和显式依赖管理。本章详细讲解如何从零构建一个模块化应用&#xff0c;包括模块定义、编译、打包、运行及调试。 4.1 模…...

DeepSeek:从入门到精通

在人工智能飞速发展的今天&#xff0c;DeepSeek作为一款备受瞩目的AI工具&#xff0c;正以其强大的功能和开源理念改变着我们的生活和工作方式。本文将带你深入了解DeepSeek&#xff0c;从基础入门到进阶应用&#xff0c;助你快速掌握这一前沿工具。 文末有详细资料可下载 文末…...

JAVA学习第二天

ArryList的构造方法和添加方法 01。构造方法的<>里面可以放数据类型 02. add&#xff08;&#xff09;可以直接在后面加入数据&#xff0c;也可以指定下标的插入元素。 ArrayList的常用方法 ArrayList存储对象 在Java中&#xff0c;System.out.println()可以打印基本数据…...

DevOps工具链概述

1. DevOps工具链概述 1.1 DevOps工具链的定义 DevOps工具链是支持DevOps实践的一系列工具的集合&#xff0c;这些工具覆盖了软件开发的整个生命周期&#xff0c;包括需求管理、开发、测试、部署和运维等各个环节。它旨在通过工具的集成和自动化&#xff0c;打破开发与运维之间…...

windows系统远程桌面连接ubuntu18.04

记录一下自己在配置过程中遇到的问题&#xff0c;记录遇到的两大坑&#xff1a; windows系统通过xrdp远程桌面连接ubuntu18.04的蓝屏问题。参考以下第一章解决。 同一局域网内网段不同的连接问题。参考以下第三章解决&#xff0c;前提是SSH可连。 1. 在ubuntu上安装xrdp 参考&…...

kafka动态监听主题

简单版本 import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.Containe…...

Python虚拟环境管理工具 pyenv

情景 我现在在部署一个python 项目&#xff0c;需要用到Python 3.10。但是我本地已经有了一个3.12解释器&#xff0c;有没有一种方法&#xff0c;可以管理python 环境&#xff0c;还可以随意切换。怎么做&#xff1f; window 安装pyenv-win 使用 PowerShell&#xff08;以管…...

网络安全产品架构图 网络安全相关产品

一、信息安全产品分类 背景 美国将网络和信息安全产品分了9类&#xff1a;鉴别、访问控制、入侵检测、防火墙、公钥基础设施、恶意程序代码防护、漏洞扫描、取证、介质清理或擦除。中国公安部将网络和信息安全产品分了7类&#xff1a;操作系统安全、数据库安全、网络安全、病毒…...

C++ 实践扩展(Qt Creator 联动 Visual Studio 2022)

​ 这里我们将在 VS 上实现 QT 编程&#xff0c;实现如下&#xff1a; 一、Vs 2022 配置&#xff08;若已安装&#xff0c;可直接跳过&#xff09; 点击链接&#xff1a;​​​​​Visual Studio 2022 我们先去 Vs 官网下载&#xff0c;如下&#xff1a; 等待程序安装完成之…...

如何实现Deepseek的本地部署并集成本地知识库?

1、下载并配置Deepseek环境 https://blog.csdn.net/kxg6666/article/details/145593346?spm1001.2014.3001.5501 2、安装AnythingLLM AnythingLLM | The all-in-one AI application for everyone 如官网下载较慢&#xff0c;本文最后提供夸克离线下载链接。下载后默认安装…...

vue学习笔记8

Pinia基础使用 - 计数器案例 定义Store&#xff08;state action&#xff09; 组件使用Store getters实现 Pinia中的 getters 直接使用 computed函数 进行模拟, 组件中需要使用需要把 getters return出去 action异步实现 编写方式&#xff1a;异步action函数的写法和组件…...

【自学笔记】Vue基础知识点总览-持续更新

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 Vue重点知识点总览一、Vue基础1. Vue简介2. MVVM设计思想3. 响应式数据绑定4. 组件化开发 二、Vue核心特性1. 虚拟DOM2. 模板语法3. 计算属性与监听属性 三、Vue高级…...

ETL的使用(sqoop):数据导入,导出

ETL ETL: 是数据抽取&#xff08;Extract&#xff09;、数据转换&#xff08;Transform&#xff09;和数据加载&#xff08;Load&#xff09;的整个过程 常用的ETL工具 sqoop 1.Apache Sqoop 是 Apache 软件基金会旗下的一个开源项目&#xff0c;旨在帮助用户高效地在 Hado…...

【核心特性】从鸭子类型到Go的io.Writer设计哲学

在编程语言的设计中&#xff0c;鸭子类型和接口设计是两种非常重要的理念。它们都强调了对象的行为和能力&#xff0c;而非其具体的类型或继承关系。Go 语言的io.Writer 接口是这种设计理念的典型代表&#xff0c;它通过简洁的接口定义&#xff0c;实现了强大的功能和灵活性。 …...

多模态模型详解

多模态模型是什么 多模态模型是一种能够处理和理解多种数据类型&#xff08;如文本、图像、音频、视频等&#xff09;的机器学习模型&#xff0c;通过融合不同模态的信息来提升任务的性能。其核心在于利用不同模态之间的互补性&#xff0c;增强模型的鲁棒性和准确性。 如何融合…...

Go 语言里中的堆与栈

在 Go 语言里&#xff0c;堆和栈是内存管理的两个重要概念&#xff0c;它们在多个方面存在明显差异&#xff1a; 1. 内存分配与回收方式 栈 分配&#xff1a;Go 语言中&#xff0c;栈内存主要用于存储函数的局部变量和调用信息。当一个函数被调用时&#xff0c;Go 会自动为其…...

八、OSG学习笔记-

前一章节&#xff1a; 七、OSG学习笔记-碰撞检测-CSDN博客https://blog.csdn.net/weixin_36323170/article/details/145558132?spm1001.2014.3001.5501 一、了解OSG图元加载显示流程 本章节代码&#xff1a; OsgStudy/wids CuiQingCheng/OsgStudy - 码云 - 开源中国https:…...

本地部署【LLM-deepseek】大模型 ollama+deepseek/conda(python)+openwebui/docker+openwebui

通过ollama本地部署deepseek 总共两步 1.模型部署 2.[web页面] 参考官网 ollama:模型部署 https://ollama.com/ open-webui:web页面 https://github.com/open-webui/open-webui 设备参考 Mac M 芯片 windows未知 蒸馏模型版本:deepseek-r1:14b 运行情况macminim2 24256 本地…...

网络分析工具—WireShark的安装及使用

Wireshark 是一个广泛使用的网络协议分析工具&#xff0c;常被网络管理员、开发人员和安全专家用来捕获和分析网络数据包。它支持多种网络协议&#xff0c;能够帮助用户深入理解网络流量、诊断网络问题以及进行安全分析。 Wireshark 的主要功能 数据包捕获与分析&#xff1a; …...

caffeine+redis实现多级缓存解决缓存雪崩

废话不多说直接上代码&#xff1a;1.依赖<dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.9.3</version></dependency>这里版本java8所以用的2.9.32.配置类&#…...

ProxyClaw住宅代理实战:破解反爬虫,赋能AI智能体与数据工程

1. 项目概述&#xff1a;ProxyClaw&#xff0c;一个为AI与数据工程而生的住宅代理网络 如果你正在构建一个需要从互联网上大规模、稳定抓取数据的AI智能体、自动化机器人或者数据管道&#xff0c;那么“被目标网站封禁”这件事&#xff0c;大概率是你最头疼的日常。无论是电商平…...

基于ASR与NLP的法庭音频智能分析系统:架构、微调与法律场景实践

1. 项目概述&#xff1a;当法庭记录“开口说话” 在司法与法律科技领域&#xff0c;数据正以前所未有的方式重塑工作流程。传统的法庭记录&#xff0c;无论是书记员手写的笔录&#xff0c;还是后来普及的录音录像&#xff0c;其核心价值在于“记录”本身——它们是静态的、被动…...

ARM架构TLB失效指令VALE1IS/VALE1ISNXS详解

1. ARM TLB失效指令基础解析在ARMv8/v9架构中&#xff0c;TLB&#xff08;Translation Lookaside Buffer&#xff09;作为内存管理单元&#xff08;MMU&#xff09;的核心组件&#xff0c;缓存了虚拟地址到物理地址的转换结果。当操作系统修改页表后&#xff0c;必须通过TLB失效…...

收藏!小白程序员必看:AI时代如何从执行者变身价值创造者?

本文指出&#xff0c;85%的知识工作者使用AI&#xff0c;但仅16%真正获得突破性价值。这些"前沿专业人士"并非更会使用工具&#xff0c;而是懂得重新定义工作。他们通过保持核心技能敏锐度、判断AI输出质量、构建人机协作系统等方式&#xff0c;创造80%的新价值。文章…...

QFN封装芯片手工焊接实战:从焊盘处理到拖焊技巧

1. QFN封装芯片手工焊接前的准备工作 QFN&#xff08;Quad Flat No-lead&#xff09;封装芯片因其体积小、散热好、电气性能优异等特点&#xff0c;在现代电子设备中越来越常见。但0.5mm甚至更小的引脚间距&#xff0c;让很多工程师和DIY爱好者在手工焊接时望而却步。其实只要掌…...

[HFSS] 从零到一:Floquet Port与主从边界在波导阵列建模中的实战解析

1. 初识Floquet Port与主从边界 第一次接触HFSS的周期性结构仿真时&#xff0c;我被Floquet Port和主从边界这两个概念搞得一头雾水。直到实际建模了一个波导阵列天线&#xff0c;才真正理解它们的妙用。简单来说&#xff0c;Floquet Port是专门为周期性结构设计的特殊端口&…...

碧蓝航线Live2D模型提取:3步快速获取游戏角色资源的完整指南

碧蓝航线Live2D模型提取&#xff1a;3步快速获取游戏角色资源的完整指南 【免费下载链接】AzurLaneLive2DExtract OBSOLETE - see readme / 碧蓝航线Live2D提取 项目地址: https://gitcode.com/gh_mirrors/az/AzurLaneLive2DExtract 你是否曾经想提取碧蓝航线中精美的Li…...

Gemini3.1Pro透明化指南:模型卡与数据卡入口解析

在 2026 年&#xff0c;越来越多的团队开始把“模型怎么用”升级为“模型用得是否可控、可追溯”。尤其是涉及合规审计、数据治理与风险评估时&#xff0c;工程侧最需要的往往是&#xff1a;能快速找到模型信息与数据来源的透明化页面入口&#xff0c;确保链路清晰、记录完整、…...

编写程序统计行业招聘薪资行情数据,智能比对企业薪资标准,优化薪资体系,减少企业人才流失问题。

一、实际应用场景描述在中型及以上企业的人力资源管理中&#xff0c;经常出现&#xff1a;- 企业需制定或调整岗位薪资标准&#xff08;Salary Band&#xff09;- 市场上同岗位薪资随城市、行业、经验年限波动明显- 企业内部薪资数据分散在 HR 系统 / Excel 中&#xff0c;缺乏…...