当前位置: 首页 > news >正文

goframe开发一个企业网站 rabbitmq队例15

RabbitMQ消息队列封装

在目录internal/pkg/rabbitmq/rabbitmq.go

# 消息队列配置
mq:# 消息队列类型: rocketmq 或 rabbitmqtype: "rabbitmq"# 是否启用消息队列enabled: truerocketmq:nameServer: "127.0.0.1:9876"producerGroup: "myProducerGroup"consumerGroup: "myConsumerGroup"brokerAddress: "127.0.0.1:10911"  # 添加 broker 地址rabbitmq:url: "amqp://wanghaibin:wanghaibin@127.0.0.1:5672/"exchange: "gf_exchange"dlx_exchange: "gf_dlx_exchange"    # 新增:死信交换机queue: "gf_queue"delay_queue: "gf_delay_queue"      # 新增:延迟队列routingKey: "gf_key"vhost: "/"
package rabbitmqimport ("context""fmt""time""github.com/gogf/gf/v2/frame/g"amqp "github.com/rabbitmq/amqp091-go"
)var (// conn RabbitMQ连接实例conn *amqp.Connection// channel RabbitMQ通道实例channel *amqp.Channel
)// Initialize 初始化 RabbitMQ 连接和通道
// 包括:建立连接、创建通道、声明交换机和队列、建立绑定关系
func Initialize() {var err errorctx := context.Background()// 从配置文件获取RabbitMQ连接URLurl := g.Cfg().MustGet(ctx, "rabbitmq.url").String()// 建立RabbitMQ连接conn, err = amqp.Dial(url)if err != nil {g.Log().Fatalf(ctx, "Failed to connect to RabbitMQ: %v", err)}// 创建通道channel, err = conn.Channel()if err != nil {g.Log().Fatalf(ctx, "Failed to open channel: %v", err)}// 1. 声明主交换机// 类型:direct,持久化:true,自动删除:false,内部的:false,非阻塞:falseerr = channel.ExchangeDeclare(g.Cfg().MustGet(ctx, "rabbitmq.exchange").String(),"direct", // 交换机类型true,     // 持久化false,    // 自动删除false,    // 内部的false,    // 非阻塞nil,      // 参数)if err != nil {g.Log().Fatalf(ctx, "Failed to declare main exchange: %v", err)}// 2. 声明死信交换机(DLX)// 用于处理无法被正常消费的消息err = channel.ExchangeDeclare(g.Cfg().MustGet(ctx, "rabbitmq.dlx_exchange").String(),"direct",true,false,false,false,nil,)if err != nil {g.Log().Fatalf(ctx, "Failed to declare DLX exchange: %v", err)}// 3. 声明主队列// 持久化:true,非自动删除,非排他,非阻塞_, err = channel.QueueDeclare(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),true,  // 持久化false, // 自动删除false, // 排他的false, // 非阻塞nil,   // 参数)if err != nil {g.Log().Fatalf(ctx, "Failed to declare main queue: %v", err)}// 4. 声明延迟队列// 配置死信交换机参数args := amqp.Table{"x-dead-letter-exchange":    g.Cfg().MustGet(ctx, "rabbitmq.dlx_exchange").String(),"x-dead-letter-routing-key": g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(),}_, err = channel.QueueDeclare(g.Cfg().MustGet(ctx, "rabbitmq.delay_queue").String(),true,false,false,false,args,)if err != nil {g.Log().Fatalf(ctx, "Failed to declare delay queue: %v", err)}// 5. 绑定主队列到主交换机err = channel.QueueBind(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),      // 队列名g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(), // 路由键g.Cfg().MustGet(ctx, "rabbitmq.exchange").String(),   // 交换机名false,nil,)if err != nil {g.Log().Fatalf(ctx, "Failed to bind main queue: %v", err)}// 6. 绑定主队列到死信交换机err = channel.QueueBind(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(),g.Cfg().MustGet(ctx, "rabbitmq.dlx_exchange").String(),false,nil,)if err != nil {g.Log().Fatalf(ctx, "Failed to bind queue to DLX: %v", err)}g.Log().Info(ctx, "RabbitMQ initialized successfully")
}// PublishMessage 发布消息到RabbitMQ
// 参数:
//   - ctx: 上下文
//   - message: 要发送的消息内容
// 返回:
//   - error: 发送错误,如果成功则为nil
func PublishMessage(ctx context.Context, message string) error {// 创建带超时的上下文ctxTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)defer cancel()// 发布消息到指定的交换机和路由err := channel.PublishWithContext(ctxTimeout,g.Cfg().MustGet(ctx, "rabbitmq.exchange").String(),g.Cfg().MustGet(ctx, "rabbitmq.routingKey").String(),false, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)if err != nil {return fmt.Errorf("failed to publish message: %v", err)}return nil
}// ConsumeMessages 消费队列中的消息
// 参数:
//   - ctx: 上下文
//   - handler: 消息处理函数
// 返回:
//   - error: 消费错误,如果成功则为nil
func ConsumeMessages(ctx context.Context, handler func(string) error) error {messages, err := channel.Consume(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),"",    // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil,   // args)if err != nil {return fmt.Errorf("failed to register a consumer: %v", err)}// 启动goroutine处理消息go func() {for msg := range messages {err := handler(string(msg.Body))if err != nil {g.Log().Errorf(ctx, "Error handling message: %v", err)msg.Nack(false, true) // 处理失败,消息重新入队} else {msg.Ack(false) // 处理成功,确认消息}}}()return nil
}// Cleanup 清理RabbitMQ连接和通道
func Cleanup() {if channel != nil {channel.Close()}if conn != nil {conn.Close()}
}// GetChannel 获取RabbitMQ通道实例
func GetChannel() *amqp.Channel {return channel
}// PurgeQueue 清空指定队列中的所有消息
// 参数:
//   - ctx: 上下文
// 返回:
//   - error: 清空错误,如果成功则为nil
func PurgeQueue(ctx context.Context) error {_, err := channel.QueuePurge(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),false, // no-wait)return err
}// PublishDelayMessage 发送延迟消息
// 参数:
//   - ctx: 上下文
//   - message: 消息内容
//   - delaySeconds: 延迟秒数
// 返回:
//   - error: 发送错误,如果成功则为nil
func PublishDelayMessage(ctx context.Context, message string, delaySeconds int) error {return channel.PublishWithContext(ctx,"",                                                    // 默认交换机g.Cfg().MustGet(ctx, "rabbitmq.delay_queue").String(), // 延迟队列false,false,amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),Expiration:  fmt.Sprintf("%d", delaySeconds*1000), // 转换为毫秒},)
}// GetQueueLength 获取队列中的消息数量
// 参数:
//   - ctx: 上下文
// 返回:
//   - int: 消息数量
//   - error: 获取错误,如果成功则为nil
func GetQueueLength(ctx context.Context) (int, error) {queue, err := channel.QueueInspect(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),)if err != nil {return 0, fmt.Errorf("failed to inspect queue: %v", err)}return queue.Messages, nil
}
logic逻辑的实现
package rabbitmqmsgimport ("context""fmt""gf_new_web/internal/pkg/rabbitmq""gf_new_web/internal/service""github.com/gogf/gf/v2/frame/g"
)// sRabbitmqMsg RabbitMQ消息服务结构体
type sRabbitmqMsg struct{}// New 创建新的RabbitMQ消息服务实例
func New() *sRabbitmqMsg {return &sRabbitmqMsg{}
}// init 初始化函数,在包加载时自动注册RabbitMQ消息服务
func init() {service.RegisterRabbitmqMsg(New())
}// SendMessage 发送普通消息到RabbitMQ
// 参数:
//   - ctx: 上下文信息
//   - message: 要发送的消息内容
// 返回:
//   - error: 发送错误,成功则为nil
func (s *sRabbitmqMsg) SendMessage(ctx context.Context, message string) error {return rabbitmq.PublishMessage(ctx, message)
}// SendDelayMessage 发送延迟消息到RabbitMQ
// 参数:
//   - ctx: 上下文信息
//   - message: 要发送的消息内容
//   - delaySeconds: 延迟时间(秒)
// 返回:
//   - error: 发送错误,成功则为nil
func (s *sRabbitmqMsg) SendDelayMessage(ctx context.Context, message string, delaySeconds int) error {return rabbitmq.PublishDelayMessage(ctx, message, delaySeconds)
}// SendBatchMessages 批量发送消息到RabbitMQ
// 参数:
//   - ctx: 上下文信息
//   - messages: 消息内容数组
// 返回:
//   - error: 发送错误,成功则为nil
// 注意:任一消息发送失败都会导致整个批次失败
func (s *sRabbitmqMsg) SendBatchMessages(ctx context.Context, messages []string) error {for _, msg := range messages {if err := rabbitmq.PublishMessage(ctx, msg); err != nil {return err}}return nil
}// GetQueueLength 获取队列当前的消息数量
// 参数:
//   - ctx: 上下文信息
// 返回:
//   - int: 队列中的消息数量
//   - error: 获取错误,成功则为nil
func (s *sRabbitmqMsg) GetQueueLength(ctx context.Context) (int, error) {queue, err := rabbitmq.GetChannel().QueueInspect(g.Cfg().MustGet(ctx, "rabbitmq.queue").String(),)if err != nil {return 0, fmt.Errorf("failed to inspect queue: %v", err)}return queue.Messages, nil
}// PurgeQueue 清空队列中的所有消息
// 参数:
//   - ctx: 上下文信息
// 返回:
//   - error: 清空错误,成功则为nil
func (s *sRabbitmqMsg) PurgeQueue(ctx context.Context) error {return rabbitmq.PurgeQueue(ctx)
}// handleMessage 处理接收到的单条消息
// 参数:
//   - message: 消息内容
// 返回:
//   - error: 处理错误,成功则为nil
// 注意:这是内部方法,实现具体的消息处理逻辑
func (s *sRabbitmqMsg) handleMessage(message string) error {// 记录接收到的消息g.Log().Info(context.Background(), "收到消息:", message)// TODO: 在这里添加实际的消息处理逻辑return nil
}// Initialize 初始化消息消费处理
// 参数:
//   - ctx: 上下文信息
// 返回:
//   - error: 初始化错误,成功则为nil
// 功能:启动消息消费者,并设置消息处理函数
func (s *sRabbitmqMsg) Initialize(ctx context.Context) error {return rabbitmq.ConsumeMessages(ctx, func(msg string) error {return s.handleMessage(msg)})
}

生成service ,不再写上

controller代码

package frontimport ("fmt""gf_new_web/internal/service""time""github.com/gogf/gf/v2/frame/g""github.com/gogf/gf/v2/net/ghttp"
)var (RabbitMsg = cRabbitMsg{}
)type cRabbitMsg struct{}// SendMessage 处理发送普通消息的HTTP请求
// 请求参数:
//   - message: 消息内容
// 响应格式:
//   成功:{"code": 0, "msg": "消息发送成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) SendMessage(r *ghttp.Request) {message := r.Get("message").String()err := service.RabbitmqMsg().SendMessage(r.GetCtx(), message)if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "消息发送成功",})
}// SendDelayMessage 处理发送延迟消息的HTTP请求
// 请求参数:
//   - message: 消息内容
//   - delay: 延迟时间(秒)
// 响应格式:
//   成功:{"code": 0, "msg": "延迟消息发送成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) SendDelayMessage(r *ghttp.Request) {message := r.Get("message").String()delaySeconds := r.Get("delay").Int()err := service.RabbitmqMsg().SendDelayMessage(r.GetCtx(), message, delaySeconds)if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "延迟消息发送成功",})
}// SendBatchMessages 处理批量发送消息的HTTP请求
// 请求参数:
//   - messages: 消息内容数组
// 响应格式:
//   成功:{"code": 0, "msg": "批量消息发送成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) SendBatchMessages(r *ghttp.Request) {messages := r.Get("messages").Strings()err := service.RabbitmqMsg().SendBatchMessages(r.GetCtx(), messages)if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "批量消息发送成功",})
}// GetQueueLength 处理获取队列长度的HTTP请求
// 响应格式:
//   成功:{"code": 0, "msg": "获取队列长度成功", "data": 队列长度}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) GetQueueLength(r *ghttp.Request) {length, err := service.RabbitmqMsg().GetQueueLength(r.GetCtx())if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "获取队列长度成功","data": length,})
}// PurgeQueue 处理清空队列的HTTP请求
// 响应格式:
//   成功:{"code": 0, "msg": "清空队列成功"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) PurgeQueue(r *ghttp.Request) {err := service.RabbitmqMsg().PurgeQueue(r.GetCtx())if err != nil {g.Log().Error(r.GetCtx(), err)r.Response.WriteJson(g.Map{"code": -1,"msg":  err.Error(),})return}r.Response.WriteJson(g.Map{"code": 0,"msg":  "清空队列成功",})
}// ConsumeMessages 处理消费消息的HTTP请求
// 特点:异步处理,非阻塞
// 响应格式:
//   成功:{"code": 0, "msg": "消息消费已开始,请查看服务器日志获取消费详情"}
//   失败:{"code": -1, "msg": "错误信息"}
func (c *cRabbitMsg) ConsumeMessages(r *ghttp.Request) {g.Log().Info(r.GetCtx(), "开始消费消息...")done := make(chan bool)go func() {err := service.RabbitmqMsg().Initialize(r.GetCtx())if err != nil {g.Log().Error(r.GetCtx(), "消费消息出错:", err)r.Response.WriteJson(g.Map{"code": -1,"msg":  fmt.Sprintf("消费消息失败: %v", err),})done <- truereturn}}()select {case <-done:returncase <-time.After(5 * time.Second):g.Log().Info(r.GetCtx(), "消息消费进行中...")r.Response.WriteJson(g.Map{"code": 0,"msg":  "消息消费已开始,请查看服务器日志获取消费详情",})}
}

相关文章:

goframe开发一个企业网站 rabbitmq队例15

RabbitMQ消息队列封装 在目录internal/pkg/rabbitmq/rabbitmq.go # 消息队列配置 mq:# 消息队列类型: rocketmq 或 rabbitmqtype: "rabbitmq"# 是否启用消息队列enabled: truerocketmq:nameServer: "127.0.0.1:9876"producerGroup: "myProducerGrou…...

设计模式-七个基本原则之一-迪米特法则 + 案例

迪米特法则:(LoD) 面向对象七个基本原则之一 只与直接的朋友通信&#xff1a;对象应只与自己直接关联的对象通信&#xff0c;例如&#xff1a;方法参数、返回值、创建的对象。避免“链式调用”&#xff1a;尽量避免通过多个对象链进行调用。例如&#xff0c;a.getB().getC().do…...

【数学二】线性代数-二次型

考试要求 1、了解二次型的概念, 会用矩阵形式表示二次型,了解合同变换与合同矩阵的概念. 2、了解二次型的秩的概念,了解二次型的标准形、规范形等概念,了解惯性定理,会用正交变换和配方法化二次型为标准形。 3、理解正定二次型、正定矩阵的概念,并掌握其判别法. 二次型…...

320页PDF | 集团IT蓝图总体规划报告-德勤(限免下载)

一、前言 这份报告是集团IT蓝图总体规划报告-德勤。在报告中详细阐述了德勤为某集团制定的全面IT蓝图总体规划&#xff0c;包括了集团信息化目标蓝图、IT应用规划、数据规划、IT集成架构、IT基础设施规划以及IT治理体系规划等关键领域&#xff0c;旨在为集团未来的信息化发展提…...

HTB:Sea[WriteUP]

目录 连接至HTB服务器并启动靶机 使用nmap对靶机TCP端口进行开放扫描 使用curl访问靶机80端口 使用ffuf对靶机进行了一顿FUZZ 尝试在Github上搜索版权拥有者 除了LICENSE还FUZZ出了version文件尝试访问 尝试直接在Github搜索该符合该版本的EXP 横向移动 使用john对该哈…...

Java 网络编程(一)—— UDP数据报套接字编程

概念 在网络编程中主要的对象有两个&#xff1a;客户端和服务器。客户端是提供请求的&#xff0c;归用户使用&#xff0c;发送的请求会被服务器接收&#xff0c;服务器根据请求做出响应&#xff0c;然后再将响应的数据包返回给客户端。 作为程序员&#xff0c;我们主要关心应…...

ECharts图表图例8

用eclipse软件制作动态单仪表图 用java知识点 代码截图&#xff1a;...

Redis中的线程模型

Redis 的单线程模型详解 Redis 的“单线程”模型主要指的是其 主线程&#xff0c;这个主线程负责从客户端接收请求、解析命令、处理数据和返回响应。为了深入了解 Redis 单线程的具体工作流程&#xff0c;我们可以将其分为以下几个步骤&#xff1a; 接收客户端请求 Redis 的主线…...

[产品管理-77]:技术人需要了解的常见概念:科学、技术、技能、产品、市场、商业模式、运营

目录 一、概念定义 科学 技术 技能 产品 市场 商业模式 运营 二、上述概念在产品创新中的作用 一、概念定义 对于技术人来说&#xff0c;了解并掌握科学、技术、技能、产品、市场、商业模式、运营等常见概念的定义至关重要。以下是这些概念的详细解释&#xff1a; 科…...

鼠标点击(一)与3D视口窗口的交互

(1) (2) (3)...

线程-2-线程概念与控制

main 线程常见寄存器&#xff08;CR3 EIP IR MMU TLB&#xff09; CR3是当前进程页表物理内存地址&#xff08;包不能虚拟地址&#xff0c;不然套娃了&#xff09; CPU中有寄存器指向task_struct* current EIP&#xff1a;入口虚拟地址 IR&#xff1a;当前命令地址系统总线&a…...

TortoiseSVN提示服务器凭证检核错误:站点名称不符

电脑重装了系统&#xff0c;下载了新版本SVN软件&#xff0c;一切准备就绪&#xff0c;准备大干一场。 打开SVN&#xff0c;一遍一遍的提示【TortoiseSVN提示服务器凭证检核错误:站点名称不符】&#xff0c;一次次的让我接受&#xff0c;终于忍受不了了。 TortoiseSVN提示服务…...

Diffusion Policy——斯坦福机器人UMI所用的扩散策略:从原理到其编码实现(含Diff-Control、ControlNet详解)

前言 本文一开始是属于此文《UMI——斯坦福刷盘机器人&#xff1a;从手持夹持器到动作预测Diffusion Policy(含代码解读)》的第三部分&#xff0c;考虑后Diffusion Policy的重要性很高&#xff0c;加之后续还有一系列基于其的改进工作 故独立成本文&#xff0c;且写的过程中 …...

(动画版)排序算法 -希尔排序

文章目录 1. 希尔排序&#xff08;Shellsort&#xff09;1.1 简介1.2 希尔排序的步骤1.3 希尔排序的C实现1.4 时间复杂度1.5 空间复杂度1.6 希尔排序动画 1. 希尔排序&#xff08;Shellsort&#xff09; 1.1 简介 希尔排序&#xff08;Shells Sort&#xff09;&#xff0c;又…...

delphi fmx android 自动更新(二)

自己写了一个升级的类,支持android与windows 1,下载升级包,可以设置进度条 我这里用的fmxui的进度条,你也可以用原生的 http下载我用的nethttpclient, 进度条设置是比较方便的 首先获取下载文件的大小 用nethttpclient.head函数请求文件地址,得到contentlength 接着…...

蓝队知识浅谈(中)

声明&#xff1a;学习视频来自b站up主 泷羽sec&#xff0c;如涉及侵权马上删除文章 感谢泷羽sec 团队的教学 视频地址&#xff1a;蓝队基础之网络七层杀伤链_哔哩哔哩_bilibili 本文主要分享一些蓝队相关的知识。 一、网络杀伤链 网络杀伤链&#xff08;Cyber Kill Chain&…...

解决vue3+ts打包项目时会生成map文件

在正常未配置的情况下使用npm run build 命令打包&#xff0c;会生成很多的js和map文件,map文件是为了方便我们在生产环境进行更友好的代码调试&#xff0c;但是这样就存一个安全问题&#xff1b;容易被攻击&#xff1b; 解决方法&#xff1a;在package.json文件&#xff0c;重…...

webpack指南

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;webpack篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来webpack篇专栏内容:webpack-指南 概念 中文&#xff1a; webpack | webpack中文文档 | webpack中文网 英文&…...

关于QUERY_ALL_PACKAGES权限导致Google下架apk

谷歌商店被下架,原因是第三方使用了 QUERY_ALL_PACKAGES 权限&#xff1b; Google在高版本上限制了此权限的使用。当然&#xff0c;并不是 QUERY_ALL_PACKAGES 这个权限没有了&#xff0c;而是被列为敏感权限&#xff0c;必须有充分的理由说明&#xff0c;才允许上架 GP&#…...

优化时钟网络之时钟抖动

Note&#xff1a;文章内容以Xilinx 7系列FPGA进行讲解 1、什么是时钟抖动 时钟抖动就是时钟周期之间出现的偏差。比如一个时钟周期为10ns的时钟&#xff0c;理想情况下&#xff0c;其上升沿会出现在0ns&#xff0c;10ns&#xff0c;20ns时刻&#xff0c;假设某个上升沿出现的时…...

[特殊字符] Windows 下 OpenClaw 快速安装与功能使用

✨ 适配系统&#xff1a;Windows10/11 64 位 &#xff5c; 当前版本&#xff1a;OpenClaw v2.7.5 &#xff1a; &#x1f517; 下载 OpenClaw 2.7.5 ✨ 核心亮点&#xff1a;零代码门槛&#xff5c;全程可视化&#xff5c;内置运行依赖&#xff5c;快速部署上手 &#x1f4e2…...

Cadence 17.4 CIS配置踩坑实录:MySQL元件数据库连接失败与中文乱码全解决

Cadence 17.4 CIS配置实战&#xff1a;MySQL元件数据库连接与中文乱码终极解决方案 当工程师尝试将Cadence CIS与MySQL数据库集成时&#xff0c;往往会遇到两个令人头疼的问题&#xff1a;连接失败和中文乱码。这两个问题看似简单&#xff0c;却可能耗费大量调试时间。本文将深…...

对比直接调用与通过 Taotoken 调用的稳定性体验差异

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 对比直接调用与通过 Taotoken 调用的稳定性体验差异 作为一名长期使用各类大模型 API 的开发者&#xff0c;我在构建和运维应用时&…...

【Flink学习】(五)Flink 并行度与任务链,任务运行核心原理

本文主要整理Flink 底层任务运行机制&#xff0c;学会合理设置并行度&#xff0c;初步具备任务调优思维。 一、并行度概念 并行度代表 Flink 任务运行的线程数量&#xff0c;决定任务处理速度&#xff0c;分为全局并行度、算子并行度、客户端并行度。 二、并行度设置 分为三种方…...

MCP电路设计:从门电路到CPLD的优先级仲裁硬件实现

1. 项目概述&#xff1a;从“命令打架”到“有序排队”的电路设计在嵌入式系统、工业控制或者任何需要处理多路信号的数字电路里&#xff0c;我们经常会遇到一个头疼的问题&#xff1a;当多个输入信号同时要求一个输出设备执行不同动作时&#xff0c;系统该听谁的&#xff1f;比…...

ag-psd:重构JavaScript生态中的PSD文件处理范式

ag-psd&#xff1a;重构JavaScript生态中的PSD文件处理范式 【免费下载链接】ag-psd Javascript library for reading and writing PSD files 项目地址: https://gitcode.com/gh_mirrors/ag/ag-psd 在前端工程化与设计系统协同演进的技术浪潮中&#xff0c;PSD文件处理一…...

MCP协议技术架构深度解析:构建AI工具生态系统的标准化方案

MCP协议技术架构深度解析&#xff1a;构建AI工具生态系统的标准化方案 【免费下载链接】Awesome-MCP-ZH MCP 资源精选&#xff0c; MCP指南&#xff0c;Claude MCP&#xff0c;MCP Servers, MCP Clients 项目地址: https://gitcode.com/gh_mirrors/aw/Awesome-MCP-ZH MC…...

YOLO综合训练工具X(免环境版 手动/自动标注、一键训练、模型验证、分类器训练、自动截图、批量处理

yolo免环境训练工具 yolo8标注工具 yolo训练工具 yolo8 yolo4 yolo3yolo无需搭建环境训练工具 免环境标注、训练的工具支持版本 yolo3 yolo4 yolo8(电脑显卡必须N卡) [火]可训练模型 cfg weights bin param pt yolo8l.pt yolo8m.pt yolo8n.pt yolo8s.pt yolo8x.pt 一、YOLO免环…...

SGLang 未来演进与生态集成:从推理到 Agent 与多模态

系列导读 你现在看到的是《SGLang 推理加速与生产级服务化部署实战》的第 10/10 篇,当前这篇会重点解决:帮助读者建立对 SGLang 生态的全局视野,并规划后续深入方向,完成从入门到精通的闭环。 上一篇回顾:第 9 篇《SGLang 生产级部署排错指南:10 个常见问题与解决方案》…...

IOT-Tree支持[子站-中心]数据同步功能-轻松支持你的物联网平台

在版本1.9.0开始&#xff0c;IOT-Tree内部移植并开源了中心-子站点的数据同步功能&#xff0c;这个功能已经在我们开发团队的企业用户系统中使用了很长一段时间&#xff0c;足够稳定和可靠。 当前很多物联网系统中&#xff0c;经常有如下需求&#xff1a; 1&#xff09;一些工…...