Flink之RedisSink
在Flink开发中经常会有将数据写入到redis
的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar
资源,这个时候就用到了bahir
,barhir
是apahce
的开源项目,是专门给spark
和flink
提供扩展包使用的,bahir
官网,这篇文章就介绍下如何自己编译RedisSink
扩展包.
- 下载源码包
通过下图进入到GitHub
选择clone
或download
源码都可以,如下图
- 编译源码包
下载好源码后,maven
会自动下载对应的依赖项- 删除不需要的子项目
因为我们这里需要编译redis
对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除
- 修改
pom
文件
删除掉不需要的子项目后,在pom
文件中也要删除对应的子项目配置
修改完成模块配置后,还需要修改对应的<!-- 这里只保留这一个模块就可以了 --> <modules><module>flink-connector-redis</module> </modules>
flink
和scala
版本依赖,这个根据自己实际的开发环境进行修改
这些都完成后就可以通过<properties><!-- 修改这里的版本就可以 --><!-- Flink version --><flink.version>1.15.3</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.11</scala.version> </properties>
maven
下载对应的依赖了.
- 删除不需要的子项目
- 编译安装
依赖下载完成后pom
文件中可能会有几处是报错的状态,如下图
以上几处错误无需理会,不影响扩展包的编译.
接下来通过maven
的install
将扩展包编译并安装到本地的maven
资源库,如下图
编译完成后我们就可以在自己的flink
项目中引入对应的扩展包了
上面依赖中<!-- Redis connector --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis</artifactId><version>1.2-SNAPSHOT</version></dependency>
groupId
是固定的,artifactId
要根据flink-connector-redis
项目中的pom
文件中artifactId
来拿,同样version
也是一样,到这里扩展包的问题就已经解决了. - 代码
其实在GitHub
上已经给了代码示例单机(java
,scala
)、集群(java
,scala
)的代码模板都是有的,下面就以单机redis
作为示例.
这里我们要创建一个类实现RedisMapper
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/ public class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {@Override// 这个方法是选择使用哪种命令插入数据到Redispublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Override// 这个方法是选择哪个作为Keypublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}@Override// 这个方法是选择哪个作为Valuepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;} }
到这里代码就结束了,具体应用根据实际业务需求进行更改.import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/ public class FlinkRedisSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源为了方便测试DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 将数据转换成Tuple的形式SingleOutputStreamOperator<Tuple2<String, String>> tuple2Stream = customizeSource.map((MapFunction<CustomizeBean, Tuple2<String, String>>) value -> Tuple2.of(value.getAge() + "-" + value.getHobbit(), value.toString())).returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码// 配置RedisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1") // redis服务器地址.setPassword("password") // redis密码.build();// 添加Sinktuple2Stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());env.execute("Redis Sink");} }
相关文章:

Flink之RedisSink
在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包. 下…...
STM32CubeMx学习与K210串口通信+识别橘色色块——点亮小灯
K210模块的串口发送代码 引入模块 import sensor, image,time,lcd,utime import KPU as kpu import gc, sys from fpioa_manager import fm from machine import UART 锁定引脚 和 申明串口 fm.register(9, fm.fpioa.UART1_TX, forceTrue) fm.register(10, fm.fpioa.UART1_R…...

睿讯微带你深度了解汽车交流充电桩
这几年随着新能源汽车的普及,充电桩也越来越多的出现在我们的视野中。新能源纯电汽车就好比一种大号的电子产品,而充电桩则是它不可缺少的子系统,是新能源车主们的必要选择。 汽车充电桩分为直流和交流两种,2022年底全国公共充电桩…...

word怎么压缩到10m以下?文件压缩很简单
Word文档是我们工作和学习中一直需要用到的,但有时候Word文档体积过大,给存储和传输带来了不便,这时候我们可以做的就压缩Word。 通常情况下,影响Word文档过大的主要因素主要是图片过多、音视频插入、格式的设置、文字内容的增多以…...

I.MX6ULL_Linux_驱动篇(43)linux通用LED驱动
前面我们都是自己编写 LED 灯驱动,其实像 LED 灯这样非常基础的设备驱动, Linux 内核已经集成了。 Linux 内核的 LED 灯驱动采用 platform 框架,因此我们只需要按照要求在设备树文件中添加相应的 LED 节点即可,本章我们就来学习如…...
OPTEE之sonarlint静态代码分析实战二——optee_client
ATF(TF-A)/OPTEE之静态代码分析汇总 目录 一、optee_client源码下载及分析 二、扫描类型归类...
c++调用ffmpeg api将视频文件内容进行udp推流
代码及工程见https://download.csdn.net/download/daqinzl/88156926 开发工具:visual studio 2019 播放,采用ffmpeg工具集里的ffplay.exe, 执行命令 ffplay udp://238.1.1.10:6016 主要代码如下: #include "pch.h" #include <iostream&g…...

助力工业物联网,工业大数据之服务域:油站主题分析【二十六】
文章目录 07:服务域:油站主题分析08:服务域:油站主题实现 07:服务域:油站主题分析 目标:掌握油站主题的需求分析 路径 step1:需求step2:分析 实施 需求:统计…...

MySql之索引
MySql之索引 1.索引概述 MySql官方对索引的定义为:索引是帮助MySql高效获取数据的数据结构。在数据之外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用数据,这样就可以在这些数据结构上实现高级查找…...
adb调试
连不上 adb 如果还遇到5037端口被占用的问题,就找出进程号用taskkill命令杀死该进程即可 1、查找5037端口对应的进程:netstat -ano|findstr 5037 2、杀死该进程:taskkill /F /PID pid 连接unity profiler 打开发包,并安装在手机…...

ElasticSearch_学习笔记
一、初始elasticsearch 什么是elasticsearch? 一个开源的分布式搜索引擎,可以用来时限搜素、日志统计、分析、系统监控等功能。什么是elasitc stack(ELK)? 是以elasticsearch为核心的技术栈,包括 beats、L…...

Portraiture 4.0.3 for windows/Mac简体中文版(ps人像磨皮滤镜插件)
Imagenomic Portraiture系列插件作为PS磨皮美白必备插件,可以说是最强,今天它更新到了4.0.3版本。但是全网都没有汉化包,经过几个日夜汉化,终于汉化完成可能是全网首个Portraiture 4的汉化包,请大家体验,有…...

Java精品项目源码第152期火车票预订系统(编号M062)
Java精品项目源码第152期火车票预订系统(编号M062) 大家好,小辰今天给大家介绍一个基于Spring Springboot MyBatis实现的火车票预订系统,演示视频文章末尾公众号对号查询观看即可 文章目录 Java精品项目源码第152期火车票预订系…...
嵌入式软件C/C++(技术面试题)
一,网络 1,TCP窗口机制 TCP(传输控制协议)是一种可靠的、面向连接的传输层协议。其中的窗口机制是TCP协议中的一项重要功能,用于控制数据在发送和接收之间的流程。 TCP窗口机制是利用滑动窗口的方式来进行拥塞控制和…...

Idea中侧面栏不见了,如何设置?
一、打开idea点击File然后点击Setting 二、点击Appearance,然后划到最下面,勾选Show tool windows bars和Side-by-side layout on the left 三、侧面栏目正常显示...
构建高效读写分离MySQL主从复制架构,应对高可用挑战!
前言 在现代数据库架构中,MySQL主从复制技术扮演着重要角色。它不仅可以提升数据库性能和可扩展性,还赋予系统卓越的高可用性和灾难恢复能力。本文将深入剖析MySQL主从复制的内部机制,同时通过一个实际案例,展示其在实际场景中的…...

Stable Diffusion系列课程二:ControlNet
AUTOMATIC1111/stable-diffusion-webui参考B站Nenly视频《零基础学会Stable Diffusion》、视频课件推荐网站:stable-diffusion-art、Civitai(魔法) 、libilibi、AI艺术天堂推荐Stable Diffusion整合资料: NovelAI资源整合、《AI绘…...

【css】使用float实现水平导航栏
该实例使用float 浮动实现元素浮动在水平方向,从而实现水平导航栏效果。 overflow: hidden:当不给父级元素设置高度的时候,其内部元素浮动后会导致下面的元素顶上去,这是因为子元素浮动后,子元素脱离标准流࿰…...

IDEA超强XSD文件编辑插件-XSD / WSDL Visualizer
前言 XSD / WSDL Visualizer可以简化XML架构定义(XSD)和WSDL文件编辑过程; 通过使用与IntelliJ无缝集成的可视化编辑器,转换处理XSD和WSDL文件的方式。告别导航复杂和难以阅读的代码的挫败感,迎接流线型和直观的体验。 插件安装 在线安装 IntelliJ IDE…...
Nodejs 第三章(Npm Package json)
npm npm(全称 Node Package Manager)是 Node.js 的包管理工具,它是一个基于命令行的工具,用于帮助开发者在自己的项目中安装、升级、移除和管理依赖项。 https://www.npmjs.com/ 类似于 PHP 的工具:Composer。它是 …...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
Linux链表操作全解析
Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表?1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...