当前位置: 首页 > news >正文

Redis之zset在异步队列上的应用

当遇到并发的客户端请求时,为了缓解服务端的处理压力,当请求对响应的处理的实时性要求不高时,可以实现一个异步的请求消息队列。

一种实现策略是使用redis的zset,将消息的到期处理时间作为score,然后用多个线程去轮训获取zset中的任务并进行处理。

需要提前考虑一个问题:

如何避免一个任务被多次处理?

一种解决方案是当多个线程获取到任务时,调用redis的zrem命令,将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。

环境

  • JDK17
  • 两个jar包
    • Jedis
    • fastjson2

代码

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import redis.clients.jedis.Jedis;import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;// 基于Redis实现的延迟队列
public class RedisDelayingQueue<T> {static class TaskItem<T> {public String id;public T msg;}// fastjson序列化对象时如果存在泛型,需要使用TypeReferenceprivate Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();private Jedis jedis;private String queueKey;public RedisDelayingQueue(Jedis jedis, String queueKey) {this.jedis = jedis;this.queueKey = queueKey;}// 将任务添加到 zset 中// 分数是延时的时间public void delay(T msg) {TaskItem<T> task = new TaskItem<T>();task.id = UUID.randomUUID().toString();task.msg = msg;// 序列化任务String s = JSON.toJSONString(task);jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s);}public void loop() {while(!Thread.interrupted()) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}public void handleMsg(T msg) {System.out.println(msg);}
}

优化

通过上面loop中代码,多个线程获取到values时,可能会被多个线程同时取到,然后再调用zrem命令去竞争的删除该值,所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的,两种实现方案:

  • 通过对代码块进行加锁的方式
  • 利用redis中lua脚本的原子执行的特点

代码块加锁

这种方案不太好,如果两个命令之间发生了网络错误或者延迟,将造成其它线程的阻塞

    public void synchronizedLoop() {while(!Thread.interrupted()) {synchronized(this) {// 从zset中取出一个任务List<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s = values.iterator().next();if(jedis.zrem(queueKey, s) > 0) {TaskItem<T> task = JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}}

Lua脚本

local key = KEYS[1]
local task = redis.call('ZPOPMIN', key)
if task and next(task) != nil thenredis.call('ZREM', key, task[1])return task[1]
elsereturn nil
end

通过查阅文档发现,ZRANGEBYSCORE从6 版本开始已经过时了,所以这里使用ZPOPMIN来获取分数最小的value,可以达到相同的效果。

通过Jedis的eval函数,调用redis执行lua脚本的命令。

    public void luaLoop() {while(!Thread.interrupted()) {Object ans = jedis.eval(script, 1, queueKey);if(ans != null) {String task = (String) ans;TaskItem<T> taskItem = JSON.parseObject(task, TaskType);this.handleMsg(taskItem.msg);}else{try{Thread.sleep(500);}catch(Exception e) {break;}}}}

为什么可以优化

  • 使用lua脚本的方式,使得一个线程如果zset中有任务都会成功获取任务,而不会多个线程同时拿到同一个任务,再去竞争删除,减少了无效的网络IO

测试程序

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;public class Main {public static void main(String[] args) {JedisPool jedisPool = new JedisPool("url-of-redis", 6379, "username", "pass");Jedis jedis = jedisPool.getResource();RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");// 创建一个线程充当生产者,并向redis中存10个异步任务Thread producer = new Thread() {public void run() {for (int i = 0; i < 10; i++) {queue.delay("codehole" + i);}}};// 创建一个线程充当消费者,不断从redis中取任务并执行Thread consumer = new Thread() {public void run() {queue.luaLoop();}};producer.start();consumer.start();try {// 等待生产者线程执行结束producer.join();Thread.sleep(6000);consumer.interrupt();consumer.join();}catch(InterruptedException e) {e.printStackTrace();}}
}

一些问题

这个问题是关于Jedis的问题,因为我通过上面的方式发起redis请求实际上是存在并发问题的,如果将上述代码中的延时去掉,这个问题发生的概率将大大发生,主要是因为Jedis不是线程安全的,换句话说,通过JedisPool获取redis连接的实例,并发访问是是通过同一个socket发送数据的。

这里使用时,最好是每个线程都用有一个Jedis的实例,避免数据竞争问题.这里只是用了两个线程,所以简单手动使用两个redis实例,如果有多个消费者存在的情况下,还是每个线程单独持有一个Jedis才能解决问题。

    private Jedis readJedis;private Jedis writeJedis;

总结

本篇文章记录了使用zset实现一个简单异步队列的过程,然后对于第一次实现存在的一个问题,使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度,所以一般使用lua脚本的方式来实现。

相关文章:

Redis之zset在异步队列上的应用

当遇到并发的客户端请求时&#xff0c;为了缓解服务端的处理压力&#xff0c;当请求对响应的处理的实时性要求不高时&#xff0c;可以实现一个异步的请求消息队列。 一种实现策略是使用redis的zset&#xff0c;将消息的到期处理时间作为score&#xff0c;然后用多个线程去轮训…...

day4:Node.js 核心库

day4:Node.js 核心库 文章目录 day4:Node.js 核心库常用工具模块util 模块Moment 模块Lodash 模块web模块文件模块path 模块常用工具模块 Node.js有许多常用的工具,以下是一些常见的: util: 是一个Node.js 核心模块,提供常用函数的集合,用于弥补核心 JavaScript 的功能…...

PHP非对称与对称双向加密解密的方式

目录 RSA非对称加密解密&#xff1a; 什么是RSA非对称加密解密解析: 解析: 为什么使用: 有什么优点: DEMO: AES、DES、3DES等对称加密解密: 解析: 为什么使用: 有什么优点: DEMO: RSA非对称加密解密&#xff1a; 什么是RSA非对称加密解密解析: 解析: RSA非对称加密…...

C++之struct匿名结构体实例(二百四十四)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…...

npm publish发布到在线仓库时,提示:Scope not found

当npm publish发布时&#xff0c;控制台提示&#xff1a;Scope not found&#xff0c;具体错误信息如下&#xff1a; npm notice npm ERR! code E404 npm ERR! 404 Not Found - PUT https://registry.npmjs.org/xxx%2fxxx - Scope not found npm ERR! 404 npm ERR! 404 xxx/xx…...

AWS Lambda 操作 RDS 示例

实现目标 创建一个 Lambda 接收调用时传入的数据, 写入 RDS 数据库 Post 表存储文章信息. 表结构如下: idtitlecontentcreate_date1我是标题我是正文内容2023-10-21 15:20:00 AWS 资源准备 RDS 控制台创建 MySQL 实例, 不允许 Public access (后面 Lambda 需要通过 VPC 访问…...

【java爬虫】使用selenium获取某交易所公司半年报数据

引言 上市公司的财报数据一般都会进行公开&#xff0c;我们可以在某交易所的官方网站上查看这些数据&#xff0c;由于数据很多&#xff0c;如果只是手动收集的话可能会比较耗时耗力&#xff0c;我们可以采用爬虫的方法进行数据的获取。 本文就介绍采用selenium框架进行公司财…...

MATLAB - 不能使用PYTHON,缺少matplotlib模块的解决办法

matlab缺少python-matplotlib模块的解决办法 1. 前言、概述2. 解决办法3. 可能出现问题4. 结果 1. 前言、概述 起因是我用习惯的colormap函数getPyPlot_cMap不能用了&#xff1a;【这个函数要调用PYTHON】 报错的地方&#xff1a; ModuleNotFoundError: No module named ‘ma…...

mk语法示例

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…...

英语什么时候加s和es

名词变复数一般情况下加s&#xff0c;以s,x,ch,sh结尾加es。一个名词如果表示一个或一样东西&#xff0c;它取单数形式&#xff0c;如果表示两个或更多的这类东西&#xff0c;则需要用名词复数形式。 1 以s,x,sh,ch结尾的词&#xff0c;加es。 2 以辅音字母&#xff08;除a/e/…...

unity中方向的两种表示:欧拉角和四元数

欧拉角&#xff1a;简单来说就是你可以选择 0度~360度 的范围 四元数&#xff1a;在计算机图像学中&#xff0c;四元数用于物体的旋转&#xff0c;是一种复杂&#xff0c;但效率较高的旋转方式 Quaternion结构体代表一个四元数&#xff0c;包含一个标量和一个三维向量&#x…...

ViT-L-14.pt下载load checkpoint from xxx

load checkpoint from E:\BaiduNetdiskDownload\sd-webui-aki-v4\models\BLIP\model_base_caption_capfilt_large.pth stable diffusion反推提示词出现此提示时&#xff0c;需安装以下模型至sd-webui-aki-v4.cache\clip\目录 ViT-L-14.pt https://openaipublic.azureedge.net/…...

机械设备经营小程序商城的作用是什么

由于机械设备厂商品牌需要各地招商代理&#xff0c;因此在管理方面也需要工具进行高效管理。如今各个行业都在开展数字化转型解决行业所遇难题或通过线上销售解决传统三公里难题及品牌扩张难题、用户消费渠道少等难题&#xff0c;构建会员体系精细化管理&#xff0c;同时还需要…...

小程序跨页面传递参数的几种方式

当我们在开发小程序时&#xff0c;经常会遇到需要在不同页面之间传递数据的情况。为了实现页面间的数据传递&#xff0c;小程序提供了多种方法。下面将介绍几种常用的传递数据的方法。 URL参数传递&#xff1a;这是一种简单直接的传递数据的方式。在跳转页面时&#xff0c;可以…...

【算法与数据结构】--高级算法和数据结构--高级数据结构

一、堆和优先队列 堆&#xff08;Heap&#xff09;是一种特殊的树状数据结构&#xff0c;通常用于实现优先队列。堆有两种主要类型&#xff1a;最大堆和最小堆。最大堆是一棵树&#xff0c;其中每个父节点的值都大于或等于其子节点的值&#xff0c;而最小堆是一棵树&#xff0…...

小工具 - Python图片转PDF文件

前言 主要整理记载一些python实现的小脚本&#xff0c;网上基本转换要会员&#xff0c;懒得搞了&#xff0c;这个一键生成&#xff0c;可以打包成exe文件使用 单张图片转换成pdf、图片批量转换成pdf # coding UTF-8 import os from io import BytesIO from PIL import Imag…...

bitbucket.org 用法

这个网站需要魔法&#xff0c;注册完成后添加厂库时间2023.10 图1 图2 第二张图 &#xff0c;不要.gitignore文件 sourcetree 1,创建前端项目 npm create vitelatest 2.打开vscode创建本地Git 看到Git代提交的文件 sourcetree&#xff0c;新建 已存在的本地厂库 提交到Git 添…...

lodash常用方法合集

安装lodash 建议安装lodash-es&#xff0c;lodash-es 是 lodash 的 es modules 版本 &#xff0c;是着具备 ES6 模块化的版本&#xff0c;体积小。按需引入。 示例 npm i lodash-es import { chunk,compact } from lodash-es; /**按需引入*/ 1.chunk 数组分组 chunk(arra…...

Nginx平滑升级重定向rewrite

文章目录 Nginx平滑升级&重定向rewritenginx平滑升级流程环境查看旧版的配置信息下载新版nginx源码包和功能模块包编译配置新版本平滑升级验证 重定向rewrite配置重定向准发访问测试 Nginx平滑升级&重定向rewrite nginx平滑升级 流程 平滑升级: (升级版本、增加新功…...

Mysql基础与高级汇总

SQL语言分类 DDL:定义 DML&#xff1a;操作 DCL:控制(用于定义访问权限和安全级别) DQL:查询 Sql方言 ->sql&#xff1a;结构化查询语言 mysql:limit oracle:rownum sqlserver:top 但是存储过程&#xff1a;每一种数据库软件一样SQL语法要求: SQL语句可以单行或多行书写&…...

电力系统时间同步系统之三

2.6 电力系统时间同步装置 时间同步装置主要完成时间信号和时间信息的同步传递&#xff0c;并提供相应的时间格式和物理接口。时间同步装置主要由三大部分组成&#xff1a;时间输入、内部时钟和时间输出&#xff0c;如图 2-25 所示。输入装置的时间信号和时间信息的精度必须不…...

vue3 + vite实现动态路由,并进行vuex持久化设计

在后台管理系统中&#xff0c;如何根据后端返回的接口&#xff0c;来动态的设计路由呢&#xff0c;今天一片文章带你们解 1、在vuex中设置一个方法 拿到完整的路由数据 const state {routerList: []}; const mutations { dynameicMenu(state, payload) {// 第一步 通过glob…...

从零开始开发纯血鸿蒙应用之网络检测

从零开始开发纯血鸿蒙应用 〇、前言一、认识 connection 模块1、获取默认网络2、获取网络能力信息3、解析网络能力信息3.1、NetCap3.2、NetBearType 二、实现网络检测功能1、申请权限2、获取默认网路的 NetCap 数组 三、总结 〇、前言 在之前的博文里&#xff0c;介绍了如何实…...

Linux进程(中)

目录 进程等待 为什么有进程等待 什么是进程等待 怎么做到进程等待 wait waitpid 进程等待 为什么有进程等待 僵尸进程无法杀死&#xff0c;需要进程等待来消灭他&#xff0c;进而解决内存泄漏问题--必须解决的 我们要通过进程等待&#xff0c;获得子进程退出情况--知…...

运行示例程序和一些基本操作

欢迎 ----> 示例 --> 选择sample CTRL B 编译代码 CTRL R 运行exe 项目 中 Shadow build 表示是否 编译生成文件和 源码是否放一块 勾上不在同一个地方 已有项目情况下怎么打开项目 方法一: 左键双击 xxx.pro 方法二: 文件菜单里面 选择打开项目...

基于Django开发的运动商城系统项目

运动商城系统项目描述 运动商城系统是一个基于现代Web技术构建的电子商务平台&#xff0c;专注于运动类商品的在线销售与管理。该系统采用前后端分离架构&#xff0c;前端使用Vue.js实现动态交互界面&#xff0c;后端基于Django框架提供RESTful API支持&#xff0c;数据库采用…...

嵌入式面试高频!!!C语言(四)(嵌入式八股文,嵌入式面经)

更多嵌入式面试文章见下面连接&#xff0c;会不断更新哦&#xff01;&#xff01;关注一下谢谢&#xff01;&#xff01;&#xff01;&#xff01; ​​​​​​​https://blog.csdn.net/qq_61574541/category_12976911.html?fromshareblogcolumn&sharetypeblogcolumn&…...

Vue3+Vite中lodash-es安装与使用指南

在 Vue 3 Vite 项目中安装和使用 lodash-es 的详细指南如下&#xff1a; 一、为什么选择 lodash-es&#xff1f; ES 模块支持&#xff1a;lodash-es 以原生 ES 模块格式发布&#xff0c;支持现代构建工具的 Tree Shaking 按需加载&#xff1a;只引入需要的函数&#xff0c;显…...

黑龙江云前沿服务器租用:便捷高效的灵活之选​

服务器租用&#xff0c;即企业直接从互联网数据中心&#xff08;IDC&#xff09;提供商处租赁服务器。企业只需按照所选的服务器配置和租赁期限&#xff0c;定期支付租金&#xff0c;即可使用服务器开展业务。​ 便捷快速部署&#xff1a;租用服务器能极大地缩短服务器搭建周期…...

优化电脑的磁盘和驱动器提高电脑性能和延长硬盘寿命?

磁盘优化 磁盘清理&#xff1a; 使用系统自带的磁盘清理工具&#xff08;如Windows的“磁盘清理”&#xff09;删除不必要的文件。清空回收站。删除临时文件和缓存。 磁盘碎片整理&#xff08;针对机械硬盘&#xff09;&#xff1a; 定期进行磁盘碎片整理&#xff0c;以提高文…...