当前位置: 首页 > news >正文

【flink】之如何消费kafka数据并读写入redis?

背景: 

最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。

准备: 

    <!-- 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency></dependencies>

代码:

package com.iterge.flink;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/*** Hello world!**/
@Slf4j
public class FlinkDemo {//创建连接池static final JedisPool pool = new JedisPool("127.0.0.0",8423);//创建redis客户端static final Jedis jedis = pool.getResource();public static void main( String[] args ) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");stringDataStreamSource.map(new RichMapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {//读redisSystem.out.println("test="+jedis.get("test"));//写redisjedis.setex("test",60,s);return s;}@Overridepublic void close() throws Exception {super.close();jedis.close();}});stringDataStreamSource.print();env.execute("test");}
}

相关文章:

【flink】之如何消费kafka数据并读写入redis?

背景&#xff1a; 最近公司出现做了一个新需求&#xff0c;需求内容是加工一个营销时机&#xff0c;但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。 准备&#xff1a; <!-- 依赖 --><dependency><groupId>org.apache.fl…...

搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(二)-索引

场景 首先介绍测试的场景&#xff0c;本系列schema定义 pdm文档索引&#xff0c;包括nested&#xff0c;作为文档扩展属性字段&#xff0c;_content字段是组件保留字段&#xff0c;支持文本内容&#xff0c;字段属性还有其他属性&#xff0c;如boost&#xff0c;getter&#x…...

离散化算法

离散化 在C中&#xff0c;离散化通常指的是将连续的数值或数据转化为离散的形式。这在数值分析、信号处理、图像处理和机器学习等领域都非常常见。以下是一些离散化的基本概念和方法&#xff1a; 1.区间划分&#xff1a; 将连续变量的值域分成多个区间&#xff0c;每个区间对…...

基于ollama的本地RAG实践

先放参考的原文链接大语言模型实战——搭建纯本地迷你版RAG_本地rag-CSDN博客 一、大模型选择 在我之前的文章中有讲到&#xff0c;我用的是ollama中的llama3.1 Ollama在Windows安装&#xff0c;使用&#xff0c;简单调用API_ollama如何对外提供api-CSDN博客 二、嵌入模型 …...

安卓开发板_MTK开发板_联发科开发评估套件Demo板接口介绍

开发板是一种功能丰富的电路平台&#xff0c;专为开发人员设计&#xff0c;集成了多种传感器、扩展接口和通信模块。这使得开发者能够高效进行原型设计和功能验证&#xff0c;极大地简化了软硬件开发的过程。 此次介绍的安卓开发板由MT8788核心板与底板构成&#xff0c;特别之处…...

代码随想录冲冲冲 Day58 图论Part9

47. 参加科学大会&#xff08;第六期模拟笔试&#xff09; 根据昨天的dijkstra进行堆优化 使用的原因是点多但边少 所以直接对于边进行操作 1.对于priority_queue来说 这是最小堆, 小于的话就是最大堆 之后由于是根据边来说的 所以新建一个Edge并且初始化一下 之后由于使用…...

UnityHub下载任意版本的Unity包

1)先打开 // 也可以采用2直接打开 2)也可以直接打开 下载存档 (unity.com) 3)关联起来UnityHub即可...

网站服务器怎么计算同时在线人数?

网站服务器计算同时在线人数通常涉及跟踪和记录当前活跃会话的数量。以下是几种常用的方法来估算或计算网站的同时在线人数&#xff1a; 1. 会话跟踪 - 基于会话(Session)&#xff1a;服务器可以为每个访问者创建一个会话&#xff0c;并跟踪这些会话。当访问者首次访问网站时&a…...

[spring]MyBatis介绍 及 用MyBatis注解操作简单数据库

文章目录 一. 什么是MyBatis二. MyBatis操作数据库步骤(使用注解)创建工程创建数据库创建对应实体类配置数据库连接字符串写持久层代码单元测试 三. MyBatis基础操作 使用注解打印日志参数传递增删改查 一. 什么是MyBatis 简单来说 MyBatis 是更简单完成程序和数据库交互的框架…...

Ks渲染做汽车动画吗?汽车本地渲染与云渲染成本分析

Keyshot是一款强大的实时光线追踪和全域光渲染软件&#xff0c;它确实可以用于制作汽车动画&#xff0c;包括汽车模型的渲染和动画展示。Keyshot的动画功能允许用户创建相机移动、物体变化等动态效果&#xff0c;非常适合用于汽车动画的制作。 至于汽车动画的渲染成本&#xff…...

AI智能时代:哪款编程工具让你的工作效率翻倍?

引言 在日益繁忙的工作环境中&#xff0c;选择合适的编程工具已成为提升开发者工作效率的关键。不同的工具能够帮助我们简化代码编写、自动化任务、提升调试速度&#xff0c;甚至让团队协作更加顺畅。那么&#xff0c;哪款编程工具让你的工作效率翻倍&#xff1f;是智能的代码编…...

这五本大模型书籍,让你从大模型零基础到精通,非常详细收藏我这一篇就够了

大模型&#xff08;Large Language Models, LLMs&#xff09;是近年来人工智能领域的一大热点&#xff0c;它们在自然语言处理、对话系统、内容生成等多个方面展现出了强大的能力。随着技术的发展&#xff0c;市面上出现了许多介绍大模型理论与实践的书籍&#xff0c;为研究人员…...

面试经典150题 堆

215.数组中的第K个最大元素 建堆算法实现-CSDN博客 215. 数组中的第K个最大元素 中等 给定整数数组 nums 和整数 k&#xff0c;请返回数组中第 k 个最大的元素。 请注意&#xff0c;你需要找的是数组排序后的第 k 个最大的元素&#xff0c;而不是第 k 个不同的元素。 你必…...

day-62 每种字符至少取 K 个

思路 滑动窗口&#xff1a;改变思路&#xff0c;从左右两边取字符&#xff0c;是a b c三个字符至少被取k次&#xff0c;那么意味着如果我们知道字符串中a b c的出现个数&#xff0c;那么可以知道取走后剩下子串a b c的个数&#xff0c;问题转化为了求最长子串 解题过程 如果a …...

免费好用!AI声音克隆神器,超级简单,10秒就能克隆任何声音!(附保姆级教程)

今天下午还有读者问&#xff1a; 有没有能克隆声音的 AI 工具&#xff1f; 其实剪映很早就上了克隆声音的功能。 只需要按要求朗读例句&#xff0c;或者上传本地的音视频文件&#xff0c;就可以克隆声音了。 操作非常简单&#xff0c;效果也不错&#xff0c;可以试试。 除了…...

LeetCode146 LRU缓存

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中&#xff0c;则返回关键字的值&#xff0c;否则返回 -1 …...

【Java】包装类【主线学习笔记】

文章目录 前言包装类基本数据类型与包装类之间的转换基本数据类型转换为包装类可以通过以下几种方式&#xff1a;包装类转换为基本数据类型可以通过以下几种方式&#xff1a;初始化值不同与String之间的转换 前言 Java是一门功能强大且广泛应用的编程语言&#xff0c;具有跨平台…...

华为HarmonyOS地图服务 11 - 如何在地图上增加点注释?

场景介绍 本章节将向您介绍如何在地图的指定位置添加点注释以标识位置、商家、建筑等&#xff0c;并可以通过信息窗口展示详细信息。 点注释支持功能&#xff1a; 支持设置图标、文字、碰撞规则等。支持添加点击事件。 PointAnnotation有默认风格&#xff0c;同时也支持自定…...

uniapp js怎么根据map需要显示的点位,计算自适应的缩放scale

推荐学习文档 golang应用级os框架&#xff0c;欢迎stargolang应用级os框架使用案例&#xff0c;欢迎star案例&#xff1a;基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识&#xff0c;这里有免费的golang学习笔…...

Mysql 架构

目录 1.1 Mysql 逻辑架构图 1.2 SQL 的执行流程 1.3 SQL 书写顺序和执行顺序 1.4 Mysql 日志文件 1.4.1. 二进制日志&#xff08;Binary Log&#xff09; 1.4.2. 错误日志&#xff08;Error Log&#xff09; 1.4.3. 慢查询日志&#xff08;Slow Query Log&#xff09; 1.…...

Vircadia Native Core:开源虚拟世界服务器核心架构与部署实战

1. 项目概述&#xff1a;一个开源虚拟世界的“引擎心脏”如果你对构建一个属于自己的、去中心化的虚拟世界&#xff08;Metaverse&#xff09;感兴趣&#xff0c;或者你正在寻找一个能支撑起大规模、高自由度社交与协作应用的底层平台&#xff0c;那么Vircadia Native Core绝对…...

dotai:将AI大模型无缝集成到Shell终端的智能助手工具

1. 项目概述&#xff1a;当AI遇上你的终端如果你是一个重度命令行用户&#xff0c;每天在终端里敲击着ls、cd、git commit这些命令&#xff0c;有没有那么一瞬间&#xff0c;希望有个助手能帮你自动补全、解释命令&#xff0c;甚至直接帮你写出复杂的管道操作&#xff1f;dotai…...

三维重建实时映射技术在智慧水利中的核心应用

三维重建实时映射技术在智慧水利中的核心应用在国家大力推进数字孪生水利建设、实现水安全精准保障的背景下&#xff0c;智慧水利已从传统监测、调度向全域感知、智能预判、协同处置、一屏统管升级。智慧水利的核心目标&#xff0c;是实现对江河湖库、灌区、泵站、堤坝、闸站等…...

Windows Android子系统深度优化:WSABuilds项目架构解析与实战部署指南

Windows Android子系统深度优化&#xff1a;WSABuilds项目架构解析与实战部署指南 【免费下载链接】WSABuilds Run Windows Subsystem For Android on your Windows 10 and Windows 11 PC using prebuilt binaries with Google Play Store (MindTheGapps) and/or Magisk or Ker…...

别再只会Commit了!用Git Desktop搞定分支合并与冲突解决(附真实开发场景)

别再只会Commit了&#xff01;用Git Desktop搞定分支合并与冲突解决&#xff08;附真实开发场景&#xff09; 当你第一次接触Git时&#xff0c;可能觉得它就是个"保存按钮"——每次改完代码就commit一下。但随着项目规模扩大&#xff0c;特别是多人协作时&#xff0c…...

开源技能安全仪表盘:从架构解析到CI/CD集成的DevSecOps实践

1. 项目概述&#xff1a;一个面向技能开发者的安全仪表盘最近在折腾一些智能设备上的技能开发&#xff0c;发现一个挺普遍但容易被忽视的问题&#xff1a;我们花大量时间在功能实现和用户体验上&#xff0c;但技能本身的安全性评估&#xff0c;往往只能等到上线后&#xff0c;通…...

Pixel Framebuf库:图形化编程驱动LED矩阵,告别底层坐标换算

1. 项目概述&#xff1a;告别点灯&#xff0c;拥抱图形化LED矩阵编程如果你玩过Arduino或者树莓派&#xff0c;大概率接触过WS2812B这类可寻址LED&#xff0c;也就是大家常说的NeoPixel。单个灯珠的控制很简单&#xff0c;setPixelColor一下就能亮。但当你面对一个8x8、16x16甚…...

嵌入式动画优化:DMA驱动位图渲染在SAMD21上的实现

1. 项目概述与核心思路如果你玩过嵌入式开发&#xff0c;尤其是想在小小的微控制器屏幕上搞点流畅的动画&#xff0c;大概率会被“卡顿”和“闪屏”折磨过。传统的逐像素绘制&#xff0c;在需要全屏更新时&#xff0c;CPU时间几乎全耗在了等待屏幕刷新上&#xff0c;用户体验大…...

像素风格技能图标自动生成:Python+Pillow实现模板化设计

1. 项目概述与核心价值最近在和一些做独立开发者和内容创作者的朋友聊天时&#xff0c;发现一个普遍痛点&#xff1a;大家手头都有不少好想法&#xff0c;但一到具体执行&#xff0c;尤其是需要制作宣传素材时&#xff0c;就卡住了。比如&#xff0c;想给自己的新App做个宣传图…...

ComfyUI ControlNet Aux 终极指南:30+种预处理器让AI图像生成更精准

ComfyUI ControlNet Aux 终极指南&#xff1a;30种预处理器让AI图像生成更精准 【免费下载链接】comfyui_controlnet_aux ComfyUIs ControlNet Auxiliary Preprocessors 项目地址: https://gitcode.com/gh_mirrors/co/comfyui_controlnet_aux 想让您的AI图像生成具备真实…...