当前位置: 首页 > news >正文

Flink nc -l -p 监听端口测试

1、9999端口未占用

netstat -apn|grep 9999

2、消息发送端

nc -l -k -p 9999
{"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1267L, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":4200L, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":5500L, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":5500L, "score":1000}{"user":"ming","url":"www.baidu1.com", "timestamp":1717171200000, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1717171202000, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":1717171260000, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":1717264860000, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":1718780790000, "score":1000}

3、运行

周期性水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** Description: * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\* forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPeriodicWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        ArrayList<Event> list = new ArrayList<>();
//        list.add(new Event("ming","www.baidu1.com",1200L));
//        list.add(new Event("xiaohu","www.baidu5.com",1267L));
//        list.add(new Event("ming","www.baidu7.com",4200L));
//        list.add(new Event("xiaohu","www.baidu8.com",5500L));
//
//        DataStreamSource<Event> ds = env.fromCollection(list, BasicTypeInfo.of(Event.class));DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
//        ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L;@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs);// 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间 flink时间窗口(左开,右闭]output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}

断点式水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** Description: * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\* forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPunctuatedWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
//        ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (event.getUser().equals("Biu")) {output.emitWatermark(new Watermark(event.getTimestamp() - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}

4、打印

3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:01.2}
4> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:01.267}
5> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:04.2}
6> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:05.5}

参考:

【Flink】Flink 中的时间和窗口之水位线(Watermark)-CSDN博客

Flink watermark_nc -lp 9999-CSDN博客

NoteWarehouse/05_BigData/09_Flink(1).md at main · FGL12321/NoteWarehouse · GitHub

相关文章:

Flink nc -l -p 监听端口测试

1、9999端口未占用 netstat -apn|grep 99992、消息发送端 nc -l -k -p 9999 {"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1} {"user":"xiaohu","url":…...

在IntelliJ IDEA中使用Spring Boot:快速配置

使用IntelliJ IDEA开发Spring Boot应用程序可以极大地提高开发效率&#xff0c;因为IDEA提供了许多便捷的功能&#xff0c;比如自动补全、代码分析、热部署等。以下是一篇可能的CSDN博客文章草稿&#xff0c;介绍如何在IntelliJ IDEA中使用Spring Boot&#xff1a; 在IntelliJ …...

django filter 批量修改

django filter 批量修改 在Django中&#xff0c;如果你想要批量修改记录&#xff0c;可以使用update()方法。这个方法允许你在一个查询集上执行批量更新&#xff0c;而不需要为每条记录生成单独的数据库事务。 以下是一个使用update()方法批量修改记录的例子&#xff1a; fro…...

maven:中央仓库验证方式改变:401 Content access is protected by token

前几天向maven中央仓库发布版本&#xff0c;执行上传命令mvn release:perform时报错了&#xff1a; [ERROR] Failed to execute goal org.sonatype.plugins:nexus-staging-maven-plugin:1.6.13:deploy (injected-nexus-deploy) on project xxxxx: Failed to deploy artifacts: …...

【面试】http

一、定义 HTTP&#xff08;超文本传输协议&#xff09;&#xff0c;是一种用于分布式、协作式、超媒体信息系统的应用层协议&#xff0c;它是万维网数据通信的基础。主要特点是无状态&#xff08;服务器不会保存之前请求的状态&#xff09;、无连接&#xff08;服务器处理完请…...

获取泛型,泛型擦除,TypeReference 原理分析

说明 author blog.jellyfishmix.com / JellyfishMIX - githubLICENSE GPL-2.0 获取泛型&#xff0c;泛型擦除 下图中示例代码是一个工具类用于生成 csv 文件&#xff0c;需要拿到数据的类型&#xff0c;使用反射感知数据类型的字段&#xff0c;来填充表字段名。可以看到泛型…...

springboot 3.x 之 集成rabbitmq实现动态发送消息给不同的队列

背景 实际项目中遇到针对不同类型的消息&#xff0c;发送消息到不同的队列&#xff0c;而且队列可能还不存在&#xff0c;需要动态创建&#xff0c;于是写了如下代码&#xff0c;实践发现没啥问题&#xff0c;这里分享下。 环境 springboot 3.2 JDK 17 rabbitMQ模型介绍 图片…...

C++ 代码实现鼠标右键注册菜单,一级目录和二级目录方法

最近做的一个项目, 在使用windows的时候,我希望在右键菜单中添加一个自定义的选项, 该选项下有我经常使用的多个程序快捷方式, 直接上代码 头文件 #pragma once #include <Windows.h> #include <iostream> #include <string> using namespace std; …...

SQLite 3 优化批量数据存储操作---事务transaction机制

0、事务操作 事务的目的是为了保证数据的一致性和完整性。 事务&#xff08;Transaction&#xff09;具有以下四个标准属性&#xff0c;通常根据首字母缩写为 ACID&#xff1a; 原子性&#xff08;Atomicity&#xff09;&#xff1a;确保工作单位内的所有操作都成功完成&…...

[程序员] 表达的能力

之前看CSDN的问答区&#xff0c;很多时候&#xff0c;感觉问题的描述所要表达的意思非常模糊&#xff0c;或者说描述不清。如果是想回答问题的人想回答问题&#xff0c;首先要搞清楚是什么问题&#xff0c;就需要再问问题主很多细节的东西。三来四去&#xff0c;才能搞清楚具体…...

rknn转换后精度差异很大,失真算子自纠

下面是添加了详细注释的优化代码&#xff1a; import cv2 import numpy as np import onnx import onnxruntime as rt from onnx import helper, shape_inferencedef get_all_node_names(model):"""获取模型中所有节点的名称。参数:model (onnx.ModelProto): O…...

【C语言】解决C语言报错:Stack Overflow

文章目录 简介什么是Stack OverflowStack Overflow的常见原因如何检测和调试Stack Overflow解决Stack Overflow的最佳实践详细实例解析示例1&#xff1a;递归调用过深示例2&#xff1a;分配过大的局部变量示例3&#xff1a;嵌套函数调用过多 进一步阅读和参考资料总结 简介 St…...

【滚动哈希 二分查找】1044. 最长重复子串

本文涉及知识点 滚动哈希 二分查找算法合集 LeetCode 1044. 最长重复子串 给你一个字符串 s &#xff0c;考虑其所有 重复子串 &#xff1a;即 s 的&#xff08;连续&#xff09;子串&#xff0c;在 s 中出现 2 次或更多次。这些出现之间可能存在重叠。 返回 任意一个 可能具…...

webid、sec_poison_id、a1、web_session参数分析与算法实现

文章目录 1. 写在前面2. 参数分析3. 核心算法【🏠作者主页】:吴秋霖 【💼作者介绍】:擅长爬虫与JS加密逆向分析!Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致力于Python与爬虫领域研究与开发工作! 【🌟作者推荐】:对爬…...

Qt|QWebSocket与Web进行通讯,实时接收语音流

实现功能主要思路&#xff1a;在网页端进行语音输入&#xff0c;PC机可以实时接收并播放语音流。 此时&#xff0c;Qt程序做客户端&#xff0c;Web端做服务器&#xff0c;使用QWebSocket进行通讯&#xff0c;实时播放接收的语音流。 功能实现 想要实现该功能&#xff0c;需要…...

「51媒体」电视台媒体邀约采访报道怎么做?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 电视台作为地方主流媒体&#xff0c;对于新闻报道有着严格的选题标准和报道流程。如果您希望电视台对某个会议或活动进行报道&#xff0c;可以按这样的方法来做&#xff1a; 1.明确活动信…...

Python提取PDF文本和图片,以及提前PDF页面中指定矩形区域的文本

前言 从PDF中提取内容能帮助我们获取文件中的信息&#xff0c;以便进行进一步的分析和处理。此外&#xff0c;在遇到类似项目时&#xff0c;提取出来的文本或图片也能再次利用。要在Python中通过代码提取PDF文件中的文本和图片&#xff0c;可以使用 Spire.PDF for Python 这个…...

C#实现边缘锐化(图像处理)

在 C# 中进行图像的边缘锐化&#xff0c;可以通过卷积滤波器实现。边缘锐化的基本思想是通过卷积核&#xff08;也称为滤波器或掩模&#xff09;来增强图像中的边缘。我们可以使用一个简单的锐化核&#xff0c;例如&#xff1a; [ 0, -1, 0][-1, 5, -1][ 0, -1, 0]这个卷积核…...

ffmpeg windows系统详细教程

视频做预览时黑屏&#xff0c;但有声音问题解决方案。 需要将 .mp4编成H.264格式的.mp4 一般上传视频的站点&#xff0c;如YouTube、Vimeo 等&#xff0c;通常会在用户上传视频时自动对视频进行转码&#xff0c;以确保视频能够在各种设备和网络条件下流畅播放。这些网站通常…...

【单片机】MSP430G2553单片机 Could not find MSP-FET430UIF on specified COM port 解决方案

文章目录 MSP430G2553开发板基础知识解决办法如何实施解决办法4步骤一步骤二步骤三 MSP430G2553开发板基础知识 MSP430G2553开发板如下图&#xff0c;上半部分就是UIF程序下载调试区域的硬件。个人觉得MSP430G2553开发板的这个部分没有做好硬件设计&#xff0c;导致很多系统兼…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

相机从app启动流程

一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

scikit-learn机器学习

# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...