当前位置: 首页 > news >正文

【flink】之如何消费kafka数据并读写入redis?

背景: 

最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。

准备: 

    <!-- 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency></dependencies>

代码:

package com.iterge.flink;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/*** Hello world!**/
@Slf4j
public class FlinkDemo {//创建连接池static final JedisPool pool = new JedisPool("127.0.0.0",8423);//创建redis客户端static final Jedis jedis = pool.getResource();public static void main( String[] args ) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");stringDataStreamSource.map(new RichMapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {//读redisSystem.out.println("test="+jedis.get("test"));//写redisjedis.setex("test",60,s);return s;}@Overridepublic void close() throws Exception {super.close();jedis.close();}});stringDataStreamSource.print();env.execute("test");}
}

相关文章:

【flink】之如何消费kafka数据并读写入redis?

背景&#xff1a; 最近公司出现做了一个新需求&#xff0c;需求内容是加工一个营销时机&#xff0c;但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。 准备&#xff1a; <!-- 依赖 --><dependency><groupId>org.apache.fl…...

搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(二)-索引

场景 首先介绍测试的场景&#xff0c;本系列schema定义 pdm文档索引&#xff0c;包括nested&#xff0c;作为文档扩展属性字段&#xff0c;_content字段是组件保留字段&#xff0c;支持文本内容&#xff0c;字段属性还有其他属性&#xff0c;如boost&#xff0c;getter&#x…...

离散化算法

离散化 在C中&#xff0c;离散化通常指的是将连续的数值或数据转化为离散的形式。这在数值分析、信号处理、图像处理和机器学习等领域都非常常见。以下是一些离散化的基本概念和方法&#xff1a; 1.区间划分&#xff1a; 将连续变量的值域分成多个区间&#xff0c;每个区间对…...

基于ollama的本地RAG实践

先放参考的原文链接大语言模型实战——搭建纯本地迷你版RAG_本地rag-CSDN博客 一、大模型选择 在我之前的文章中有讲到&#xff0c;我用的是ollama中的llama3.1 Ollama在Windows安装&#xff0c;使用&#xff0c;简单调用API_ollama如何对外提供api-CSDN博客 二、嵌入模型 …...

安卓开发板_MTK开发板_联发科开发评估套件Demo板接口介绍

开发板是一种功能丰富的电路平台&#xff0c;专为开发人员设计&#xff0c;集成了多种传感器、扩展接口和通信模块。这使得开发者能够高效进行原型设计和功能验证&#xff0c;极大地简化了软硬件开发的过程。 此次介绍的安卓开发板由MT8788核心板与底板构成&#xff0c;特别之处…...

代码随想录冲冲冲 Day58 图论Part9

47. 参加科学大会&#xff08;第六期模拟笔试&#xff09; 根据昨天的dijkstra进行堆优化 使用的原因是点多但边少 所以直接对于边进行操作 1.对于priority_queue来说 这是最小堆, 小于的话就是最大堆 之后由于是根据边来说的 所以新建一个Edge并且初始化一下 之后由于使用…...

UnityHub下载任意版本的Unity包

1)先打开 // 也可以采用2直接打开 2)也可以直接打开 下载存档 (unity.com) 3)关联起来UnityHub即可...

网站服务器怎么计算同时在线人数?

网站服务器计算同时在线人数通常涉及跟踪和记录当前活跃会话的数量。以下是几种常用的方法来估算或计算网站的同时在线人数&#xff1a; 1. 会话跟踪 - 基于会话(Session)&#xff1a;服务器可以为每个访问者创建一个会话&#xff0c;并跟踪这些会话。当访问者首次访问网站时&a…...

[spring]MyBatis介绍 及 用MyBatis注解操作简单数据库

文章目录 一. 什么是MyBatis二. MyBatis操作数据库步骤(使用注解)创建工程创建数据库创建对应实体类配置数据库连接字符串写持久层代码单元测试 三. MyBatis基础操作 使用注解打印日志参数传递增删改查 一. 什么是MyBatis 简单来说 MyBatis 是更简单完成程序和数据库交互的框架…...

Ks渲染做汽车动画吗?汽车本地渲染与云渲染成本分析

Keyshot是一款强大的实时光线追踪和全域光渲染软件&#xff0c;它确实可以用于制作汽车动画&#xff0c;包括汽车模型的渲染和动画展示。Keyshot的动画功能允许用户创建相机移动、物体变化等动态效果&#xff0c;非常适合用于汽车动画的制作。 至于汽车动画的渲染成本&#xff…...

AI智能时代:哪款编程工具让你的工作效率翻倍?

引言 在日益繁忙的工作环境中&#xff0c;选择合适的编程工具已成为提升开发者工作效率的关键。不同的工具能够帮助我们简化代码编写、自动化任务、提升调试速度&#xff0c;甚至让团队协作更加顺畅。那么&#xff0c;哪款编程工具让你的工作效率翻倍&#xff1f;是智能的代码编…...

这五本大模型书籍,让你从大模型零基础到精通,非常详细收藏我这一篇就够了

大模型&#xff08;Large Language Models, LLMs&#xff09;是近年来人工智能领域的一大热点&#xff0c;它们在自然语言处理、对话系统、内容生成等多个方面展现出了强大的能力。随着技术的发展&#xff0c;市面上出现了许多介绍大模型理论与实践的书籍&#xff0c;为研究人员…...

面试经典150题 堆

215.数组中的第K个最大元素 建堆算法实现-CSDN博客 215. 数组中的第K个最大元素 中等 给定整数数组 nums 和整数 k&#xff0c;请返回数组中第 k 个最大的元素。 请注意&#xff0c;你需要找的是数组排序后的第 k 个最大的元素&#xff0c;而不是第 k 个不同的元素。 你必…...

day-62 每种字符至少取 K 个

思路 滑动窗口&#xff1a;改变思路&#xff0c;从左右两边取字符&#xff0c;是a b c三个字符至少被取k次&#xff0c;那么意味着如果我们知道字符串中a b c的出现个数&#xff0c;那么可以知道取走后剩下子串a b c的个数&#xff0c;问题转化为了求最长子串 解题过程 如果a …...

免费好用!AI声音克隆神器,超级简单,10秒就能克隆任何声音!(附保姆级教程)

今天下午还有读者问&#xff1a; 有没有能克隆声音的 AI 工具&#xff1f; 其实剪映很早就上了克隆声音的功能。 只需要按要求朗读例句&#xff0c;或者上传本地的音视频文件&#xff0c;就可以克隆声音了。 操作非常简单&#xff0c;效果也不错&#xff0c;可以试试。 除了…...

LeetCode146 LRU缓存

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中&#xff0c;则返回关键字的值&#xff0c;否则返回 -1 …...

【Java】包装类【主线学习笔记】

文章目录 前言包装类基本数据类型与包装类之间的转换基本数据类型转换为包装类可以通过以下几种方式&#xff1a;包装类转换为基本数据类型可以通过以下几种方式&#xff1a;初始化值不同与String之间的转换 前言 Java是一门功能强大且广泛应用的编程语言&#xff0c;具有跨平台…...

华为HarmonyOS地图服务 11 - 如何在地图上增加点注释?

场景介绍 本章节将向您介绍如何在地图的指定位置添加点注释以标识位置、商家、建筑等&#xff0c;并可以通过信息窗口展示详细信息。 点注释支持功能&#xff1a; 支持设置图标、文字、碰撞规则等。支持添加点击事件。 PointAnnotation有默认风格&#xff0c;同时也支持自定…...

uniapp js怎么根据map需要显示的点位,计算自适应的缩放scale

推荐学习文档 golang应用级os框架&#xff0c;欢迎stargolang应用级os框架使用案例&#xff0c;欢迎star案例&#xff1a;基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识&#xff0c;这里有免费的golang学习笔…...

Mysql 架构

目录 1.1 Mysql 逻辑架构图 1.2 SQL 的执行流程 1.3 SQL 书写顺序和执行顺序 1.4 Mysql 日志文件 1.4.1. 二进制日志&#xff08;Binary Log&#xff09; 1.4.2. 错误日志&#xff08;Error Log&#xff09; 1.4.3. 慢查询日志&#xff08;Slow Query Log&#xff09; 1.…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

.Net框架,除了EF还有很多很多......

文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

微服务商城-商品微服务

数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

多种风格导航菜单 HTML 实现(附源码)

下面我将为您展示 6 种不同风格的导航菜单实现&#xff0c;每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...