当前位置: 首页 > news >正文

Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

Asynq[1]是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker

alt

同时提供一个WebUI asynqmon[5],可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错[6]

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379


➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go

0 directories, 5 files

其中const.go:

package main

const (
 redisAddr   = "127.0.0.1:6379"
 redisPasswd = ""
)

const (
 TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:


package main

import (
 "encoding/json"
 "fmt"
 "log"
 "time"

 "github.com/hibiken/asynq"
)

type ExampleTaskPayload struct {
 UserID string
 Msg    string
 // 业务需要的其他字段
}

func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
 payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
 if err != nil {
  return nil, err
 }
 return asynq.NewTask(TypeExampleTask, payload), nil
}

var client *asynq.Client

func main() {

 client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
 defer client.Close()

 //go startExampleTask()
 startExampleTask()

 //startGithubUpdate() // 定时触发
}

func startExampleTask() {

 fmt.Println("开始执行一次性的任务")
 // 立刻执行
 task1, err := NewExampleTask("10001""mashangzhixing!")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err := client.Enqueue(task1)
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 10秒后执行(定时执行)
 task2, err := NewExampleTask("10002""10s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 30s后执行(定时执行)
 task3, err := NewExampleTask("10003""30s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 theTime := time.Now().Add(30 * time.Second)
 info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main

import (
 "context"
 "encoding/json"
 "fmt"
 "time"

 "github.com/davecgh/go-spew/spew"
 "github.com/hibiken/asynq"
)

var AsynqServer *asynq.Server // 异步任务server

func initTaskServer() error {
 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,
    "default":  3,
    "low":      1,
   },
   // See the godoc for other configuration options
  },
 )
 return nil
}

func main() {
 initTaskServer()
 mux := asynq.NewServeMux()

 mux.HandleFunc(TypeExampleTask, HandleExampleTask)
 // ...register other handlers...

 if err := AsynqServer.Run(mux); err != nil {
  fmt.Printf("could not run asynq server: %v", err)
 }
}

func HandleExampleTask(ctx context.Context, t *asynq.Task) error {

 res := make(map[string]string)

 spew.Dump("t.Payload() is:", t.Payload())
 err := json.Unmarshal(t.Payload(), &res)
 if err != nil {
  fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
  return nil
 }
 //-----------具体处理逻辑------------
 spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
 fmt.Println()
 // 模拟具体的处理
 time.Sleep(5 * time.Second)
 fmt.Println("--------------处理了5s,处理完成-----------------")

 return nil

}

执行redis-server


清除redis中所有的key:


执行docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379

alt

执行 go run client.go const.go (生产者,产生消息放入队列)

alt

此时能看到redis中多个几个key

alt

同时管理后台能看到队列的信息

alt

执行 go run server.go const.go (消费者,消费队列中的消息)

alt

可以看到都被处理了

alt

此时redis中的key:

alt

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作




实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。

完整Demo[7] push github的功能没有完全实现


另外可以配置队列的优先级,asynq队列如何配置队列优先级[8]

 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,//关键队列中的任务将被处理 60% 的时间
    "default":  3,//默认队列中的任务将被处理 30% 的时间
    "low":      1,//低队列中的任务将被处理 10% 的时间
   },
   // See the godoc for other configuration options
  },
 )

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误[9]

参考资料

[1]

Asynq: https://github.com/hibiken/asynq

[2]

sidekiq: https://github.com/sidekiq/sidekiq

[3]

celery: https://github.com/celery/celery

[4]

machinery: https://blog.csdn.net/weixin_42681866/article/details/123334654

[5]

asynqmon: https://github.com/hibiken/asynqmon

[6]

报错: https://github.com/hibiken/asynqmon/issues/214

[7]

完整Demo: https://github.com/cuishuang/asynq-demo

[8]

asynq队列如何配置队列优先级: https://blog.csdn.net/itopit/article/details/126123626

[9]

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误: https://my.oschina.net/randolphcyg/blog/5539676

本文由 mdnice 多平台发布

相关文章:

Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

Asynq[1]是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker 同时提供一个WebUI asynqmon[5],可以源码形式安装或使用Docker image, 还可以和Prometheus…...

保证率计算公式 正态分布

在正态分布中,如果我们要计算一个给定区间内的保证率,可以使用下面的计算公式: 找到给定保证率对应的标准正态分布的z值。可以使用标准正态分布表或计算器进行查询。例如,对于95%的保证率,对应的z值为1.96。 使用z值和…...

docker容器监控:Cadvisor+InfluxDB+Grafana的安装部署

目录 CadvisorInfluxDBGrafan安装部署 1、安装docker-ce 2、阿里云镜像加速器 3、下载组件镜像 4、创建自定义网络 5、创建influxdb容器 6、创建Cadvisor 容器 7、查看Cadvisor 容器: (1)准备测试镜像 (2)通…...

论文讲解——TPU-MLIR: A Compiler For TPU Using MLIR

论文讲解——TPU-MLIR: A Compiler For TPU Using MLIR https://arxiv.org/pdf/2210.15016.pdf概览模型转换TranslationCanonicalizeLoweringLayerGroup BufferizationCalibration QuantizationCorrectness Check相关资料 https://arxiv.org/pdf/2210.15016.pdf 本文将对TPU…...

基于最新导则下生态环评报告编制技术暨报告篇、制图篇、指数篇、综合应用篇系统性实践技能提升

查看原文>>>基于最新导则下生态环评报告编制技术暨报告篇、制图篇、指数篇、综合应用篇系统性实践技能提升 目录 专题一、生态环评报告编制规范 专题二、土地利用图 专题三、植被类型及植被覆盖度图 专题四、物种适宜生境分布图 专题五、生物多样性测定 专题六…...

NGZORRO:动态表单/模型驱动 的相关问题

官网的demo的[nzFor]"control.controlInstance"&#xff0c;似乎是靠[formControlName]"control.controlInstance"来关联的。 <form nz-form [formGroup]"validateForm" (ngSubmit)"submitForm()"><nz-form-item *ngFor&quo…...

第十七次CCF计算机软件能力认证

第一题&#xff1a;小明种苹果 n , m map(int , input().split()) t , k , p 0 , 0 , -1 for _ in range(n):l list(map(int , input().split()))t sum(l)x -sum(l[i] for i in range(1 , len(l)))if x > p:p xk _ 1 print(t , k , p) 第二题&#xff1a;小明种苹…...

ApplicationContext在Spring Boot中是如何创建的?

一、ApplicationContext在Spring Boot中是如何创建的&#xff1f; 1. SpringApplication ApplicationContextFactory有三个实现类&#xff0c;分别是AnnotationConfigReactiveWebServerApplicationContext.Factory、AnnotationConfigServletWebServerApplicationContext.Facto…...

后端开发7.轮播图模块【mongdb开发】

概述 轮播图模块数据库采用mongdb开发 效果图 数据库设计 创建数据库 use sc; 添加数据 db.banner.insertMany([ {bannerId:"1",bannerName:"商城轮播图1",bannerUrl:"http://xx:8020/img/轮播图/shop1.png"}, {bannerId:"2"…...

Linux常用命令(一):创建文件目录

一、touch&#xff1a; 1、作用&#xff1a; 1). 改变已有文件的时间戳属性&#xff0c;修改文件时间戳时&#xff0c;用户必须的文件的属主&#xff0c;或者拥有写文件的权限 2). 创建新的空文件 2、语法&#xff1a; touch [option] 文件名 ,后面可跟多个文件名3、示例 …...

如何创建一个Vue组件?如何在父组件和子组件之间传递数据?如何在子组件中向父组件发送消息?

1、如何创建一个Vue组件&#xff1f; 要创建一个Vue组件&#xff0c;可以按照以下步骤进行&#xff1a; 安装Vue CLI&#xff08;如果还没有安装&#xff09;&#xff1a; npm install -g vue/cli创建一个新的Vue组件&#xff1a; vue create my-component在 src/component…...

设计模式之适配器模式

一、概述 将一个类的接口转换成客户希望的另外一个接口。Adapter模式使得原本由于接口不兼容而不能一起工作的那些类可以一起工作。 二、适用性 1.你想使用一个已经存在的类&#xff0c;而它的接口不符合你的需求。 2.你想创建一个可以复用的类&#xff0c;该类可以与其他不…...

让ChatGPT介绍一下ChatGPT(ChatGPT的自我介绍)

ChatGPT是这样介绍自己的&#xff1a; ChatGPT是由OpenAI开发的一种基于大规模预训练的语言模型。它是建立在GPT&#xff08;Generative Pre-trained Transformer&#xff09;架构的基础上&#xff0c;经过大量的数据训练而成。 ChatGPT旨在通过对话与用户进行交互&#xff0…...

CentOS 7 构建 LVS-DR 群集

一、LVS-DR集群摘要 LVS&#xff08;Linux Virtual Server&#xff09;是一个用于构建可扩展和高可用性的负载均衡集群的软件。它基于Linux操作系统&#xff0c;并提供了一种将网络流量分发到多个后端服务器的机制。 二、基本工作原理 配置负载均衡器&#xff1a;在LVS集群中…...

MySQL8.0.33二进制包安装与部署

官方文档 https://downloads.mysql.com/archives/community/https://dev.mysql.com/doc/refman/8.1/en/binary-installation.html官方文档操作步骤 # Preconfiguration setup $> groupadd mysql $> useradd -r -g mysql -s /bin/false mysql # Beginning of source-build…...

RocketMQ发送消息失败:error CODE: 14 DESC: service not available now, maybe disk full

在执行业务时&#xff0c;发现MQ控制台没有查询到消息&#xff0c;在日志中发现消息发送失败&#xff0c;报错error CODE: 14 DESC: service not available now, maybe disk full 分析报错应该是磁盘空间不足&#xff0c;导致broker不能进行正常的消息存储刷盘&#xff0c;去查…...

1.Fay-UE5数字人工程导入(UE数字人系统教程)

非常全面的数字人解决方案(含源码) Fay-UE5数字人工程导入 1、工程下载&#xff1a;xszyou/fay-ue5: 可对接fay数字人的ue5工程 (github.com) 2、ue5下载安装&#xff1a;Unreal Engine 5 3、ue5插件安装 依次安装以下几个插件 4、双击运行工程 5、切换中文 6、检…...

Linux 终端操作命令(2)内部命令分类

Linux 终端操作命令 也称Shell命令&#xff0c;是用户与操作系统内核进行交互的命令解释器&#xff0c;它接收用户输入的命令并将其传递给操作系统进行执行&#xff0c;可分为内部命令和外部命令。内部命令是Shell程序的一部分&#xff0c;而外部命令是独立于Shell的可执行程序…...

【数据结构与算法】十大经典排序算法-插入排序

&#x1f31f;个人博客&#xff1a;www.hellocode.top &#x1f3f0;Java知识导航&#xff1a;Java-Navigate &#x1f525;CSDN&#xff1a;HelloCode. &#x1f31e;知乎&#xff1a;HelloCode &#x1f334;掘金&#xff1a;HelloCode ⚡如有问题&#xff0c;欢迎指正&#…...

如何使用PHP Smarty进行条件判断和循环?

欢迎来到PHP Smarty的世界&#xff01;如果你想要在Smarty中执行条件判断和循环&#xff0c;那么你需要了解一些基本的语法和结构。 首先&#xff0c;让我们从条件判断开始吧&#xff01;在Smarty中&#xff0c;你可以使用{if}、{elseif}和{else}语句来进行条件判断。这些语句的…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

【单片机期末】单片机系统设计

主要内容&#xff1a;系统状态机&#xff0c;系统时基&#xff0c;系统需求分析&#xff0c;系统构建&#xff0c;系统状态流图 一、题目要求 二、绘制系统状态流图 题目&#xff1a;根据上述描述绘制系统状态流图&#xff0c;注明状态转移条件及方向。 三、利用定时器产生时…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

MyBatis中关于缓存的理解

MyBatis缓存 MyBatis系统当中默认定义两级缓存&#xff1a;一级缓存、二级缓存 默认情况下&#xff0c;只有一级缓存开启&#xff08;sqlSession级别的缓存&#xff09;二级缓存需要手动开启配置&#xff0c;需要局域namespace级别的缓存 一级缓存&#xff08;本地缓存&#…...

自然语言处理——文本分类

文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益&#xff08;IG&#xff09; 分类器设计贝叶斯理论&#xff1a;线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别&#xff0c; 有单标签多类别文本分类和多…...

高抗扰度汽车光耦合器的特性

晶台光电推出的125℃光耦合器系列产品&#xff08;包括KL357NU、KL3H7U和KL817U&#xff09;&#xff0c;专为高温环境下的汽车应用设计&#xff0c;具备以下核心优势和技术特点&#xff1a; 一、技术特性分析 高温稳定性 采用先进的LED技术和优化的IC设计&#xff0c;确保在…...

项目进度管理软件是什么?项目进度管理软件有哪些核心功能?

无论是建筑施工、软件开发&#xff0c;还是市场营销活动&#xff0c;项目往往涉及多个团队、大量资源和严格的时间表。如果没有一个系统化的工具来跟踪和管理这些元素&#xff0c;项目很容易陷入混乱&#xff0c;导致进度延误、成本超支&#xff0c;甚至失败。 项目进度管理软…...