当前位置: 首页 > news >正文

redis实现分布式延时队列

文章目录

  • 延时队列简介
  • 应用场景
  • 案例:
  • 考虑:
  • 实现:
    • 整体思路:
    • 具体实现
      • 生产者
      • 消费者
    • 运行结果
  • redis分布式延时队列优势
  • redis分布式延时队列劣势

延时队列简介

延时队列是一种特殊的消息队列,它允许将消息在一定的延迟时间后再进行消费。延时队列的主要特点是可以延迟消息的处理时间,以满足定时任务或者定时事件的需求。

总之,延时队列通过延迟消息的消费时间,提供了一种方便、可靠的方式来处理定时任务和定时事件。它在分布式系统中具有重要的作用,能够提高系统的可靠性和性能。

延时队列的实现方式可以有多种,本文介绍一种redis实现的分布式延时队列。

应用场景

  • 定时任务:可以将需要在特定时间执行的任务封装为延时消息,通过延时队列来触发任务的执行。

  • 订单超时处理:可以将订单消息发送到延时队列中,并设置订单的超时时间,超过时间后,消费者从队列中获取到超时的订单消息,进行相应的处理。

  • 消息重试机制:当某个消息处理失败时,可以将该消息发送到延时队列中,并设置一定的重试时间,超过时间后再次尝试处理。

案例:

12306火车票购买,抢了订单后,45分钟没有支付,自动取消订单

考虑:

数据持久化:redis是支持的,可以使用rdb,也可以使用aof

有序存储:因为只要最小的没过期,后面的肯定就没过期,这样的话检查最小的节点就行了,考虑使用redis中的zset结构

高可用:考虑哨兵或者cluster

高伸缩:因为12306用户量非常大,可能导致redis中存储的任务空间非常大,所以考虑扩展节点,从这个角度来说,使用cluster集群模式,哨兵只有一个节点即主节点写数据。

实现:

整体思路:

  • 生产消费者模型:因为12306的用户量非常大,所以考虑生产者和消费者有多个节点;
  • 采用cluster模式实现高可用以及高伸缩性
  • 采用zset存储延时任务(zadd key score member,score表示时间);
  • 为了让数据均匀分布在cluster集群中的多个主节点中:构建多个zset,每个zset对应一个消费者,生产者随机向某个zset中生产数据。

具体实现

生产者

需要安装hiredis-cluster集群,安装编译如下:

git clone https://github.com/Nordix/hiredis-cluster.git
cd hiredis-cluster
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -
DENABLE_SSL=ON ..
make
sudo make install
sudo ldconfig

需要安装libevent库,最后编译时执行gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl编译生产者可执行程序

#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>
#include <event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <sys/time.h>int64_t g_taskid = 0;#define MAX_KEY 10static int64_t hi_msec_now() {int64_t msec;struct timeval now;int status;status = gettimeofday(&now, NULL);if (status < 0) {return -1;}msec = (int64_t)now.tv_sec * 1000LL + (int64_t)(now.tv_usec / 1000LL);return msec;
}static int _vscnprintf(char *buf, size_t size, const char *fmt, va_list args) {int n;n = vsnprintf(buf, size, fmt, args);if (n <= 0) {return 0;}if (n <= (int)size) {return n;}return (int)(size-1);
}static int _scnprintf(char *buf, size_t size, const char *fmt, ...) {va_list args;int n;va_start(args, fmt);n = _vscnprintf(buf, size, fmt, args);va_end(args);return n;
}void connectCallback(const redisAsyncContext *ac, int status) {if (status != REDIS_OK) {printf("Error: %s\n", ac->errstr);return;}printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}void disconnectCallback(const redisAsyncContext *ac, int status) {if (status != REDIS_OK) {printf("Error: %s\n", ac->errstr);return;}printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}void addTaskCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {redisReply *reply = (redisReply *)r;if (reply == NULL) {if (cc->errstr) {printf("errstr: %s\n", cc->errstr);}return;}int64_t now = hi_msec_now() / 10;printf("add task success reply: %lld now=%ld\n", reply->integer, now);
}int addTask(redisClusterAsyncContext *cc, char *desc) {/* 转化为厘米秒 */int64_t now = hi_msec_now() / 10;g_taskid++;/* key */char key[256] = {0};// 为了让数据均匀分布在cluster集群中的多个主节点中:// 构建多个zset,每个zset对应一个消费者,生产者随机向某个zset中生产数据,// 生产者可以有很多个,只需要保证向task_group:0-task_group:9中均匀的生产数据即可int len = _scnprintf(key, 255, "task_group:%ld", g_taskid % MAX_KEY);key[len] = '\0';/* member */char mem[1024] = {0};len = _scnprintf(mem, 1023, "task:%ld:%s", g_taskid, desc);mem[len] = '\0';int status;// 为每一个任务延时5秒中去处理status = redisClusterAsyncCommand(cc, addTaskCallback, "","zadd %s %ld %s", key, now+500, mem);printf("redisClusterAsyncCommand:zadd %s %ld %s\n", key, now+500, mem);if (status != REDIS_OK) {printf("error: err=%d errstr=%s\n", cc->err, cc->errstr);}return 0;
}void stdio_callback(struct bufferevent *bev, void *arg) {redisClusterAsyncContext *cc = (redisClusterAsyncContext *)arg;struct evbuffer *evbuf = bufferevent_get_input(bev);char *msg = evbuffer_readln(evbuf, NULL, EVBUFFER_EOL_LF);if (!msg) return;if (strcmp(msg, "quit") == 0) {printf("safe exit!!!\n");exit(0);return;}if (strlen(msg) > 1024-5-13-1) {printf("[err]msg is too long, try again...\n");return;}addTask(cc, msg);printf("stdio read the data: %s\n", msg);
}int main(int argc, char **argv) {printf("Connecting...\n");// 连接cluster集群,可以从cluster集群中任意一个节点出发连接集群redisClusterAsyncContext *cc =redisClusterAsyncConnect("127.0.0.1:7006", HIRCLUSTER_FLAG_NULL);printf("redisClusterAsyncContext...\n");if (cc && cc->err) {printf("Error: %s\n", cc->errstr);return 1;}struct event_base *base = event_base_new();redisClusterLibeventAttach(cc, base);redisClusterAsyncSetConnectCallback(cc, connectCallback);redisClusterAsyncSetDisconnectCallback(cc, disconnectCallback);// nodeIterator ni;// initNodeIterator(&ni, cc->cc);// cluster_node *node;// while ((node = nodeNext(&ni)) != NULL) {//     printf("node %s:%d role:%d pad:%d\n", node->host, node->port, node->role, node->pad);// }struct bufferevent *ioev = bufferevent_socket_new(base, 0, BEV_OPT_CLOSE_ON_FREE);bufferevent_setcb(ioev, stdio_callback, NULL, NULL, cc);bufferevent_enable(ioev, EV_READ | EV_PERSIST);printf("Dispatch..\n");event_base_dispatch(base);printf("Done..\n");redisClusterAsyncFree(cc);event_base_free(base);return 0;
}// 需要安装 hiredis-cluster libevent
// gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl

说明:

这里构建了10个zset,分别是task_group:0,task_group:1,…,task_group:9作为10个zset的key,zset的数据其实就代表着消费者的数量,通常消费者的功能是一摸一样的,生产者就不管你有多少个了,只需要将任务均匀的打散在不同的zset中就行了(具体实现可以搞一个全局的id,每一次添加任务时id++,然后再对zset个数10取模,最终可以得到0-9之间的一个数,然后再与task_group拼接,这样就可以将任务均匀的打散在不同的zset中)。

消费者

消费者是采用skynet+lua脚本实现的,每个消费者会不断的去检查redis中的任务有没有过期,如果过期,就取出来删除(这里只是demo,只是打印之后删除任务)

local skynet = require "skynet"local function table_dump( object )if type(object) == 'table' thenlocal s = '{ 'for k,v in pairs(object) doif type(k) ~= 'number' then k = string.format("%q", k) ends = s .. '['..k..'] = ' .. table_dump(v) .. ','endreturn s .. '} 'elseif type(object) == 'function' thenreturn tostring(object)elseif type(object) == 'string' thenreturn string.format("%q", object)elsereturn tostring(object)end
endlocal mode, key = ...
if mode == "slave" thenlocal rediscluster = require "skynet.db.redis.cluster"local function onmessage(data,channel,pchannel)print("onmessage",data,channel,pchannel)endskynet.start(function ()local db = rediscluster.new({{host="127.0.0.1",port=7001},},{read_slave=true,auth=nil,db=0,},onmessage)assert(db, "redis-cluster startup error")skynet.fork(function ()while true dolocal res = db:zrange(key, 0, 0, "withscores")if not next(res) thenskynet.sleep(50)elselocal expire = tonumber(res[2])local now = skynet.time()*100if now >= expire thenprint(("%s is comsumed:expire_time:%d"):format(res[1], expire))db:zrem(key, res[1])elseskynet.sleep(10)endendendend)end)elseskynet.start(function ()	-- // 启动10个程序,并把"slave"传入mode,task_group:i传入到key中,即每个程序只消费一个for i=0,9 doskynet.newservice(SERVICE_NAME, "slave", "task_group:"..i)

运行结果

在这里插入图片描述

redis分布式延时队列优势

1.Redis zset支持高性能的 score 排序。

2.Redis是在内存上进行操作的,速度非常快。

3.Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。

4.Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性

redis分布式延时队列劣势

使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题:

  • 没有重试机制 - 处理消息出现异常没有重试机制, 这些需要自己去实现, 包括重试次数的实现等;
  • 没有 ACK 机制 - 例如在获取消息并已经删除了消息情况下, 正在处理消息的时候客户端崩溃了, 这条正在处理的这些消息就会丢失, MQ 是需要明确的返回一个值给 MQ 才会认为这个消息是被正确的消费了。

总结:如果对消息可靠性要求较高, 推荐使用 MQ 来实现

相关文章:

redis实现分布式延时队列

文章目录 延时队列简介应用场景案例&#xff1a;考虑&#xff1a;实现&#xff1a;整体思路&#xff1a;具体实现生产者消费者 运行结果 redis分布式延时队列优势redis分布式延时队列劣势 延时队列简介 延时队列是一种特殊的消息队列&#xff0c;它允许将消息在一定的延迟时间…...

Spring AOP源码解读

今天我们来分析Spring中AOP的源码&#xff0c;主要是关于SpringAOP是如何发挥作用的。 前期准备 首先我们需要有一个Spring AOP项目&#xff0c;添加好了SpringAOP的依赖。 <dependency><groupId>org.springframework</groupId><artifactId>spring-co…...

JavaScript基础入门01

目录 1.初识 JavaScript 1.1JavaScript 是什么 1.2发展历史 1.3JavaScript 和 HTML 和 CSS 之间的关系 2.JavaScript 的组成 3.前置知识 3.1第一个程序 4.JavaScript 的书写形式 4.1 行内式 4.2. 内嵌式 4.3.外部式 5.注释 6.输入输出 6.1输入: prompt 6.2输出: …...

yum 命令

基本语法 yum [选项] [参数] 选项说明 -y 对所有提问都回答“yes” 参数说明 实操 yum list | grep firefox yum -y remove firefox yum -y install firefox...

Nginx 部署多个安全域名,多个服务【工作记录】

以下是本人通过Docker 部署的Nginx挂载出来的文件目录 先看下 nginx.conf 配置文件内容&#xff1a;如下 ps&#xff1a;当前文件就是安装后的初始内容&#xff0c;无修改。主要关注最后一行 include /etc/nginx/conf.d/*.conf;表示引入其他目录下的.conf配置文件&#xff1b;…...

性能测试QPS+TPS+事务基础知识分析

本篇文章是性能测试基础篇&#xff0c;主要介绍了性能测试中对QPSTPS事务的基础知识分析&#xff0c;有需要的朋友可以借鉴参考下&#xff0c;希望可以对广大读者有所帮助 事务 就是用户某一步或几步操作的集合。不过&#xff0c;我们要保证它有一个完整意义。比如用户对某一…...

PSP - 蛋白质复合物 AlphaFold2 Multimer MSA Pairing 逻辑与优化

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/134144591 在蛋白质复合物结构预测中&#xff0c;当序列 (Sequence) 是异源多链时&#xff0c;无论是AB&#xff0c;还是AABB&#xff0c;都需要 …...

C++中vec.size()-1的坑

问题描述&#xff1a;如下代码&#xff0c; #include <iostream> #include <vector>using namespace std;int main() {vector<int> vec {};for (int i 0; i < vec.size() - 1; i) {cout << "i " << i << ", vec[i] …...

Flask Shell 操作 SQLite

一、前言 这段时间在玩Flask Web&#xff0c;发现用Flask Shell去操作SQLite还是比较方便的。今天简单地介绍一下。 二、SQLite SQLite是一种嵌入式数据库&#xff0c;它的数据库就是一个文件&#xff0c;处理速度快&#xff0c;经常被集成在各种应用程序中&#xff0c;在IO…...

Mybatis—XML配置文件、动态SQL

学习完Mybatis的基本操作之后&#xff0c;继续学习Mybatis—XML配置文件、动态SQL。 目录 Mybatis的XML配置文件XML配置文件规范XML配置文件实现MybatisX的使用 Mybatis动态SQL动态SQL-if条件查询 \<if\>与\<where\>更新员工 \<set\>小结 动态SQL-\<forea…...

excel求差公式怎么使用?

利用excel求差&#xff0c;可能有许多的小伙伴已经会了&#xff0c;不过还是存在一些不太熟悉的朋友们&#xff0c;所以这里有必要讲解一下。其实求差的实现主要就是一个公式&#xff0c;就是用一个单元格中的数字“减去”另一个单元格中的数字“等于”第三个单元格。此公式掌握…...

高效分割分段视频:提升您的视频剪辑能力

在数字媒体时代&#xff0c;视频剪辑已经成为一项重要的技能。无论是制作个人影片、广告还是其他类型的视频内容&#xff0c;掌握高效的视频剪辑技巧都是必不可少的。本文将介绍如何引用云炫AI智剪高效地分割和分段视频&#xff0c;以提升您的视频剪辑能力。以下是详细的操作步…...

【c++|opencv】二、灰度变换和空间滤波---2.直方图和均衡化

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 图像直方图、直方图均衡化 1. 图像直方图 #include <iostream> #include <opencv2/opencv.hpp>using namespace cv; using namespace std;…...

【Windows】线程同步之信号量(Semaphores)

概述&#xff1a; semaphores 的说明和使用 微软官方文档&#xff1a; Semaphore Objects - Win32 apps | Microsoft Learn Semaphores是解决各种 producer/consumer问题的关键要素。这种问题会存有一个缓冲区&#xff0c;可能在同一时间内被读出数据或被写入数据。 理论可以证…...

二叉树问题——前中后遍历数组构建二叉树

摘要 利用二叉树的前序&#xff0c;中序&#xff0c;后序&#xff0c;有序数组来构建相关二叉树的问题。 一、构建二叉树题目 105. 从前序与中序遍历序列构造二叉树 106. 从中序与后序遍历序列构造二叉树 889. 根据前序和后序遍历构造二叉树 617. 合并二叉树 226. 翻转二…...

Java保留n位小数的方法(超简洁)

要输出double类型保留n位小数的几种方法如下&#xff1a; 我们以保留6位小数为例 方法一&#xff1a;使用DecimalFormat类 import java.text.DecimalFormat;public class Main {public static void main(String[] args) {double number 3.141592653589793;DecimalFormat df …...

JavaEE-博客系统1(数据库和后端的交互)

本部分内容包括网站设计总述&#xff0c;数据库和后端的交互&#xff1b; 数据库操作代码如下&#xff1a; -- 编写SQL完成建库建表操作 create database if not exists java_blog_system charset utf8; use java_blog_system; -- 建立两张表&#xff0c;一个存储博客信息&am…...

【unity/vufornia】Duplicate virtual buttons with name.../同一个ImageTarget上多个按钮失灵

问题&#xff1a;在同一个ImageTarget上添加多个按钮时无法触发对应按钮的事件。 解决过程&#xff1a; 1.查看报错&#xff1a;“Duplicate virtual buttons with name...”这一行&#xff0c;顾名思义&#xff0c;命名重复。 2.英文搜索到以下文章&#xff0c;应该在inspe…...

Apache ActiveMQ 远程代码执行漏洞复现(CNVD-2023-69477)

Apache ActiveMQ 远程代码执行RCE漏洞复现&#xff08;CNVD-2023-69477&#xff09; 上周爆出来的漏洞&#xff0c;正好做一下漏洞复现&#xff0c;记录一下 1.漏洞描述 ​ Apache ActiveMQ 中存在远程代码执行漏洞&#xff0c;具有 Apache ActiveMQ 服务器TCP端口&#xff…...

项目管理-科学管理基础-线性规划介绍及例题

项目管理中的线性规划是什么? 在项目管理中,线性规划是一种数学建模和优化技术,用于解决资源分配和进度规划的问题。线性规划的目标是在给定的资源限制下,找到最佳的资源分配方案,以满足项目的需求并优化特定的目标,如成本最小化或时间最短化。 线性规划的基本元素包括…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

Windows安装Miniconda

一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...