flinkjar开发 自定义函数
编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。
import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}
FlinkCDC mysql到mysql 业务代码
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}
运行结果,加密成功

相关文章:
flinkjar开发 自定义函数
编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。 import org.apache.flink.table.functions.ScalarFunction; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax…...
Golang 学习(一)基础知识
面向对象 Golang 也支持面向对象编程(OOP),但是和传统的面向对象编程有区别,并不是纯粹的面向对象语言。 Golang 没有类(class),Go 语言的结构体(struct)和其它编程语言的类(class)有同等的地位,Golang 是基于 struct 来实现 OOP…...
C++学习:string的了解
1.string的介绍 #include<string> 对于字符串的操作 自动处理内存的分配和释放 2.string的声明与初始化 1.std::string str1;空的 2.string str2 "afhsihsa" 3.string str3 str2 4.string str3 str2.substr(0,5) .substr(位置,长度) 5.c…...
Webpack源码浅析
webpack启动方式 webpack有两种启动方式: 通过webpack-cli脚手架来启动,即可以在Terminal终端直接运行; webpack ./debug/index.js --config ./debug/webpack.config.js通过require(webpack)引入包的方式执行;其实第一种方式最终…...
Hadoop:HDFS学习巩固——基础习题及编程实战
一 HDFS 选择题 1.对HDFS通信协议的理解错误的是? A.客户端与数据节点的交互是通过RPC(Remote Procedure Call)来实现的 B.HDFS通信协议都是构建在IoT协议基础之上的 C.名称节点和数据节点之间则使用数据节点协议进行交互 D.客户端通过一…...
SASS 官方文档速通
前言:参考 Sass 中文网。 一. 特色功能 Sass 是一款强化 CSS 的辅助工具,在 CSS 语法的基础上增加了变量、嵌套、混合、导入等高级功能。有助于组织管理样式文件,更高效地开发项目。 二. 语法格式 .scss 拓展名:在 CSS3 语法的基…...
《动手学深度学习(PyTorch版)》笔记7.4
注:书中对代码的讲解并不详细,本文对很多细节做了详细注释。另外,书上的源代码是在Jupyter Notebook上运行的,较为分散,本文将代码集中起来,并加以完善,全部用vscode在python 3.9.18下测试通过&…...
关于自动驾驶概念的学习和一些理解
文章目录 对于自动驾驶的认识自动驾驶技术的优势自动驾驶的技术要求自动驾驶技术的挑战自动驾驶技术的潜在影响总结 对于自动驾驶的认识 自动驾驶是指车辆在没有人类驾驶员控制的情况下进行行驶的技术。随着人工智能的快速发展,自动驾驶技术已经成为将来交通行业的…...
C++ dfs搜索枚举(四十八)【第八篇】
曾经我们讲过枚举算法,那假设我们把枚举算法应用到搜索里呢? 1.搜索枚举 以前我们在进行枚举的时候是用了多层循环嵌套,但是当枚举的变量过多或者是输入的数量的时候就很难利用循环完成枚举了,不过我们可以尝试利用搜索进行枚举。…...
【优先级队列(大顶堆 小顶堆)】【遍历哈希表键值对】Leetcode 347 前K个高频元素
【优先级队列(大顶堆 小顶堆)】【排序】Leetcode 347 前K个高频元素 1.不同排序法归纳2.大顶堆和小顶堆3.PriorityQueue操作4.PriorityQueue的升序(默认)与降序5.问题解决:找前K个最大的元素 :踢走最小的&…...
Java设计模式-模板方法模式(14)
行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对…...
【C++ 二维前缀和】约会
题目描述 从前,小兔发现了一个神秘的花园。 花园是一个 n 行 m 列的矩阵,第 i 行 j 列的花的美丽度为 ai,j,一个合法的约会场所为任意一个正方形子矩阵,定义子矩阵的浪漫度为这个子矩阵的两条对角线上的花的美丽度之和。 现在小兔…...
基于Springboot的社区疫情防控平台
末尾获取源码作者介绍:大家好,我是墨韵,本人4年开发经验,专注定制项目开发 更多项目:CSDN主页YAML墨韵 学如逆水行舟,不进则退。学习如赶路,不能慢一步。 一、项目简介 以往的社区疫情防控管理…...
JAVA中的类方法
一、定义 1.类方法也叫静态方法 格式 访问修饰符 static 数据返回类型 方法名(){} 2.类方法的调用 前提:满足访问修饰符的访问权限 使用方式:类名.类方法名或者对象名.类方法名 二、注意事项 1.类方法中没有this的参数 class D{private int n1 …...
rust嵌入式开发之RTICvsEmbassy
RTIC和Embassy是目前rust嵌入式开发中比较热门的两个框架。本来呢,针对RTIC的移植已经完成了一小半,但在移植过程中感受到了RTIC的不足,正好跳出来全面考察下embassy,本文就是根据目前的尝试结果做个对比总结。 RTIC和Embassy是两…...
Bug地狱 #1 突然宕机,企业级应用到底怎么了
Bug地狱 #1 突然宕机,企业级应用到底怎么了 背景 目前就职的企业经营是一家服务小微门店Saas企业,以进销存管理和客户营销为主体提供订阅服务。项目正式上线可以说是从13年,基础架构是Web和后端使用C# .net,数据库使用SQL Serve…...
使用 Python、Elasticsearch 和 Kibana 分析波士顿凯尔特人队
作者:来自 Jessica Garson 大约一年前,我经历了一段压力很大的时期,最后参加了一场篮球比赛。 在整个过程中,我可以以一种我以前无法做到的方式断开连接并找到焦点。 我加入的第一支球队是波士顿凯尔特人队。 波士顿凯尔特人队是…...
探索C语言结构体:编程中的利器与艺术
✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C语言学习 贝蒂的主页:Betty‘s blog 1. 常量与变量 1. 什么是结构体 在C语言中本身就自带了一些数据类型&#x…...
Git介绍与常用命令总结
Git介绍与其常用命令总结 1、Git介绍2、Git的使用3、Git常用命令3.1 初始化仓库3.2 克隆仓库3.3 配置用户信息3.4 提交代码(Commit)3.5 推送代码(Push)3.6 拉取代码(Pull)3.7 分支(Branch)3.8 远程仓库(Remote)3.9 撤销回退本地改动3.10 更新本地仓库与远程仓库 1、Git介绍 Gi…...
机器学习 | 探索朴素贝叶斯算法的应用
朴素贝叶斯算法是一种基于贝叶斯定理和特征条件独立假设的分类算法。它被广泛应用于文本分类、垃圾邮件过滤、情感分析等领域,并且在实际应用中表现出色。 朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法: 1)对于给定的待分类项r…...
RMBG-2.0异常处理指南:解决常见部署与运行问题
RMBG-2.0异常处理指南:解决常见部署与运行问题 抠图工具用得好好的,突然给你来个报错,或者生成的结果莫名其妙,是不是特别让人头疼?尤其是像RMBG-2.0这样效果出色的工具,一旦出问题,很多人就不…...
玩转西门子S7-1200气力输送仿真系统
气力输送系统管道气力输送系统 (21)采用西门子S7-1200博图WinCC画面组态,博图V16及以上版本都可以仿真运行,无需硬件。 系统带有手动/自动模式,运行数据动态实时显示,带压力实时曲线显示&#x…...
MedGemma-X镜像轻量化:去除冗余依赖+精简日志+压缩缓存的体积优化实践
MedGemma-X镜像轻量化:去除冗余依赖精简日志压缩缓存的体积优化实践 1. 引言:为什么需要优化MedGemma-X镜像? 如果你已经体验过MedGemma-X的强大功能——那种像专业医生一样“对话式”阅片的智能体验,可能会发现一个现实问题&am…...
BGE-Reranker-v2-m3企业部署:高并发请求压力测试案例
BGE-Reranker-v2-m3企业部署:高并发请求压力测试案例 1. 项目背景与价值 在企业级RAG(检索增强生成)系统中,检索精度直接影响最终的回答质量。传统向量检索虽然快速,但容易受到关键词相似性的干扰,返回大…...
WPF实战:用LiveCharts打造实时监控曲线(附动态数据刷新技巧)
WPF实战:用LiveCharts打造高性能实时监控曲线 在工业自动化、物联网监控等场景中,实时数据可视化是核心需求之一。想象一下,当数百个传感器数据以毫秒级频率涌向系统时,如何让曲线图既流畅又精准?传统WPF图表在高频数…...
开源工具MelonLoader:Unity游戏模组开发的3大突破与零基础上手指南
开源工具MelonLoader:Unity游戏模组开发的3大突破与零基础上手指南 【免费下载链接】MelonLoader The Worlds First Universal Mod Loader for Unity Games compatible with both Il2Cpp and Mono 项目地址: https://gitcode.com/gh_mirrors/me/MelonLoader …...
Splitting.js终极指南:深度解析网页文本动画的魔法引擎
Splitting.js终极指南:深度解析网页文本动画的魔法引擎 【免费下载链接】Splitting JavaScript microlibrary to split an element by words, characters, children and more, populated with CSS variables! 项目地址: https://gitcode.com/gh_mirrors/sp/Splitt…...
解锁图像标注效率:LabelImg亮度调节功能提升标注准确性全指南
解锁图像标注效率:LabelImg亮度调节功能提升标注准确性全指南 【免费下载链接】labelImg LabelImg is now part of the Label Studio community. The popular image annotation tool created by Tzutalin is no longer actively being developed, but you can check…...
Artichoke 未来展望:这个创新 Ruby 实现的路线图和愿景 [特殊字符]
Artichoke 未来展望:这个创新 Ruby 实现的路线图和愿景 🚀 【免费下载链接】artichoke 💎 Artichoke is a Ruby made with Rust 项目地址: https://gitcode.com/gh_mirrors/ar/artichoke Artichoke 是一个用 Rust 编写的创新 Ruby 实现…...
WhisperX语音识别:如何实现70倍实时转录精度与词级时间戳?
WhisperX语音识别:如何实现70倍实时转录精度与词级时间戳? 【免费下载链接】whisperX m-bain/whisperX: 是一个用于实现语音识别和语音合成的 JavaScript 库。适合在需要进行语音识别和语音合成的网页中使用。特点是提供了一种简单、易用的 APIÿ…...
