Flink之RedisSink
在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.
- 下载源码包
通过下图进入到GitHub

选择clone或download源码都可以,如下图

- 编译源码包
下载好源码后,maven会自动下载对应的依赖项- 删除不需要的子项目
因为我们这里需要编译redis对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除

- 修改
pom文件
删除掉不需要的子项目后,在pom文件中也要删除对应的子项目配置
修改完成模块配置后,还需要修改对应的<!-- 这里只保留这一个模块就可以了 --> <modules><module>flink-connector-redis</module> </modules>flink和scala版本依赖,这个根据自己实际的开发环境进行修改
这些都完成后就可以通过<properties><!-- 修改这里的版本就可以 --><!-- Flink version --><flink.version>1.15.3</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.11</scala.version> </properties>maven下载对应的依赖了.
- 删除不需要的子项目
- 编译安装
依赖下载完成后pom文件中可能会有几处是报错的状态,如下图



以上几处错误无需理会,不影响扩展包的编译.
接下来通过maven的install将扩展包编译并安装到本地的maven资源库,如下图

编译完成后我们就可以在自己的flink项目中引入对应的扩展包了
上面依赖中<!-- Redis connector --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis</artifactId><version>1.2-SNAPSHOT</version></dependency>groupId是固定的,artifactId要根据flink-connector-redis项目中的pom文件中artifactId来拿,同样version也是一样,到这里扩展包的问题就已经解决了. - 代码
其实在GitHub上已经给了代码示例单机(java,scala)、集群(java,scala)的代码模板都是有的,下面就以单机redis作为示例.
这里我们要创建一个类实现RedisMapperimport org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/ public class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {@Override// 这个方法是选择使用哪种命令插入数据到Redispublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Override// 这个方法是选择哪个作为Keypublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}@Override// 这个方法是选择哪个作为Valuepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;} }
到这里代码就结束了,具体应用根据实际业务需求进行更改.import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/ public class FlinkRedisSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源为了方便测试DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 将数据转换成Tuple的形式SingleOutputStreamOperator<Tuple2<String, String>> tuple2Stream = customizeSource.map((MapFunction<CustomizeBean, Tuple2<String, String>>) value -> Tuple2.of(value.getAge() + "-" + value.getHobbit(), value.toString())).returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码// 配置RedisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1") // redis服务器地址.setPassword("password") // redis密码.build();// 添加Sinktuple2Stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());env.execute("Redis Sink");} }
相关文章:
Flink之RedisSink
在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包. 下…...
STM32CubeMx学习与K210串口通信+识别橘色色块——点亮小灯
K210模块的串口发送代码 引入模块 import sensor, image,time,lcd,utime import KPU as kpu import gc, sys from fpioa_manager import fm from machine import UART 锁定引脚 和 申明串口 fm.register(9, fm.fpioa.UART1_TX, forceTrue) fm.register(10, fm.fpioa.UART1_R…...
睿讯微带你深度了解汽车交流充电桩
这几年随着新能源汽车的普及,充电桩也越来越多的出现在我们的视野中。新能源纯电汽车就好比一种大号的电子产品,而充电桩则是它不可缺少的子系统,是新能源车主们的必要选择。 汽车充电桩分为直流和交流两种,2022年底全国公共充电桩…...
word怎么压缩到10m以下?文件压缩很简单
Word文档是我们工作和学习中一直需要用到的,但有时候Word文档体积过大,给存储和传输带来了不便,这时候我们可以做的就压缩Word。 通常情况下,影响Word文档过大的主要因素主要是图片过多、音视频插入、格式的设置、文字内容的增多以…...
I.MX6ULL_Linux_驱动篇(43)linux通用LED驱动
前面我们都是自己编写 LED 灯驱动,其实像 LED 灯这样非常基础的设备驱动, Linux 内核已经集成了。 Linux 内核的 LED 灯驱动采用 platform 框架,因此我们只需要按照要求在设备树文件中添加相应的 LED 节点即可,本章我们就来学习如…...
OPTEE之sonarlint静态代码分析实战二——optee_client
ATF(TF-A)/OPTEE之静态代码分析汇总 目录 一、optee_client源码下载及分析 二、扫描类型归类...
c++调用ffmpeg api将视频文件内容进行udp推流
代码及工程见https://download.csdn.net/download/daqinzl/88156926 开发工具:visual studio 2019 播放,采用ffmpeg工具集里的ffplay.exe, 执行命令 ffplay udp://238.1.1.10:6016 主要代码如下: #include "pch.h" #include <iostream&g…...
助力工业物联网,工业大数据之服务域:油站主题分析【二十六】
文章目录 07:服务域:油站主题分析08:服务域:油站主题实现 07:服务域:油站主题分析 目标:掌握油站主题的需求分析 路径 step1:需求step2:分析 实施 需求:统计…...
MySql之索引
MySql之索引 1.索引概述 MySql官方对索引的定义为:索引是帮助MySql高效获取数据的数据结构。在数据之外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用数据,这样就可以在这些数据结构上实现高级查找…...
adb调试
连不上 adb 如果还遇到5037端口被占用的问题,就找出进程号用taskkill命令杀死该进程即可 1、查找5037端口对应的进程:netstat -ano|findstr 5037 2、杀死该进程:taskkill /F /PID pid 连接unity profiler 打开发包,并安装在手机…...
ElasticSearch_学习笔记
一、初始elasticsearch 什么是elasticsearch? 一个开源的分布式搜索引擎,可以用来时限搜素、日志统计、分析、系统监控等功能。什么是elasitc stack(ELK)? 是以elasticsearch为核心的技术栈,包括 beats、L…...
Portraiture 4.0.3 for windows/Mac简体中文版(ps人像磨皮滤镜插件)
Imagenomic Portraiture系列插件作为PS磨皮美白必备插件,可以说是最强,今天它更新到了4.0.3版本。但是全网都没有汉化包,经过几个日夜汉化,终于汉化完成可能是全网首个Portraiture 4的汉化包,请大家体验,有…...
Java精品项目源码第152期火车票预订系统(编号M062)
Java精品项目源码第152期火车票预订系统(编号M062) 大家好,小辰今天给大家介绍一个基于Spring Springboot MyBatis实现的火车票预订系统,演示视频文章末尾公众号对号查询观看即可 文章目录 Java精品项目源码第152期火车票预订系…...
嵌入式软件C/C++(技术面试题)
一,网络 1,TCP窗口机制 TCP(传输控制协议)是一种可靠的、面向连接的传输层协议。其中的窗口机制是TCP协议中的一项重要功能,用于控制数据在发送和接收之间的流程。 TCP窗口机制是利用滑动窗口的方式来进行拥塞控制和…...
Idea中侧面栏不见了,如何设置?
一、打开idea点击File然后点击Setting 二、点击Appearance,然后划到最下面,勾选Show tool windows bars和Side-by-side layout on the left 三、侧面栏目正常显示...
构建高效读写分离MySQL主从复制架构,应对高可用挑战!
前言 在现代数据库架构中,MySQL主从复制技术扮演着重要角色。它不仅可以提升数据库性能和可扩展性,还赋予系统卓越的高可用性和灾难恢复能力。本文将深入剖析MySQL主从复制的内部机制,同时通过一个实际案例,展示其在实际场景中的…...
Stable Diffusion系列课程二:ControlNet
AUTOMATIC1111/stable-diffusion-webui参考B站Nenly视频《零基础学会Stable Diffusion》、视频课件推荐网站:stable-diffusion-art、Civitai(魔法) 、libilibi、AI艺术天堂推荐Stable Diffusion整合资料: NovelAI资源整合、《AI绘…...
【css】使用float实现水平导航栏
该实例使用float 浮动实现元素浮动在水平方向,从而实现水平导航栏效果。 overflow: hidden:当不给父级元素设置高度的时候,其内部元素浮动后会导致下面的元素顶上去,这是因为子元素浮动后,子元素脱离标准流࿰…...
IDEA超强XSD文件编辑插件-XSD / WSDL Visualizer
前言 XSD / WSDL Visualizer可以简化XML架构定义(XSD)和WSDL文件编辑过程; 通过使用与IntelliJ无缝集成的可视化编辑器,转换处理XSD和WSDL文件的方式。告别导航复杂和难以阅读的代码的挫败感,迎接流线型和直观的体验。 插件安装 在线安装 IntelliJ IDE…...
Nodejs 第三章(Npm Package json)
npm npm(全称 Node Package Manager)是 Node.js 的包管理工具,它是一个基于命令行的工具,用于帮助开发者在自己的项目中安装、升级、移除和管理依赖项。 https://www.npmjs.com/ 类似于 PHP 的工具:Composer。它是 …...
避坑指南:SAP冲销原因配置常见错误及解决方案(附SPRO操作截图)
SAP FI模块冲销原因配置实战避坑指南 刚接触SAP FI模块的财务顾问们,在配置冲销原因时往往会遇到各种"坑"。这些看似简单的后台配置,一旦出错可能导致整个月结流程卡壳。本文将结合真实项目案例,带你避开那些教科书上不会写的配置陷…...
突破端侧极限!让 Gemma 4 在手机不仅能跑,还能“用中文张口说话” —— 安卓端侧大模型
2026 年 4 月初,Google 抛下了一枚重磅炸弹:Gemma 4 终于来了!更令人震撼的是,他们真的把多模态大模型完完整整塞进了手机里 —— 这一次,完全不需要联网、不需要传数据到云端,真正的零延迟隐私拉满的端侧离…...
知识图谱嵌入评估实战:从MRR到HITS@n的指标解析与应用
1. 知识图谱嵌入评估指标入门指南 第一次接触知识图谱嵌入评估时,我被各种缩写搞得晕头转向。MRR、MR、HITSn这些指标就像天书一样,直到我在实际项目中踩了几个坑才真正理解它们的意义。现在我就用最直白的语言,带你快速掌握这些核心指标。 …...
实用高效:socat-windows网络数据转发实战配置与性能优化指南
实用高效:socat-windows网络数据转发实战配置与性能优化指南 【免费下载链接】socat-windows unofficial windows build of socat http://www.dest-unreach.org/socat/ 项目地址: https://gitcode.com/gh_mirrors/so/socat-windows socat-windows是Windows平…...
C/C++ Socket网络编程 介绍
前言:对于C/C初学者来说,网络编程似乎是一道"门槛",而Socket就是打开这扇门的钥匙。今天我们一起来看看如何入门Socket网络编程。 目录 一、什么是Socket 二、Socket编程流程 三、TCP Socket编程示例 四、一些注意事项 一、什么…...
5个高效技巧:downkyi批量下载完全指南
5个高效技巧:downkyi批量下载完全指南 【免费下载链接】downkyi 哔哩下载姬downkyi,哔哩哔哩网站视频下载工具,支持批量下载,支持8K、HDR、杜比视界,提供工具箱(音视频提取、去水印等)。 项目…...
如何通过Everything Claude Code实现Next.js Turbopack的AI驱动性能优化:终极指南
如何通过Everything Claude Code实现Next.js Turbopack的AI驱动性能优化:终极指南 【免费下载链接】everything-claude-code The agent harness performance optimization system. Skills, instincts, memory, security, and research-first development for Claude…...
抖音下载器:告别录屏时代,3步打造你的专属内容库
抖音下载器:告别录屏时代,3步打造你的专属内容库 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback…...
OpenCore Legacy Patcher终极指南:5步让旧Mac重获新生
OpenCore Legacy Patcher终极指南:5步让旧Mac重获新生 【免费下载链接】OpenCore-Legacy-Patcher Experience macOS just like before 项目地址: https://gitcode.com/GitHub_Trending/op/OpenCore-Legacy-Patcher 还在为Mac无法升级到最新macOS而烦恼吗&…...
告别手动翻页!用幻影联动+DLL调用,5分钟搞定通达信分时指标自动选股
通达信分时指标自动化选股实战:幻影联动DLL调用的高效解决方案 在瞬息万变的股票市场中,分时级别的交易信号往往转瞬即逝。传统的手动翻页监控方式不仅效率低下,还容易错过最佳买卖时机。本文将详细介绍如何通过幻影联动软件结合DLL调用技术&…...
