flink学习(6)——自定义source和kafka
概述
SourceFunction:非并行数据源(并行度只能=1) --接口
RichSourceFunction:多功能非并行数据源(并行度只能=1) --类
ParallelSourceFunction:并行数据源(并行度能够>=1) --接口
RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】
——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。
基础案例
package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {// ctrl + oprivate final Random random = new Random();private boolean flag = true;// 现在不用@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("实现一些资源的开启");}// 现在不用@Overridepublic void close() throws Exception {System.out.println("实现一些资源的关闭");}@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {while (flag){String stu_id = UUID.randomUUID().toString();String stu_name = "Student_"+stu_id;int stu_age = random.nextInt(8)+10;long stu_timestamp = System.currentTimeMillis();Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道@Overridepublic void cancel() {flag = false;System.out.println("停止运行");}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// add + new DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());int parallelism = studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}
cancel+open+close的调用时机
package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度(8个cpu)studentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute = env.execute();JobClient flink_job = env.executeAsync("Flink Job");Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
}
kafkaSource
package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");// consumerFlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);// sourceDataStreamSource<String> dataStreamSource = env.addSource(consumer);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}}).print();env.execute();}
}
相关文章:
flink学习(6)——自定义source和kafka
概述 SourceFunction:非并行数据源(并行度只能1) --接口 RichSourceFunction:多功能非并行数据源(并行度只能1) --类 ParallelSourceFunction:并行数据源(并行度能够>1) --接口 RichParallelSourceFunction:多功能并行数据源(并行度能够>1) --类 【建议使用的】 ——…...
开发常见问题及解决
1.DBeaver 报Public Key Retrieval is not allowed 在使用DBeaver连接数据库时出现“Public Key Retrieval is not allowed”错误,主要是因为数据库连接配置的安全策略导致的。以下是详细的解释和解决方法: 错误原因 这个错误通常出现在连接MySQL数据…...
python excel接口自动化测试框架!
今天采用Excel继续写一个接口自动化测试框架。 设计流程图 这张图是我的excel接口测试框架的一些设计思路。 首先读取excel文件,得到测试信息,然后通过封装的requests方法,用unittest进行测试。 其中,接口关联的参数通过正则进…...
mybatis:You have an error in your SQL syntax;
完整报错You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near false, false, false, false, false, false, false, false, false, false, false, at line 1 SQL: INSERT INTO user …...
使用 Maven 开发 IntelliJ IDEA 插件
使用 Maven 开发 IntelliJ IDEA 插件的完整流程 1. 创建 Maven 项目 1.1 使用 IntelliJ 创建 Maven 项目 打开 IntelliJ IDEA,点击 File > New > Project。选择 Maven,填写项目名称和 GroupId,例如: GroupId: com.exampl…...
Windows修复SSL/TLS协议信息泄露漏洞(CVE-2016-2183) --亲测
漏洞说明: 打开链接:https://docs.microsoft.com/zh-cn/troubleshoot/windows-server/windows-security/restrict-cryptographic-algorithms-protocols-schannel 可以看到: 找到:应通过配置密码套件顺序来控制 TLS/SSL 密码 我们…...
uniapp生命周期:应用生命周期和页面生命周期
文章目录 1.应用的生命周期2.页面的生命周期 1.应用的生命周期 生命周期的概念:一个对象从创建、运行、销毁的整个过程被称为生命周期 生命周期函数:在生命周期中每个阶段会伴随着每一个函数的出发,这些函数被称为生命周期函数 所有页面都…...
基于SSM的婴幼儿用品商城系统+LW示例参考
1.项目介绍 功能模块:管理员(产品管理、产品分类、会员管理、订单管理、秒杀活动、文章管理、数据统计等)、普通用户(登录注册、个人中心、购物车、我的收藏、各类信息查看等)技术选型:SSM,jsp…...
【工具变量】城市供应链创新试点数据(2007-2023年)
一、测算方式:参考C刊《经济管理》沈坤荣和乔刚老师(2024)的做法,使用“供应链创新与应用试点”的政策虚拟变量(TreatPost)表征。若样本城市为试点城市,则赋值为 1,否则为 0…...
【carla生成车辆时遇到的问题】carla显示的坐标和carlaworld中提取的坐标y值相反
项目需要重新运行了一下generate_car.py的脚本,发现死活生成不了,研究了半天,发现脚本里面生成车辆的坐标值y和carla_ros_bridge_with_example_ego_vehicle.launch脚本打开的驾驶操控界面里面的y值正好是相反数! y1-y2 因为,我运行…...
Jira使用笔记二 ScriptRunner 验证问题创建角色
背景 最近在对公司Jira工作流改造,收到这么一个要求:某些问题类型只有某些角色可以创建。本来是想通过Jira内建的权限控制来处理的。结果点到权限页面,心都凉透了。 好吧,那只能上脚本了。最终使用ScriptRunner的Simple scripte…...
Java线程的使用
Java中的线程是用来实现多任务并发执行的机制。在Java中,主要有两种方式来创建和使用线程:实现Runnable接口和继承Thread类。 实现Runnable接口: 创建一个类,实现Runnable接口,并重写run()方法。在run()方法中定义线程…...
自动化测试工具Ranorex Studio(四十三)-RANOREXPATH编辑器5
代码示例 下面的代码示例将讲解如何使用Ranorex API来编写代码模块,或者是使用用户代码来扩展录制的模块。 在代码中使用对象库 使用对象库等待UI元素 建立Adapter来访问更多的属性和方法 为对象库元素建立一组Adapter 使用Validate类 强制一个测试用例失败 设置aut…...
超高流量多级缓存架构设计!
文章内容已经收录在《面试进阶之路》,从原理出发,直击面试难点,实现更高维度的降维打击! 文章目录 电商-多级缓存架构设计多级缓存架构介绍多级缓存请求流程负载均衡算法的选择轮询负载均衡一致性哈希负载均衡算法选择 应用层 Ngi…...
数据结构(Java)—— ArrayList
1.线性表 线性表( linear list)是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构,常见的线性表:顺序表、链表、栈、队列... 线性表在逻辑上是线性结构,也就说是连续的一条直线。但是在…...
实习冲刺第三十三天
102.二叉树的层序遍历 给你二叉树的根节点 root ,返回其节点值的 层序遍历 。 (即逐层地,从左到右访问所有节点)。 示例 1: 输入:root [3,9,20,null,null,15,7] 输出:[[3],[9,20],[15,7]]示例…...
Uniapp开发下拉刷新功能onPullDownRefresh/onReachBottom
文章目录 1.onPullDownRefresh2.onReachBottom 1.onPullDownRefresh 在 js 中定义 onPullDownRefresh 处理函数(和onLoad等生命周期函数同级),监听该页面用户下拉刷新事件。 需要在 pages.json 里,找到的当前页面的pages节点&am…...
什么是 C++ 中的函数对象?函数对象与普通函数有什么区别?如何定义和使用函数对象?
1) 什么是 C 中的函数对象?它有什么特点? 在 C 中,函数对象(也称为仿函数或 functor)是一种重载了 operator() 的对象。这意味着这些对象可以像函数一样被调用。函数对象通常用于需要传递行为(即代码&…...
PointNet++论文复现
✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…...
【VUE】el-table表格内输入框或者其他控件规则校验实现
1、封装组件 1、规则校验一般基于form表单实现,因此需要给具体控件套一层form表单 新建组件input-required.vue,内容如下 <template><div><el-form ref"formRef" :model"form" :rules"formRules" label-…...
[260326] x-cmd v0.8.10:跨 Shell 统一配置命令短名;自动装好依赖运行 WhisperLiveKit 实时语音转写
[260326] x-cmd v0.8.10:跨 Shell 统一配置命令短名;自动装好依赖运行 WhisperLiveKit 实时语音转写 开放 shortcut 内部模块,配置命令短名,支持跨 Shell 统一使用whisper 模块新增 livekit 命令,自动装好依赖&#x…...
零基础玩转CTFShow Web1-7:手把手教你用Burp Suite和蚁剑拿flag
零基础玩转CTFShow Web1-7:从工具配置到实战渗透全指南 第一次接触CTF比赛时,看着其他选手在终端里敲出神秘代码就能拿到flag,总觉得这是黑客才能掌握的"黑魔法"。直到自己动手尝试才发现,只要掌握正确的工具和方法&…...
AutoDock Vina特殊金属元素对接技术指南:从问题诊断到方案落地
AutoDock Vina特殊金属元素对接技术指南:从问题诊断到方案落地 【免费下载链接】AutoDock-Vina AutoDock Vina 项目地址: https://gitcode.com/gh_mirrors/au/AutoDock-Vina 问题溯源:金属元素对接的技术瓶颈 在分子对接实践中,科研人…...
OpenClaw安全实践:私有化Qwen3-VL:30B保障敏感数据不出境
OpenClaw安全实践:私有化Qwen3-VL:30B保障敏感数据不出境 1. 为什么我们需要私有化部署 去年处理一份法律合同时,我犯了一个至今心有余悸的错误——把客户保密协议上传到某公有云AI进行条款分析。虽然及时删除了文件,但那种"数据已脱离…...
Agent Skill 从使用到原理,一次讲清
目录前言1. 本期内容概览2. Agent Skill 是什么3. Agent Skill 的基本用法4. 高级用法(Reference)5. 高级用法(Script)6. 渐进式披露机制7. Agent Skill vs MCP结语参考前言 学习 UP 主 马克的技术工作坊 的 Agent Skill 从使用到…...
拒了一个只要1.8万的45岁大佬
因公众号更改推送规则,请点“在看”并加“星标”第一时间获取精彩技术分享点击关注#互联网架构师公众号,领取架构师全套资料 都在这里0、2T架构师学习资料干货分上一篇:2T架构师学习资料干货分享大家好,我是互联网架构师ÿ…...
esp-hosted 方案深度解析:从架构选型到性能调优实战
1. 为什么选择esp-hosted方案? 如果你正在为嵌入式系统寻找稳定可靠的无线连接方案,esp-hosted绝对值得考虑。这个由乐鑫推出的开源方案,本质上是通过ESP32系列芯片为Linux主机或MCU设备提供Wi-Fi和蓝牙连接能力。我曾在多个工业物联网项目中…...
深入理解Matplotlib中的plt、fig、axes与axis:从基础到高级应用
1. Matplotlib绘图基础:从plt到figure的认知跃迁 第一次接触Matplotlib时,最让人困惑的就是plt.plot()和ax.plot()到底有什么区别。这就像学做菜时,有人告诉你"用锅炒菜"和"先用电磁炉加热再放锅炒菜"两种方式都能做出青…...
**Modbus协议深度解析:基于Python的TCP通信实战与发散创新应用**在工业自动化领域,**Modbus协议
Modbus协议深度解析:基于Python的TCP通信实战与发散创新应用 在工业自动化领域,Modbus协议因其简单、稳定和开放性成为最广泛使用的串行通信标准之一。本文将从底层原理出发,深入剖析 Modbus TCP 的数据帧结构,并结合 Python 实现…...
3种方法让加密音乐重获自由:Unlock Music浏览器解密工具详解
3种方法让加密音乐重获自由:Unlock Music浏览器解密工具详解 【免费下载链接】unlock-music 在浏览器中解锁加密的音乐文件。原仓库: 1. https://github.com/unlock-music/unlock-music ;2. https://git.unlock-music.dev/um/web 项目地址:…...
