当前位置: 首页 > news >正文

Flink之输出算子Redis Sink

Redis Sink

  • Redis Sink
  • jedis实现
    • 添加依赖
    • 自定义Redis Sink
    • 使用Sink
    • 验证
  • 开源 Redis Connector
    • 添加依赖
    • 自定义Redis Sink
      • RedisCommand
      • String数据类型示例
      • Hash数据类型示例
    • 使用Sink
      • RedisStringSink
      • RedisHashSink
    • 验证

Redis Sink

在新版Flink的文档中,并没有发现Redis Sink的具体使用,但可通过 Apache Bahir了解到其具体使用

Redis具有其极高的写入读取性能,因此也是经常使用的Sink之一。可以使用Java Redis客户端Jedis手动实现,也可以使用Flink和Bahir提供的实现来实现。

开源实现的Redis Connector使用非常方便,但是无法使用一些Jedis中的高级功能,如设置过期时间等

jedis实现

添加依赖

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.0.0</version>
</dependency>

自定义Redis Sink

自定义一个 RedisSink 函数,继承 RichSinkFunction,重写其中的open、invoke和close方法

open:用于新建Redis客户端invoke:将数据存储到Redis中,这里将数据以字符串的形式存储到Redis中close:使用完毕后关闭Redis客户端
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;public class RedisSink extends RichSinkFunction {private transient Jedis jedis;@Overridepublic void open(Configuration config) {jedis = new Jedis("localhost", 6379);}@Overridepublic void invoke(Object value, Context context) throws Exception {Tuple2<String, String> val = (Tuple2<String, String>) value;if (!jedis.isConnected()) {jedis.connect();}jedis.set(val.f0, val.f1);}@Overridepublic void close() throws Exception {jedis.close();}
}

使用Sink

    public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");// 应用转换算子SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {return new Tuple2<>(s, s);}});// 关联sink:将给定接收器添加到此数据流streamOperator.addSink(new RedisSink());// 执行env.execute("redis sink");}

验证

在Redis的控制台中查询数据

本地:0>keys *
1) "Flink"
2) "Storm"
3) "Spark"

开源 Redis Connector

可以使用Flink和Bahir提供的实现,其内部都是使用Java Redis客户端Jedis实现Redis Sink。

Flink:https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis

Bahir:https://bahir.apache.org/#home

添加依赖

Flink与Bahir的使用方法类似,这里以使用Flink提供的依赖Jar使用为例,引入flink-connector-redis。

Flink提供的依赖包

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version>
</dependency>

Bahir提供的依赖包

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

自定义Redis Sink

可以通过实现 RedisMapper 来自定义Redis Sink

自定义的MyRedisSink实现了RedisMapper并覆写了其中的getCommandDescription、getKeyFromData、getValueFromData。

getCommandDescription:定义存储到Redis中的数据格式,这里定义RedisCommandSET,将数据以字符串的形式存储getKeyFromData:定义SETKeygetValueFromData:定义SET的值

RedisCommand

Redis的所有可用命令,每个命令属于RedisDataType。

public enum RedisCommand {/*** 将指定值插入到存储在键上的列表的头部。* 如果键不存在,则在执行推送操作之前将其创建为空列表。*/LPUSH(RedisDataType.LIST),/*** 将指定值插入到存储在键上的列表的尾部。* 如果键不存在,则在执行推送操作之前将其创建为空列表。*/RPUSH(RedisDataType.LIST),/*** 将指定成员添加到存储在键上的集合中。* 忽略已经是该集合成员的指定成员。*/SADD(RedisDataType.SET),/*** 设置键的字符串值。如果键已经持有一个值,* 则无论其类型如何,都会被覆盖。*/SET(RedisDataType.STRING),/*** 设置键的字符串值,并设置生存时间(TTL)。* 如果键已经持有一个值,则无论其类型如何,都会被覆盖。*/SETEX(RedisDataType.STRING),/*** 将元素添加到以第一个参数指定的变量名存储的HyperLogLog数据结构中。*/PFADD(RedisDataType.HYPER_LOG_LOG),/*** 将消息发布到给定的频道。*/PUBLISH(RedisDataType.PUBSUB),/*** 将具有指定分数的指定成员添加到存储在键上的有序集合中。*/ZADD(RedisDataType.SORTED_SET),ZINCRBY(RedisDataType.SORTED_SET),/*** 从存储在键上的有序集合中删除指定的成员。*/ZREM(RedisDataType.SORTED_SET),/*** 在键上的哈希中设置字段的值。如果键不存在,* 则创建一个持有哈希的新键。如果字段已经存在于哈希中,则会覆盖它。*/HSET(RedisDataType.HASH),HINCRBY(RedisDataType.HINCRBY),/*** 为指定的键进行加法操作。*/INCRBY(RedisDataType.STRING),/*** 为指定的键进行加法操作,并在固定时间后过期。*/INCRBY_EX(RedisDataType.STRING),/*** 为指定的键进行减法操作。*/DECRBY(RedisDataType.STRING),/*** 为指定的键进行减法操作,并在固定时间后过期。*/DESCRBY_EX(RedisDataType.STRING);/*** 此命令所属的{@link RedisDataType}。*/private RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType = redisDataType;}/*** 获取此命令所属的{@link RedisDataType}。** @return {@link RedisDataType}*/public RedisDataType getRedisDataType() {return redisDataType;}
}

String数据类型示例

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisStringSink implements RedisMapper<Tuple2<String, String>> {/*** 设置redis数据类型*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}/*** 设置Key*/@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}/*** 设置value*/@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}
}

Hash数据类型示例

创建一个Order对象

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.math.BigDecimal;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {private Integer id;private String name;private BigDecimal price;
}

编码示例:

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisHashSink implements RedisMapper<Order> {/*** 设置redis数据类型*/@Overridepublic RedisCommandDescription getCommandDescription() {/*** 第二个参数是Hash数据类型, 第二个参数是外面的key** @redisCommand: Hash数据类型* @additionalKey: 哈希和排序集数据类型时使用(RedisDataType.HASH组或RedisDataType.SORTED_SET), 其他类型忽略additionalKey*/return new RedisCommandDescription(RedisCommand.HSET, "redis");}/*** 设置Key*/@Overridepublic String getKeyFromData(Order oder) {return oder.getId() + "";}/*** 设置value*/@Overridepublic String getValueFromData(Order oder) {// 从数据中获取Value: Hash的valuereturn JSON.toJSONString(oder);}
}

使用Sink

RedisStringSink

    public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");// 应用转换算子SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {return new Tuple2<>(s, s);}});// Jedis池配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 关联sink:将给定接收器添加到此数据流streamOperator.addSink(new RedisSink<>(conf, new RedisStringSink()));// 执行env.execute("redis sink");}

RedisHashSink

public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<Order> dataStreamSource = env.fromElements(new Order(1,"Flink", BigDecimal.valueOf(100)),new Order(2,"Spark", BigDecimal.valueOf(200)),new Order(3,"Storm", BigDecimal.valueOf(300)));// Jedis池配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379)// 池可分配的最大对象数,默认值为8.setMaxTotal(100)// 设置超时,默认值为2000.setTimeout(1000 * 10).build();// 关联sink:将给定接收器添加到此数据流dataStreamSource.addSink(new RedisSink<>(conf, new RedisHashSink()));// 执行env.execute("redis sink");}

验证

在Redis的控制台中查询数据

查看String数据类型

本地:0>get Flink
"Flink"
本地:0>get Spark
"Spark"

查看Hash数据类型

Redis:0>keys *
1) "redis"
Redis:0>HGETALL redis
1) "3"
2) "{"id":3,"name":"Storm","price":300}"
3) "2"
4) "{"id":2,"name":"Spark","price":200}"
5) "1"
6) "{"id":1,"name":"Flink","price":100}"

相关文章:

Flink之输出算子Redis Sink

Redis Sink Redis Sinkjedis实现添加依赖自定义Redis Sink使用Sink验证 开源 Redis Connector添加依赖自定义Redis SinkRedisCommandString数据类型示例Hash数据类型示例 使用SinkRedisStringSinkRedisHashSink 验证 Redis Sink 在新版Flink的文档中&#xff0c;并没有发现Redi…...

【数据结构】顺序表实现通讯录

前言 在上一节中我们实现了顺序表&#xff0c;现在我们将使用顺序表完成通讯录的实现。&#xff08;注&#xff1a;本人水平有限&#xff0c;“小屎山”有些许bug&#xff0c;代码冗余且语无伦次&#xff0c;望谅解&#xff01;&#x1f605;&#xff09; 文章目录 一、数据结构…...

JMeter 随机数生成器简介:使用 Random 和 UUID 算法

在压力测试中&#xff0c;经常需要生成随机值来模拟用户行为。JMeter 提供了多种方式来生成随机值&#xff0c;本文来具体介绍一下。 随机数函数 JMeter 提供了多个用于生成随机数的函数&#xff0c;其中最常用的是 __Random 函数。该函数可以生成一个指定范围内的随机整数或…...

vue3 更换 elemnt-ui / element-plus 版本npm命令

1. 安装 / 更换 element-ui 版本 [ 在 后面指定想要安装的版本 ] //卸载当前版本 npm uninstall element-ui //安装指定版本 npm i element-ui2.4.8 -S --legacy-peer-deps 2. 安装 / 更换 element-plus 版本 [ 在 后面指定想要安装的版本 ] npm install element-plus2.3…...

react.js 手写响应式 reactive

Redux 太繁琐&#xff0c;Mbox 很酷但我们可能没必要引入新的包&#xff0c;那就让我们亲自在 react.js 中通过代理实现一套钩子来达到类似 vue 的响应式状态&#xff1a; 实现 reactive hooks 代理类声明 代理状态的类应当提供可访问的状态&#xff0c;和订阅变化的接口。 …...

代码随想录打卡第四十六天|完全背包 ● 518. 零钱兑换 II ● 377. 组合总和 Ⅳ

完全背包理论 有N件物品和一个最多能背重量为W的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品都有无限个&#xff08;也就是可以放入背包多次&#xff09;&#xff0c;求解将哪些物品装入背包里物品价值总和最大。 完全背包和01背包问题唯一…...

【BP-Adaboost预测】基于BP神经网络的Adaboost的单维时间序列预测研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

Origami Studio for Mac:塑造未来,掌握原型设计之巅

在当今高度竞争的设计领域&#xff0c;原型设计的重要性不言而喻。它不仅是沟通想法&#xff0c;也是测试和改进设计的关键环节。而现在&#xff0c;一款强大的原型设计工具——Origami Studio for Mac&#xff0c;正在席卷设计界&#xff0c;以其独特的功能和卓越的性能&#…...

UML类图中各箭头表示总结

UML类图中各箭头表示总结 1、泛化2、实现3、依赖4、关联5、聚合6、组合 在UML类图中&#xff0c;箭头关系是用来表示类之间的关系的。箭头关系的种类有以下几种&#xff1a; 1、泛化 泛化&#xff1a;表示类之间的继承关系。箭头从子类指向父类。箭头&#xff1a;实线空心三角…...

神经网络量化----为了部署而特别设计

引言:一般神经网络量化有两个目的: 为了加速,在某些平台上浮点数计算比较耗费时间,替换为整形可以加快运算为了部署,某些平台上只支持整形运算,比如在芯片中如果是第1个目的,则使用常规的量化手段就可以满足,将浮点数运算变成整形运算+较少的浮点运算 但是如果是第2个目…...

代码随想录算法训练营Day60|单调栈01

代码随想录算法训练营Day60|单调栈01 文章目录 代码随想录算法训练营Day60|单调栈01一、739. 每日温度二、496.下一个更大元素 I 一、739. 每日温度 class Solution {public int[] dailyTemperatures(int[] temperatures) {//单调栈int lenstemperatures.length;int result[]n…...

openMP学习笔记 -编程模型

OpenMP模型 gcc编译openmp指令&#xff1a;gcc test.cpp -o test -fopenmp 定积分计算 函数面积 给定一个定积分&#xff0c;计算其面积&#xff1a; ∫ 0 1 4.0 ( 1 x 2 ) d x \int^{1}_{0}{\frac{4.0}{(1x^2)}dx} ∫01​(1x2)4.0​dx omp 概念 并行区域 并行区域用于…...

【Hive SQL 每日一题】环比增长率、环比增长率、复合增长率

文章目录 环比增长率同比增长率复合增长率测试数据需求说明需求实现 环比增长率 环比增长率是指两个相邻时段之间某种指标的增长率。通常来说&#xff0c;环比增长率是比较两个连续时间段内某项数据的增长量大小的百分比。 环比增长率反映了两个相邻时间段内某种经济指标的变…...

Java设计模式之外观模式(Facade Pattern)

外观模式&#xff08;Facade Pattern&#xff09;是一种结构型设计模式&#xff0c;它提供了一个统一的接口&#xff0c;用于访问子系统中的一组接口。外观模式通过隐藏子系统的复杂性&#xff0c;简化了客户端与子系统之间的交互&#xff0c;提供了一个更简单、更直观的接口。…...

【大疆智图】大疆智图(DJI Terra 3.0.0)安装及使用教程

大疆智图是一款以二维正射影像与三维模型重建为主的软件,同时提供二维多光谱重建、激光雷达点云处理、精细化巡检等功能。它能够将无人机采集的数据可视化,实时生成高精度、高质量三维模型,满足事故现场、工程监测、电力巡线等场景的展示与精确测量需求。 文章目录 1. 安装D…...

腾讯地图基本使用(撒点位,点位点击,弹框等...功能) 搭配Vue3

腾讯地图的基础注册账号 展示地图等基础功能在专栏的上一篇内容 大家有兴趣可以去看一看 今天说的是腾讯地图的在稍微一点的基础操作 话不多说 直接上代码 var marker ref(null) var map var center ref(null) // 地图初始化 const initMap () > {//定义地图中心点坐标…...

散列表:Word文档中的单词拼写检查功能是如何实现的?

文章来源于极客时间前google工程师−王争专栏。 一旦我们在Word里输入一个错误的英文单词&#xff0c;它就会用标红的方式提示“编写错误”。Word的这个单词拼写检查功能&#xff0c;虽然很小但却非常实用。这个功能是如何实现的&#xff1f; 散列别&#xff08;Hash Table&am…...

智慧公厕蜕变多功能城市智慧驿站公厕的创新

随着城市发展的不断推进&#xff0c;对公共设施的便利性和智能化要求也日益提高。为满足市民对高品质、便捷、舒适的公共厕所的需求&#xff0c;智慧公厕行业的领航厂家广州中期科技有限公司&#xff0c;全新推出了一体化智慧公厕驿站。凭借着“高科技碳中和物联网创意设计新经…...

R语言清洗与处理数据常用代码段

去掉数据框df的某一列&#xff1a; # 删除不必要的变量 data$unnecessary_var <- NULL 选择需要的列进行读入数据框&#xff1a; # 选择需要的列 selected_cols <- c("col1", "col2", "col3") data <- fread("data.csv", s…...

centos 7.9 安装python 3.10的tls问题,

本地开发升级成了py3.10.6,服务器测试时安装py3.10.4 发现无法正常使用pip3 pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available. 印象中py3的高版本依赖高版本的openssl,centos 7下默认的openssl为1.0.x, 问题很简…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)

参考官方文档&#xff1a;https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java&#xff08;供 Kotlin 使用&#xff09; 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...