Flink之RedisSink
在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.
- 下载源码包
通过下图进入到GitHub

选择clone或download源码都可以,如下图

- 编译源码包
下载好源码后,maven会自动下载对应的依赖项- 删除不需要的子项目
因为我们这里需要编译redis对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除

- 修改
pom文件
删除掉不需要的子项目后,在pom文件中也要删除对应的子项目配置
修改完成模块配置后,还需要修改对应的<!-- 这里只保留这一个模块就可以了 --> <modules><module>flink-connector-redis</module> </modules>flink和scala版本依赖,这个根据自己实际的开发环境进行修改
这些都完成后就可以通过<properties><!-- 修改这里的版本就可以 --><!-- Flink version --><flink.version>1.15.3</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.11</scala.version> </properties>maven下载对应的依赖了.
- 删除不需要的子项目
- 编译安装
依赖下载完成后pom文件中可能会有几处是报错的状态,如下图



以上几处错误无需理会,不影响扩展包的编译.
接下来通过maven的install将扩展包编译并安装到本地的maven资源库,如下图

编译完成后我们就可以在自己的flink项目中引入对应的扩展包了
上面依赖中<!-- Redis connector --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis</artifactId><version>1.2-SNAPSHOT</version></dependency>groupId是固定的,artifactId要根据flink-connector-redis项目中的pom文件中artifactId来拿,同样version也是一样,到这里扩展包的问题就已经解决了. - 代码
其实在GitHub上已经给了代码示例单机(java,scala)、集群(java,scala)的代码模板都是有的,下面就以单机redis作为示例.
这里我们要创建一个类实现RedisMapperimport org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/ public class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {@Override// 这个方法是选择使用哪种命令插入数据到Redispublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Override// 这个方法是选择哪个作为Keypublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}@Override// 这个方法是选择哪个作为Valuepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;} }
到这里代码就结束了,具体应用根据实际业务需求进行更改.import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/ public class FlinkRedisSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源为了方便测试DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 将数据转换成Tuple的形式SingleOutputStreamOperator<Tuple2<String, String>> tuple2Stream = customizeSource.map((MapFunction<CustomizeBean, Tuple2<String, String>>) value -> Tuple2.of(value.getAge() + "-" + value.getHobbit(), value.toString())).returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码// 配置RedisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1") // redis服务器地址.setPassword("password") // redis密码.build();// 添加Sinktuple2Stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());env.execute("Redis Sink");} }
相关文章:
Flink之RedisSink
在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包. 下…...
STM32CubeMx学习与K210串口通信+识别橘色色块——点亮小灯
K210模块的串口发送代码 引入模块 import sensor, image,time,lcd,utime import KPU as kpu import gc, sys from fpioa_manager import fm from machine import UART 锁定引脚 和 申明串口 fm.register(9, fm.fpioa.UART1_TX, forceTrue) fm.register(10, fm.fpioa.UART1_R…...
睿讯微带你深度了解汽车交流充电桩
这几年随着新能源汽车的普及,充电桩也越来越多的出现在我们的视野中。新能源纯电汽车就好比一种大号的电子产品,而充电桩则是它不可缺少的子系统,是新能源车主们的必要选择。 汽车充电桩分为直流和交流两种,2022年底全国公共充电桩…...
word怎么压缩到10m以下?文件压缩很简单
Word文档是我们工作和学习中一直需要用到的,但有时候Word文档体积过大,给存储和传输带来了不便,这时候我们可以做的就压缩Word。 通常情况下,影响Word文档过大的主要因素主要是图片过多、音视频插入、格式的设置、文字内容的增多以…...
I.MX6ULL_Linux_驱动篇(43)linux通用LED驱动
前面我们都是自己编写 LED 灯驱动,其实像 LED 灯这样非常基础的设备驱动, Linux 内核已经集成了。 Linux 内核的 LED 灯驱动采用 platform 框架,因此我们只需要按照要求在设备树文件中添加相应的 LED 节点即可,本章我们就来学习如…...
OPTEE之sonarlint静态代码分析实战二——optee_client
ATF(TF-A)/OPTEE之静态代码分析汇总 目录 一、optee_client源码下载及分析 二、扫描类型归类...
c++调用ffmpeg api将视频文件内容进行udp推流
代码及工程见https://download.csdn.net/download/daqinzl/88156926 开发工具:visual studio 2019 播放,采用ffmpeg工具集里的ffplay.exe, 执行命令 ffplay udp://238.1.1.10:6016 主要代码如下: #include "pch.h" #include <iostream&g…...
助力工业物联网,工业大数据之服务域:油站主题分析【二十六】
文章目录 07:服务域:油站主题分析08:服务域:油站主题实现 07:服务域:油站主题分析 目标:掌握油站主题的需求分析 路径 step1:需求step2:分析 实施 需求:统计…...
MySql之索引
MySql之索引 1.索引概述 MySql官方对索引的定义为:索引是帮助MySql高效获取数据的数据结构。在数据之外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用数据,这样就可以在这些数据结构上实现高级查找…...
adb调试
连不上 adb 如果还遇到5037端口被占用的问题,就找出进程号用taskkill命令杀死该进程即可 1、查找5037端口对应的进程:netstat -ano|findstr 5037 2、杀死该进程:taskkill /F /PID pid 连接unity profiler 打开发包,并安装在手机…...
ElasticSearch_学习笔记
一、初始elasticsearch 什么是elasticsearch? 一个开源的分布式搜索引擎,可以用来时限搜素、日志统计、分析、系统监控等功能。什么是elasitc stack(ELK)? 是以elasticsearch为核心的技术栈,包括 beats、L…...
Portraiture 4.0.3 for windows/Mac简体中文版(ps人像磨皮滤镜插件)
Imagenomic Portraiture系列插件作为PS磨皮美白必备插件,可以说是最强,今天它更新到了4.0.3版本。但是全网都没有汉化包,经过几个日夜汉化,终于汉化完成可能是全网首个Portraiture 4的汉化包,请大家体验,有…...
Java精品项目源码第152期火车票预订系统(编号M062)
Java精品项目源码第152期火车票预订系统(编号M062) 大家好,小辰今天给大家介绍一个基于Spring Springboot MyBatis实现的火车票预订系统,演示视频文章末尾公众号对号查询观看即可 文章目录 Java精品项目源码第152期火车票预订系…...
嵌入式软件C/C++(技术面试题)
一,网络 1,TCP窗口机制 TCP(传输控制协议)是一种可靠的、面向连接的传输层协议。其中的窗口机制是TCP协议中的一项重要功能,用于控制数据在发送和接收之间的流程。 TCP窗口机制是利用滑动窗口的方式来进行拥塞控制和…...
Idea中侧面栏不见了,如何设置?
一、打开idea点击File然后点击Setting 二、点击Appearance,然后划到最下面,勾选Show tool windows bars和Side-by-side layout on the left 三、侧面栏目正常显示...
构建高效读写分离MySQL主从复制架构,应对高可用挑战!
前言 在现代数据库架构中,MySQL主从复制技术扮演着重要角色。它不仅可以提升数据库性能和可扩展性,还赋予系统卓越的高可用性和灾难恢复能力。本文将深入剖析MySQL主从复制的内部机制,同时通过一个实际案例,展示其在实际场景中的…...
Stable Diffusion系列课程二:ControlNet
AUTOMATIC1111/stable-diffusion-webui参考B站Nenly视频《零基础学会Stable Diffusion》、视频课件推荐网站:stable-diffusion-art、Civitai(魔法) 、libilibi、AI艺术天堂推荐Stable Diffusion整合资料: NovelAI资源整合、《AI绘…...
【css】使用float实现水平导航栏
该实例使用float 浮动实现元素浮动在水平方向,从而实现水平导航栏效果。 overflow: hidden:当不给父级元素设置高度的时候,其内部元素浮动后会导致下面的元素顶上去,这是因为子元素浮动后,子元素脱离标准流࿰…...
IDEA超强XSD文件编辑插件-XSD / WSDL Visualizer
前言 XSD / WSDL Visualizer可以简化XML架构定义(XSD)和WSDL文件编辑过程; 通过使用与IntelliJ无缝集成的可视化编辑器,转换处理XSD和WSDL文件的方式。告别导航复杂和难以阅读的代码的挫败感,迎接流线型和直观的体验。 插件安装 在线安装 IntelliJ IDE…...
Nodejs 第三章(Npm Package json)
npm npm(全称 Node Package Manager)是 Node.js 的包管理工具,它是一个基于命令行的工具,用于帮助开发者在自己的项目中安装、升级、移除和管理依赖项。 https://www.npmjs.com/ 类似于 PHP 的工具:Composer。它是 …...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
OpenLayers 分屏对比(地图联动)
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能,和卷帘图层不一样的是,分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...
10-Oracle 23 ai Vector Search 概述和参数
一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...
