当前位置: 首页 > news >正文

基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

场景

在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中间件;

接口模型

这个模型的核心思想是消息队列的核心功能生产者生产消息方法和消费者消费消息,任何消息队列都必须有这两个功能;根据如下代码消息队列中间件是可扩展的,只需在实例化消息队列对象那里添加新消息队列的实现;

// MQer 消息队列接口
type MQer interface {Producer(topic string, data []byte)Consumer(topic, channel string, ch chan []byte, f func(b []byte))
}// NewMQ 实例化消息队列对象
func NewMQ() MQer {switch conf.Conf.Default.Mq { // mq 设置的类型case "nsq":return new(MQNsqService)case "rabbit":return new(MQRabbitService)case "kafka":return new(MQKafkaService)default:return new(MQNsqService)}
}/*
配置文件结构设计mqType: "" # nsq, rabbit, kafka  这三个值然当然了是可扩展的nsq:producer: ""consumer: ""rabbit:addr: ""user: ""password: ""kafka:addr: ""
*/

各个消息队列的实现

1. 依赖库
  • nsq : github.com/nsqio/go-nsq
  • rabbitmq : github.com/streadway/amqp
  • kafka : github.com/Shopify/sarama
2. nsq

nsq结构体

// MQNsqService NSQ消息队列
type MQNsqService struct {
}

生产者

// Producer 生产者
func (m *MQNsqService) Producer(topic string, data []byte) {nsqConf := &nsq.Config{}client, err := nsq.NewProducer(nsqServer, nsqConf)if err != nil {log.Error("[nsq]无法连接到队列")return}log.DebugF(fmt.Sprintf("[生产消息] topic : %s -->  %s", topic, string(data)))err = client.Publish(topic, data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}

消费者

var (nsqServer   = conf.Conf.Default.Nsq.Producer // nsqServer
)// Consumer 消费者
func (m *MQNsqService) Consumer(topic, channel string, ch chan []byte, f func(b []byte)) {mh, err := NewMessageHandler(nsqServer, channel)if err != nil {log.Error(err)return}go func() {mh.SetMaxInFlight(1000)mh.Registry(topic, ch)}()go func() {for {select {case s := <-ch:f(s)}}}()log.DebugF("[NSQ] ServerID:%v => %v started", channel, topic)
}// MessageHandler MessageHandler
type MessageHandler struct {msgChan     chan *goNsq.Messagestop        boolnsqServer   stringChannel     stringmaxInFlight int
}// NewMessageHandler return new MessageHandler
func NewMessageHandler(nsqServer string, channel string) (mh *MessageHandler, err error) {if nsqServer == "" {err = fmt.Errorf("[NSQ] need nsq server")return}mh = &MessageHandler{msgChan:   make(chan *goNsq.Message, 1024),stop:      false,nsqServer: nsqServer,Channel:   channel,}return
}// Registry register nsq topic
func (m *MessageHandler) Registry(topic string, ch chan []byte) {config := goNsq.NewConfig()if m.maxInFlight > 0 {config.MaxInFlight = m.maxInFlight}consumer, err := goNsq.NewConsumer(topic, m.Channel, config)if err != nil {panic(err)}consumer.SetLogger(nil, 0)consumer.AddHandler(goNsq.HandlerFunc(m.handlerMessage))err = consumer.ConnectToNSQLookupd(m.nsqServer)if err != nil {panic(err)}m.process(ch)
}
  1. rabbitmq
    结构体
// MQRabbitService Rabbit消息队列
type MQRabbitService struct {
}

生产者

// Producer 生产者
func (m *MQRabbitService) Producer(topic string, data []byte) {mq, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}//defer mq.Destroy()log.DebugF(fmt.Sprintf("[生产消息] topic : %s -->  %s", topic, string(data)))err = mq.PublishPub(data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例  (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}...其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq 

消费者

// Consumer 消费者
func (m *MQRabbitService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {mh, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}msg := mh.RegistryReceiveSub()go func(m <-chan amqp.Delivery) {for {select {case s := <-m:f(s.Body)}}}(msg)log.DebugF("[Rabbit] ServerID:%v => %v started", serverId, topic)
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例  (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}... 其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq 
  1. kafka
    结构体
// MQKafkaService Kafka消息队列
type MQKafkaService struct {
}

生产者

func (m *MQKafkaService) Producer(topic string, data []byte) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follower都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,我们默认设置32个分区config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回// 构造一个消息msg := &sarama.ProducerMessage{}msg.Topic = topicmsg.Value = sarama.ByteEncoder(data)// 连接kafkaclient, err := sarama.NewSyncProducer(kafkaServer, config)if err != nil {log.Error("Producer closed, err:", err)return}defer client.Close()// 发送消息pid, offset, err := client.SendMessage(msg)if err != nil {log.Error("send msg failed, err:", err)return}log.InfoF("pid:%v offset:%v\n", pid, offset)
}

消费者

// Consumer 消费者
func (m *MQKafkaService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {var wg sync.WaitGroupconsumer, err := sarama.NewConsumer(kafkaServer, nil)if err != nil {log.ErrorF("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区if err != nil {log.Error("Failed to get the list of partition: ", err)return}log.Info(partitionList)for partition := range partitionList { // 遍历所有的分区pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者if err != nil {log.ErrorF("Failed to start consumer for partition %d: %s\n", partition, err)}wg.Add(1)go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待log.DebugF("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))f(msg.Value)}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close()
}

总结

golang的接口是一种抽象类型,是对其他类型行为的概括与抽象,从语法角度来看,接口是一组方法定义的集合,文本的封装使用了golang接口这一特性,把所有的消息队列中间件抽象为一个MQer拥有生产和消费两个方法,具体的各个消息队列中间件去实现这两个方法即可,最明显的优点在于扩展性,解耦性,选择性,维护性这几个表象上。

完整代码

https://github.com/mangenotwork/common/tree/main/mq

你的星星是我分享的最大动力 : )

相关文章:

基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

基于golang多消息队列中间件的封装nsq,rabbitmq,kafka 场景 在创建个人的公共方法库中有这样一个需求&#xff0c;就是不同的项目会用到不同的消息队列中间件&#xff0c;我的思路把所有的消息队列中间件进行封装一个消息队列接口&#xff08;MQer&#xff09;有两个方法一个…...

【第一阶段】kotlin的函数

函数头 fun main() {getMethod("zhangsan",22) }//kotlin语言默认是public,kotlin更规范&#xff0c;先有输入&#xff08; getMethod(name:String,age:Int)&#xff09;再有输出(Int[返回值]) private fun getMethod(name:String,age:Int): Int{println("我叫…...

PAM安全配置-用户密码锁定策略

PAM是一个用于实现身份验证的模块化系统&#xff0c;可以在操作系统中的不同服务和应用程序中使用。 pam_faillock模块 pam_faillock模块用来实现账号锁定功能&#xff0c;它可以在一定的认证失败次数后锁定用户账号&#xff0c;防止暴力破解密码攻击。 常见选项 deny&…...

AndroidManifest.xml日常笔记

1 Bundle介绍 Bundle主要用于传递数据&#xff1b;它保存的数据&#xff0c;是以key-value(键值对)的形式存在的。 我们经常使用Bundle在Activity之间传递数据&#xff0c;传递的数据可以是boolean、byte、int、long、float、double、string等基本类型或它们对应的数组&#xf…...

SpringBoot异步框架

参考&#xff1a;解剖SpringBoot异步线程池框架_哔哩哔哩_bilibili 1、 为什么要用异步框架&#xff0c;它解决什么问题&#xff1f; 在SpringBoot的日常开发中&#xff0c;一般都是同步调用的。但经常有特殊业务需要做异步来处理&#xff0c;例如&#xff1a;注册新用户&…...

导出LLaMA ChatGlm2等LLM模型为onnx

通过onnx模型可以在支持onnx推理的推理引擎上进行推理&#xff0c;从而可以将LLM部署在更加广泛的平台上面。此外还可以具有避免pytorch依赖&#xff0c;获得更好的性能等优势。 这篇博客&#xff08;大模型LLaMa及周边项目&#xff08;二&#xff09; - 知乎&#xff09;进行…...

C++项目:在线五子棋对战网页版--匹配对战模块开发

玩家匹配是根据自己的天梯分数进行匹配的&#xff0c;而服务器中将玩家天梯分数分为三个档次&#xff1a; 1. 普通&#xff1a;天梯分数小于2000分 2. 高手&#xff1a;天梯分数介于2000~3000分之间 3. 大神&#xff1a;天梯分数大于3000分 当玩家进行对战匹配时&#xff0c;服…...

ssh 连接断开,正在执行的shell脚本也被中断了

背景 最近在训练chatGLM&#xff0c;一次训练经常要花掉近2个小时&#xff0c;但是由于网络不稳定&#xff0c;经常ssh莫名的断开&#xff0c;导致训练不得不重新开启&#xff0c;这就很浪费时间了 解决方案 下面教大家一种在后台执行命令的方案&#xff0c;即使你ssh连接断…...

UML 用例图,类图,时序图,活动图

UML之用例图&#xff0c;类图&#xff0c;时序图&#xff0c;活动图_用例图 时序图_siyan985的博客-CSDN博客 https://www.cnblogs.com/GumpYan/p/14734357.html 用例图与类图 - 简书...

Java 面试题2023

Java core JVM 1、JVM内存模型 2、JVM运行时内存分配 3、如何确定当前对象是个垃圾 4、GCrooot 包括哪些? 5、JVM对象头包含哪些部分 6、GC算法有哪些 7、JVM中类的加载机制 8、分代收集算法 9、JDK1.8 和 1.7做了哪些优化 10、内存泄漏和内存溢出有什么区别 11、J…...

【CSS3】CSS3 动画 ④ ( 使用动画制作地图热点图 )

文章目录 一、需求说明二、动画代码分析1、地图背景设置2、热点动画位置测量3、热点动画布局分析4、动画定义5、小圆点实现6、波纹效果盒子实现7、延迟动画设置 三、代码示例 一、需求说明 实现如下效果 , 在一张地图上 , 以某个位置为中心点 , 向四周发散 ; 核心 是实现 向四周…...

命令模式(Command)

命令模式是一种行为设计模式&#xff0c;可将一个请求封装为一个对象&#xff0c;用不同的请求将方法参数化&#xff0c;从而实现延迟请求执行或将其放入队列中或记录请求日志&#xff0c;以及支持可撤销操作。其别名为动作(Action)模式或事务(Transaction)模式。 Command is …...

Dapper 微型orm的光

介绍 Dapper是一个轻量级的ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;它可以方便地将数据库查询结果映射到.NET对象上&#xff0c;同时也支持执行原生SQL查询。下面我将详细介绍Dapper的使用方法。 安装Dapper 首先&#xff0c;你需要通过NuGet包管理器将Dap…...

Mysql随心记--第一篇

MylSAM&#xff1a;查询速度快&#xff0c;有较好的索引优化和数据压缩技术&#xff0c;但是它不支持事务 InnoDB&#xff1a;它支持事务&#xff0c;并且提供行级的锁定&#xff0c;应用也相当广泛 docker ps -a --filter "ancestormysql" 查看linux中创建了多少个d…...

使用dockerfile安装各种服务组件

使用dockerfile安装各种服务组件 elasticsearch、minio、mongodb、nacos、redis 一、使用dockerfile安装elasticsearch:7.8.0 1、Dockerfile文件 FROM elasticsearch:7.8.0 #添加分词器 ADD elasticsearch-analysis-ik /usr/share/elasticsearch/plugins/elasticsearch-anal…...

如何简单的无人直播

环境搭建 ffmpeg安装&#xff0c;我这里用的是centos搭建的&#xff0c;其他平台可以自己百度 yum -y install wgetwget --no-check-certificate https://www.johnvansickle.com/ffmpeg/old-releases/ffmpeg-4.0.3-64bit-static.tar.xztar -xJf ffmpeg-4.0.3-64bit-static.ta…...

【基于HBase和ElasticSearch构建大数据实时检索项目】

基于HBase和ElasticSearch构建大数据实时检索项目 一、项目说明二、环境搭建三、编写程序四、测试流程 一、项目说明 利用HBase存储海量数据&#xff0c;解决海量数据存储和实时更新查询的问题&#xff1b;利用ElasticSearch作为HBase索引&#xff0c;加快大数据集中实时查询数…...

ProComponent 用法学习

相信很多同学都用过 Ant Design 这一 react 著名组件库&#xff0c;而 ProComponents 则是在 antd 之上进行封装的页面级组件库&#xff08;指一个组件就可以搞定一个页面&#xff09;。它同时也是 Ant Design Pro 中后台框架所用的主要组件库。如果你手上有要用 react 开发的中…...

巨人互动|Google海外户Google Analytics的优缺点是什么?

Google Analytics是一个由谷歌开发的网站分析工具&#xff0c;旨在帮助网站和移动应用程序运营者收集和分析数据&#xff0c;以更好地了解用户行为和改进业务。虽然Google Analytics具有许多优势&#xff0c;但也存在一些缺点。在本文中&#xff0c;我们将探讨Google Analytics…...

MySQL数据库的操作

MySQL 连接服务器 库的操作创建数据库数据库删除查看数据库进入数据库查看所在的数据库修改数据库显示创建语句查看连接情况 表的操作创建表查看数据库所有的表查看表的详细信息查看创建表时的详细信息删除表修改表名向表中插入数据在表结构中新增一列对表结构数据的修改删除表…...

Linux 驱动开发流程(带最小可运行代码 + 通俗类比)

Linux 驱动开发流程&#xff08;带最小可运行代码 通俗类比&#xff09; 很多人学 Linux 驱动都会卡在这里&#xff1a;API 都看过&#xff0c;但完全不知道它们是怎么串起来工作的这篇文章目标很明确&#xff1a; ✅ 用一条主线讲清流程 ✅ 用类比帮你记住 ✅ 给你一个最小可…...

IDM激活工具使用指南:长期使用IDM的技术方案详解

IDM激活工具使用指南&#xff1a;长期使用IDM的技术方案详解 【免费下载链接】IDM-Activation-Script IDM Activation & Trail Reset Script 项目地址: https://gitcode.com/gh_mirrors/id/IDM-Activation-Script Internet Download Manager&#xff08;IDM&#xf…...

SMUDebugTool终极指南:如何深度调试AMD Ryzen处理器底层硬件

SMUDebugTool终极指南&#xff1a;如何深度调试AMD Ryzen处理器底层硬件 【免费下载链接】SMUDebugTool A dedicated tool to help write/read various parameters of Ryzen-based systems, such as manual overclock, SMU, PCI, CPUID, MSR and Power Table. 项目地址: http…...

LeetCode //C - 1002. Find Common Characters

1002. Find Common Characters Given a string array words, return an array of all characters that show up in all strings within the words (including duplicates). You may return the answer in any order. Example 1: Input: words [“bella”,“label”,“roll…...

PyTorch 3.0静态图分布式训练全链路解析(含NCCL拓扑感知、Graph Partitioning与梯度同步优化)

第一章&#xff1a;PyTorch 3.0静态图分布式训练概览与演进脉络PyTorch 3.0标志着框架在可扩展性与编译优化方向的重大跃迁——其核心变化之一是将TorchDynamo Inductor后端深度整合为默认的静态图编译通道&#xff0c;并原生支持跨设备、跨节点的分布式静态图训练。这一演进并…...

提升c语言编码效率:用快马智能生成可复用的基础工具函数库

提升C语言编码效率&#xff1a;用快马智能生成可复用的基础工具函数库 最近在写C语言项目时&#xff0c;发现很多基础功能需要反复实现&#xff0c;比如字符串处理、动态数组管理这些轮子。每次从零开始写不仅耗时&#xff0c;还容易引入边界条件错误。后来尝试用InsCode(快马…...

告别B站缓存碎片化:3步智能合并视频的终极解决方案

告别B站缓存碎片化&#xff1a;3步智能合并视频的终极解决方案 【免费下载链接】BilibiliCacheVideoMerge 项目地址: https://gitcode.com/gh_mirrors/bi/BilibiliCacheVideoMerge 你是否曾在高铁上打开B站缓存视频准备消遣时光&#xff0c;却发现播放器卡在开头几秒后…...

设备独立滚动控制:让macOS输入设备各得其所的开源解决方案

设备独立滚动控制&#xff1a;让macOS输入设备各得其所的开源解决方案 【免费下载链接】Scroll-Reverser Per-device scrolling prefs on macOS. 项目地址: https://gitcode.com/gh_mirrors/sc/Scroll-Reverser 问题溯源&#xff1a;当滚动方向成为效率隐形杀手 在数字…...

ViTConvMAE-B(NeurIPS 2022)目标检测、实例分割模型环境配置ViTConvMAE-B(NeurIPS 2022)目标检测、实例分割模型数据集调整ViTConvMAE-B(Ne

ViTConvMAE-B&#xff08;NeurIPS 2022&#xff09;目标检测、实例分割模型环境配置 ViTConvMAE-B&#xff08;NeurIPS 2022&#xff09;目标检测、实例分割模型数据集调整 ViTConvMAE-B&#xff08;NeurIPS 2022&#xff09;目标检测、实例分割模型代跑训练 ViTConvMAE-B&…...

FUTURE POLICE惊艳效果:毫秒级语音字幕对齐实战演示

FUTURE POLICE惊艳效果&#xff1a;毫秒级语音字幕对齐实战演示 1. 为什么需要精准的字幕对齐&#xff1f; 在视频制作和多媒体处理中&#xff0c;字幕与语音的同步问题一直是个痛点。传统字幕制作往往需要人工逐句校对&#xff0c;耗时耗力。而普通语音识别技术虽然能生成文…...