当前位置: 首页 > news >正文

flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}

运行结果,加密成功

相关文章:

flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。 import org.apache.flink.table.functions.ScalarFunction; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax…...

Golang 学习(一)基础知识

面向对象 Golang 也支持面向对象编程(OOP),但是和传统的面向对象编程有区别,并不是纯粹的面向对象语言。 Golang 没有类(class),Go 语言的结构体(struct)和其它编程语言的类(class)有同等的地位,Golang 是基于 struct 来实现 OOP…...

C++学习:string的了解

1.string的介绍 #include<string> 对于字符串的操作 自动处理内存的分配和释放 2.string的声明与初始化 1.std::string str1;空的 2.string str2 "afhsihsa" 3.string str3 str2 4.string str3 str2.substr(0,5) .substr(位置&#xff0c;长度) 5.c…...

Webpack源码浅析

webpack启动方式 webpack有两种启动方式&#xff1a; 通过webpack-cli脚手架来启动&#xff0c;即可以在Terminal终端直接运行&#xff1b; webpack ./debug/index.js --config ./debug/webpack.config.js通过require(webpack)引入包的方式执行&#xff1b;其实第一种方式最终…...

Hadoop:HDFS学习巩固——基础习题及编程实战

一 HDFS 选择题 1.对HDFS通信协议的理解错误的是&#xff1f; A.客户端与数据节点的交互是通过RPC&#xff08;Remote Procedure Call&#xff09;来实现的 B.HDFS通信协议都是构建在IoT协议基础之上的 C.名称节点和数据节点之间则使用数据节点协议进行交互 D.客户端通过一…...

SASS 官方文档速通

前言&#xff1a;参考 Sass 中文网。 一. 特色功能 Sass 是一款强化 CSS 的辅助工具&#xff0c;在 CSS 语法的基础上增加了变量、嵌套、混合、导入等高级功能。有助于组织管理样式文件&#xff0c;更高效地开发项目。 二. 语法格式 .scss 拓展名&#xff1a;在 CSS3 语法的基…...

《动手学深度学习(PyTorch版)》笔记7.4

注&#xff1a;书中对代码的讲解并不详细&#xff0c;本文对很多细节做了详细注释。另外&#xff0c;书上的源代码是在Jupyter Notebook上运行的&#xff0c;较为分散&#xff0c;本文将代码集中起来&#xff0c;并加以完善&#xff0c;全部用vscode在python 3.9.18下测试通过&…...

关于自动驾驶概念的学习和一些理解

文章目录 对于自动驾驶的认识自动驾驶技术的优势自动驾驶的技术要求自动驾驶技术的挑战自动驾驶技术的潜在影响总结 对于自动驾驶的认识 自动驾驶是指车辆在没有人类驾驶员控制的情况下进行行驶的技术。随着人工智能的快速发展&#xff0c;自动驾驶技术已经成为将来交通行业的…...

C++ dfs搜索枚举(四十八)【第八篇】

曾经我们讲过枚举算法&#xff0c;那假设我们把枚举算法应用到搜索里呢&#xff1f; 1.搜索枚举 以前我们在进行枚举的时候是用了多层循环嵌套&#xff0c;但是当枚举的变量过多或者是输入的数量的时候就很难利用循环完成枚举了&#xff0c;不过我们可以尝试利用搜索进行枚举。…...

【优先级队列(大顶堆 小顶堆)】【遍历哈希表键值对】Leetcode 347 前K个高频元素

【优先级队列&#xff08;大顶堆 小顶堆&#xff09;】【排序】Leetcode 347 前K个高频元素 1.不同排序法归纳2.大顶堆和小顶堆3.PriorityQueue操作4.PriorityQueue的升序&#xff08;默认&#xff09;与降序5.问题解决&#xff1a;找前K个最大的元素 &#xff1a;踢走最小的&…...

Java设计模式-模板方法模式(14)

行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对…...

【C++ 二维前缀和】约会

题目描述 从前&#xff0c;小兔发现了一个神秘的花园。 花园是一个 n 行 m 列的矩阵&#xff0c;第 i 行 j 列的花的美丽度为 ai,j&#xff0c;一个合法的约会场所为任意一个正方形子矩阵&#xff0c;定义子矩阵的浪漫度为这个子矩阵的两条对角线上的花的美丽度之和。 现在小兔…...

基于Springboot的社区疫情防控平台

末尾获取源码作者介绍&#xff1a;大家好&#xff0c;我是墨韵&#xff0c;本人4年开发经验&#xff0c;专注定制项目开发 更多项目&#xff1a;CSDN主页YAML墨韵 学如逆水行舟&#xff0c;不进则退。学习如赶路&#xff0c;不能慢一步。 一、项目简介 以往的社区疫情防控管理…...

JAVA中的类方法

一、定义 1.类方法也叫静态方法 格式 访问修饰符 static 数据返回类型 方法名(){} 2.类方法的调用 前提&#xff1a;满足访问修饰符的访问权限 使用方式&#xff1a;类名.类方法名或者对象名.类方法名 二、注意事项 1.类方法中没有this的参数 class D{private int n1 …...

rust嵌入式开发之RTICvsEmbassy

RTIC和Embassy是目前rust嵌入式开发中比较热门的两个框架。本来呢&#xff0c;针对RTIC的移植已经完成了一小半&#xff0c;但在移植过程中感受到了RTIC的不足&#xff0c;正好跳出来全面考察下embassy&#xff0c;本文就是根据目前的尝试结果做个对比总结。 RTIC和Embassy是两…...

Bug地狱 #1 突然宕机,企业级应用到底怎么了

Bug地狱 #1 突然宕机&#xff0c;企业级应用到底怎么了 背景 目前就职的企业经营是一家服务小微门店Saas企业&#xff0c;以进销存管理和客户营销为主体提供订阅服务。项目正式上线可以说是从13年&#xff0c;基础架构是Web和后端使用C# .net&#xff0c;数据库使用SQL Serve…...

使用 Python、Elasticsearch 和 Kibana 分析波士顿凯尔特人队

作者&#xff1a;来自 Jessica Garson 大约一年前&#xff0c;我经历了一段压力很大的时期&#xff0c;最后参加了一场篮球比赛。 在整个过程中&#xff0c;我可以以一种我以前无法做到的方式断开连接并找到焦点。 我加入的第一支球队是波士顿凯尔特人队。 波士顿凯尔特人队是…...

探索C语言结构体:编程中的利器与艺术

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C语言学习 贝蒂的主页&#xff1a;Betty‘s blog 1. 常量与变量 1. 什么是结构体 在C语言中本身就自带了一些数据类型&#x…...

Git介绍与常用命令总结

Git介绍与其常用命令总结 1、Git介绍2、Git的使用3、Git常用命令3.1 初始化仓库3.2 克隆仓库3.3 配置用户信息3.4 提交代码(Commit)3.5 推送代码(Push)3.6 拉取代码(Pull)3.7 分支(Branch)3.8 远程仓库(Remote)3.9 撤销回退本地改动3.10 更新本地仓库与远程仓库 1、Git介绍 Gi…...

机器学习 | 探索朴素贝叶斯算法的应用

朴素贝叶斯算法是一种基于贝叶斯定理和特征条件独立假设的分类算法。它被广泛应用于文本分类、垃圾邮件过滤、情感分析等领域&#xff0c;并且在实际应用中表现出色。 朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法&#xff1a; 1&#xff09;对于给定的待分类项r…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

linux 下常用变更-8

1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行&#xff0c;YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID&#xff1a; YW3…...

Matlab | matlab常用命令总结

常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库&#xff0c;分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷&#xff0c;但是文件存放起来数据比较冗余&#xff0c;用二进制能够更好管理咱们M…...