Redis之zset在异步队列上的应用
当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。
一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训获取zset中的任务并进行处理。
需要提前考虑一个问题:
如何避免一个任务被多次处理?
一种解决方案是当多个线程获取到任务时,调用redis的zrem命令,将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。
环境
- JDK17
- 两个jar包
- Jedis
- fastjson2
代码
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import redis.clients.jedis.Jedis;import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;// 基于Redis实现的延迟队列
public class RedisDelayingQueue<T> {static class TaskItem<T> {public String id;public T msg;}// fastjson序列化对象时如果存在泛型,需要使用TypeReferenceprivate Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();private Jedis jedis;private String queueKey;public RedisDelayingQueue(Jedis jedis, String queueKey) {this.jedis = jedis;this.queueKey = queueKey;}// 将任务添加到 zset 中// 分数是延时的时间public void delay(T msg) {TaskItem<T> task = new TaskItem<T>();task.id = UUID.randomUUID().toString();task.msg = msg;// 序列化任务String s = JSON.toJSONString(task);jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s);}public void loop() {while(!Thread.interrupted()) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}public void handleMsg(T msg) {System.out.println(msg);}
}
优化
通过上面loop中代码,多个线程获取到values时,可能会被多个线程同时取到,然后再调用zrem命令去竞争的删除该值,所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的,两种实现方案:
- 通过对代码块进行加锁的方式
- 利用redis中lua脚本的原子执行的特点
代码块加锁
这种方案不太好,如果两个命令之间发生了网络错误或者延迟,将造成其它线程的阻塞
public void synchronizedLoop() {while(!Thread.interrupted()) {synchronized(this) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}}
Lua脚本
local key = KEYS[1]
local task = redis.call('ZPOPMIN', key)
if task and next(task) != nil thenredis.call('ZREM', key, task[1])return task[1]
elsereturn nil
end
通过查阅文档发现,ZRANGEBYSCORE从6 版本开始已经过时了,所以这里使用ZPOPMIN来获取分数最小的value,可以达到相同的效果。
通过Jedis的eval函数,调用redis执行lua脚本的命令。
public void luaLoop() {while(!Thread.interrupted()) {Object ans = jedis.eval(script, 1, queueKey);if(ans != null) {String task = (String) ans;TaskItem<T> taskItem = JSON.parseObject(task, TaskType);this.handleMsg(taskItem.msg);}else{try{Thread.sleep(500);}catch(Exception e) {break;}}}}
为什么可以优化:
- 使用lua脚本的方式,使得一个线程如果zset中有任务都会成功获取任务,而不会多个线程同时拿到同一个任务,再去竞争删除,减少了无效的网络IO
测试程序
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;public class Main {public static void main(String[] args) {JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass");Jedis jedis = jedisPool.getResource();RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");// 创建一个线程充当生产者,并向redis中存10个异步任务Thread producer = new Thread() {public void run() {for (int i = 0; i < 10; i++) {queue.delay("codehole" + i);}}};// 创建一个线程充当消费者,不断从redis中取任务并执行Thread consumer = new Thread() {public void run() {queue.luaLoop();}};producer.start();consumer.start();try {// 等待生产者线程执行结束producer.join();Thread.sleep(6000);consumer.interrupt();consumer.join();}catch(InterruptedException e) {e.printStackTrace();}}
}
一些问题
这个问题是关于Jedis的问题,因为我通过上面的方式发起redis请求实际上是存在并发问题的,如果将上述代码中的延时去掉,这个问题发生的概率将大大发生,主要是因为Jedis不是线程安全的,换句话说,通过JedisPool获取redis连接的实例,并发访问是是通过同一个socket发送数据的。
这里使用时,最好是每个线程都用有一个Jedis的实例,避免数据竞争问题.这里只是用了两个线程,所以简单手动使用两个redis实例,如果有多个消费者存在的情况下,还是每个线程单独持有一个Jedis才能解决问题。
private Jedis readJedis;private Jedis writeJedis;
总结
本篇文章记录了使用zset实现一个简单异步队列的过程,然后对于第一次实现存在的一个问题,使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度,所以一般使用lua脚本的方式来实现。
相关文章:
Redis之zset在异步队列上的应用
当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。 一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训…...
day4:Node.js 核心库
day4:Node.js 核心库 文章目录 day4:Node.js 核心库常用工具模块util 模块Moment 模块Lodash 模块web模块文件模块path 模块常用工具模块 Node.js有许多常用的工具,以下是一些常见的: util: 是一个Node.js 核心模块,提供常用函数的集合,用于弥补核心 JavaScript 的功能…...
PHP非对称与对称双向加密解密的方式
目录 RSA非对称加密解密: 什么是RSA非对称加密解密解析: 解析: 为什么使用: 有什么优点: DEMO: AES、DES、3DES等对称加密解密: 解析: 为什么使用: 有什么优点: DEMO: RSA非对称加密解密: 什么是RSA非对称加密解密解析: 解析: RSA非对称加密…...
C++之struct匿名结构体实例(二百四十四)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生…...
npm publish发布到在线仓库时,提示:Scope not found
当npm publish发布时,控制台提示:Scope not found,具体错误信息如下: npm notice npm ERR! code E404 npm ERR! 404 Not Found - PUT https://registry.npmjs.org/xxx%2fxxx - Scope not found npm ERR! 404 npm ERR! 404 xxx/xx…...
AWS Lambda 操作 RDS 示例
实现目标 创建一个 Lambda 接收调用时传入的数据, 写入 RDS 数据库 Post 表存储文章信息. 表结构如下: idtitlecontentcreate_date1我是标题我是正文内容2023-10-21 15:20:00 AWS 资源准备 RDS 控制台创建 MySQL 实例, 不允许 Public access (后面 Lambda 需要通过 VPC 访问…...
【java爬虫】使用selenium获取某交易所公司半年报数据
引言 上市公司的财报数据一般都会进行公开,我们可以在某交易所的官方网站上查看这些数据,由于数据很多,如果只是手动收集的话可能会比较耗时耗力,我们可以采用爬虫的方法进行数据的获取。 本文就介绍采用selenium框架进行公司财…...
MATLAB - 不能使用PYTHON,缺少matplotlib模块的解决办法
matlab缺少python-matplotlib模块的解决办法 1. 前言、概述2. 解决办法3. 可能出现问题4. 结果 1. 前言、概述 起因是我用习惯的colormap函数getPyPlot_cMap不能用了:【这个函数要调用PYTHON】 报错的地方: ModuleNotFoundError: No module named ‘ma…...
mk语法示例
这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…...
英语什么时候加s和es
名词变复数一般情况下加s,以s,x,ch,sh结尾加es。一个名词如果表示一个或一样东西,它取单数形式,如果表示两个或更多的这类东西,则需要用名词复数形式。 1 以s,x,sh,ch结尾的词,加es。 2 以辅音字母(除a/e/…...
unity中方向的两种表示:欧拉角和四元数
欧拉角:简单来说就是你可以选择 0度~360度 的范围 四元数:在计算机图像学中,四元数用于物体的旋转,是一种复杂,但效率较高的旋转方式 Quaternion结构体代表一个四元数,包含一个标量和一个三维向量&#x…...
ViT-L-14.pt下载load checkpoint from xxx
load checkpoint from E:\BaiduNetdiskDownload\sd-webui-aki-v4\models\BLIP\model_base_caption_capfilt_large.pth stable diffusion反推提示词出现此提示时,需安装以下模型至sd-webui-aki-v4.cache\clip\目录 ViT-L-14.pt https://openaipublic.azureedge.net/…...
机械设备经营小程序商城的作用是什么
由于机械设备厂商品牌需要各地招商代理,因此在管理方面也需要工具进行高效管理。如今各个行业都在开展数字化转型解决行业所遇难题或通过线上销售解决传统三公里难题及品牌扩张难题、用户消费渠道少等难题,构建会员体系精细化管理,同时还需要…...
小程序跨页面传递参数的几种方式
当我们在开发小程序时,经常会遇到需要在不同页面之间传递数据的情况。为了实现页面间的数据传递,小程序提供了多种方法。下面将介绍几种常用的传递数据的方法。 URL参数传递:这是一种简单直接的传递数据的方式。在跳转页面时,可以…...
【算法与数据结构】--高级算法和数据结构--高级数据结构
一、堆和优先队列 堆(Heap)是一种特殊的树状数据结构,通常用于实现优先队列。堆有两种主要类型:最大堆和最小堆。最大堆是一棵树,其中每个父节点的值都大于或等于其子节点的值,而最小堆是一棵树࿰…...
小工具 - Python图片转PDF文件
前言 主要整理记载一些python实现的小脚本,网上基本转换要会员,懒得搞了,这个一键生成,可以打包成exe文件使用 单张图片转换成pdf、图片批量转换成pdf # coding UTF-8 import os from io import BytesIO from PIL import Imag…...
bitbucket.org 用法
这个网站需要魔法,注册完成后添加厂库时间2023.10 图1 图2 第二张图 ,不要.gitignore文件 sourcetree 1,创建前端项目 npm create vitelatest 2.打开vscode创建本地Git 看到Git代提交的文件 sourcetree,新建 已存在的本地厂库 提交到Git 添…...
lodash常用方法合集
安装lodash 建议安装lodash-es,lodash-es 是 lodash 的 es modules 版本 ,是着具备 ES6 模块化的版本,体积小。按需引入。 示例 npm i lodash-es import { chunk,compact } from lodash-es; /**按需引入*/ 1.chunk 数组分组 chunk(arra…...
Nginx平滑升级重定向rewrite
文章目录 Nginx平滑升级&重定向rewritenginx平滑升级流程环境查看旧版的配置信息下载新版nginx源码包和功能模块包编译配置新版本平滑升级验证 重定向rewrite配置重定向准发访问测试 Nginx平滑升级&重定向rewrite nginx平滑升级 流程 平滑升级: (升级版本、增加新功…...
Mysql基础与高级汇总
SQL语言分类 DDL:定义 DML:操作 DCL:控制(用于定义访问权限和安全级别) DQL:查询 Sql方言 ->sql:结构化查询语言 mysql:limit oracle:rownum sqlserver:top 但是存储过程:每一种数据库软件一样SQL语法要求: SQL语句可以单行或多行书写&…...
【脚本安装】十分钟配置Claude Code:终端里的AI编程搭档
十分钟上手Claude Code:终端里的AI编程搭档从零开始配置属于你自己的AI编程助手,让代码审查、批量修改、技术问答都在命令行里搞定。为什么写这篇 最近折腾了不少AI编程工具,Claude Code给我的体验最接近「搭档」这个词——不是那种被动等指令…...
TRNSYS模块太多记不住?这份保姆级模块速查手册(附中英文对照)帮你快速定位
TRNSYS模块速查实战指南:从分类逻辑到精准调用 面对TRNSYS中数百个模块编号和复杂的英文命名体系,许多工程师在搭建系统模型时都会陷入"选择困难"。本文将彻底改变你查找模块的方式——我们不再简单罗列中英文对照表,而是从实际建模…...
【代码】基于交替方向乘子法(admm)的微电网分布式低碳优化运行策略matlab-yalmip-cplex/gurobi
✅作者简介:热爱科研的Matlab仿真开发者,擅长毕业设计辅导、数学建模、数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。🍎 往期回顾关注个人主页:Matlab科研工作室👇 关注我领取海量matlab电子书和…...
AGI平民化接入实战手册(SITS2026现场闭门报告首次公开)
第一章:SITS2026专家:AGI的民主化访问 2026奇点智能技术大会(https://ml-summit.org) 从封闭模型到开放协议 AGI能力正加速脱离专有云服务与高门槛API调用范式,转向基于轻量级推理引擎、可验证提示合约和联邦式知识更新的开放基础设施。SIT…...
联想小新Air14 AMD版装Ubuntu 20.04,升级内核到5.11解决触控板和亮度问题(附详细步骤)
联想小新Air14 AMD版Ubuntu 20.04深度优化指南:从内核调优到桌面效率革命 当AMD锐龙5500U遇上Ubuntu 20.04,这本应是开源世界与高性能硬件的完美邂逅,但预装的5.8内核却让触控板和亮度调节成了摆设。这不是个例——2023年硬件兼容性报告显示&…...
Day03:Function Calling 核心
文章目录一、Function Calling 核心概念与定义1.1 技术本质与原理1.2 与传统 AI 推理的区别1.3 主要技术实现框架二、Function Calling 的核心价值与解决的问题2.1 解决知识截止问题2.2 解决实时数据获取需求2.3 解决外部动作执行问题2.4 安全性与可控性设计三、Function Calli…...
告别ESP32环境配置噩梦:用Python虚拟环境一劳永逸管理ESP-IDF依赖
ESP32开发者的Python虚拟环境实战指南:彻底解决依赖冲突难题 每次打开ESP-IDF项目时,那些烦人的Python依赖报错是不是让你血压飙升?不同项目间的包版本冲突是否让你在pip install和pip uninstall之间反复横跳?作为一名长期奋战在E…...
别再凭感觉了!用Excel快速搞定外观检验员一致性(Kappa)分析,附免费模板
用Excel实现外观检验一致性分析的实战指南 在制造业的质量控制环节,外观检验的一致性直接影响产品合格率与客户满意度。传统手工计算Kappa值不仅耗时费力,还容易出错。本文将手把手教你如何用Excel搭建自动化分析模板,让质量工程师在10分钟内…...
避坑指南:RH850 SPI DMA配置中PEG权限和InterDataTime那些事儿,你踩雷了吗?
RH850 SPI DMA实战避坑:PEG权限与InterDataTime的深度解析 实验室里,示波器上的SPI波形突然停滞,工程师盯着屏幕上的异常数据陷入沉思——这已经是本周第三次遇到DMA传输失败的问题了。RH850的SPI DMA配置看似简单,但PEG权限设置不…...
深入PCIe数据包:除了Header和Data,TLP Prefix如何为虚拟化和高性能计算“加戏”?
PCIe TLP Prefix技术解析:从虚拟化到异构计算的底层革新 在数据中心架构持续演进的今天,PCIe总线早已突破传统外设连接的范畴,成为支撑GPU加速、智能网卡、CXL内存池化等前沿技术的核心互连标准。而TLP Prefix作为PCIe协议中一个看似微小的可…...
