flinkjar开发 自定义函数
编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。
import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}
FlinkCDC mysql到mysql 业务代码
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}
运行结果,加密成功

相关文章:
flinkjar开发 自定义函数
编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。 import org.apache.flink.table.functions.ScalarFunction; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax…...
Golang 学习(一)基础知识
面向对象 Golang 也支持面向对象编程(OOP),但是和传统的面向对象编程有区别,并不是纯粹的面向对象语言。 Golang 没有类(class),Go 语言的结构体(struct)和其它编程语言的类(class)有同等的地位,Golang 是基于 struct 来实现 OOP…...
C++学习:string的了解
1.string的介绍 #include<string> 对于字符串的操作 自动处理内存的分配和释放 2.string的声明与初始化 1.std::string str1;空的 2.string str2 "afhsihsa" 3.string str3 str2 4.string str3 str2.substr(0,5) .substr(位置,长度) 5.c…...
Webpack源码浅析
webpack启动方式 webpack有两种启动方式: 通过webpack-cli脚手架来启动,即可以在Terminal终端直接运行; webpack ./debug/index.js --config ./debug/webpack.config.js通过require(webpack)引入包的方式执行;其实第一种方式最终…...
Hadoop:HDFS学习巩固——基础习题及编程实战
一 HDFS 选择题 1.对HDFS通信协议的理解错误的是? A.客户端与数据节点的交互是通过RPC(Remote Procedure Call)来实现的 B.HDFS通信协议都是构建在IoT协议基础之上的 C.名称节点和数据节点之间则使用数据节点协议进行交互 D.客户端通过一…...
SASS 官方文档速通
前言:参考 Sass 中文网。 一. 特色功能 Sass 是一款强化 CSS 的辅助工具,在 CSS 语法的基础上增加了变量、嵌套、混合、导入等高级功能。有助于组织管理样式文件,更高效地开发项目。 二. 语法格式 .scss 拓展名:在 CSS3 语法的基…...
《动手学深度学习(PyTorch版)》笔记7.4
注:书中对代码的讲解并不详细,本文对很多细节做了详细注释。另外,书上的源代码是在Jupyter Notebook上运行的,较为分散,本文将代码集中起来,并加以完善,全部用vscode在python 3.9.18下测试通过&…...
关于自动驾驶概念的学习和一些理解
文章目录 对于自动驾驶的认识自动驾驶技术的优势自动驾驶的技术要求自动驾驶技术的挑战自动驾驶技术的潜在影响总结 对于自动驾驶的认识 自动驾驶是指车辆在没有人类驾驶员控制的情况下进行行驶的技术。随着人工智能的快速发展,自动驾驶技术已经成为将来交通行业的…...
C++ dfs搜索枚举(四十八)【第八篇】
曾经我们讲过枚举算法,那假设我们把枚举算法应用到搜索里呢? 1.搜索枚举 以前我们在进行枚举的时候是用了多层循环嵌套,但是当枚举的变量过多或者是输入的数量的时候就很难利用循环完成枚举了,不过我们可以尝试利用搜索进行枚举。…...
【优先级队列(大顶堆 小顶堆)】【遍历哈希表键值对】Leetcode 347 前K个高频元素
【优先级队列(大顶堆 小顶堆)】【排序】Leetcode 347 前K个高频元素 1.不同排序法归纳2.大顶堆和小顶堆3.PriorityQueue操作4.PriorityQueue的升序(默认)与降序5.问题解决:找前K个最大的元素 :踢走最小的&…...
Java设计模式-模板方法模式(14)
行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对…...
【C++ 二维前缀和】约会
题目描述 从前,小兔发现了一个神秘的花园。 花园是一个 n 行 m 列的矩阵,第 i 行 j 列的花的美丽度为 ai,j,一个合法的约会场所为任意一个正方形子矩阵,定义子矩阵的浪漫度为这个子矩阵的两条对角线上的花的美丽度之和。 现在小兔…...
基于Springboot的社区疫情防控平台
末尾获取源码作者介绍:大家好,我是墨韵,本人4年开发经验,专注定制项目开发 更多项目:CSDN主页YAML墨韵 学如逆水行舟,不进则退。学习如赶路,不能慢一步。 一、项目简介 以往的社区疫情防控管理…...
JAVA中的类方法
一、定义 1.类方法也叫静态方法 格式 访问修饰符 static 数据返回类型 方法名(){} 2.类方法的调用 前提:满足访问修饰符的访问权限 使用方式:类名.类方法名或者对象名.类方法名 二、注意事项 1.类方法中没有this的参数 class D{private int n1 …...
rust嵌入式开发之RTICvsEmbassy
RTIC和Embassy是目前rust嵌入式开发中比较热门的两个框架。本来呢,针对RTIC的移植已经完成了一小半,但在移植过程中感受到了RTIC的不足,正好跳出来全面考察下embassy,本文就是根据目前的尝试结果做个对比总结。 RTIC和Embassy是两…...
Bug地狱 #1 突然宕机,企业级应用到底怎么了
Bug地狱 #1 突然宕机,企业级应用到底怎么了 背景 目前就职的企业经营是一家服务小微门店Saas企业,以进销存管理和客户营销为主体提供订阅服务。项目正式上线可以说是从13年,基础架构是Web和后端使用C# .net,数据库使用SQL Serve…...
使用 Python、Elasticsearch 和 Kibana 分析波士顿凯尔特人队
作者:来自 Jessica Garson 大约一年前,我经历了一段压力很大的时期,最后参加了一场篮球比赛。 在整个过程中,我可以以一种我以前无法做到的方式断开连接并找到焦点。 我加入的第一支球队是波士顿凯尔特人队。 波士顿凯尔特人队是…...
探索C语言结构体:编程中的利器与艺术
✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C语言学习 贝蒂的主页:Betty‘s blog 1. 常量与变量 1. 什么是结构体 在C语言中本身就自带了一些数据类型&#x…...
Git介绍与常用命令总结
Git介绍与其常用命令总结 1、Git介绍2、Git的使用3、Git常用命令3.1 初始化仓库3.2 克隆仓库3.3 配置用户信息3.4 提交代码(Commit)3.5 推送代码(Push)3.6 拉取代码(Pull)3.7 分支(Branch)3.8 远程仓库(Remote)3.9 撤销回退本地改动3.10 更新本地仓库与远程仓库 1、Git介绍 Gi…...
机器学习 | 探索朴素贝叶斯算法的应用
朴素贝叶斯算法是一种基于贝叶斯定理和特征条件独立假设的分类算法。它被广泛应用于文本分类、垃圾邮件过滤、情感分析等领域,并且在实际应用中表现出色。 朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法: 1)对于给定的待分类项r…...
iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
