Docker Compose 构建 EMQX 集群 实现mqqt 和websocket
EMQX 集群化管理mqqt真香
目录
#目录 /usr/emqx
容器构建
vim docker-compose.yml
version: '3'services:emqx1:image: emqx:5.8.3container_name: emqx1environment:- "EMQX_NODE_NAME=emqx@node1.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]interval: 5stimeout: 25sretries: 5networks:emqx-bridge:aliases:- node1.emqx.ioports:- 1883:1883- 8083:8083- 8084:8084- 8883:8883- 18083:18083 # volumes:# - $PWD/emqx1_data:/opt/emqx/dataemqx2:image: emqx:5.8.3container_name: emqx2environment:- "EMQX_NODE_NAME=emqx@node2.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"healthcheck:test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]interval: 5stimeout: 25sretries: 5networks:emqx-bridge:aliases:- node2.emqx.io# volumes:# - $PWD/emqx2_data:/opt/emqx/datanetworks:emqx-bridge:driver: bridge
启动
docker-compose up -d
集群状态
#查看集群状态
docker exec -it emqx1 sh -c "emqx ctl cluster status"#验证
telnet 192.168.0.15 1883
#内网
nc -zv 192.168.0.15 1883 #账户
admin
#默认密码
public
服务开放端口
1883,8083,8084,8883,18083
端口占用
EMQX 默认使用以下端口,请确保这些端口未被其他应用程序占用,并按照需求开放防火墙以保证 EMQX 正常运行。
| 端口 | 协议 | 描述 |
| 1883 | TCP | MQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。 |
| 8883 | TCP | MQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。 |
| 8083 | TCP | MQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。 |
| 8084 | TCP | MQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。 |
| 18083 | HTTP | EMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。 |
| 4370 | TCP | Erlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset。 |
| 5370 | TCP | 集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset。 |
前端js示
<!DOCTYPE html>
<html>
<head><title>MQTT WebSocket Test</title><script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body><script>// 使用提供的客户端 ID 或生成一个唯一 IDconst clientId = 'emqx_NjI4MT2';// 配置 WebSocket MQTT broker 地址const host = 'ws://127.0.0.1:8083/mqtt';// MQTT 连接选项const options = {keepalive: 60, // 心跳时间间隔clientId: clientId,protocolId: 'MQTT', // 协议 IDprotocolVersion: 5, // 使用 MQTT 5 协议clean: true, // 是否清除会话reconnectPeriod: 1000, // 重连间隔时间 (ms)connectTimeout: 30 * 1000, // 连接超时时间 (ms)username: 'admin', // 设置用户名password: 'public', // 设置密码will: {topic: 'pushRanking/1',payload: 'Connection Closed abnormally..!',qos: 0,retain: false},};console.log('Connecting mqtt client');// 连接到 MQTT Brokerconst client = mqtt.connect(host, options);// 连接成功回调client.on('connect', () => {console.log('Connected to MQTT broker');// 订阅主题 pushRanking/#,支持通配符client.subscribe('pushRanking/1', { qos: 0 }, (err) => {if (!err) {console.log('Subscribed to topic: pushRanking/1');} else {console.error('Failed to subscribe:', err);}});});// 处理接收到的消息client.on('message', (topic, message) => {console.log(`Received message from topic "${topic}": ${message.toString()}`);});// 连接错误回调client.on('error', (err) => {console.log('Connection error:', err);client.end();});// 重新连接回调client.on('reconnect', () => {console.log('Reconnecting...');});// 连接关闭回调client.on('close', () => {console.log('Connection closed');});// 模拟消息发布以测试接收setTimeout(() => {client.publish('pushRanking/1', JSON.stringify({ msg: 'hello' }), { qos: 0 });}, 5000);</script>
</body>
</html>
后端
package emqximport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""testing""time"
)func TestMQTT(t *testing.T) {// 创建 EMQX 客户端实例client := NewEMQXClient("tcp://127.0.0.1:1883", "test-client", "admin", "QfTzLy3cop9NOGWj")// 连接到 EMQXif err := client.Connect(); err != nil {fmt.Printf("Failed to connect: %v\n", err)return}defer client.Disconnect()// 订阅主题client.Subscribe("testtopic/#", 1, func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Message received: %s\n", msg.Payload())})// 发布消息client.Publish("testtopic/1", 1, false, "Hello from Golang!")// 保持连接一段时间以接收消息time.Sleep(10 * time.Second)
}/*长连接的场景 DEMOfunc main() {client := emqxclient.NewEMQXClient("tcp://broker.emqx.io:1883", "test-client", "", "")if err := client.Connect(); err != nil {fmt.Printf("Failed to connect: %v\n", err)return}// 使用 defer 确保程序退出时断开连接defer client.Disconnect()// 订阅主题client.Subscribe("test/topic", 1, func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s\n", msg.Payload())})// 发布消息client.Publish("test/topic", 1, false, "Hello from Golang!")// 捕获退出信号signalChan := make(chan os.Signal, 1)signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)fmt.Println("Running... Press Ctrl+C to exit.")<-signalChanfmt.Println("Exiting...")
}*//*JS 调用如下
import mqtt from 'mqtt';const brokerURL = 'ws://broker.emqx.io:8083/mqtt'; // WebSocket 连接地址
const clientID = `mqttjs_${Math.random().toString(16).substr(2, 8)}`;// 创建客户端
const client = mqtt.connect(brokerURL, {clientId: clientID,username: '', // 如需要认证,填入用户名password: '', // 如需要认证,填入密码
});// 连接事件
client.on('connect', () => {console.log('Connected to EMQX');// 订阅主题client.subscribe('test/topic', (err) => {if (!err) {console.log('Subscribed to topic: test/topic');} else {console.error('Failed to subscribe:', err);}});// 发布消息client.publish('test/topic', 'Hello from JavaScript!');
});// 接收消息事件
client.on('message', (topic, message) => {console.log(`Received message on topic "${topic}": ${message.toString()}`);
});// 错误事件
client.on('error', (err) => {console.error('Connection error:', err);
});*/
common封装调用
package emqximport ("fmt""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)type EMQXClient struct {client mqtt.Client
}// NewEMQXClient 初始化 EMQX 客户端
func NewEMQXClient(broker string, clientID string, username string, password string) *EMQXClient {opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientID).SetUsername(username).SetPassword(password).SetKeepAlive(60 * time.Second).SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message on topic: %s, message: %s\n", msg.Topic(), msg.Payload())}).SetPingTimeout(1 * time.Second)client := mqtt.NewClient(opts)return &EMQXClient{client: client}
}// Connect 连接到 EMQX
func (c *EMQXClient) Connect() error {token := c.client.Connect()if token.Wait() && token.Error() != nil {return token.Error()}fmt.Println("Connected to EMQX broker")return nil
}// Publish 发布消息
func (c *EMQXClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {token := c.client.Publish(topic, qos, retained, payload)token.Wait()return token.Error()
}// Subscribe 订阅主题
func (c *EMQXClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error {token := c.client.Subscribe(topic, qos, callback)token.Wait()return token.Error()
}// Unsubscribe 取消订阅
func (c *EMQXClient) Unsubscribe(topics ...string) error {token := c.client.Unsubscribe(topics...)token.Wait()return token.Error()
}// Disconnect 断开连接
func (c *EMQXClient) Disconnect() {c.client.Disconnect(250)fmt.Println("Disconnected from EMQX broker")
}

相关文章:
Docker Compose 构建 EMQX 集群 实现mqqt 和websocket
EMQX 集群化管理mqqt真香 目录 #目录 /usr/emqx 容器构建 vim docker-compose.yml version: 3services:emqx1:image: emqx:5.8.3container_name: emqx1environment:- "EMQX_NODE_NAMEemqxnode1.emqx.io"- "EMQX_CLUSTER__DISCOVERY_STRATEGYstatic"- …...
Spring 过滤器:OncePerRequestFilter 应用详解
在Web应用中,过滤器(Filter)是一个强大的工具,它可以在请求到达目标资源之前或响应返回客户端之前对请求或响应进行拦截和处理。然而,在某些情况下,我们可能希望确保过滤器逻辑在一次完整的HTTP请求中仅执行…...
3.CSS字体属性
3.1字体系列 CSS使用font-family属性定义文本的字体系列。 p{font-family:"微软雅黑"} div{font-family:Arial,"Microsoft Yahei",微软雅黑} 3.2字体大小 css使用font-size属性定义字体大小 p{ font-size:20px; } px(像素)大小是我们网页的最常用的单…...
微信小程序 单选多选radio/checkbox 纯代码分享
单选按钮 <radio-group class"radiogroup" bindchange"radioChange"> <label class"radio" wx:for"{{items}}"> <radio value"{{item.name}}" checked"{{item.checked}}" /> {{item.value}} &…...
k8s 部署meilisearch UI
https://github.com/riccox/meilisearch-ui 拉取镜像 sudo docker pull riccoxie/meilisearch-ui:latestk8s 部署 apiVersion: v1 kind: Service metadata:name: meilisearch-uinamespace: meilisearch spec:type: NodePortselector:app: meilisearch-uiports:- port: 24900…...
gitlab 还原合并请求
事情是这样的: 菜鸡从 test 分支切了个名为 pref-art 的分支出来,发布后一机灵,发现错了,于是在本地用 git branch -d pref-art 将该分支删掉了。之后切到了 prod 分支,再切出了一个相同名称的 pref-art 分支出来&…...
ChatGPT最新版本“o3”的概要
o3简介 o3于2024年12月20日发布——也就是OpenAI 12天直播的最后一天。目前处于安全性测试阶段。它是o1的继任者,旨在处理更复杂的推理任务。o3特别针对数学、科学和编程等领域进行了优化。 o3在多项基准测试中表现出色。例如,在ARC-AGI基准测试中&…...
uniapp——App下载文件,保存、打开文件(二)
uniapp如何下载文件、保存、打开文件 时光荏苒,2024即将过去! 迈向2025,祝大家新的一年工作顺利、万事如意,少一点BUG,涨一点工资…↖(ω)↗ 文章目录 uniapp如何下载文件、保存、打开文件下载文件保存并打开文件处理 …...
Postman接口测试05|实战项目笔记
目录 一、项目接口概况 二、单接口测试-登录接口:POST 1、正例 2、反例 ①姓名未注册 ②密码错误 ③姓名为空 ④多参 ⑤少参 ⑥无参 三、批量运行测试用例 四、生成测试报告 1、Postman界面生成 2、Newman命令行生成 五、token鉴权(“…...
【paddle】初次尝试
张量 张量是 paddlepaddle, torch, tensorflow 等 python 主流机器学习包中唯一通货变量,因此应当了解其基本的功能。 张量 paddle.Tensor 与 numpy.array 的转化 import paddle as paddle import matplotlib.pyplot as plt apaddle.to_t…...
01-2023年上半年软件设计师考试java真题解析
1.真题内容 在某系统中,类 Interval(间隔) 代表由下界(lower bound(边界))上界(upper bound )定义的区间。 要求采用不同的格式显示区间范围。 如[lower bound , upper bound ]、[ lower bound … upper bound ]、[ lower bou nd - upper bound &#x…...
一文讲清楚CSS3新特性
文章目录 一文讲清楚CSS3新特性1. 新增选择器特性2. 新增的样式3. 新增布局方式 一文讲清楚CSS3新特性 1. 新增选择器特性 层次选择器(div~p)选择前面有div的p元素伪类选择器 :first-of-type 表示⼀组同级元素中其类型的第⼀个元素:last-of-type 表示⼀组同级元素中其类型的最…...
系统设计案例:设计 Spotify
https://levelup.gitconnected.com/system-design-interview-question-design-spotify-4a8a79697dda 这是一道系统设计面试题,即设计 Spotify。在真正的面试中,你通常会关注应用程序的一两个主要功能,但在本文中,我想从高层次概述…...
太速科技-633-4通道2Gsps 14bit AD采集PCie卡
4通道2Gsps 14bit AD采集PCie卡 一、板卡概述 二、性能指标 板卡功能 参数 内容 ADC 芯片型号 AD9689 路数 4路ADC, 采样率 2Gsps 数据位 14bit 数字接口 JESD204B 模拟接口 交流耦合 模拟输入 1V 连接器 6路 SMA 输入阻抗 50Ω 模拟指…...
图片叠加拖拽对比展示效果实现——Vue版
图片叠加拖拽对比展示效果实现——Vue版 项目中遇见一个需求:2张图片按竖线分割,左右两侧分别展示对应图片,通过滚动条拖动对应展示图片区域;; 网上搜索了下,没有找到直接可用的组件,这里自己封装了一个次功…...
结合长短期记忆网络(LSTM)和无迹卡尔曼滤波器(UKF)的技术在机器人导航和状态估计中的应用前景
结合长短期记忆网络(LSTM)和无迹卡尔曼滤波器(UKF)的技术在机器人导航和状态估计中具有广泛的应用前景。如有滤波、导航方面的代码定制需求,可通过文末卡片联系作者获得帮助 文章目录 结合LSTM和UKF的背景结合LSTM和UKF的优势应用实例研究现状MATLAB代码示例结论结合LSTM和…...
【MATLAB APP Designer】小波阈值去噪(第一期)
代码原理及流程 小波阈值去噪是一种信号处理方法,用于从信号中去除噪声。这种方法基于小波变换,它通过将信号分解到不同的尺度和频率上来实现。其基本原理可以分为以下几个步骤: (1)小波变换:首先对含噪信…...
ClickHouse副本搭建
一. 副本概述 副本的目的主要是保障数据的高可用性,ClickHouse中的副本没有主从之分。所有的副本都是平等的。 副本写入流程: 二. 副本搭建 1. 实验环境 hadoop1(192.168.47.128) hadoop2(192.168.47.129)2. 修改配置文件 修改两台主机/etc/click…...
K3知识点
提示:文章 文章目录 前言一、顺序队列和链式队列题目 顺序队列和链式队列的定义和特性实际应用场景顺序表题目 链式队列 二、AVL树三、红黑树四、二叉排序树五、树的概念题目1左子树右子树前序遍历、中序遍历,后序遍历先根遍历、中根遍历左孩子右孩子题目…...
cocos creator 3.x版本如何添加打开游戏时首屏加载进度条
前言 项目有一个打开游戏时添加载入进度条的需求。这个功能2.X版本是自带的,不知为何在3.X版本中移除了。 实现 先说一下解决思路,就是在引擎源码加载场景的位置插入一个方法,然后在游戏入口HTML处监听即可。 1.找到对应源码脚本 在coco…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
破解路内监管盲区:免布线低位视频桩重塑停车管理新标准
城市路内停车管理常因行道树遮挡、高位设备盲区等问题,导致车牌识别率低、逃费率高,传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法,正成为破局关键。该设备安装于车位侧方0.5-0.7米高度,直接规避树枝遮…...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析
Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么?它的作用是什么? Spring框架的核心容器是IoC(控制反转)容器。它的主要作用是管理对…...
