执行flink sql连接clickhouse库
手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。
| 组件 | 版本 |
| jdk | 1.8 |
| flink | 1.17.2 |
| clickhouse | 23.12.2.59 |
1.背景
flink官方不支持clickhouse连接器,工作中难免会用到。
2.方案
利用GitHub大佬提供的源代码,我用的是release-1.16:https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16
3.编译
导入IDEA,maven编译即可,生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar

4.将此依赖包,导入flink工程
spring boot工程
4.1)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion>
<!-- <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.13</version><relativePath/> <!– lookup parent from repository –></parent>--><parent><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid</artifactId><version>${project.build.version}</version></parent><artifactId>mit-microgrid-flink</artifactId><name>mit-microgrid-flink</name><description>flink connector clickhouse</description><properties><java.version>1.8</java.version><flink.version>1.17.2</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><!-- 排除SpringBoot自带的日志依赖 --><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!--flink--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><!--flink connector--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!--flink connector clickhouse--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.16.0-SNAPSHOT</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId>
<!-- <artifactId>flink-clients_2.12</artifactId>--><version>${flink.version}</version>
<!-- <scope>provided</scope>--></dependency><!-- flink sql --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!-- Flink JDBC Connector -->
<!-- <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.6</version> <!– 与您的Flink版本匹配 –></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.17</version></dependency><!-- ClickHouse JDBC Driver --><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.2</version> <!-- 请根据实际情况选择最新稳定版本 --></dependency><!-- 添加clickhouse-maven依赖--><dependency><groupId>ru.ivi.opensource</groupId><artifactId>flink-clickhouse-sink</artifactId><version>1.2.0</version></dependency><!--module--><dependency><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid-common-core</artifactId><version>${project.build.version}</version></dependency><dependency><groupId>com.mit.microgrid</groupId><artifactId>mit-microgrid-api-history</artifactId><version>${project.build.version}</version></dependency><!--sql parse--><dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.37.0</version></dependency>
<!-- <dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-server</artifactId><version>1.37.0</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-parser</artifactId><version>${flink.version}</version></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-text</artifactId><version>1.12.0</version></dependency></dependencies><build><!--<plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin>-->
<!-- <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin>-->
<!-- </plugins>--><finalName>${project.artifactId}</finalName><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.7.3</version><configuration><mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass><fork>true</fork><layout>ZIP</layout><includeSystemScope>true</includeSystemScope></configuration><executions><execution><goals><goal>repackage</goal></goals><configuration><classifier>-with-dependencies</classifier></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.3.0</version><configuration><archive><addMavenDescriptor>false</addMavenDescriptor><manifest><mainClass>com.mit.microgrid.flink.MitMicrogridFlinkApplication</mainClass><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><descriptors><descriptor>src/main/resources/assembly/assembly.xml</descriptor></descriptors><outputDirectory>./../out</outputDirectory></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
4.2)核心方法:
/*** multiple sql execute** @param sqlList*/public static JobClient flinkSqlJobClientMultiple(List<String> sqlList) {log.info("参数sqlList: {}", sqlList);
// StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);EnvironmentSettings setting = EnvironmentSettings.newInstance().inBatchMode().build();TableEnvironment tEnv = TableEnvironment.create(setting);if (CollectionUtil.isNullOrEmpty(sqlList)) {log.warn("sqlList参数为空");return null;}for (String s : sqlList) {TableResult tableResult = tEnv.executeSql(s);Optional<JobClient> jobClientOptional = tableResult.getJobClient();if (jobClientOptional.isPresent()) {JobClient jobClient = jobClientOptional.get();log.info("jobClient: " + jobClient);return jobClient;}}log.error("没有可执行的job");return null;}
5.源码地址
https://github.com/genghongsheng0/mit-microgrid-flink
相关文章:
执行flink sql连接clickhouse库
手把手教学,flink connector打通clickhouse大数据库,通过下发flink sql,来使用ck。 组件版本jdk1.8flink1.17.2clickhouse23.12.2.59 1.背景 flink官方不支持clickhouse连接器,工作中难免会用到。 2.方案 利用GitHub大佬提供…...
什么是C++中的友元函数和友元类?
友元函数(Friend Function)和 友元类(Friend Class)是用于控制类的访问权限的机制。这允许特定的函数或类访问另一个类的私有成员和保护成员,打破了 C 的封装性规则。 友元函数 定义 友元提供了不同类的成员函数之间…...
基于Spring Boot+Vue的多媒体素材管理系统的设计与实现
一.系统开发工具与环境搭建 1.系统设计开发工具 后端使用Java编程语言的Spring boot框架 项目架构:B/S架构 运行环境:win10/win11、jdk17 前端: 技术:框架Vue.js;UI库:ElementUI; 开发工具&…...
Inpaint-Web:纯浏览器端实现的开源图像处理工具
之前在刷短视频的时候,经常看到一些情侣在景区拍照,结果被路人“抢镜”。有时男朋友会拿出手机,帮忙把那些路人“P”掉,简直是既贴心又有趣。最近我在逛 GitHub 时,发现了一个可以在浏览器端删除照片中部分内容的纯前端…...
商业物联网详细指南:优势与挑战
物联网是信息技术行业最具前景的领域之一。为什么它如此热门呢?原因在于全球连接性。设备可以像人群一样相互协作。正如我们所知,协作能显著提高生产力。 物联网对普通用户和企业都有益处。许多日常流程可以通过传感器、扫描仪、摄像头和其他设备实现自…...
如何在项目中用elementui实现分页器功能
1.在结构部分复制官网代码: <template> 标签: 这是 Vue 模板的根标签,包含所有的 HTML 元素和 Vue 组件。 <div> 标签: 这是一个普通的 HTML 元素,包裹了 el-pagination 组件。它没有特别的意义,只是为了确保 el-pagi…...
Nginx参数配置-笔记
文章目录 upstream实现后台应用服务负载均衡&高可用proxy_set_header参数 upstream实现后台应用服务负载均衡&高可用 角色IPnginx172.168.110.2后端应用服务1172.168.110.3后端应用服务2172.168.110.4后端应用服务3(备用)172.168.110.5 示例如下: upstre…...
衡量神经网络表征相似度
目录 1.中心核对齐技术(CKA)2.Hilbert-Schmidt independence criterion(HSIC)HSIC的计算步骤:HSIC的性质:应用:矩阵中心化操作对于单个数据集的中心化对于两个数据集的中心化(例如,用于HSIC)Python代码示例1.中心核对齐技术(CKA) CKA通过计算两个表征的Gram矩阵(即…...
Javascript高级:深度解析与多种实现方式数组扁平化
数组扁平化:深度解析与多种实现方式 在JavaScript编程中,数组扁平化是一个常见的操作,指的是将一个多维数组转换成一个一维数组。这个过程中,所有嵌套的数组元素都会被“拉平”到同一个层级。数组扁平化在处理嵌套数据结构时非常…...
SpringBoot Data Redis连接Redis-Cluster集群
使用SpringBoot Data Redis无法连接Redis-Cluster集群 最近在研究系统高并发下的缓存架构,因此自己在自己买的云服务器上搭建好Redis 5.0 版本的集群后,使用springboot的 RedisTemplate连接是发现总是访问不到集群节点。上网百度了发现没有好的解决办法&…...
计算机网络——TCP篇
TCP篇 基本认知 TCP和UDP的区别? TCP 和 UDP 可以使用同一个端口吗? 可以的 传输层中 TCP 和 UDP在内核中是两个完全独立的软件模块。可以根据协议字段来选择不同的模块来处理。 TCP 连接建立 TCP 三次握手过程是怎样的? 一次握手:客户端发送带有 …...
【网络安全面经】技术性问题3
11. 一次完整的 HTTP 请求过程 域名解析:通过 DNS 将域名转换为 IP 地址,如上述 DNS 的工作原理。建立 TCP 连接:客户端向服务器发送 SYN 报文段,经过三次握手建立 TCP 连接。发送 HTTP 请求:客户端向服务器发送 HTTP…...
前后端交互之动态列
一. 情景 在做项目时,有时候后会遇到后端使用了聚合函数,导致生成的对象的属性数量或数量不固定,因此无法建立一个与之对应的对象来向前端传递数据,这时可以采用NameDataListVO向前端传递数据。 Data Builder AllArgsConstructo…...
递归(3)----力扣40组合数2,力扣473火柴拼正方形
给定一个候选人编号的集合 candidates 和一个目标数 target ,找出 candidates 中所有可以使数字和为 target 的组合。 candidates 中的每个数字在每个组合中只能使用 一次 。 注意:解集不能包含重复的组合。 示例 1: 输入: candidates [10,1,2,7,6,1…...
十一:HTTP 状态码详解:解读每一个响应背后的意义
HTTP(超文本传输协议)是网络通信的基石之一,主要用于客户端(例如浏览器)和服务器之间的通信。为了让服务器能准确地向客户端反馈请求的处理状态,HTTP设计了一套标准的状态码。每一个状态码代表了特定的含义,指示了请求的状态、潜在的问题或成功的信息。 1. 信息响应 (1…...
《译文》2024年11月数维杯国际大学生数学建模挑战赛题目
# 赛题正式发布 2024年第十届数维杯国际大学生数学建模挑战赛顺利开赛,竞赛开始时间为北京时间2024年11月15日09:00至北京时间2024年11月19日09:00,共计4天,竞赛题目正式发布,快来一起围观,你认为今年的哪个题目更具有…...
shell命令统计文件行数之和
你可以使用以下 shell 命令来统计每个 .txt 文件的行数,并将其加和在一起: find . -name "*.txt" -not -name "*.json" -exec wc -l {} + | awk {sum += $1} END {print sum} 解释: find . -name "*.txt" -not -name "*.json": f…...
第02章 CentOS基本操作
2.文件基本操作【文件操作(一)】 目标 理解Linux下路径的表示方法能够使用命令(mkdir和touch)在指定位置创建目录和文件能够使用命令(rm)删除指定的目录和文件能够使用命令(ls)列出目录里的文件能够使用命令(cat,head,tail,less,more)查看文件内容理解标…...
241113.学习日志——[CSDIY] [ByteDance] 后端训练营 [02]
CSDIY:这是一个非科班学生的努力之路,从今天开始这个系列会长期更新,(最好做到日更),我会慢慢把自己目前对CS的努力逐一上传,帮助那些和我一样有着梦想的玩家取得胜利!!&…...
【HOT100第三天】和为K的子数组,最大子数组和,合并区间,轮转数组
今天练的是子串和子数组专题 ~ (前缀和那里差点学死了) 560.和为K的子数组 给你一个整数数组 nums 和一个整数 k ,请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续非空序列。 先写个暴力法,用昨天刚学…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...
k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
