【flink】之如何消费kafka数据并读写入redis?
背景:
最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。
准备:
<!-- 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency></dependencies>
代码:
package com.iterge.flink;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/*** Hello world!**/
@Slf4j
public class FlinkDemo {//创建连接池static final JedisPool pool = new JedisPool("127.0.0.0",8423);//创建redis客户端static final Jedis jedis = pool.getResource();public static void main( String[] args ) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");stringDataStreamSource.map(new RichMapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {//读redisSystem.out.println("test="+jedis.get("test"));//写redisjedis.setex("test",60,s);return s;}@Overridepublic void close() throws Exception {super.close();jedis.close();}});stringDataStreamSource.print();env.execute("test");}
}
相关文章:
【flink】之如何消费kafka数据并读写入redis?
背景: 最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。 准备: <!-- 依赖 --><dependency><groupId>org.apache.fl…...
搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(二)-索引
场景 首先介绍测试的场景,本系列schema定义 pdm文档索引,包括nested,作为文档扩展属性字段,_content字段是组件保留字段,支持文本内容,字段属性还有其他属性,如boost,getter&#x…...
离散化算法
离散化 在C中,离散化通常指的是将连续的数值或数据转化为离散的形式。这在数值分析、信号处理、图像处理和机器学习等领域都非常常见。以下是一些离散化的基本概念和方法: 1.区间划分: 将连续变量的值域分成多个区间,每个区间对…...
基于ollama的本地RAG实践
先放参考的原文链接大语言模型实战——搭建纯本地迷你版RAG_本地rag-CSDN博客 一、大模型选择 在我之前的文章中有讲到,我用的是ollama中的llama3.1 Ollama在Windows安装,使用,简单调用API_ollama如何对外提供api-CSDN博客 二、嵌入模型 …...
安卓开发板_MTK开发板_联发科开发评估套件Demo板接口介绍
开发板是一种功能丰富的电路平台,专为开发人员设计,集成了多种传感器、扩展接口和通信模块。这使得开发者能够高效进行原型设计和功能验证,极大地简化了软硬件开发的过程。 此次介绍的安卓开发板由MT8788核心板与底板构成,特别之处…...
代码随想录冲冲冲 Day58 图论Part9
47. 参加科学大会(第六期模拟笔试) 根据昨天的dijkstra进行堆优化 使用的原因是点多但边少 所以直接对于边进行操作 1.对于priority_queue来说 这是最小堆, 小于的话就是最大堆 之后由于是根据边来说的 所以新建一个Edge并且初始化一下 之后由于使用…...
UnityHub下载任意版本的Unity包
1)先打开 // 也可以采用2直接打开 2)也可以直接打开 下载存档 (unity.com) 3)关联起来UnityHub即可...
网站服务器怎么计算同时在线人数?
网站服务器计算同时在线人数通常涉及跟踪和记录当前活跃会话的数量。以下是几种常用的方法来估算或计算网站的同时在线人数: 1. 会话跟踪 - 基于会话(Session):服务器可以为每个访问者创建一个会话,并跟踪这些会话。当访问者首次访问网站时&a…...
[spring]MyBatis介绍 及 用MyBatis注解操作简单数据库
文章目录 一. 什么是MyBatis二. MyBatis操作数据库步骤(使用注解)创建工程创建数据库创建对应实体类配置数据库连接字符串写持久层代码单元测试 三. MyBatis基础操作 使用注解打印日志参数传递增删改查 一. 什么是MyBatis 简单来说 MyBatis 是更简单完成程序和数据库交互的框架…...
Ks渲染做汽车动画吗?汽车本地渲染与云渲染成本分析
Keyshot是一款强大的实时光线追踪和全域光渲染软件,它确实可以用于制作汽车动画,包括汽车模型的渲染和动画展示。Keyshot的动画功能允许用户创建相机移动、物体变化等动态效果,非常适合用于汽车动画的制作。 至于汽车动画的渲染成本ÿ…...
AI智能时代:哪款编程工具让你的工作效率翻倍?
引言 在日益繁忙的工作环境中,选择合适的编程工具已成为提升开发者工作效率的关键。不同的工具能够帮助我们简化代码编写、自动化任务、提升调试速度,甚至让团队协作更加顺畅。那么,哪款编程工具让你的工作效率翻倍?是智能的代码编…...
这五本大模型书籍,让你从大模型零基础到精通,非常详细收藏我这一篇就够了
大模型(Large Language Models, LLMs)是近年来人工智能领域的一大热点,它们在自然语言处理、对话系统、内容生成等多个方面展现出了强大的能力。随着技术的发展,市面上出现了许多介绍大模型理论与实践的书籍,为研究人员…...
面试经典150题 堆
215.数组中的第K个最大元素 建堆算法实现-CSDN博客 215. 数组中的第K个最大元素 中等 给定整数数组 nums 和整数 k,请返回数组中第 k 个最大的元素。 请注意,你需要找的是数组排序后的第 k 个最大的元素,而不是第 k 个不同的元素。 你必…...
day-62 每种字符至少取 K 个
思路 滑动窗口:改变思路,从左右两边取字符,是a b c三个字符至少被取k次,那么意味着如果我们知道字符串中a b c的出现个数,那么可以知道取走后剩下子串a b c的个数,问题转化为了求最长子串 解题过程 如果a …...
免费好用!AI声音克隆神器,超级简单,10秒就能克隆任何声音!(附保姆级教程)
今天下午还有读者问: 有没有能克隆声音的 AI 工具? 其实剪映很早就上了克隆声音的功能。 只需要按要求朗读例句,或者上传本地的音视频文件,就可以克隆声音了。 操作非常简单,效果也不错,可以试试。 除了…...
LeetCode146 LRU缓存
请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类: LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中,则返回关键字的值,否则返回 -1 …...
【Java】包装类【主线学习笔记】
文章目录 前言包装类基本数据类型与包装类之间的转换基本数据类型转换为包装类可以通过以下几种方式:包装类转换为基本数据类型可以通过以下几种方式:初始化值不同与String之间的转换 前言 Java是一门功能强大且广泛应用的编程语言,具有跨平台…...
华为HarmonyOS地图服务 11 - 如何在地图上增加点注释?
场景介绍 本章节将向您介绍如何在地图的指定位置添加点注释以标识位置、商家、建筑等,并可以通过信息窗口展示详细信息。 点注释支持功能: 支持设置图标、文字、碰撞规则等。支持添加点击事件。 PointAnnotation有默认风格,同时也支持自定…...
uniapp js怎么根据map需要显示的点位,计算自适应的缩放scale
推荐学习文档 golang应用级os框架,欢迎stargolang应用级os框架使用案例,欢迎star案例:基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识,这里有免费的golang学习笔…...
Mysql 架构
目录 1.1 Mysql 逻辑架构图 1.2 SQL 的执行流程 1.3 SQL 书写顺序和执行顺序 1.4 Mysql 日志文件 1.4.1. 二进制日志(Binary Log) 1.4.2. 错误日志(Error Log) 1.4.3. 慢查询日志(Slow Query Log) 1.…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...
06 Deep learning神经网络编程基础 激活函数 --吴恩达
深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列,以便知晓哪些列包含有价值的数据,…...
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
