当前位置: 首页 > news >正文

Flink之RedisSink

在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhirapahce的开源项目,是专门给sparkflink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.

  • 下载源码包
    通过下图进入到GitHub
    在这里插入图片描述
    选择clonedownload源码都可以,如下图
    在这里插入图片描述
  • 编译源码包
    下载好源码后,maven会自动下载对应的依赖项
    • 删除不需要的子项目
      因为我们这里需要编译redis对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除
      在这里插入图片描述
    • 修改pom文件
      删除掉不需要的子项目后,在pom文件中也要删除对应的子项目配置
      <!-- 这里只保留这一个模块就可以了 -->
      <modules><module>flink-connector-redis</module>
      </modules>
      
      修改完成模块配置后,还需要修改对应的flinkscala版本依赖,这个根据自己实际的开发环境进行修改
       <properties><!-- 修改这里的版本就可以 --><!-- Flink version --><flink.version>1.15.3</flink.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.11</scala.version>
      </properties>
      
      这些都完成后就可以通过maven下载对应的依赖了.
  • 编译安装
    依赖下载完成后pom文件中可能会有几处是报错的状态,如下图
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    以上几处错误无需理会,不影响扩展包的编译.
    接下来通过maveninstall将扩展包编译并安装到本地的maven资源库,如下图
    在这里插入图片描述
    编译完成后我们就可以在自己的flink项目中引入对应的扩展包了
        <!-- Redis connector --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis</artifactId><version>1.2-SNAPSHOT</version></dependency>
    
    上面依赖中groupId是固定的,artifactId要根据flink-connector-redis项目中的pom文件中artifactId来拿,同样version也是一样,到这里扩展包的问题就已经解决了.
  • 代码
    其实在GitHub上已经给了代码示例单机(java,scala)、集群(java,scala)的代码模板都是有的,下面就以单机redis作为示例.
    这里我们要创建一个类实现RedisMapper
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/
    public class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {@Override// 这个方法是选择使用哪种命令插入数据到Redispublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");}@Override// 这个方法是选择哪个作为Keypublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}@Override// 这个方法是选择哪个作为Valuepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}
    }
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/4* @Description: 测试**/
    public class FlinkRedisSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源为了方便测试DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 将数据转换成Tuple的形式SingleOutputStreamOperator<Tuple2<String, String>> tuple2Stream = customizeSource.map((MapFunction<CustomizeBean, Tuple2<String, String>>) value -> Tuple2.of(value.getAge() + "-" + value.getHobbit(), value.toString())).returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码// 配置RedisFlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1") // redis服务器地址.setPassword("password") // redis密码.build();// 添加Sinktuple2Stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());env.execute("Redis Sink");}
    }
    
    到这里代码就结束了,具体应用根据实际业务需求进行更改.

相关文章:

Flink之RedisSink

在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhir是apahce的开源项目,是专门给spark和flink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包. 下…...

STM32CubeMx学习与K210串口通信+识别橘色色块——点亮小灯

K210模块的串口发送代码 引入模块 import sensor, image,time,lcd,utime import KPU as kpu import gc, sys from fpioa_manager import fm from machine import UART 锁定引脚 和 申明串口 fm.register(9, fm.fpioa.UART1_TX, forceTrue) fm.register(10, fm.fpioa.UART1_R…...

睿讯微带你深度了解汽车交流充电桩

这几年随着新能源汽车的普及&#xff0c;充电桩也越来越多的出现在我们的视野中。新能源纯电汽车就好比一种大号的电子产品&#xff0c;而充电桩则是它不可缺少的子系统&#xff0c;是新能源车主们的必要选择。 汽车充电桩分为直流和交流两种&#xff0c;2022年底全国公共充电桩…...

word怎么压缩到10m以下?文件压缩很简单

Word文档是我们工作和学习中一直需要用到的&#xff0c;但有时候Word文档体积过大&#xff0c;给存储和传输带来了不便&#xff0c;这时候我们可以做的就压缩Word。 通常情况下&#xff0c;影响Word文档过大的主要因素主要是图片过多、音视频插入、格式的设置、文字内容的增多以…...

I.MX6ULL_Linux_驱动篇(43)linux通用LED驱动

前面我们都是自己编写 LED 灯驱动&#xff0c;其实像 LED 灯这样非常基础的设备驱动&#xff0c; Linux 内核已经集成了。 Linux 内核的 LED 灯驱动采用 platform 框架&#xff0c;因此我们只需要按照要求在设备树文件中添加相应的 LED 节点即可&#xff0c;本章我们就来学习如…...

OPTEE之sonarlint静态代码分析实战二——optee_client

ATF(TF-A)/OPTEE之静态代码分析汇总 目录 一、optee_client源码下载及分析 二、扫描类型归类...

c++调用ffmpeg api将视频文件内容进行udp推流

代码及工程见https://download.csdn.net/download/daqinzl/88156926 开发工具&#xff1a;visual studio 2019 播放&#xff0c;采用ffmpeg工具集里的ffplay.exe, 执行命令 ffplay udp://238.1.1.10:6016 主要代码如下: #include "pch.h" #include <iostream&g…...

助力工业物联网,工业大数据之服务域:油站主题分析【二十六】

文章目录 07&#xff1a;服务域&#xff1a;油站主题分析08&#xff1a;服务域&#xff1a;油站主题实现 07&#xff1a;服务域&#xff1a;油站主题分析 目标&#xff1a;掌握油站主题的需求分析 路径 step1&#xff1a;需求step2&#xff1a;分析 实施 需求&#xff1a;统计…...

MySql之索引

MySql之索引 1.索引概述 MySql官方对索引的定义为&#xff1a;索引是帮助MySql高效获取数据的数据结构。在数据之外&#xff0c;数据库系统还维护着满足特定查找算法的数据结构&#xff0c;这些数据结构以某种方式引用数据&#xff0c;这样就可以在这些数据结构上实现高级查找…...

adb调试

连不上 adb 如果还遇到5037端口被占用的问题&#xff0c;就找出进程号用taskkill命令杀死该进程即可 1、查找5037端口对应的进程&#xff1a;netstat -ano|findstr 5037 2、杀死该进程&#xff1a;taskkill /F /PID pid 连接unity profiler 打开发包&#xff0c;并安装在手机…...

ElasticSearch_学习笔记

一、初始elasticsearch 什么是elasticsearch&#xff1f; 一个开源的分布式搜索引擎&#xff0c;可以用来时限搜素、日志统计、分析、系统监控等功能。什么是elasitc stack&#xff08;ELK&#xff09;&#xff1f; 是以elasticsearch为核心的技术栈&#xff0c;包括 beats、L…...

Portraiture 4.0.3 for windows/Mac简体中文版(ps人像磨皮滤镜插件)

Imagenomic Portraiture系列插件作为PS磨皮美白必备插件&#xff0c;可以说是最强&#xff0c;今天它更新到了4.0.3版本。但是全网都没有汉化包&#xff0c;经过几个日夜汉化&#xff0c;终于汉化完成可能是全网首个Portraiture 4的汉化包&#xff0c;请大家体验&#xff0c;有…...

Java精品项目源码第152期火车票预订系统(编号M062)

Java精品项目源码第152期火车票预订系统&#xff08;编号M062&#xff09; 大家好&#xff0c;小辰今天给大家介绍一个基于Spring Springboot MyBatis实现的火车票预订系统&#xff0c;演示视频文章末尾公众号对号查询观看即可 文章目录 Java精品项目源码第152期火车票预订系…...

嵌入式软件C/C++(技术面试题)

一&#xff0c;网络 1&#xff0c;TCP窗口机制 TCP&#xff08;传输控制协议&#xff09;是一种可靠的、面向连接的传输层协议。其中的窗口机制是TCP协议中的一项重要功能&#xff0c;用于控制数据在发送和接收之间的流程。 TCP窗口机制是利用滑动窗口的方式来进行拥塞控制和…...

Idea中侧面栏不见了,如何设置?

一、打开idea点击File然后点击Setting 二、点击Appearance,然后划到最下面&#xff0c;勾选Show tool windows bars和Side-by-side layout on the left 三、侧面栏目正常显示...

构建高效读写分离MySQL主从复制架构,应对高可用挑战!

前言 在现代数据库架构中&#xff0c;MySQL主从复制技术扮演着重要角色。它不仅可以提升数据库性能和可扩展性&#xff0c;还赋予系统卓越的高可用性和灾难恢复能力。本文将深入剖析MySQL主从复制的内部机制&#xff0c;同时通过一个实际案例&#xff0c;展示其在实际场景中的…...

Stable Diffusion系列课程二:ControlNet

AUTOMATIC1111/stable-diffusion-webui参考B站Nenly视频《零基础学会Stable Diffusion》、视频课件推荐网站&#xff1a;stable-diffusion-art、Civitai&#xff08;魔法&#xff09; 、libilibi、AI艺术天堂推荐Stable Diffusion整合资料&#xff1a; NovelAI资源整合、《AI绘…...

【css】使用float实现水平导航栏

该实例使用float 浮动实现元素浮动在水平方向&#xff0c;从而实现水平导航栏效果。 overflow: hidden&#xff1a;当不给父级元素设置高度的时候&#xff0c;其内部元素浮动后会导致下面的元素顶上去&#xff0c;这是因为子元素浮动后&#xff0c;子元素脱离标准流&#xff0…...

IDEA超强XSD文件编辑插件-XSD / WSDL Visualizer

前言 XSD / WSDL Visualizer可以简化XML架构定义(XSD)和WSDL文件编辑过程; 通过使用与IntelliJ无缝集成的可视化编辑器&#xff0c;转换处理XSD和WSDL文件的方式。告别导航复杂和难以阅读的代码的挫败感&#xff0c;迎接流线型和直观的体验。 插件安装 在线安装 IntelliJ IDE…...

Nodejs 第三章(Npm Package json)

npm npm&#xff08;全称 Node Package Manager&#xff09;是 Node.js 的包管理工具&#xff0c;它是一个基于命令行的工具&#xff0c;用于帮助开发者在自己的项目中安装、升级、移除和管理依赖项。 https://www.npmjs.com/ 类似于 PHP 的工具&#xff1a;Composer。它是 …...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

三分算法与DeepSeek辅助证明是单峰函数

前置 单峰函数有唯一的最大值&#xff0c;最大值左侧的数值严格单调递增&#xff0c;最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值&#xff0c;最小值左侧的数值严格单调递减&#xff0c;最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...