当前位置: 首页 > news >正文

flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}

运行结果,加密成功

相关文章:

flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。 import org.apache.flink.table.functions.ScalarFunction; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax…...

Golang 学习(一)基础知识

面向对象 Golang 也支持面向对象编程(OOP),但是和传统的面向对象编程有区别,并不是纯粹的面向对象语言。 Golang 没有类(class),Go 语言的结构体(struct)和其它编程语言的类(class)有同等的地位,Golang 是基于 struct 来实现 OOP…...

C++学习:string的了解

1.string的介绍 #include<string> 对于字符串的操作 自动处理内存的分配和释放 2.string的声明与初始化 1.std::string str1;空的 2.string str2 "afhsihsa" 3.string str3 str2 4.string str3 str2.substr(0,5) .substr(位置&#xff0c;长度) 5.c…...

Webpack源码浅析

webpack启动方式 webpack有两种启动方式&#xff1a; 通过webpack-cli脚手架来启动&#xff0c;即可以在Terminal终端直接运行&#xff1b; webpack ./debug/index.js --config ./debug/webpack.config.js通过require(webpack)引入包的方式执行&#xff1b;其实第一种方式最终…...

Hadoop:HDFS学习巩固——基础习题及编程实战

一 HDFS 选择题 1.对HDFS通信协议的理解错误的是&#xff1f; A.客户端与数据节点的交互是通过RPC&#xff08;Remote Procedure Call&#xff09;来实现的 B.HDFS通信协议都是构建在IoT协议基础之上的 C.名称节点和数据节点之间则使用数据节点协议进行交互 D.客户端通过一…...

SASS 官方文档速通

前言&#xff1a;参考 Sass 中文网。 一. 特色功能 Sass 是一款强化 CSS 的辅助工具&#xff0c;在 CSS 语法的基础上增加了变量、嵌套、混合、导入等高级功能。有助于组织管理样式文件&#xff0c;更高效地开发项目。 二. 语法格式 .scss 拓展名&#xff1a;在 CSS3 语法的基…...

《动手学深度学习(PyTorch版)》笔记7.4

注&#xff1a;书中对代码的讲解并不详细&#xff0c;本文对很多细节做了详细注释。另外&#xff0c;书上的源代码是在Jupyter Notebook上运行的&#xff0c;较为分散&#xff0c;本文将代码集中起来&#xff0c;并加以完善&#xff0c;全部用vscode在python 3.9.18下测试通过&…...

关于自动驾驶概念的学习和一些理解

文章目录 对于自动驾驶的认识自动驾驶技术的优势自动驾驶的技术要求自动驾驶技术的挑战自动驾驶技术的潜在影响总结 对于自动驾驶的认识 自动驾驶是指车辆在没有人类驾驶员控制的情况下进行行驶的技术。随着人工智能的快速发展&#xff0c;自动驾驶技术已经成为将来交通行业的…...

C++ dfs搜索枚举(四十八)【第八篇】

曾经我们讲过枚举算法&#xff0c;那假设我们把枚举算法应用到搜索里呢&#xff1f; 1.搜索枚举 以前我们在进行枚举的时候是用了多层循环嵌套&#xff0c;但是当枚举的变量过多或者是输入的数量的时候就很难利用循环完成枚举了&#xff0c;不过我们可以尝试利用搜索进行枚举。…...

【优先级队列(大顶堆 小顶堆)】【遍历哈希表键值对】Leetcode 347 前K个高频元素

【优先级队列&#xff08;大顶堆 小顶堆&#xff09;】【排序】Leetcode 347 前K个高频元素 1.不同排序法归纳2.大顶堆和小顶堆3.PriorityQueue操作4.PriorityQueue的升序&#xff08;默认&#xff09;与降序5.问题解决&#xff1a;找前K个最大的元素 &#xff1a;踢走最小的&…...

Java设计模式-模板方法模式(14)

行为型模式 行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对…...

【C++ 二维前缀和】约会

题目描述 从前&#xff0c;小兔发现了一个神秘的花园。 花园是一个 n 行 m 列的矩阵&#xff0c;第 i 行 j 列的花的美丽度为 ai,j&#xff0c;一个合法的约会场所为任意一个正方形子矩阵&#xff0c;定义子矩阵的浪漫度为这个子矩阵的两条对角线上的花的美丽度之和。 现在小兔…...

基于Springboot的社区疫情防控平台

末尾获取源码作者介绍&#xff1a;大家好&#xff0c;我是墨韵&#xff0c;本人4年开发经验&#xff0c;专注定制项目开发 更多项目&#xff1a;CSDN主页YAML墨韵 学如逆水行舟&#xff0c;不进则退。学习如赶路&#xff0c;不能慢一步。 一、项目简介 以往的社区疫情防控管理…...

JAVA中的类方法

一、定义 1.类方法也叫静态方法 格式 访问修饰符 static 数据返回类型 方法名(){} 2.类方法的调用 前提&#xff1a;满足访问修饰符的访问权限 使用方式&#xff1a;类名.类方法名或者对象名.类方法名 二、注意事项 1.类方法中没有this的参数 class D{private int n1 …...

rust嵌入式开发之RTICvsEmbassy

RTIC和Embassy是目前rust嵌入式开发中比较热门的两个框架。本来呢&#xff0c;针对RTIC的移植已经完成了一小半&#xff0c;但在移植过程中感受到了RTIC的不足&#xff0c;正好跳出来全面考察下embassy&#xff0c;本文就是根据目前的尝试结果做个对比总结。 RTIC和Embassy是两…...

Bug地狱 #1 突然宕机,企业级应用到底怎么了

Bug地狱 #1 突然宕机&#xff0c;企业级应用到底怎么了 背景 目前就职的企业经营是一家服务小微门店Saas企业&#xff0c;以进销存管理和客户营销为主体提供订阅服务。项目正式上线可以说是从13年&#xff0c;基础架构是Web和后端使用C# .net&#xff0c;数据库使用SQL Serve…...

使用 Python、Elasticsearch 和 Kibana 分析波士顿凯尔特人队

作者&#xff1a;来自 Jessica Garson 大约一年前&#xff0c;我经历了一段压力很大的时期&#xff0c;最后参加了一场篮球比赛。 在整个过程中&#xff0c;我可以以一种我以前无法做到的方式断开连接并找到焦点。 我加入的第一支球队是波士顿凯尔特人队。 波士顿凯尔特人队是…...

探索C语言结构体:编程中的利器与艺术

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C语言学习 贝蒂的主页&#xff1a;Betty‘s blog 1. 常量与变量 1. 什么是结构体 在C语言中本身就自带了一些数据类型&#x…...

Git介绍与常用命令总结

Git介绍与其常用命令总结 1、Git介绍2、Git的使用3、Git常用命令3.1 初始化仓库3.2 克隆仓库3.3 配置用户信息3.4 提交代码(Commit)3.5 推送代码(Push)3.6 拉取代码(Pull)3.7 分支(Branch)3.8 远程仓库(Remote)3.9 撤销回退本地改动3.10 更新本地仓库与远程仓库 1、Git介绍 Gi…...

机器学习 | 探索朴素贝叶斯算法的应用

朴素贝叶斯算法是一种基于贝叶斯定理和特征条件独立假设的分类算法。它被广泛应用于文本分类、垃圾邮件过滤、情感分析等领域&#xff0c;并且在实际应用中表现出色。 朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法&#xff1a; 1&#xff09;对于给定的待分类项r…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型&#xff1a;架构设计与关键步骤 在当今数字化转型的浪潮中&#xff0c;大语言模型&#xff08;LLM&#xff09;已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中&#xff0c;不仅可以优化用户体验&#xff0c;还能为业务决策提供…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...

论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving

地址&#xff1a;LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂&#xff0c;正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...

jdbc查询mysql数据库时,出现id顺序错误的情况

我在repository中的查询语句如下所示&#xff0c;即传入一个List<intager>的数据&#xff0c;返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致&#xff0c;会导致返回的id是从小到大排列的&#xff0c;但我不希望这样。 Query("SELECT NEW com…...