当前位置: 首页 > news >正文

Flink之RedisSink

在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhirapahce的开源项目,是专门给sparkflink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.

  • 下载源码包
    通过下图进入到GitHub
    在这里插入图片描述
    选择clonedownload源码都可以,如下图
    在这里插入图片描述
  • 编译源码包
    下载好源码后,maven会自动下载对应的依赖项
    • 删除不需要的子项目
      因为我们这里需要编译redis对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除
      在这里插入图片描述
    • 修改pom文件
      删除掉不需要的子项目后,在pom文件中也要删除对应的子项目配置
      <!-- 这里只保留这一个模块就可以了 -->
      <modules><module>flink-connector-redis</module>
      </modules>
      
      修改完成模块配置后,还需要修改对应的flinkscala版本依赖,这个根据自己实际的开发环境进行修改
       <properties><!-- 修改这里的版本就可以 --><!-- Flink version --><flink.version>1.15.3</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.11</scala.version>
      </properties>
      
      这些都完成后就可以通过maven下载对应的依赖了.
  • 编译安装
    依赖下载完成后pom文件中可能会有几处是报错的状态,如下图
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    以上几处错误无需理会,不影响扩展包的编译.
    接下来通过maveninstall将扩展包编译并安装到本地的maven资源库,如下图
    在这里插入图片描述
    编译完成后我们就可以在自己的flink项目中引入对应的扩展包了
        <!-- Redis connector --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis</artifactId><version>1.2-SNAPSHOT</version></dependency>
    
    上面依赖中groupId是固定的,artifactId要根据flink-connector-redis项目中的pom文件中artifactId来拿,同样version也是一样,到这里扩展包的问题就已经解决了.
  • 代码
    其实在GitHub上已经给了代码示例单机(java,scala)、集群(java,scala)的代码模板都是有的,下面就以单机redis作为示例.
    这里我们要创建一个类实现RedisMapper
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/
    public class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {@Override// 这个方法是选择使用哪种命令插入数据到Redispublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Override// 这个方法是选择哪个作为Keypublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}@Override// 这个方法是选择哪个作为Valuepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}
    }
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/
    public class FlinkRedisSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源为了方便测试DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 将数据转换成Tuple的形式SingleOutputStreamOperator<Tuple2<String, String>> tuple2Stream = customizeSource.map((MapFunction<CustomizeBean, Tuple2<String, String>>) value -> Tuple2.of(value.getAge() + "-" + value.getHobbit(), value.toString())).returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码// 配置RedisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1") // redis服务器地址.setPassword("password") // redis密码.build();// 添加Sinktuple2Stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());env.execute("Redis Sink");}
    }
    
    到这里代码就结束了,具体应用根据实际业务需求进行更改.

相关文章:

Flink之RedisSink

在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包. 下…...

STM32CubeMx学习与K210串口通信+识别橘色色块——点亮小灯

K210模块的串口发送代码 引入模块 import sensor, image,time,lcd,utime import KPU as kpu import gc, sys from fpioa_manager import fm from machine import UART 锁定引脚 和 申明串口 fm.register(9, fm.fpioa.UART1_TX, forceTrue) fm.register(10, fm.fpioa.UART1_R…...

睿讯微带你深度了解汽车交流充电桩

这几年随着新能源汽车的普及&#xff0c;充电桩也越来越多的出现在我们的视野中。新能源纯电汽车就好比一种大号的电子产品&#xff0c;而充电桩则是它不可缺少的子系统&#xff0c;是新能源车主们的必要选择。 汽车充电桩分为直流和交流两种&#xff0c;2022年底全国公共充电桩…...

word怎么压缩到10m以下?文件压缩很简单

Word文档是我们工作和学习中一直需要用到的&#xff0c;但有时候Word文档体积过大&#xff0c;给存储和传输带来了不便&#xff0c;这时候我们可以做的就压缩Word。 通常情况下&#xff0c;影响Word文档过大的主要因素主要是图片过多、音视频插入、格式的设置、文字内容的增多以…...

I.MX6ULL_Linux_驱动篇(43)linux通用LED驱动

前面我们都是自己编写 LED 灯驱动&#xff0c;其实像 LED 灯这样非常基础的设备驱动&#xff0c; Linux 内核已经集成了。 Linux 内核的 LED 灯驱动采用 platform 框架&#xff0c;因此我们只需要按照要求在设备树文件中添加相应的 LED 节点即可&#xff0c;本章我们就来学习如…...

OPTEE之sonarlint静态代码分析实战二——optee_client

ATF(TF-A)/OPTEE之静态代码分析汇总 目录 一、optee_client源码下载及分析 二、扫描类型归类...

c++调用ffmpeg api将视频文件内容进行udp推流

代码及工程见https://download.csdn.net/download/daqinzl/88156926 开发工具&#xff1a;visual studio 2019 播放&#xff0c;采用ffmpeg工具集里的ffplay.exe, 执行命令 ffplay udp://238.1.1.10:6016 主要代码如下: #include "pch.h" #include <iostream&g…...

助力工业物联网,工业大数据之服务域:油站主题分析【二十六】

文章目录 07&#xff1a;服务域&#xff1a;油站主题分析08&#xff1a;服务域&#xff1a;油站主题实现 07&#xff1a;服务域&#xff1a;油站主题分析 目标&#xff1a;掌握油站主题的需求分析 路径 step1&#xff1a;需求step2&#xff1a;分析 实施 需求&#xff1a;统计…...

MySql之索引

MySql之索引 1.索引概述 MySql官方对索引的定义为&#xff1a;索引是帮助MySql高效获取数据的数据结构。在数据之外&#xff0c;数据库系统还维护着满足特定查找算法的数据结构&#xff0c;这些数据结构以某种方式引用数据&#xff0c;这样就可以在这些数据结构上实现高级查找…...

adb调试

连不上 adb 如果还遇到5037端口被占用的问题&#xff0c;就找出进程号用taskkill命令杀死该进程即可 1、查找5037端口对应的进程&#xff1a;netstat -ano|findstr 5037 2、杀死该进程&#xff1a;taskkill /F /PID pid 连接unity profiler 打开发包&#xff0c;并安装在手机…...

ElasticSearch_学习笔记

一、初始elasticsearch 什么是elasticsearch&#xff1f; 一个开源的分布式搜索引擎&#xff0c;可以用来时限搜素、日志统计、分析、系统监控等功能。什么是elasitc stack&#xff08;ELK&#xff09;&#xff1f; 是以elasticsearch为核心的技术栈&#xff0c;包括 beats、L…...

Portraiture 4.0.3 for windows/Mac简体中文版(ps人像磨皮滤镜插件)

Imagenomic Portraiture系列插件作为PS磨皮美白必备插件&#xff0c;可以说是最强&#xff0c;今天它更新到了4.0.3版本。但是全网都没有汉化包&#xff0c;经过几个日夜汉化&#xff0c;终于汉化完成可能是全网首个Portraiture 4的汉化包&#xff0c;请大家体验&#xff0c;有…...

Java精品项目源码第152期火车票预订系统(编号M062)

Java精品项目源码第152期火车票预订系统&#xff08;编号M062&#xff09; 大家好&#xff0c;小辰今天给大家介绍一个基于Spring Springboot MyBatis实现的火车票预订系统&#xff0c;演示视频文章末尾公众号对号查询观看即可 文章目录 Java精品项目源码第152期火车票预订系…...

嵌入式软件C/C++(技术面试题)

一&#xff0c;网络 1&#xff0c;TCP窗口机制 TCP&#xff08;传输控制协议&#xff09;是一种可靠的、面向连接的传输层协议。其中的窗口机制是TCP协议中的一项重要功能&#xff0c;用于控制数据在发送和接收之间的流程。 TCP窗口机制是利用滑动窗口的方式来进行拥塞控制和…...

Idea中侧面栏不见了,如何设置?

一、打开idea点击File然后点击Setting 二、点击Appearance,然后划到最下面&#xff0c;勾选Show tool windows bars和Side-by-side layout on the left 三、侧面栏目正常显示...

构建高效读写分离MySQL主从复制架构,应对高可用挑战!

前言 在现代数据库架构中&#xff0c;MySQL主从复制技术扮演着重要角色。它不仅可以提升数据库性能和可扩展性&#xff0c;还赋予系统卓越的高可用性和灾难恢复能力。本文将深入剖析MySQL主从复制的内部机制&#xff0c;同时通过一个实际案例&#xff0c;展示其在实际场景中的…...

Stable Diffusion系列课程二:ControlNet

AUTOMATIC1111/stable-diffusion-webui参考B站Nenly视频《零基础学会Stable Diffusion》、视频课件推荐网站&#xff1a;stable-diffusion-art、Civitai&#xff08;魔法&#xff09; 、libilibi、AI艺术天堂推荐Stable Diffusion整合资料&#xff1a; NovelAI资源整合、《AI绘…...

【css】使用float实现水平导航栏

该实例使用float 浮动实现元素浮动在水平方向&#xff0c;从而实现水平导航栏效果。 overflow: hidden&#xff1a;当不给父级元素设置高度的时候&#xff0c;其内部元素浮动后会导致下面的元素顶上去&#xff0c;这是因为子元素浮动后&#xff0c;子元素脱离标准流&#xff0…...

IDEA超强XSD文件编辑插件-XSD / WSDL Visualizer

前言 XSD / WSDL Visualizer可以简化XML架构定义(XSD)和WSDL文件编辑过程; 通过使用与IntelliJ无缝集成的可视化编辑器&#xff0c;转换处理XSD和WSDL文件的方式。告别导航复杂和难以阅读的代码的挫败感&#xff0c;迎接流线型和直观的体验。 插件安装 在线安装 IntelliJ IDE…...

Nodejs 第三章(Npm Package json)

npm npm&#xff08;全称 Node Package Manager&#xff09;是 Node.js 的包管理工具&#xff0c;它是一个基于命令行的工具&#xff0c;用于帮助开发者在自己的项目中安装、升级、移除和管理依赖项。 https://www.npmjs.com/ 类似于 PHP 的工具&#xff1a;Composer。它是 …...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

10-Oracle 23 ai Vector Search 概述和参数

一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI&#xff0c;使用客户端或是内部自己搭建集成大模型的终端&#xff0c;加速与大型语言模型&#xff08;LLM&#xff09;的结合&#xff0c;同时使用检索增强生成&#xff08;Retrieval Augmented Generation &#…...