当前位置: 首页 > news >正文

【flink】之如何消费kafka数据并读写入redis?

背景: 

最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。

准备: 

    <!-- 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency></dependencies>

代码:

package com.iterge.flink;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/*** Hello world!**/
@Slf4j
public class FlinkDemo {//创建连接池static final JedisPool pool = new JedisPool("127.0.0.0",8423);//创建redis客户端static final Jedis jedis = pool.getResource();public static void main( String[] args ) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");stringDataStreamSource.map(new RichMapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {//读redisSystem.out.println("test="+jedis.get("test"));//写redisjedis.setex("test",60,s);return s;}@Overridepublic void close() throws Exception {super.close();jedis.close();}});stringDataStreamSource.print();env.execute("test");}
}

相关文章:

【flink】之如何消费kafka数据并读写入redis?

背景&#xff1a; 最近公司出现做了一个新需求&#xff0c;需求内容是加工一个营销时机&#xff0c;但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。 准备&#xff1a; <!-- 依赖 --><dependency><groupId>org.apache.fl…...

搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(二)-索引

场景 首先介绍测试的场景&#xff0c;本系列schema定义 pdm文档索引&#xff0c;包括nested&#xff0c;作为文档扩展属性字段&#xff0c;_content字段是组件保留字段&#xff0c;支持文本内容&#xff0c;字段属性还有其他属性&#xff0c;如boost&#xff0c;getter&#x…...

离散化算法

离散化 在C中&#xff0c;离散化通常指的是将连续的数值或数据转化为离散的形式。这在数值分析、信号处理、图像处理和机器学习等领域都非常常见。以下是一些离散化的基本概念和方法&#xff1a; 1.区间划分&#xff1a; 将连续变量的值域分成多个区间&#xff0c;每个区间对…...

基于ollama的本地RAG实践

先放参考的原文链接大语言模型实战——搭建纯本地迷你版RAG_本地rag-CSDN博客 一、大模型选择 在我之前的文章中有讲到&#xff0c;我用的是ollama中的llama3.1 Ollama在Windows安装&#xff0c;使用&#xff0c;简单调用API_ollama如何对外提供api-CSDN博客 二、嵌入模型 …...

安卓开发板_MTK开发板_联发科开发评估套件Demo板接口介绍

开发板是一种功能丰富的电路平台&#xff0c;专为开发人员设计&#xff0c;集成了多种传感器、扩展接口和通信模块。这使得开发者能够高效进行原型设计和功能验证&#xff0c;极大地简化了软硬件开发的过程。 此次介绍的安卓开发板由MT8788核心板与底板构成&#xff0c;特别之处…...

代码随想录冲冲冲 Day58 图论Part9

47. 参加科学大会&#xff08;第六期模拟笔试&#xff09; 根据昨天的dijkstra进行堆优化 使用的原因是点多但边少 所以直接对于边进行操作 1.对于priority_queue来说 这是最小堆, 小于的话就是最大堆 之后由于是根据边来说的 所以新建一个Edge并且初始化一下 之后由于使用…...

UnityHub下载任意版本的Unity包

1)先打开 // 也可以采用2直接打开 2)也可以直接打开 下载存档 (unity.com) 3)关联起来UnityHub即可...

网站服务器怎么计算同时在线人数?

网站服务器计算同时在线人数通常涉及跟踪和记录当前活跃会话的数量。以下是几种常用的方法来估算或计算网站的同时在线人数&#xff1a; 1. 会话跟踪 - 基于会话(Session)&#xff1a;服务器可以为每个访问者创建一个会话&#xff0c;并跟踪这些会话。当访问者首次访问网站时&a…...

[spring]MyBatis介绍 及 用MyBatis注解操作简单数据库

文章目录 一. 什么是MyBatis二. MyBatis操作数据库步骤(使用注解)创建工程创建数据库创建对应实体类配置数据库连接字符串写持久层代码单元测试 三. MyBatis基础操作 使用注解打印日志参数传递增删改查 一. 什么是MyBatis 简单来说 MyBatis 是更简单完成程序和数据库交互的框架…...

Ks渲染做汽车动画吗?汽车本地渲染与云渲染成本分析

Keyshot是一款强大的实时光线追踪和全域光渲染软件&#xff0c;它确实可以用于制作汽车动画&#xff0c;包括汽车模型的渲染和动画展示。Keyshot的动画功能允许用户创建相机移动、物体变化等动态效果&#xff0c;非常适合用于汽车动画的制作。 至于汽车动画的渲染成本&#xff…...

AI智能时代:哪款编程工具让你的工作效率翻倍?

引言 在日益繁忙的工作环境中&#xff0c;选择合适的编程工具已成为提升开发者工作效率的关键。不同的工具能够帮助我们简化代码编写、自动化任务、提升调试速度&#xff0c;甚至让团队协作更加顺畅。那么&#xff0c;哪款编程工具让你的工作效率翻倍&#xff1f;是智能的代码编…...

这五本大模型书籍,让你从大模型零基础到精通,非常详细收藏我这一篇就够了

大模型&#xff08;Large Language Models, LLMs&#xff09;是近年来人工智能领域的一大热点&#xff0c;它们在自然语言处理、对话系统、内容生成等多个方面展现出了强大的能力。随着技术的发展&#xff0c;市面上出现了许多介绍大模型理论与实践的书籍&#xff0c;为研究人员…...

面试经典150题 堆

215.数组中的第K个最大元素 建堆算法实现-CSDN博客 215. 数组中的第K个最大元素 中等 给定整数数组 nums 和整数 k&#xff0c;请返回数组中第 k 个最大的元素。 请注意&#xff0c;你需要找的是数组排序后的第 k 个最大的元素&#xff0c;而不是第 k 个不同的元素。 你必…...

day-62 每种字符至少取 K 个

思路 滑动窗口&#xff1a;改变思路&#xff0c;从左右两边取字符&#xff0c;是a b c三个字符至少被取k次&#xff0c;那么意味着如果我们知道字符串中a b c的出现个数&#xff0c;那么可以知道取走后剩下子串a b c的个数&#xff0c;问题转化为了求最长子串 解题过程 如果a …...

免费好用!AI声音克隆神器,超级简单,10秒就能克隆任何声音!(附保姆级教程)

今天下午还有读者问&#xff1a; 有没有能克隆声音的 AI 工具&#xff1f; 其实剪映很早就上了克隆声音的功能。 只需要按要求朗读例句&#xff0c;或者上传本地的音视频文件&#xff0c;就可以克隆声音了。 操作非常简单&#xff0c;效果也不错&#xff0c;可以试试。 除了…...

LeetCode146 LRU缓存

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中&#xff0c;则返回关键字的值&#xff0c;否则返回 -1 …...

【Java】包装类【主线学习笔记】

文章目录 前言包装类基本数据类型与包装类之间的转换基本数据类型转换为包装类可以通过以下几种方式&#xff1a;包装类转换为基本数据类型可以通过以下几种方式&#xff1a;初始化值不同与String之间的转换 前言 Java是一门功能强大且广泛应用的编程语言&#xff0c;具有跨平台…...

华为HarmonyOS地图服务 11 - 如何在地图上增加点注释?

场景介绍 本章节将向您介绍如何在地图的指定位置添加点注释以标识位置、商家、建筑等&#xff0c;并可以通过信息窗口展示详细信息。 点注释支持功能&#xff1a; 支持设置图标、文字、碰撞规则等。支持添加点击事件。 PointAnnotation有默认风格&#xff0c;同时也支持自定…...

uniapp js怎么根据map需要显示的点位,计算自适应的缩放scale

推荐学习文档 golang应用级os框架&#xff0c;欢迎stargolang应用级os框架使用案例&#xff0c;欢迎star案例&#xff1a;基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识&#xff0c;这里有免费的golang学习笔…...

Mysql 架构

目录 1.1 Mysql 逻辑架构图 1.2 SQL 的执行流程 1.3 SQL 书写顺序和执行顺序 1.4 Mysql 日志文件 1.4.1. 二进制日志&#xff08;Binary Log&#xff09; 1.4.2. 错误日志&#xff08;Error Log&#xff09; 1.4.3. 慢查询日志&#xff08;Slow Query Log&#xff09; 1.…...

TMI8260SP的替代品7889直流双向电机驱动芯片详解

在直流电机驱动领域&#xff0c;TMI8260SP作为一款经典的双向马达驱动芯片&#xff0c;曾广泛应用于各类中低功率电机控制场景&#xff0c;其稳定的性能积累了良好的市场口碑。但随着市场对电机驱动芯片的性能、功耗及性价比要求不断提升&#xff0c;7889直流双向电机驱动芯片凭…...

5步打造高效音乐体验:Listen1扩展的智能选择与效率提升指南

5步打造高效音乐体验&#xff1a;Listen1扩展的智能选择与效率提升指南 【免费下载链接】listen1_chrome_extension one for all free music in china (chrome extension, also works for firefox) 项目地址: https://gitcode.com/gh_mirrors/li/listen1_chrome_extension …...

MySQL存储图片旋转元数据的最佳实践

MySQL存储图片旋转元数据的最佳实践 1. 引言 在日常应用中&#xff0c;我们经常遇到这样的场景&#xff1a;用户上传的图片在显示时方向不正确&#xff0c;需要根据EXIF信息中的旋转角度进行自动校正。比如手机拍摄的照片&#xff0c;由于设备方向不同&#xff0c;可能包含90…...

英飞凌IPOSIM在线仿真平台保姆级入门:从注册到生成第一份功率损耗报告

英飞凌IPOSIM在线仿真平台零基础实战指南&#xff1a;三步完成功率模块热评估 在电力电子设计领域&#xff0c;精确的功率损耗计算往往决定着系统可靠性。我曾见过一个光伏逆变器项目因热设计失误导致批量返修&#xff0c;仅仅因为工程师低估了IGBT模块在高温环境下的导通损耗。…...

【部署】windows下虚拟机OpenClaw Ubuntu 24.04.4 安装指南

未来已来,只需一句指令,养龙虾专栏导航,持续更新ing… 概述 前置环境:win10/11、vmware等虚拟机(安装时注意勾选VMware Tools、cpu可以分配2C,内存建议4G,硬盘空间建议给40G) 系统要求 Node.js 22+:安装脚本可自动检测并安装(下文补充手动安装方案); Ubuntu 24.0…...

Xilinx Video IP实战:如何将HDMI输入转换为AXI4-Stream(附仿真+上板测试)

Xilinx Video IP实战&#xff1a;HDMI转AXI4-Stream全流程开发指南 在FPGA视频处理系统中&#xff0c;将HDMI等视频输入接口转换为标准化的AXI4-Stream协议是构建复杂视频处理流水线的关键第一步。不同于简单的接口转换&#xff0c;这一过程涉及视频时序解析、数据位宽适配、时…...

ESP32轻量级18650电池电量估算库设计与实现

1. 项目概述Battery_18650_Stats是一款专为 ESP32 平台设计的轻量级嵌入式电池状态计算库&#xff0c;核心目标是在 Arduino IDE 环境下&#xff0c;以最小资源开销、最高工程鲁棒性&#xff0c;实现对单节 18650 锂离子电池&#xff08;Li-ion&#xff09;荷电状态&#xff08…...

保姆级教程:手把手教你安装并激活DevExpress 20.1.3(附资源与注册机使用避坑指南)

深度指南&#xff1a;DevExpress 20.1.3开发环境高效配置与资源管理 在.NET生态系统中&#xff0c;DevExpress始终以其强大的控件库和高效的开发工具占据重要地位。对于刚接触这个工具集的开发者来说&#xff0c;如何快速搭建一个稳定的开发环境往往成为项目启动的第一道门槛。…...

SVPWM/AZSPWM的simulink仿真 AZSPWM(Advanced Zero Se...

SVPWM/AZSPWM的simulink仿真 AZSPWM&#xff08;Advanced Zero Sequence Pulse Width Modulation&#xff0c;先进零序脉宽调制&#xff09;是一种改进的脉宽调制技术&#xff0c;主要应用于三相逆变器中&#xff0c;通过引入零序分量来优化输出电压的波形和性能。 AZSPWM的目标…...

AI Agent 的动态知识更新:保持 LLM 知识的实时性

AI Agent 的动态知识更新:保持 LLM 知识的实时性 关键词:AI Agent、动态知识更新、大语言模型(LLM)、实时性、知识图谱 摘要:本文聚焦于 AI Agent 的动态知识更新,旨在探讨如何保持大语言模型(LLM)知识的实时性。首先介绍了相关背景,包括目的、预期读者等。接着阐述了…...