当前位置: 首页 > news >正文

flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}

运行结果,加密成功

相关文章:

flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。 import org.apache.flink.table.functions.ScalarFunction; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax…...

Golang 学习(一)基础知识

面向对象 Golang 也支持面向对象编程(OOP),但是和传统的面向对象编程有区别,并不是纯粹的面向对象语言。 Golang 没有类(class),Go 语言的结构体(struct)和其它编程语言的类(class)有同等的地位,Golang 是基于 struct 来实现 OOP…...

C++学习:string的了解

1.string的介绍 #include<string> 对于字符串的操作 自动处理内存的分配和释放 2.string的声明与初始化 1.std::string str1;空的 2.string str2 "afhsihsa" 3.string str3 str2 4.string str3 str2.substr(0,5) .substr(位置&#xff0c;长度) 5.c…...

Webpack源码浅析

webpack启动方式 webpack有两种启动方式&#xff1a; 通过webpack-cli脚手架来启动&#xff0c;即可以在Terminal终端直接运行&#xff1b; webpack ./debug/index.js --config ./debug/webpack.config.js通过require(webpack)引入包的方式执行&#xff1b;其实第一种方式最终…...

Hadoop:HDFS学习巩固——基础习题及编程实战

一 HDFS 选择题 1.对HDFS通信协议的理解错误的是&#xff1f; A.客户端与数据节点的交互是通过RPC&#xff08;Remote Procedure Call&#xff09;来实现的 B.HDFS通信协议都是构建在IoT协议基础之上的 C.名称节点和数据节点之间则使用数据节点协议进行交互 D.客户端通过一…...

SASS 官方文档速通

前言&#xff1a;参考 Sass 中文网。 一. 特色功能 Sass 是一款强化 CSS 的辅助工具&#xff0c;在 CSS 语法的基础上增加了变量、嵌套、混合、导入等高级功能。有助于组织管理样式文件&#xff0c;更高效地开发项目。 二. 语法格式 .scss 拓展名&#xff1a;在 CSS3 语法的基…...

《动手学深度学习(PyTorch版)》笔记7.4

注&#xff1a;书中对代码的讲解并不详细&#xff0c;本文对很多细节做了详细注释。另外&#xff0c;书上的源代码是在Jupyter Notebook上运行的&#xff0c;较为分散&#xff0c;本文将代码集中起来&#xff0c;并加以完善&#xff0c;全部用vscode在python 3.9.18下测试通过&…...

关于自动驾驶概念的学习和一些理解

文章目录 对于自动驾驶的认识自动驾驶技术的优势自动驾驶的技术要求自动驾驶技术的挑战自动驾驶技术的潜在影响总结 对于自动驾驶的认识 自动驾驶是指车辆在没有人类驾驶员控制的情况下进行行驶的技术。随着人工智能的快速发展&#xff0c;自动驾驶技术已经成为将来交通行业的…...

C++ dfs搜索枚举(四十八)【第八篇】

曾经我们讲过枚举算法&#xff0c;那假设我们把枚举算法应用到搜索里呢&#xff1f; 1.搜索枚举 以前我们在进行枚举的时候是用了多层循环嵌套&#xff0c;但是当枚举的变量过多或者是输入的数量的时候就很难利用循环完成枚举了&#xff0c;不过我们可以尝试利用搜索进行枚举。…...

【优先级队列(大顶堆 小顶堆)】【遍历哈希表键值对】Leetcode 347 前K个高频元素

【优先级队列&#xff08;大顶堆 小顶堆&#xff09;】【排序】Leetcode 347 前K个高频元素 1.不同排序法归纳2.大顶堆和小顶堆3.PriorityQueue操作4.PriorityQueue的升序&#xff08;默认&#xff09;与降序5.问题解决&#xff1a;找前K个最大的元素 &#xff1a;踢走最小的&…...

Java设计模式-模板方法模式(14)

行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对…...

【C++ 二维前缀和】约会

题目描述 从前&#xff0c;小兔发现了一个神秘的花园。 花园是一个 n 行 m 列的矩阵&#xff0c;第 i 行 j 列的花的美丽度为 ai,j&#xff0c;一个合法的约会场所为任意一个正方形子矩阵&#xff0c;定义子矩阵的浪漫度为这个子矩阵的两条对角线上的花的美丽度之和。 现在小兔…...

基于Springboot的社区疫情防控平台

末尾获取源码作者介绍&#xff1a;大家好&#xff0c;我是墨韵&#xff0c;本人4年开发经验&#xff0c;专注定制项目开发 更多项目&#xff1a;CSDN主页YAML墨韵 学如逆水行舟&#xff0c;不进则退。学习如赶路&#xff0c;不能慢一步。 一、项目简介 以往的社区疫情防控管理…...

JAVA中的类方法

一、定义 1.类方法也叫静态方法 格式 访问修饰符 static 数据返回类型 方法名(){} 2.类方法的调用 前提&#xff1a;满足访问修饰符的访问权限 使用方式&#xff1a;类名.类方法名或者对象名.类方法名 二、注意事项 1.类方法中没有this的参数 class D{private int n1 …...

rust嵌入式开发之RTICvsEmbassy

RTIC和Embassy是目前rust嵌入式开发中比较热门的两个框架。本来呢&#xff0c;针对RTIC的移植已经完成了一小半&#xff0c;但在移植过程中感受到了RTIC的不足&#xff0c;正好跳出来全面考察下embassy&#xff0c;本文就是根据目前的尝试结果做个对比总结。 RTIC和Embassy是两…...

Bug地狱 #1 突然宕机,企业级应用到底怎么了

Bug地狱 #1 突然宕机&#xff0c;企业级应用到底怎么了 背景 目前就职的企业经营是一家服务小微门店Saas企业&#xff0c;以进销存管理和客户营销为主体提供订阅服务。项目正式上线可以说是从13年&#xff0c;基础架构是Web和后端使用C# .net&#xff0c;数据库使用SQL Serve…...

使用 Python、Elasticsearch 和 Kibana 分析波士顿凯尔特人队

作者&#xff1a;来自 Jessica Garson 大约一年前&#xff0c;我经历了一段压力很大的时期&#xff0c;最后参加了一场篮球比赛。 在整个过程中&#xff0c;我可以以一种我以前无法做到的方式断开连接并找到焦点。 我加入的第一支球队是波士顿凯尔特人队。 波士顿凯尔特人队是…...

探索C语言结构体:编程中的利器与艺术

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C语言学习 贝蒂的主页&#xff1a;Betty‘s blog 1. 常量与变量 1. 什么是结构体 在C语言中本身就自带了一些数据类型&#x…...

Git介绍与常用命令总结

Git介绍与其常用命令总结 1、Git介绍2、Git的使用3、Git常用命令3.1 初始化仓库3.2 克隆仓库3.3 配置用户信息3.4 提交代码(Commit)3.5 推送代码(Push)3.6 拉取代码(Pull)3.7 分支(Branch)3.8 远程仓库(Remote)3.9 撤销回退本地改动3.10 更新本地仓库与远程仓库 1、Git介绍 Gi…...

机器学习 | 探索朴素贝叶斯算法的应用

朴素贝叶斯算法是一种基于贝叶斯定理和特征条件独立假设的分类算法。它被广泛应用于文本分类、垃圾邮件过滤、情感分析等领域&#xff0c;并且在实际应用中表现出色。 朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法&#xff1a; 1&#xff09;对于给定的待分类项r…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...

tomcat指定使用的jdk版本

说明 有时候需要对tomcat配置指定的jdk版本号&#xff0c;此时&#xff0c;我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...

WPF八大法则:告别模态窗口卡顿

⚙️ 核心问题&#xff1a;阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程&#xff0c;导致后续逻辑无法执行&#xff1a; var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题&#xff1a…...

数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !

我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...