当前位置: 首页 > news >正文

golang开源的可嵌入应用程序高性能的MQTT服务

golang开源的可嵌入应用程序高性能的MQTT服务

什么是MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息传输协议,设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发,现已成为OASIS标准。
MQTT的设计目标是提供一种简单、轻量、可扩展的协议,适用于各种设备和网络条件。它通常用于物联网(IoT)和传感器网络,其中设备需要以有效的方式进行通信,并且资源(如带宽和电池寿命)可能受到限制。
MQTT的简单设计和适用性使其成为物联网中常用的通信协议之一。它被广泛用于传感器网络、嵌入式设备、移动应用程序和其他场景中,提供了一种可靠、高效的消息传输机制。

什么是Mochi-MQTT

源代码地址:https://github.com/mochi-mqtt/server

Mochi MQTT 是一个完全兼容 MQTT v5的可嵌入的中间件/服务器,完全使用 Go 语言编写,旨在用于遥测和物联网项目的开发。它可以作为独立的二进制文件使用,也可以嵌入到你自己的应用程序中库来使用,经过提出的设计以实现问题的轻量化和快速部署,同时也非常重视代码的质量和可维护性。

用途

物联网项目开发时,常常需要使用MQTT协议对设备接入,在很多场景中,私有化部署物联网系统时资源比较少,性能要求高,一些大型的MQTT服务不满足要求,而且代码不可控。
还有在边缘场景下,需要在边缘网关,边缘控制器设备上部署物联网系统,但是边缘网关的资源很少,内存大约只有4G,所以使用java开发的物联网系统就很难部署上去;使用C/C++开发效率又很低,所以Go语言是最合适的,
Mochi-MQTT刚好又完全是Go编写的开源的,可以嵌入到自己的程序启动。

Mochi MQTT独立部署

Golang的环境配置这里不做说明,请看我前面的博文说明

Mochi MQTT 可以作为独立的中间件使用。只需拉取此仓库代码,然后在 cmd 文件夹中运行 cmd/main.go ,默认将开启下面几个服务端口, tcp (:1883)、websocket (:1882) 和服务状态监控 (:8080) 。

cd cmd
go build -o mqtt && ./mqtt

docker部署

可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run mochimqtt/server

也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest

嵌入自己项目运行和开发

下载Mochi MQTT包

go get github.com/mochi-mqtt/server/v2

将Mochi MQTT作为包导入使用, 示例代码如下

import (mqttServer "github.com/mochi-mqtt/server/v2""github.com/mochi-mqtt/server/v2/listeners""github.com/mochi-mqtt/server/v2/packets"
)var Server *mqttServer.Serverfunc ServerMqttInit() {// 创建新的 MQTT 服务器。Server = mqttServer.New(&mqttServer.Options{InlineClient: true, // 启动内联客户端})// 初始化数据库实例edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),productDao:     productDao.NewProductRepository(),}// 添加自定义权限方法err := Server.AddHook(edge, nil)if err != nil {log.Fatal(err)}// 在1883端口上创建一个 TCP 服务端。tcp := listeners.NewTCP("t1", ":1883", nil)err = Server.AddListener(tcp)if err != nil {log.Fatal(err)}// 在1882端口上创建一个 Websocket 服务端。ws := listeners.NewWebsocket("ws1", ":1882", nil)err = server.AddListener(ws)if err != nil {log.Fatal(err)}go func() {err := Server.Serve()if err != nil {log.Fatal(err)}}()
}type edgeHook struct {mqttServer.HookBasedeviceDao      deviceDao.DeviceRepositoryproductDao     productDao.ProductRepository
}func (h *edgeHook) ID() string {return "mqtt-auth"
}func (h *edgeHook) Provides(b byte) bool {// 实现钩子函数return bytes.Contains([]byte{//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。mqttServer.OnConnectAuthenticate,//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。mqttServer.OnACLCheck,//在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。mqttServer.OnSessionEstablish,//当客户端因任何原因断开连接时调用。mqttServer.OnDisconnect,//当客户端向订阅者发布消息后调用。mqttServer.OnPublished,}, []byte{b})
}// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h *edgeHook) OnConnectAuthenticate(cl *mqttServer.Client, pk packets.Packet) bool {username := string(pk.Connect.Username)password := string(pk.Connect.Password)if username == "" || len(username) == 0 {return false}if password == "" || len(password) == 0 {return false}return true
}// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h *edgeHook) OnACLCheck(cl *mqttServer.Client, topic string, write bool) bool {username := string(cl.Properties.Username)if username == "" || len(username) == 0 {return false}if topic == "" || len(topic) == 0 {return false}return true
}// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h *edgeHook) OnSessionEstablish(cl *mqttServer.Client, pk packets.Packet) {username := string(cl.Properties.Username)if username == "" || len(username) == 0 {return}//设备连接MQTT成功后保存设备在线状态
}// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h *edgeHook) OnDisconnect(cl *mqttServer.Client, err error, expire bool) {username := string(cl.Properties.Username)if username == "" || len(username) == 0 {return}//设备断开MQTT成功后保存设备离线状态
}// OnPublished 当客户端向订阅者发布消息后调用。
func (h *edgeHook) OnPublished(cl *mqttServer.Client, pk packets.Packet) {Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))//收到客户端消息后做业务逻辑处理
}// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {err := Server.Publish(topic, msg, false, 0)if err != nil {Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)return false}return true
}// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,"topic", pk.TopicName, "payload", string(pk.Payload))callback(pk.TopicName, pk.Payload)}_ = Server.Subscribe(topic, subscriptionId, callbackFn)
}// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {_ = Server.Unsubscribe(topic, subscriptionId)
}func main() {// 创建信号用于等待服务端关闭信号sigs := make(chan os.Signal, 1)done := make(chan bool, 1)signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)go func() {<-sigsdone <- true}()<-doneLog.Error("caught signal, stopping...")Server.Close()Log.Error("main.go finished")
}

监控MQTT指标信息

mqttRouters := r.Group("/mqtt", func(context *gin.Context) {}){mqttRouters.GET("stats", func(c *gin.Context) {util.R(c, nil, mqtt.Server.Info)})}

在这里插入图片描述

详情使用指南请看:https://github.com/mochi-mqtt/server

相关文章:

golang开源的可嵌入应用程序高性能的MQTT服务

golang开源的可嵌入应用程序高性能的MQTT服务 什么是MQTT&#xff1f; MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的、开放的消息传输协议&#xff0c;设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发&#xf…...

uniapp微信小程序-请求二次封装(直接可用)

一、请求封装优点 代码重用性&#xff1a;通过封装请求&#xff0c;你可以在整个项目中重用相同的请求逻辑。这样一来&#xff0c;如果 API 发生变化或者需要进行优化&#xff0c;你只需在一个地方修改代码&#xff0c;而不是在每个使用这个请求的地方都进行修改。 可维护性&a…...

UE4 C++ 结构体

先在UCLASS()前写入&#xff1a; USTRUCT(BlueprintType) struct FMyStruct //必须以"F"开头 {GENERATED_BODY() //必须添加“GENERATED_BODY()”UPROPERTY(EditAnywhere, BlueprintReadWrite, Category "MyStruct1")int32 Health;UPROPERTY(EditAnywher…...

软件工程知识梳理0-概述

学好软件工程就必须理解软件工程到底是干什么的&#xff0c;为什么需要软件工程&#xff0c;以及怎么干的&#xff01;只有理解了软件工程的本质&#xff0c;才能更好的理解软件工程中各种工程手段和方法的目的。 个人开发模式 —> 小作坊开发模式 —> 软件工程开发模式 …...

贪吃蛇---C语言---详解

引言 C语言已经学了不短的时间的&#xff0c;这期间已经开始C和Python的学习&#xff0c;想给我的C语言收个尾&#xff0c;想起了小时候见过别人的老人机上的贪吃蛇游戏&#xff0c;自己父母的手机又没有这个游戏&#xff0c;当时成为了我的一大遗憾&#xff0c;这两天发现C语…...

Airflow原理浅析

⭐️ airflow基本原理 Apache Airflow 是一个开源的工作流自动化工具&#xff0c;它用于调度和管理复杂的数据工作流。Airflow 的原理基于有向无环图&#xff08;DAG&#xff09;的概念&#xff0c;它通过编写和组织任务的有向图来描述工作流程。 以下是 Apache Airflow 的一…...

uniapp 使用canvas 画海报,有手粘贴即可用

html部分 <view click"doposter">下载海报</view> <canvas canvas-id"myCanvas" type2d style"width: 370px; height: 550px;opcity:0;position: fixed;z-index:-1;" id"myCanvas" />js 部分 drawBackground() {c…...

Vite+Vue3+TS 引入使用Cesium.js

申请 Cesium Token 进入Cesium 注册账号 cesium 离谱的是 E宝 (Epic) 居然可以快捷登录&#xff1f;&#xff01; 登录后点击导航栏的 Access Token 再右侧即可看到默认Token 安装&引入 # Cesium pnpm pnpm install cesium# 如果项目同时存在Three.js 需避免使用pnpm T…...

Cocos creator 动作系统

动作系统简介 是用于控制物体运动的一套系统&#xff0c;完全依赖代码进行实现&#xff0c;动态调节节点的移动。 移动 cc.moveTo 移动到某个坐标&#xff08;x,y&#xff09; //1秒时间内&#xff0c;移动到0,0let action1 cc.moveTo(1,0,0)this.node.runAction(action1)c…...

对Spring当中AOP的理解

AOP(面向切面编程)全称Aspect Oriented Programminge AOP就是把系统中重复的代码抽取出来&#xff0c;单独开发&#xff0c;在系统需要时&#xff0c;使用动态代理技术&#xff0c;在不修改源码的基础上&#xff0c;将单独开发的功能通知织入(应用)到系统中的过程&#xff0c;完…...

【Vue】2-8、Axios 网络请求

cdn&#xff1a;<script src"https://unpkg.com/axios/dist/axios.min.js"></script> 注&#xff1a;使用 CDN 链接就可以不需要去下载对应的 js 文件到本地&#xff0c;只需要联网即可使用&#xff0c;可以减少项目的体积 <!DOCTYPE html> <…...

Vue中嵌入原生HTML页面

Vue中嵌入html页面并相互通信 需求&#xff1a;b2b支付需要从后获取到数据放到form表单提交跳转&#xff0c;如下&#xff1a; 但是vue目前暂时没找到有类似功能相关文档&#xff0c;所以我采用iframe嵌套的方式 1. Vue中嵌入Html <iframe src"/static/gateway.htm…...

streampark+flink一键整库或多表同步mysql到doris实战

streamparkflink一键整库或多表同步mysql到doris实战&#xff0c;此应用一旦推广起来&#xff0c;那么数据实时异构时&#xff0c;不仅可以减少对数据库的查询压力&#xff0c;还可以减少数据同步时的至少50%的成本&#xff0c;还可以减少30%的存储成本&#xff1b; streampar…...

Vim实战:使用 Vim实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…...

学习MySQL ENUM数据类型

学习MySQL ENUM数据类型 ENUM是MySQL中的一个字符串对象&#xff0c;它允许从预定义的值列表中选择一个值。这种数据类型特别适用于值的数量有限且不太可能变化的情况。 定义ENUM类型 在定义ENUM类型时&#xff0c;你需要明确列出所有可能的字符串值。例如&#xff1a; CRE…...

88.合并两个有序数组

88.合并两个有序数组 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺序 排列。 **注意&#xff1a;**最…...

python查询xml类别

第一章 导包 import os from xml.etree.ElementTree import ElementTree第二章 存储类别 # 定义一个空集合用于存储类别 classes set()第三章 遍历所有XML文件 # 遍历指定目录下的所有XML文件 for filename in os.listdir(/home/li/PycharmProjects/Annotations):if filena…...

nginx配置及性能优化

1. 请简述nginx的工作原理&#xff1f; Nginx的工作原理基于事件驱动模型和异步非阻塞I/O处理机制。 具体来说&#xff0c;Nginx接收到客户端的请求后&#xff0c;会将该请求映射到配置文件中指定的location block。这个过程中&#xff0c;Nginx本身并不执行实际的工作&#…...

阿里云如何找回域名,进行添加或删除?

权威域名管理介绍说明&#xff0c;包含添加域名、删除域名、找回域名、域名分组等操作介绍。 一、添加域名 非阿里云注册域名或子域名如需使用云解析DNS&#xff0c;需要通过添加域名功能&#xff0c;将主域名或子域名添加到云解析控制台&#xff0c;才可以启用域名解析服务。…...

机器学习 低代码 ML:PyCaret 的使用

✅作者简介&#xff1a;人工智能专业本科在读&#xff0c;喜欢计算机与编程&#xff0c;写博客记录自己的学习历程。 &#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&…...

前端入门第二天

目录 一、列表、表格、表单 二、列表&#xff08;布局内容排列整齐的区域&#xff09; 1.无序列表&#xff08;不规定顺序&#xff09; 2.有序列表&#xff08;规定顺序&#xff09; 3.定义列表&#xff08;一个标题多个分类&#xff09; 三、表格 1.表格结构标签 2.合并…...

Django实现富文本编辑器Ckeditor5图片上传功能

上一章我们已经为我们的博客继承了富文本编辑器Ckeditor5,虽然已经可以对文字进行排版处理,虽然已经可以通过插入图片的url地址来插入图片,但还无法通过本地上传图片,那么我们这个富文本编辑器就是不完整的,这一章我们将实现上传图片功能! ​ Ckeditor5图片上传采用的是…...

【C语言】epoll_wait / select

一、epoll_wait和select对比 1. 阻塞和非阻塞 在Linux C语言中进行socket编程时&#xff0c;epoll_wait 和 select 都是用于多路I/O复用的系统调用&#xff0c;但是它们的行为可以设置为阻塞和非阻塞模式&#xff0c;这取决于调用它们时所使用的参数。 让我们分别看看 epoll…...

Java 数据抓取

大家好我是苏麟 , 今天聊聊数据抓取 . 大家合理使用 注意&#xff0c;爬虫技术不能滥用&#xff0c;干万不要给别人的系统造成压力、不要侵犯他人权益! 数据抓取 实质上就是java程序模拟浏览器进行目标网站的访问&#xff0c;无论是请求目标服务器的接口还是请求目标网页内容…...

深度学习之处理多维特征的输入

我们首先来看一个糖尿病的数据集&#xff1a; 在数据集中&#xff0c;我们称每一行叫做sample&#xff0c;表示一个样本&#xff0c;称每一列是feature&#xff0c;也就是特征在数据库里面这就是一个关系表&#xff0c;每一行叫做记录&#xff0c;每一列叫做字段。 每一个样本都…...

西瓜书读书笔记整理(十二) —— 第十二章 计算学习理论(下)

第十二章 计算学习理论&#xff08;下&#xff09; 12.4 VC 维&#xff08;Vapnik-Chervonenkis dimension&#xff09;12.4.1 什么是 VC 维12.4.2 增长函数&#xff08;growth function&#xff09;、对分&#xff08;dichotomy&#xff09;和打散&#xff08;shattering&…...

初探分布式链路追踪

本篇文章&#xff0c;主要介绍应用如何正确使用日志系统&#xff0c;帮助用户从依赖、输出、清理、问题排查、报警等各方面全面掌握。 可观测性 可观察性不单是一套理论框架&#xff0c;而且并不强制具体的技术规格。其核心在于鼓励团队内化可观察性的理念&#xff0c;并确保由…...

闭包的理解?闭包使用场景

说说你对闭包的理解&#xff1f;闭包使用场景 #一、是什么 一个函数和对其周围状态&#xff08;lexical environment&#xff0c;词法环境&#xff09;的引用捆绑在一起&#xff08;或者说函数被引用包围&#xff09;&#xff0c;这样的组合就是闭包&#xff08;closure&#…...

openssl3.2 - 帮助文档的整理

文章目录 openssl3.2 - 帮助文档的整理概述笔记整理后, 非空的文件夹如下整理后, 留下的有点用的文件列表如下备注END openssl3.2 - 帮助文档的整理 概述 openssl3.2源码工程编译安装完, 对于库的使用者, 有用的文档, 远不止安装的那些html. 用everything查找, 配合手工删除,…...

中移(苏州)软件技术有限公司面试问题与解答(5)—— Linux进程调度参数调优是如何通过代码实际完成的1

接前一篇文章&#xff1a;中移&#xff08;苏州&#xff09;软件技术有限公司面试问题与解答&#xff08;0&#xff09;—— 面试感悟与问题记录 本文对于中移&#xff08;苏州&#xff09;软件技术有限公司面试问题中的“&#xff08;11&#xff09;Linux进程调度参数调优是如…...