Redis之zset在异步队列上的应用
当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。
一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训获取zset中的任务并进行处理。
需要提前考虑一个问题:
如何避免一个任务被多次处理?
一种解决方案是当多个线程获取到任务时,调用redis的zrem命令,将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。
环境
- JDK17
- 两个jar包
- Jedis
- fastjson2
代码
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import redis.clients.jedis.Jedis;import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;// 基于Redis实现的延迟队列
public class RedisDelayingQueue<T> {static class TaskItem<T> {public String id;public T msg;}// fastjson序列化对象时如果存在泛型,需要使用TypeReferenceprivate Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();private Jedis jedis;private String queueKey;public RedisDelayingQueue(Jedis jedis, String queueKey) {this.jedis = jedis;this.queueKey = queueKey;}// 将任务添加到 zset 中// 分数是延时的时间public void delay(T msg) {TaskItem<T> task = new TaskItem<T>();task.id = UUID.randomUUID().toString();task.msg = msg;// 序列化任务String s = JSON.toJSONString(task);jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s);}public void loop() {while(!Thread.interrupted()) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}public void handleMsg(T msg) {System.out.println(msg);}
}
优化
通过上面loop中代码,多个线程获取到values时,可能会被多个线程同时取到,然后再调用zrem命令去竞争的删除该值,所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的,两种实现方案:
- 通过对代码块进行加锁的方式
- 利用redis中lua脚本的原子执行的特点
代码块加锁
这种方案不太好,如果两个命令之间发生了网络错误或者延迟,将造成其它线程的阻塞
public void synchronizedLoop() {while(!Thread.interrupted()) {synchronized(this) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}}
Lua脚本
local key = KEYS[1]
local task = redis.call('ZPOPMIN', key)
if task and next(task) != nil thenredis.call('ZREM', key, task[1])return task[1]
elsereturn nil
end
通过查阅文档发现,ZRANGEBYSCORE从6 版本开始已经过时了,所以这里使用ZPOPMIN来获取分数最小的value,可以达到相同的效果。
通过Jedis的eval函数,调用redis执行lua脚本的命令。
public void luaLoop() {while(!Thread.interrupted()) {Object ans = jedis.eval(script, 1, queueKey);if(ans != null) {String task = (String) ans;TaskItem<T> taskItem = JSON.parseObject(task, TaskType);this.handleMsg(taskItem.msg);}else{try{Thread.sleep(500);}catch(Exception e) {break;}}}}
为什么可以优化:
- 使用lua脚本的方式,使得一个线程如果zset中有任务都会成功获取任务,而不会多个线程同时拿到同一个任务,再去竞争删除,减少了无效的网络IO
测试程序
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;public class Main {public static void main(String[] args) {JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass");Jedis jedis = jedisPool.getResource();RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");// 创建一个线程充当生产者,并向redis中存10个异步任务Thread producer = new Thread() {public void run() {for (int i = 0; i < 10; i++) {queue.delay("codehole" + i);}}};// 创建一个线程充当消费者,不断从redis中取任务并执行Thread consumer = new Thread() {public void run() {queue.luaLoop();}};producer.start();consumer.start();try {// 等待生产者线程执行结束producer.join();Thread.sleep(6000);consumer.interrupt();consumer.join();}catch(InterruptedException e) {e.printStackTrace();}}
}
一些问题
这个问题是关于Jedis的问题,因为我通过上面的方式发起redis请求实际上是存在并发问题的,如果将上述代码中的延时去掉,这个问题发生的概率将大大发生,主要是因为Jedis不是线程安全的,换句话说,通过JedisPool获取redis连接的实例,并发访问是是通过同一个socket发送数据的。
这里使用时,最好是每个线程都用有一个Jedis的实例,避免数据竞争问题.这里只是用了两个线程,所以简单手动使用两个redis实例,如果有多个消费者存在的情况下,还是每个线程单独持有一个Jedis才能解决问题。
private Jedis readJedis;private Jedis writeJedis;
总结
本篇文章记录了使用zset实现一个简单异步队列的过程,然后对于第一次实现存在的一个问题,使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度,所以一般使用lua脚本的方式来实现。
相关文章:
Redis之zset在异步队列上的应用
当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。 一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训…...
day4:Node.js 核心库
day4:Node.js 核心库 文章目录 day4:Node.js 核心库常用工具模块util 模块Moment 模块Lodash 模块web模块文件模块path 模块常用工具模块 Node.js有许多常用的工具,以下是一些常见的: util: 是一个Node.js 核心模块,提供常用函数的集合,用于弥补核心 JavaScript 的功能…...
PHP非对称与对称双向加密解密的方式
目录 RSA非对称加密解密: 什么是RSA非对称加密解密解析: 解析: 为什么使用: 有什么优点: DEMO: AES、DES、3DES等对称加密解密: 解析: 为什么使用: 有什么优点: DEMO: RSA非对称加密解密: 什么是RSA非对称加密解密解析: 解析: RSA非对称加密…...
C++之struct匿名结构体实例(二百四十四)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生…...
npm publish发布到在线仓库时,提示:Scope not found
当npm publish发布时,控制台提示:Scope not found,具体错误信息如下: npm notice npm ERR! code E404 npm ERR! 404 Not Found - PUT https://registry.npmjs.org/xxx%2fxxx - Scope not found npm ERR! 404 npm ERR! 404 xxx/xx…...
AWS Lambda 操作 RDS 示例
实现目标 创建一个 Lambda 接收调用时传入的数据, 写入 RDS 数据库 Post 表存储文章信息. 表结构如下: idtitlecontentcreate_date1我是标题我是正文内容2023-10-21 15:20:00 AWS 资源准备 RDS 控制台创建 MySQL 实例, 不允许 Public access (后面 Lambda 需要通过 VPC 访问…...
【java爬虫】使用selenium获取某交易所公司半年报数据
引言 上市公司的财报数据一般都会进行公开,我们可以在某交易所的官方网站上查看这些数据,由于数据很多,如果只是手动收集的话可能会比较耗时耗力,我们可以采用爬虫的方法进行数据的获取。 本文就介绍采用selenium框架进行公司财…...
MATLAB - 不能使用PYTHON,缺少matplotlib模块的解决办法
matlab缺少python-matplotlib模块的解决办法 1. 前言、概述2. 解决办法3. 可能出现问题4. 结果 1. 前言、概述 起因是我用习惯的colormap函数getPyPlot_cMap不能用了:【这个函数要调用PYTHON】 报错的地方: ModuleNotFoundError: No module named ‘ma…...
mk语法示例
这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…...
英语什么时候加s和es
名词变复数一般情况下加s,以s,x,ch,sh结尾加es。一个名词如果表示一个或一样东西,它取单数形式,如果表示两个或更多的这类东西,则需要用名词复数形式。 1 以s,x,sh,ch结尾的词,加es。 2 以辅音字母(除a/e/…...
unity中方向的两种表示:欧拉角和四元数
欧拉角:简单来说就是你可以选择 0度~360度 的范围 四元数:在计算机图像学中,四元数用于物体的旋转,是一种复杂,但效率较高的旋转方式 Quaternion结构体代表一个四元数,包含一个标量和一个三维向量&#x…...
ViT-L-14.pt下载load checkpoint from xxx
load checkpoint from E:\BaiduNetdiskDownload\sd-webui-aki-v4\models\BLIP\model_base_caption_capfilt_large.pth stable diffusion反推提示词出现此提示时,需安装以下模型至sd-webui-aki-v4.cache\clip\目录 ViT-L-14.pt https://openaipublic.azureedge.net/…...
机械设备经营小程序商城的作用是什么
由于机械设备厂商品牌需要各地招商代理,因此在管理方面也需要工具进行高效管理。如今各个行业都在开展数字化转型解决行业所遇难题或通过线上销售解决传统三公里难题及品牌扩张难题、用户消费渠道少等难题,构建会员体系精细化管理,同时还需要…...
小程序跨页面传递参数的几种方式
当我们在开发小程序时,经常会遇到需要在不同页面之间传递数据的情况。为了实现页面间的数据传递,小程序提供了多种方法。下面将介绍几种常用的传递数据的方法。 URL参数传递:这是一种简单直接的传递数据的方式。在跳转页面时,可以…...
【算法与数据结构】--高级算法和数据结构--高级数据结构
一、堆和优先队列 堆(Heap)是一种特殊的树状数据结构,通常用于实现优先队列。堆有两种主要类型:最大堆和最小堆。最大堆是一棵树,其中每个父节点的值都大于或等于其子节点的值,而最小堆是一棵树࿰…...
小工具 - Python图片转PDF文件
前言 主要整理记载一些python实现的小脚本,网上基本转换要会员,懒得搞了,这个一键生成,可以打包成exe文件使用 单张图片转换成pdf、图片批量转换成pdf # coding UTF-8 import os from io import BytesIO from PIL import Imag…...
bitbucket.org 用法
这个网站需要魔法,注册完成后添加厂库时间2023.10 图1 图2 第二张图 ,不要.gitignore文件 sourcetree 1,创建前端项目 npm create vitelatest 2.打开vscode创建本地Git 看到Git代提交的文件 sourcetree,新建 已存在的本地厂库 提交到Git 添…...
lodash常用方法合集
安装lodash 建议安装lodash-es,lodash-es 是 lodash 的 es modules 版本 ,是着具备 ES6 模块化的版本,体积小。按需引入。 示例 npm i lodash-es import { chunk,compact } from lodash-es; /**按需引入*/ 1.chunk 数组分组 chunk(arra…...
Nginx平滑升级重定向rewrite
文章目录 Nginx平滑升级&重定向rewritenginx平滑升级流程环境查看旧版的配置信息下载新版nginx源码包和功能模块包编译配置新版本平滑升级验证 重定向rewrite配置重定向准发访问测试 Nginx平滑升级&重定向rewrite nginx平滑升级 流程 平滑升级: (升级版本、增加新功…...
Mysql基础与高级汇总
SQL语言分类 DDL:定义 DML:操作 DCL:控制(用于定义访问权限和安全级别) DQL:查询 Sql方言 ->sql:结构化查询语言 mysql:limit oracle:rownum sqlserver:top 但是存储过程:每一种数据库软件一样SQL语法要求: SQL语句可以单行或多行书写&…...
Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...
高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
C++:多态机制详解
目录 一. 多态的概念 1.静态多态(编译时多态) 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1).协变 2).析构函数的重写 5.override 和 final关键字 1&#…...
三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
NPOI操作EXCEL文件 ——CAD C# 二次开发
缺点:dll.版本容易加载错误。CAD加载插件时,没有加载所有类库。插件运行过程中用到某个类库,会从CAD的安装目录找,找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库,就用插件程序加载进…...
【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...
