当前位置: 首页 > news >正文

Flink之输出算子Redis Sink

Redis Sink

  • Redis Sink
  • jedis实现
    • 添加依赖
    • 自定义Redis Sink
    • 使用Sink
    • 验证
  • 开源 Redis Connector
    • 添加依赖
    • 自定义Redis Sink
      • RedisCommand
      • String数据类型示例
      • Hash数据类型示例
    • 使用Sink
      • RedisStringSink
      • RedisHashSink
    • 验证

Redis Sink

在新版Flink的文档中,并没有发现Redis Sink的具体使用,但可通过 Apache Bahir了解到其具体使用

Redis具有其极高的写入读取性能,因此也是经常使用的Sink之一。可以使用Java Redis客户端Jedis手动实现,也可以使用Flink和Bahir提供的实现来实现。

开源实现的Redis Connector使用非常方便,但是无法使用一些Jedis中的高级功能,如设置过期时间等

jedis实现

添加依赖

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.0.0</version>
</dependency>

自定义Redis Sink

自定义一个 RedisSink 函数,继承 RichSinkFunction,重写其中的open、invoke和close方法

open:用于新建Redis客户端invoke:将数据存储到Redis中,这里将数据以字符串的形式存储到Redis中close:使用完毕后关闭Redis客户端
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;public class RedisSink extends RichSinkFunction {private transient Jedis jedis;@Overridepublic void open(Configuration config) {jedis = new Jedis("localhost", 6379);}@Overridepublic void invoke(Object value, Context context) throws Exception {Tuple2<String, String> val = (Tuple2<String, String>) value;if (!jedis.isConnected()) {jedis.connect();}jedis.set(val.f0, val.f1);}@Overridepublic void close() throws Exception {jedis.close();}
}

使用Sink

    public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");// 应用转换算子SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {return new Tuple2<>(s, s);}});// 关联sink:将给定接收器添加到此数据流streamOperator.addSink(new RedisSink());// 执行env.execute("redis sink");}

验证

在Redis的控制台中查询数据

本地:0>keys *
1) "Flink"
2) "Storm"
3) "Spark"

开源 Redis Connector

可以使用Flink和Bahir提供的实现,其内部都是使用Java Redis客户端Jedis实现Redis Sink。

Flink:https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis

Bahir:https://bahir.apache.org/#home

添加依赖

Flink与Bahir的使用方法类似,这里以使用Flink提供的依赖Jar使用为例,引入flink-connector-redis。

Flink提供的依赖包

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version>
</dependency>

Bahir提供的依赖包

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

自定义Redis Sink

可以通过实现 RedisMapper 来自定义Redis Sink

自定义的MyRedisSink实现了RedisMapper并覆写了其中的getCommandDescription、getKeyFromData、getValueFromData。

getCommandDescription:定义存储到Redis中的数据格式,这里定义RedisCommandSET,将数据以字符串的形式存储getKeyFromData:定义SETKeygetValueFromData:定义SET的值

RedisCommand

Redis的所有可用命令,每个命令属于RedisDataType。

public enum RedisCommand {/*** 将指定值插入到存储在键上的列表的头部。* 如果键不存在,则在执行推送操作之前将其创建为空列表。*/LPUSH(RedisDataType.LIST),/*** 将指定值插入到存储在键上的列表的尾部。* 如果键不存在,则在执行推送操作之前将其创建为空列表。*/RPUSH(RedisDataType.LIST),/*** 将指定成员添加到存储在键上的集合中。* 忽略已经是该集合成员的指定成员。*/SADD(RedisDataType.SET),/*** 设置键的字符串值。如果键已经持有一个值,* 则无论其类型如何,都会被覆盖。*/SET(RedisDataType.STRING),/*** 设置键的字符串值,并设置生存时间(TTL)。* 如果键已经持有一个值,则无论其类型如何,都会被覆盖。*/SETEX(RedisDataType.STRING),/*** 将元素添加到以第一个参数指定的变量名存储的HyperLogLog数据结构中。*/PFADD(RedisDataType.HYPER_LOG_LOG),/*** 将消息发布到给定的频道。*/PUBLISH(RedisDataType.PUBSUB),/*** 将具有指定分数的指定成员添加到存储在键上的有序集合中。*/ZADD(RedisDataType.SORTED_SET),ZINCRBY(RedisDataType.SORTED_SET),/*** 从存储在键上的有序集合中删除指定的成员。*/ZREM(RedisDataType.SORTED_SET),/*** 在键上的哈希中设置字段的值。如果键不存在,* 则创建一个持有哈希的新键。如果字段已经存在于哈希中,则会覆盖它。*/HSET(RedisDataType.HASH),HINCRBY(RedisDataType.HINCRBY),/*** 为指定的键进行加法操作。*/INCRBY(RedisDataType.STRING),/*** 为指定的键进行加法操作,并在固定时间后过期。*/INCRBY_EX(RedisDataType.STRING),/*** 为指定的键进行减法操作。*/DECRBY(RedisDataType.STRING),/*** 为指定的键进行减法操作,并在固定时间后过期。*/DESCRBY_EX(RedisDataType.STRING);/*** 此命令所属的{@link RedisDataType}。*/private RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType = redisDataType;}/*** 获取此命令所属的{@link RedisDataType}。** @return {@link RedisDataType}*/public RedisDataType getRedisDataType() {return redisDataType;}
}

String数据类型示例

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisStringSink implements RedisMapper<Tuple2<String, String>> {/*** 设置redis数据类型*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}/*** 设置Key*/@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}/*** 设置value*/@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}
}

Hash数据类型示例

创建一个Order对象

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.math.BigDecimal;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {private Integer id;private String name;private BigDecimal price;
}

编码示例:

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisHashSink implements RedisMapper<Order> {/*** 设置redis数据类型*/@Overridepublic RedisCommandDescription getCommandDescription() {/*** 第二个参数是Hash数据类型, 第二个参数是外面的key** @redisCommand: Hash数据类型* @additionalKey: 哈希和排序集数据类型时使用(RedisDataType.HASH组或RedisDataType.SORTED_SET), 其他类型忽略additionalKey*/return new RedisCommandDescription(RedisCommand.HSET, "redis");}/*** 设置Key*/@Overridepublic String getKeyFromData(Order oder) {return oder.getId() + "";}/*** 设置value*/@Overridepublic String getValueFromData(Order oder) {// 从数据中获取Value: Hash的valuereturn JSON.toJSONString(oder);}
}

使用Sink

RedisStringSink

    public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");// 应用转换算子SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {return new Tuple2<>(s, s);}});// Jedis池配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 关联sink:将给定接收器添加到此数据流streamOperator.addSink(new RedisSink<>(conf, new RedisStringSink()));// 执行env.execute("redis sink");}

RedisHashSink

public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<Order> dataStreamSource = env.fromElements(new Order(1,"Flink", BigDecimal.valueOf(100)),new Order(2,"Spark", BigDecimal.valueOf(200)),new Order(3,"Storm", BigDecimal.valueOf(300)));// Jedis池配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379)// 池可分配的最大对象数,默认值为8.setMaxTotal(100)// 设置超时,默认值为2000.setTimeout(1000 * 10).build();// 关联sink:将给定接收器添加到此数据流dataStreamSource.addSink(new RedisSink<>(conf, new RedisHashSink()));// 执行env.execute("redis sink");}

验证

在Redis的控制台中查询数据

查看String数据类型

本地:0>get Flink
"Flink"
本地:0>get Spark
"Spark"

查看Hash数据类型

Redis:0>keys *
1) "redis"
Redis:0>HGETALL redis
1) "3"
2) "{"id":3,"name":"Storm","price":300}"
3) "2"
4) "{"id":2,"name":"Spark","price":200}"
5) "1"
6) "{"id":1,"name":"Flink","price":100}"

相关文章:

Flink之输出算子Redis Sink

Redis Sink Redis Sinkjedis实现添加依赖自定义Redis Sink使用Sink验证 开源 Redis Connector添加依赖自定义Redis SinkRedisCommandString数据类型示例Hash数据类型示例 使用SinkRedisStringSinkRedisHashSink 验证 Redis Sink 在新版Flink的文档中&#xff0c;并没有发现Redi…...

【数据结构】顺序表实现通讯录

前言 在上一节中我们实现了顺序表&#xff0c;现在我们将使用顺序表完成通讯录的实现。&#xff08;注&#xff1a;本人水平有限&#xff0c;“小屎山”有些许bug&#xff0c;代码冗余且语无伦次&#xff0c;望谅解&#xff01;&#x1f605;&#xff09; 文章目录 一、数据结构…...

JMeter 随机数生成器简介:使用 Random 和 UUID 算法

在压力测试中&#xff0c;经常需要生成随机值来模拟用户行为。JMeter 提供了多种方式来生成随机值&#xff0c;本文来具体介绍一下。 随机数函数 JMeter 提供了多个用于生成随机数的函数&#xff0c;其中最常用的是 __Random 函数。该函数可以生成一个指定范围内的随机整数或…...

vue3 更换 elemnt-ui / element-plus 版本npm命令

1. 安装 / 更换 element-ui 版本 [ 在 后面指定想要安装的版本 ] //卸载当前版本 npm uninstall element-ui //安装指定版本 npm i element-ui2.4.8 -S --legacy-peer-deps 2. 安装 / 更换 element-plus 版本 [ 在 后面指定想要安装的版本 ] npm install element-plus2.3…...

react.js 手写响应式 reactive

Redux 太繁琐&#xff0c;Mbox 很酷但我们可能没必要引入新的包&#xff0c;那就让我们亲自在 react.js 中通过代理实现一套钩子来达到类似 vue 的响应式状态&#xff1a; 实现 reactive hooks 代理类声明 代理状态的类应当提供可访问的状态&#xff0c;和订阅变化的接口。 …...

代码随想录打卡第四十六天|完全背包 ● 518. 零钱兑换 II ● 377. 组合总和 Ⅳ

完全背包理论 有N件物品和一个最多能背重量为W的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品都有无限个&#xff08;也就是可以放入背包多次&#xff09;&#xff0c;求解将哪些物品装入背包里物品价值总和最大。 完全背包和01背包问题唯一…...

【BP-Adaboost预测】基于BP神经网络的Adaboost的单维时间序列预测研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

Origami Studio for Mac:塑造未来,掌握原型设计之巅

在当今高度竞争的设计领域&#xff0c;原型设计的重要性不言而喻。它不仅是沟通想法&#xff0c;也是测试和改进设计的关键环节。而现在&#xff0c;一款强大的原型设计工具——Origami Studio for Mac&#xff0c;正在席卷设计界&#xff0c;以其独特的功能和卓越的性能&#…...

UML类图中各箭头表示总结

UML类图中各箭头表示总结 1、泛化2、实现3、依赖4、关联5、聚合6、组合 在UML类图中&#xff0c;箭头关系是用来表示类之间的关系的。箭头关系的种类有以下几种&#xff1a; 1、泛化 泛化&#xff1a;表示类之间的继承关系。箭头从子类指向父类。箭头&#xff1a;实线空心三角…...

神经网络量化----为了部署而特别设计

引言:一般神经网络量化有两个目的: 为了加速,在某些平台上浮点数计算比较耗费时间,替换为整形可以加快运算为了部署,某些平台上只支持整形运算,比如在芯片中如果是第1个目的,则使用常规的量化手段就可以满足,将浮点数运算变成整形运算+较少的浮点运算 但是如果是第2个目…...

代码随想录算法训练营Day60|单调栈01

代码随想录算法训练营Day60|单调栈01 文章目录 代码随想录算法训练营Day60|单调栈01一、739. 每日温度二、496.下一个更大元素 I 一、739. 每日温度 class Solution {public int[] dailyTemperatures(int[] temperatures) {//单调栈int lenstemperatures.length;int result[]n…...

openMP学习笔记 -编程模型

OpenMP模型 gcc编译openmp指令&#xff1a;gcc test.cpp -o test -fopenmp 定积分计算 函数面积 给定一个定积分&#xff0c;计算其面积&#xff1a; ∫ 0 1 4.0 ( 1 x 2 ) d x \int^{1}_{0}{\frac{4.0}{(1x^2)}dx} ∫01​(1x2)4.0​dx omp 概念 并行区域 并行区域用于…...

【Hive SQL 每日一题】环比增长率、环比增长率、复合增长率

文章目录 环比增长率同比增长率复合增长率测试数据需求说明需求实现 环比增长率 环比增长率是指两个相邻时段之间某种指标的增长率。通常来说&#xff0c;环比增长率是比较两个连续时间段内某项数据的增长量大小的百分比。 环比增长率反映了两个相邻时间段内某种经济指标的变…...

Java设计模式之外观模式(Facade Pattern)

外观模式&#xff08;Facade Pattern&#xff09;是一种结构型设计模式&#xff0c;它提供了一个统一的接口&#xff0c;用于访问子系统中的一组接口。外观模式通过隐藏子系统的复杂性&#xff0c;简化了客户端与子系统之间的交互&#xff0c;提供了一个更简单、更直观的接口。…...

【大疆智图】大疆智图(DJI Terra 3.0.0)安装及使用教程

大疆智图是一款以二维正射影像与三维模型重建为主的软件,同时提供二维多光谱重建、激光雷达点云处理、精细化巡检等功能。它能够将无人机采集的数据可视化,实时生成高精度、高质量三维模型,满足事故现场、工程监测、电力巡线等场景的展示与精确测量需求。 文章目录 1. 安装D…...

腾讯地图基本使用(撒点位,点位点击,弹框等...功能) 搭配Vue3

腾讯地图的基础注册账号 展示地图等基础功能在专栏的上一篇内容 大家有兴趣可以去看一看 今天说的是腾讯地图的在稍微一点的基础操作 话不多说 直接上代码 var marker ref(null) var map var center ref(null) // 地图初始化 const initMap () > {//定义地图中心点坐标…...

散列表:Word文档中的单词拼写检查功能是如何实现的?

文章来源于极客时间前google工程师−王争专栏。 一旦我们在Word里输入一个错误的英文单词&#xff0c;它就会用标红的方式提示“编写错误”。Word的这个单词拼写检查功能&#xff0c;虽然很小但却非常实用。这个功能是如何实现的&#xff1f; 散列别&#xff08;Hash Table&am…...

智慧公厕蜕变多功能城市智慧驿站公厕的创新

随着城市发展的不断推进&#xff0c;对公共设施的便利性和智能化要求也日益提高。为满足市民对高品质、便捷、舒适的公共厕所的需求&#xff0c;智慧公厕行业的领航厂家广州中期科技有限公司&#xff0c;全新推出了一体化智慧公厕驿站。凭借着“高科技碳中和物联网创意设计新经…...

R语言清洗与处理数据常用代码段

去掉数据框df的某一列&#xff1a; # 删除不必要的变量 data$unnecessary_var <- NULL 选择需要的列进行读入数据框&#xff1a; # 选择需要的列 selected_cols <- c("col1", "col2", "col3") data <- fread("data.csv", s…...

centos 7.9 安装python 3.10的tls问题,

本地开发升级成了py3.10.6,服务器测试时安装py3.10.4 发现无法正常使用pip3 pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available. 印象中py3的高版本依赖高版本的openssl,centos 7下默认的openssl为1.0.x, 问题很简…...

CTF show Web 红包题第六弹

提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框&#xff0c;很难让人不联想到SQL注入&#xff0c;但提示都说了不是SQL注入&#xff0c;所以就不往这方面想了 ​ 先查看一下网页源码&#xff0c;发现一段JavaScript代码&#xff0c;有一个关键类ctfs…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

Go 语言并发编程基础:无缓冲与有缓冲通道

在上一章节中&#xff0c;我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道&#xff0c;它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好&#xff0…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...

CppCon 2015 学习:Reactive Stream Processing in Industrial IoT using DDS and Rx

“Reactive Stream Processing in Industrial IoT using DDS and Rx” 是指在工业物联网&#xff08;IIoT&#xff09;场景中&#xff0c;结合 DDS&#xff08;Data Distribution Service&#xff09; 和 Rx&#xff08;Reactive Extensions&#xff09; 技术&#xff0c;实现 …...