基于golang多消息队列中间件的封装nsq,rabbitmq,kafka
基于golang多消息队列中间件的封装nsq,rabbitmq,kafka
场景
在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中间件;
接口模型
这个模型的核心思想是消息队列的核心功能生产者生产消息方法和消费者消费消息,任何消息队列都必须有这两个功能;根据如下代码消息队列中间件是可扩展的,只需在实例化消息队列对象那里添加新消息队列的实现;
// MQer 消息队列接口
type MQer interface {Producer(topic string, data []byte)Consumer(topic, channel string, ch chan []byte, f func(b []byte))
}// NewMQ 实例化消息队列对象
func NewMQ() MQer {switch conf.Conf.Default.Mq { // mq 设置的类型case "nsq":return new(MQNsqService)case "rabbit":return new(MQRabbitService)case "kafka":return new(MQKafkaService)default:return new(MQNsqService)}
}/*
配置文件结构设计mqType: "" # nsq, rabbit, kafka 这三个值然当然了是可扩展的nsq:producer: ""consumer: ""rabbit:addr: ""user: ""password: ""kafka:addr: ""
*/
各个消息队列的实现
1. 依赖库
- nsq : github.com/nsqio/go-nsq
- rabbitmq : github.com/streadway/amqp
- kafka : github.com/Shopify/sarama
2. nsq
nsq结构体
// MQNsqService NSQ消息队列
type MQNsqService struct {
}
生产者
// Producer 生产者
func (m *MQNsqService) Producer(topic string, data []byte) {nsqConf := &nsq.Config{}client, err := nsq.NewProducer(nsqServer, nsqConf)if err != nil {log.Error("[nsq]无法连接到队列")return}log.DebugF(fmt.Sprintf("[生产消息] topic : %s --> %s", topic, string(data)))err = client.Publish(topic, data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}
消费者
var (nsqServer = conf.Conf.Default.Nsq.Producer // nsqServer
)// Consumer 消费者
func (m *MQNsqService) Consumer(topic, channel string, ch chan []byte, f func(b []byte)) {mh, err := NewMessageHandler(nsqServer, channel)if err != nil {log.Error(err)return}go func() {mh.SetMaxInFlight(1000)mh.Registry(topic, ch)}()go func() {for {select {case s := <-ch:f(s)}}}()log.DebugF("[NSQ] ServerID:%v => %v started", channel, topic)
}// MessageHandler MessageHandler
type MessageHandler struct {msgChan chan *goNsq.Messagestop boolnsqServer stringChannel stringmaxInFlight int
}// NewMessageHandler return new MessageHandler
func NewMessageHandler(nsqServer string, channel string) (mh *MessageHandler, err error) {if nsqServer == "" {err = fmt.Errorf("[NSQ] need nsq server")return}mh = &MessageHandler{msgChan: make(chan *goNsq.Message, 1024),stop: false,nsqServer: nsqServer,Channel: channel,}return
}// Registry register nsq topic
func (m *MessageHandler) Registry(topic string, ch chan []byte) {config := goNsq.NewConfig()if m.maxInFlight > 0 {config.MaxInFlight = m.maxInFlight}consumer, err := goNsq.NewConsumer(topic, m.Channel, config)if err != nil {panic(err)}consumer.SetLogger(nil, 0)consumer.AddHandler(goNsq.HandlerFunc(m.handlerMessage))err = consumer.ConnectToNSQLookupd(m.nsqServer)if err != nil {panic(err)}m.process(ch)
}
- rabbitmq
结构体
// MQRabbitService Rabbit消息队列
type MQRabbitService struct {
}
生产者
// Producer 生产者
func (m *MQRabbitService) Producer(topic string, data []byte) {mq, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}//defer mq.Destroy()log.DebugF(fmt.Sprintf("[生产消息] topic : %s --> %s", topic, string(data)))err = mq.PublishPub(data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例 (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}...其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq
消费者
// Consumer 消费者
func (m *MQRabbitService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {mh, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}msg := mh.RegistryReceiveSub()go func(m <-chan amqp.Delivery) {for {select {case s := <-m:f(s.Body)}}}(msg)log.DebugF("[Rabbit] ServerID:%v => %v started", serverId, topic)
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例 (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}... 其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq
- kafka
结构体
// MQKafkaService Kafka消息队列
type MQKafkaService struct {
}
生产者
func (m *MQKafkaService) Producer(topic string, data []byte) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follower都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,我们默认设置32个分区config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回// 构造一个消息msg := &sarama.ProducerMessage{}msg.Topic = topicmsg.Value = sarama.ByteEncoder(data)// 连接kafkaclient, err := sarama.NewSyncProducer(kafkaServer, config)if err != nil {log.Error("Producer closed, err:", err)return}defer client.Close()// 发送消息pid, offset, err := client.SendMessage(msg)if err != nil {log.Error("send msg failed, err:", err)return}log.InfoF("pid:%v offset:%v\n", pid, offset)
}
消费者
// Consumer 消费者
func (m *MQKafkaService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {var wg sync.WaitGroupconsumer, err := sarama.NewConsumer(kafkaServer, nil)if err != nil {log.ErrorF("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区if err != nil {log.Error("Failed to get the list of partition: ", err)return}log.Info(partitionList)for partition := range partitionList { // 遍历所有的分区pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者if err != nil {log.ErrorF("Failed to start consumer for partition %d: %s\n", partition, err)}wg.Add(1)go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待log.DebugF("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))f(msg.Value)}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close()
}
总结
golang的接口是一种抽象类型,是对其他类型行为的概括与抽象,从语法角度来看,接口是一组方法定义的集合,文本的封装使用了golang接口这一特性,把所有的消息队列中间件抽象为一个MQer拥有生产和消费两个方法,具体的各个消息队列中间件去实现这两个方法即可,最明显的优点在于扩展性,解耦性,选择性,维护性这几个表象上。
完整代码
https://github.com/mangenotwork/common/tree/main/mq
你的星星是我分享的最大动力 : )
相关文章:
基于golang多消息队列中间件的封装nsq,rabbitmq,kafka
基于golang多消息队列中间件的封装nsq,rabbitmq,kafka 场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个…...

【第一阶段】kotlin的函数
函数头 fun main() {getMethod("zhangsan",22) }//kotlin语言默认是public,kotlin更规范,先有输入( getMethod(name:String,age:Int))再有输出(Int[返回值]) private fun getMethod(name:String,age:Int): Int{println("我叫…...
PAM安全配置-用户密码锁定策略
PAM是一个用于实现身份验证的模块化系统,可以在操作系统中的不同服务和应用程序中使用。 pam_faillock模块 pam_faillock模块用来实现账号锁定功能,它可以在一定的认证失败次数后锁定用户账号,防止暴力破解密码攻击。 常见选项 deny&…...
AndroidManifest.xml日常笔记
1 Bundle介绍 Bundle主要用于传递数据;它保存的数据,是以key-value(键值对)的形式存在的。 我们经常使用Bundle在Activity之间传递数据,传递的数据可以是boolean、byte、int、long、float、double、string等基本类型或它们对应的数组…...

SpringBoot异步框架
参考:解剖SpringBoot异步线程池框架_哔哩哔哩_bilibili 1、 为什么要用异步框架,它解决什么问题? 在SpringBoot的日常开发中,一般都是同步调用的。但经常有特殊业务需要做异步来处理,例如:注册新用户&…...

导出LLaMA ChatGlm2等LLM模型为onnx
通过onnx模型可以在支持onnx推理的推理引擎上进行推理,从而可以将LLM部署在更加广泛的平台上面。此外还可以具有避免pytorch依赖,获得更好的性能等优势。 这篇博客(大模型LLaMa及周边项目(二) - 知乎)进行…...
C++项目:在线五子棋对战网页版--匹配对战模块开发
玩家匹配是根据自己的天梯分数进行匹配的,而服务器中将玩家天梯分数分为三个档次: 1. 普通:天梯分数小于2000分 2. 高手:天梯分数介于2000~3000分之间 3. 大神:天梯分数大于3000分 当玩家进行对战匹配时,服…...

ssh 连接断开,正在执行的shell脚本也被中断了
背景 最近在训练chatGLM,一次训练经常要花掉近2个小时,但是由于网络不稳定,经常ssh莫名的断开,导致训练不得不重新开启,这就很浪费时间了 解决方案 下面教大家一种在后台执行命令的方案,即使你ssh连接断…...
UML 用例图,类图,时序图,活动图
UML之用例图,类图,时序图,活动图_用例图 时序图_siyan985的博客-CSDN博客 https://www.cnblogs.com/GumpYan/p/14734357.html 用例图与类图 - 简书...
Java 面试题2023
Java core JVM 1、JVM内存模型 2、JVM运行时内存分配 3、如何确定当前对象是个垃圾 4、GCrooot 包括哪些? 5、JVM对象头包含哪些部分 6、GC算法有哪些 7、JVM中类的加载机制 8、分代收集算法 9、JDK1.8 和 1.7做了哪些优化 10、内存泄漏和内存溢出有什么区别 11、J…...

【CSS3】CSS3 动画 ④ ( 使用动画制作地图热点图 )
文章目录 一、需求说明二、动画代码分析1、地图背景设置2、热点动画位置测量3、热点动画布局分析4、动画定义5、小圆点实现6、波纹效果盒子实现7、延迟动画设置 三、代码示例 一、需求说明 实现如下效果 , 在一张地图上 , 以某个位置为中心点 , 向四周发散 ; 核心 是实现 向四周…...

命令模式(Command)
命令模式是一种行为设计模式,可将一个请求封装为一个对象,用不同的请求将方法参数化,从而实现延迟请求执行或将其放入队列中或记录请求日志,以及支持可撤销操作。其别名为动作(Action)模式或事务(Transaction)模式。 Command is …...
Dapper 微型orm的光
介绍 Dapper是一个轻量级的ORM(对象关系映射)框架,它可以方便地将数据库查询结果映射到.NET对象上,同时也支持执行原生SQL查询。下面我将详细介绍Dapper的使用方法。 安装Dapper 首先,你需要通过NuGet包管理器将Dap…...
Mysql随心记--第一篇
MylSAM:查询速度快,有较好的索引优化和数据压缩技术,但是它不支持事务 InnoDB:它支持事务,并且提供行级的锁定,应用也相当广泛 docker ps -a --filter "ancestormysql" 查看linux中创建了多少个d…...
使用dockerfile安装各种服务组件
使用dockerfile安装各种服务组件 elasticsearch、minio、mongodb、nacos、redis 一、使用dockerfile安装elasticsearch:7.8.0 1、Dockerfile文件 FROM elasticsearch:7.8.0 #添加分词器 ADD elasticsearch-analysis-ik /usr/share/elasticsearch/plugins/elasticsearch-anal…...
如何简单的无人直播
环境搭建 ffmpeg安装,我这里用的是centos搭建的,其他平台可以自己百度 yum -y install wgetwget --no-check-certificate https://www.johnvansickle.com/ffmpeg/old-releases/ffmpeg-4.0.3-64bit-static.tar.xztar -xJf ffmpeg-4.0.3-64bit-static.ta…...

【基于HBase和ElasticSearch构建大数据实时检索项目】
基于HBase和ElasticSearch构建大数据实时检索项目 一、项目说明二、环境搭建三、编写程序四、测试流程 一、项目说明 利用HBase存储海量数据,解决海量数据存储和实时更新查询的问题;利用ElasticSearch作为HBase索引,加快大数据集中实时查询数…...

ProComponent 用法学习
相信很多同学都用过 Ant Design 这一 react 著名组件库,而 ProComponents 则是在 antd 之上进行封装的页面级组件库(指一个组件就可以搞定一个页面)。它同时也是 Ant Design Pro 中后台框架所用的主要组件库。如果你手上有要用 react 开发的中…...

巨人互动|Google海外户Google Analytics的优缺点是什么?
Google Analytics是一个由谷歌开发的网站分析工具,旨在帮助网站和移动应用程序运营者收集和分析数据,以更好地了解用户行为和改进业务。虽然Google Analytics具有许多优势,但也存在一些缺点。在本文中,我们将探讨Google Analytics…...

MySQL数据库的操作
MySQL 连接服务器 库的操作创建数据库数据库删除查看数据库进入数据库查看所在的数据库修改数据库显示创建语句查看连接情况 表的操作创建表查看数据库所有的表查看表的详细信息查看创建表时的详细信息删除表修改表名向表中插入数据在表结构中新增一列对表结构数据的修改删除表…...

Hardware-Efficient Attention for Fast Decoding
TL;DR 2025 年普林斯顿大学提出的硬件友好 attention 设计,在 MQA/GQA 与 deepseek 提出的 MLA 基础之上继续优化,提出 Grouped-Tied Attention (GTA) 和 Grouped Latent Attention (GLA),实现更高推理效率的同时也能保持较好的模型效果。 …...

Docker 部署 Python 的 Flask项目
文章目录 一、构建运行 Docker 容器1. 查找合适镜像2.本地docker 拉取镜像3.项目配置1. python项目下生成 requirements.txt 依赖文件2. 生成Dockerfile文件3.忽略不必要文件4. 构建镜像 4. 运行容器5.测试 二、常见问题与解决方案 一、构建运行 Docker 容器 1. 查找合适镜像 …...
JDK8新特性之Steam流
这里写目录标题 一、Stream流概述1.1、传统写法1.2、Stream写法1.3、Stream流操作分类 二、Stream流获取方式2.1、根据Collection获取2.2、通过Stream的of方法 三、Stream常用方法介绍3.1、forEach3.2、count3.3、filter3.4、limit3.5、skip3.6、map3.7、sorted3.8、distinct3.…...
Java线程工厂:定制线程的利器
在Java中,线程工厂(Thread Factory)是一个创建新线程的工厂。它提供了一种方式,允许你在创建线程时定制线程的属性,比如设置线程名称、线程的优先级、守护线程属性等。 线程工厂的主要目的是将线程的创建逻辑从使用线…...

96.如何使用C#实现串口发送? C#例子
Nuget包名称 System.IO.Ports 参考代码 using System; using System.IO.Ports; using System.Threading;namespace test {class Program{static void Main(){SerialPort port new SerialPort("COM1", 9600); // 配置串口port.Open();Timer timer new Timer((_) &…...

Qt学习及使用_第1部分_认识Qt---学习目的及技术准备
前言 学以致用,通过QT框架的学习,一边实践,一边探索编程的方方面面. 参考书:<Qt 6 C开发指南>(以下称"本书") 标识说明:概念用粗体倾斜.重点内容用(加粗黑体)---重点内容(红字)---重点内容(加粗红字), 本书原话内容用深蓝色标识,比较重要的内容用加粗倾…...
时间序列预测:LSTM与Prophet对比实验
时间序列预测:LSTM与Prophet对比实验 系统化学习人工智能网站(收藏):https://www.captainbed.cn/flu 文章目录 时间序列预测:LSTM与Prophet对比实验摘要引言实验设计1. 数据集选择2. 实验流程 模型架构对比1. LSTM架…...

dvwa5——File Upload
LOW 在dvwa里建一个testd2.php文件,写入一句话木马,密码password antsword连接 直接上传testd2.php文件,上传成功 MEDIUM 查看源码,发现这一关只能提交jpg和png格式的文件 把testd2.php的后缀改成jpg,上传时用bp抓包…...

VmWare Ubuntu22.04 搭建DPDK 20.11.1
一、开发环境 Ubuntu 版本 二、增加虚拟机的网卡 给虚拟机增加1个网卡,加上原来的网卡,一共2个 网络适配器作为 ssh 连接的网卡,网络适配器2作为 DPDK 运行的网卡。 三、NAT模式简介 这里待补充,网上都是那一张图,看不懂 四、使网卡名称从0开始命名 进入管理员权限 s…...

Ctrl-Crash 助力交通安全:可控生成逼真车祸视频,防患于未然
视频扩散技术虽发展显著,但多数驾驶数据集事故事件少,难以生成逼真车祸图像,而提升交通安全又急需逼真可控的事故模拟。为此,论文提出可控车祸视频生成模型 Ctrl-Crash,它以边界框、碰撞类型、初始图像帧等为条件&…...