RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表
RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表
文章目录
- RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表
- 1. MQTT概览
- 2. MQTT 5.0 特性
- 1. 特性概要
- 2. Docker中安装RabbitMQ及启用MQTT5.0协议
- 3. MQTT 5.0 功能列表
- 1. 消息过期
- 1. 描述
- 2. 举例
- 3. 实现
- 2. 订阅标识符
- 1.描述
- 2. 举例
- 3. 实现
- 3. 订阅选项
- 1. 描述
- 2. 举例例
- 4 所有ACK上的原因代码
- 1. 描述
- 2. 实现
- 5. 用户属性
- 1. 描述
- 2. 示例 PUBLISH 数据包
- 3. 示例 CONNECT 数据包
- 6. 有效负载格式和内容类型
- 1. 描述
- 2. 举例
- 7. 请求/响应
- 1. 描述
- 2. 举例
- 8. 分配的客户端标识符
- 1. 描述
- 2. 实现
- 9. 主题别名
- 1. 描述
- 2. 实现
- 10. 流量控制
- 1. 描述
- 2. 实现
- 11. 最大数据包大小
- 1. 描述
- 2. 举例
- 12. 服务器启动的断开连接
- 1. 描述
- 2. 实现
- 13. 会话到期
- 1. 描述
- 2. 实现
- 3. 举例
- 14. 会延迟
- 1. 描述
- 2. 实现
- 3. 举例
- 15. 可选的服务器功能可用性
- 1. 描述
- 2. 实现
- 4. 局限性
- 1. MQTT 5.0 特定限制
- 1. 共享订阅
- 1. 延迟和保留的遗嘱消息
- 2. 非 MQTT 5.0 特定限制
- 1. 保留的消息
- 5. 总结
- 6. 相关链接
RabbitMQ 3.12 中发布的原生 MQTT 为物联网用例提供了显著的可扩展性和性能改进。
RabbitMQ 3.13 将支持 MQTT 5.0,因此将成为我们使 RabbitMQ 成为领先的 MQTT 代理之一的下一个重要步骤。
这篇博文解释了如何在 RabbitMQ 中使用新的 MQTT 5.0 功能。
1. MQTT概览
MQTT 是物联网 (IoT) 的标准协议。
物联网远程设备在连接到代理时网络质量可能较差。 因此,MQTT是轻量级的:MQTT协议头很小,可以节省网络带宽。
由于物联网设备可能经常断开连接并重新连接(想象一下一辆汽车驶过隧道),MQTT 也很高效:与其他消息传递协议相比,客户端通过更短的握手进行连接和身份验证。
MQTT协议已经存在了很多年。 如下表所示,最新的 MQTT 协议版本为 5.0。
MQTT 版本 | CONNECT 数据包中的协议版本 | MQTT 规范发布年份 | 自 Year 以来的 RabbitMQ 支持(版本) |
---|---|---|---|
3.1 | 3 | 2010 | 2012 (3.0) |
3.1.1 | 4 | 2014 | 2014 (3.3) |
5.0 | 5 | 2019 | 2024 (3.13) |
值得一提的是,面向用户的协议版本和“内部”协议版本(也称为协议级别)是有区别的。 后者在 CONNECT 数据包中从客户端发送到服务器。 由于面向用户的协议版本 3.1.1 映射到内部协议版本 4,为了避免进一步的混淆,MQTT 委员会决定跳过面向用户的版本 4.0,以便面向用户的版本 5.0 映射到内部协议版本 5。
2. MQTT 5.0 特性
1. 特性概要
附录 C. MQTT v5.0 中的新功能摘要提供了 MQTT 5.0 新功能的完整列表。
由于您在 Web 上找到了很棒的 MQTT 5.0 资源,包括说明性图表和使用模式,因此这篇博文仅关注 RabbitMQ 的细节。 本节介绍 PR #7263 中实现的最重要功能。 对于每个功能,我们提供了一个如何将其与 RabbitMQ 一起使用的示例,或者概述了如何在 RabbitMQ 中实现它的高级描述。
2. Docker中安装RabbitMQ及启用MQTT5.0协议
要自己运行示例,请启动 RabbitMQ 服务器 3.13, 例如,使用以下 Docker 镜像标记:
docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 rabbitmq:3.13.0-management
在另一个终端窗口中,启用 MQTT 插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
由于 MQTT 插件是动态启用的,因此 MQTT 插件定义的功能标志被禁用。 启用所有功能标志,包括功能标志:mqtt_v5
docker exec rabbitmq rabbitmqctl enable_feature_flag all
现在,列出功能标志应显示所有功能标志都已启用:
docker exec rabbitmq rabbitmqctl list_feature_flags --formatter=pretty_table
以下示例使用 MQTTX CLI V1.9.4。 我们使用 CLI 而不是图形 UI,以便您可以通过复制粘贴命令轻松运行示例。
所有新功能也适用于 RabbitMQ Web MQTT 插件。
3. MQTT 5.0 功能列表
1. 消息过期
1. 描述
可以为发布到代理的每条消息设置以秒为单位的到期间隔。 如果在该过期时间间隔内未使用邮件,则该邮件将被丢弃或死信。
2. 举例
为 主题 创建订阅。 这将在 RabbitMQ 中创建一个队列。 通过键入终端断开客户端连接。 由于我们使用 600 秒的会话到期间隔,因此此队列将再存在 10 分钟。t/1``Ctrl+C
mqttx sub --client-id sub-1 --topic t/1 --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/1...
✔ Subscribed to t/1
^C
将消息发布到同一主题,消息过期间隔为 30 秒:
mqttx pub --topic t/1 --message m1 --message-expiry-interval 30 --qos 1
… Connecting...
✔ Connected
… Message publishing...
✔ Message published
在接下来的 30 秒内,列出队列:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 1 │
└─────────────────────────────┴─────────┴──────────┘
等待 30 秒,然后再次列出队列:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues
┌─────────────────────────────┬─────────┬──────────┐
│ name │ type │ messages │
├─────────────────────────────┼─────────┼──────────┤
│ mqtt-subscription-sub-1qos1 │ classic │ 0 │
└─────────────────────────────┴─────────┴──────────┘
该消息已过期,因为客户端尚未连接到代理以使用该消息。 如果设置了死字策略,则邮件将死信发送到交易所。 在我们的例子中,死字被禁用。 查询 Prometheus 端点可证明经典队列中有 1 条消息已过期。sub-1
curl --silent localhost:15692/metrics | grep rabbitmq_global_messages_dead_lettered_expired_total
# TYPE rabbitmq_global_messages_dead_lettered_expired_total counter
# HELP rabbitmq_global_messages_dead_lettered_expired_total Total number of messages dead-lettered due to message TTL exceeded
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_classic_queue",dead_letter_strategy="disabled"} 1
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_least_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="at_most_once"} 0
rabbitmq_global_messages_dead_lettered_expired_total{queue_type="rabbit_quorum_queue",dead_letter_strategy="disabled"} 0
另一个有趣的功能是以下要求:
服务器发送到客户端的 PUBLISH 数据包必须包含设置为接收值减去应用程序消息在服务器中等待的时间的消息到期间隔。
向代理发送第二条消息,消息到期间隔为 60 秒:
mqttx pub --topic t/1 --message m2 --message-expiry-interval 60 --qos 1
等待 20 秒,然后重新连接订阅客户端:
mqttx sub --client-id sub-1 --topic t/1 --no-clean --session-expiry-interval 0 --qos 1 --output-mode clean
{"topic": "t/1","payload": "m2","packet": {..."properties": {"messageExpiryInterval": 40}}
}
根据 MQTT 5.0 协议规范的规定,客户端接收第二条消息,消息到期间隔设置为 40 秒: 代理接收的 60 秒减去消息在代理中等待的 20 秒。
3. 实现
MQTT 5.0 消息到期是在 RabbitMQ 中使用每条消息 TTL 实现的,类似于 AMQP 0.9.1 发布者中的字段。expiration
2. 订阅标识符
1.描述
客户端可以在 SUBSCRIBE 数据包中设置订阅标识符。 如果客户端因该订阅而收到消息,则代理会将该订阅标识符包含在 PUBLISH 数据包中。
订阅标识符的用例列在 SUBSCRIBE 操作部分。
2. 举例
从同一客户端向服务器发送 3 个单独的 SUBSCRIBE 数据包,每个数据包具有不同的主题过滤器和不同的订阅标识符:
mqttx sub --client-id sub-2 --topic t/1 --subscription-identifier 1 --session-expiry-interval 600
^C
mqttx sub --client-id sub-2 --topic t/2 --subscription-identifier 2 --session-expiry-interval 600 --no-clean
^C
mqttx sub --client-id sub-2 --topic "t/#" --subscription-identifier 3 --session-expiry-interval 0 --no-clean --output-mode clean
在第二个终端窗口中,我们看到从同一队列到同一主题交换的 3 个绑定,每个绑定都具有不同的路由键:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings \source_name source_kind destination_name destination_kind routing_key
┌─────────────┬─────────────┬─────────────────────────────┬──────────────────┬─────────────────────────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ mqtt-subscription-sub-2qos0 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.# │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.1 │
├─────────────┼─────────────┼─────────────────────────────┼──────────────────┼─────────────────────────────┤
│ amq.topic │ exchange │ mqtt-subscription-sub-2qos0 │ queue │ t.2 │
└─────────────┴─────────────┴─────────────────────────────┴──────────────────┴─────────────────────────────┘
第一个条目是与默认交换的隐式绑定。
每个具有 MQTT 主题筛选器的 MQTT 订阅对应一个带有绑定键的 AMQP 0.9.1 绑定。 准确地说,表列的名称错误:应该改为调用它。 MQTT 中的主题级分隔符是 “” 字符,而 AMQP 0.9.1 主题交换中的主题级分隔符是 “” 字符。routing_key``binding_key``/``.
再次在第二个终端窗口中,向主题发送消息:t/1
mqttx pub --topic t/1 --message m1
(订阅客户端的)第一个终端窗口接收以下 PUBLISH 数据包:
{"topic": "t/1","payload": "m1","packet": {..."properties": {"subscriptionIdentifier": [1,3]}}
}
它包含订阅标识符 1 和 3,因为 topic filters 和 match topic .t/1``t/#``t/1
同样,如果向主题发送第二条消息,订阅客户端将收到包含订阅标识符 2 和 3 的 PUBLISH 数据包。t/2
3. 实现
订阅标识符是 MQTT 会话状态的一部分。 因此,在客户端断开连接时,订阅标识符必须保留在服务器的数据库中,直到 MQTT 会话结束。 RabbitMQ 将订阅标识符存储在绑定参数中:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_bindings routing_key arguments
┌─────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────┐
│ routing_key │ arguments │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-2qos0 │ │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.# │ {mqtt_subscription_opts,0,false,false,0,3}{<<"x-binding-key">>,longstr,<<"t.#">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.1 │ {mqtt_subscription_opts,0,false,false,0,1}{<<"x-binding-key">>,longstr,<<"t.1">>} │
├─────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────┤
│ t.2 │ {mqtt_subscription_opts,0,false,false,0,2}{<<"x-binding-key">>,longstr,<<"t.2">>} │
└─────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────┘
绑定参数的确切结构并不重要,并且可能会在将来的 RabbitMQ 版本中更改。 但是,可以在绑定参数中看到整数 1、2 和 3,这些参数对应于订阅标识符。
当主题交换路由消息时,发布 Erlang 进程会将所有匹配的绑定键包含在消息中。 订阅 MQTT 客户端的 Erlang 进程将匹配的绑定密钥与其知道的 MQTT 主题过滤器进行比较,并将订阅标识符包含在发送到 MQTT 客户端的 PUBLISH 数据包中。
发布 Erlang 进程可以是 MQTT 连接进程,也可以是 AMQP 0.9.1 通道进程。 一如既往,RabbitMQ 在跨协议互操作性方面表现出色:当 AMQP 0.9.1(或 STOMP 或 AMQP 1.0)客户端向主题交换发送消息时, 正确的订阅标识符将包含在发送到 MQTT 客户端的 PUBLISH 数据包中。
3. 订阅选项
1. 描述
MQTT 5.0 提供了 3 个新的订阅选项:
- 无本地
- 保留为已发布
- 保留处理
所有订阅选项均由 RabbitMQ 实现。 在这里,我们只关注“保留处理”选项:
此选项指定在建立订阅时是否发送保留的消息。
这些值为:
0 = 在订阅
时发送保留消息 1 = 仅在订阅当前不存在
时在订阅时发送保留消息 2 = 在订阅时不发送保留的消息
2. 举例例
发送保留的消息:
mqttx pub --topic mytopic --message m --retain
保留处理值 0 将接收保留的消息,而值 2 不会:
mqttx sub --topic mytopic --retain-handling 0
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
payload: m
retain: true
^Cmqttx sub --topic mytopic --retain-handling 2
… Connecting...
✔ Connected
… Subscribing to mytopic...
✔ Subscribed to mytopic
4 所有ACK上的原因代码
1. 描述
数据包 CONNACK、PUBACK、SUBACK、UNSUBACK 和 DISCONNECT 包含原因码。
2. 实现
一个实现示例是,如果消息未路由到任何队列,则 RabbitMQ 将在 PUBACK 数据包中使用原因代码进行回复。 MQTT 5.0 原因代码在概念上对应于 AMQP 0.9.1 中的强制消息属性和处理程序。No matching subscribers``No matching subscribers``BasicReturn
5. 用户属性
1. 描述
大多数 MQTT 数据包可以包含用户属性。 用户属性的含义不是由 MQTT 规范定义的。
2. 示例 PUBLISH 数据包
PUBLISH 数据包中的用户属性由客户端应用程序定义,并由服务器原封不动地转发。
在第一个终端窗口中订阅:
mqttx sub --topic t/5
在第二个终端窗口中发布包含用户属性的消息:
mqttx pub --topic t/5 --message m --user-properties "key1: value1"
第一个终端窗口将接收用户属性,原封不动:
payload: m
userProperties: [ { key: 'key1', value: 'value1' } ]
MQTT 5.0 PUBLISH 数据包中的用户属性类似于 AMQP 0.9.1 中的消息属性。headers
3. 示例 CONNECT 数据包
使用用户属性进行连接:
mqttx conn --client-id myclient --user-properties "connecting-from: London"
在浏览器中打开管理 UI http://localhost:15672/#/connections(用户名和密码都是 ),然后单击 MQTT 连接:guest
RabbitMQ 将在管理 UI 中显示 CONNECT 数据包中的用户属性。
6. 有效负载格式和内容类型
1. 描述
发布者可以指定 MIME 内容类型。 它还可以设置有效负载格式指示器,指示有效负载是由 UTF-8 编码的字符数据还是未指定的二进制数据组成。
2. 举例
在第一个终端窗口中,订阅一个主题:
mqttx sub --topic t/6 --output-mode clean
在第二个终端窗口中,发送一条带有内容类型和有效负载格式指示符的消息:
mqttx pub --topic t/6 --message "my UTF-8 encoded data 🙂" --content-type text/plain --payload-format-indicator
第一个终端窗口将原封不动地接收内容类型和有效负载格式指示器:
{"topic": "t/6","payload": "my UTF-8 encoded data 🙂","packet": {..."properties": {"payloadFormatIndicator": true,"contentType": "text/plain"}}
}
7. 请求/响应
1. 描述
MQTT 5.0 正式化了请求/响应模式。
在发布消息之前,MQTT 客户端(请求者)订阅响应主题。 请求者将响应主题和一些关联数据包含在请求消息中。
另一个 MQTT 客户端(响应者)接收到请求消息,执行一些操作,并将具有相同关联数据的响应消息发布到响应主题。
MQTT 5.0 请求/响应功能对应于 AMQP 0.9.1 中的远程过程调用。 但是,在 AMQP 0.9.1 中,请求者将在 AMQP 0.9.1 消息属性中包含回调队列的名称。 MQTT 协议没有定义队列的概念。因此,在 MQTT 中,被回复的“地址”是一个主题名称。reply_to
尽管协议规范之间存在不兼容性,但 RabbitMQ 在协议互操作性方面大放异彩: 因此,RabbitMQ 支持跨协议的请求/响应交互。
例如,MQTT 客户端可以在请求消息中包含响应主题和关联数据。 如果 AMQP 0.9.1 客户端创建了一个绑定到主题交换的队列,该队列的绑定密钥与请求消息的主题匹配,它将收到一条 AMQP 0.9.1 消息,其属性设置为 MQTT 客户端发送的关联数据和名为 . 然后,AMQP 0.9.1 客户端可以使用相同的 MQTT 5.0 客户端响应,并将响应消息发布到具有标头中存在的主题的主题交换。amq.topic``correlation_id``x-opt-reply-to-topic``correlation_id``amq.topic``x-opt-reply-to-topic
2. 举例
此示例仅关注 MQTT 客户端。
在第一个终端窗口中,响应的 MQTT 客户端订阅主题t/7
;
mqttx sub --client-id responder --topic t/7 --session-expiry-interval 600 --output-mode clean --qos 1
在第二个终端窗口中,请求的 MQTT 客户端订阅了一个名为 :my/response/topic
mqttx sub --client-id requester --topic my/response/topic --session-expiry-interval 600 --qos 1
… Connecting...
✔ Connected
… Subscribing to my/response/topic...
✔ Subscribed to my/response/topic
^C
在第二个终端窗口中,请求者随后发布一条请求消息:
mqttx pub --client-id requester --topic t/7 --message "my request" \--correlation-data abc-123 --response-topic my/response/topic \--session-expiry-interval 600 --no-clean
在第 1 个终端窗口中,响应方收到请求消息:
{"topic": "t/7","payload": "my request","packet": {..."properties": {"responseTopic": "my/response/topic","correlationData": {"type": "Buffer","data": [97,98,99,45,49,50,51]}}}
}
^C
在第一个终端窗口中,响应方通过复制关联数据并发布到响应主题来响应请求者:
mqttx pub --client-id responder --topic my/response/topic --message "my response" --correlation-data abc-123
在第 2 个终端窗口中,请求者收到响应。
mqttx sub --client-id requester --topic my/response/topic --no-clean --qos 1 --output-mode clean
{"topic": "my/response/topic","payload": "my response","packet": {..."properties": {"correlationData": {"type": "Buffer","data": [97,98,99,45,49,50,51]}}}
}
关联数据可用于将响应与请求相关联。 请求者通常为其发布的每个请求选取唯一的关联数据。
8. 分配的客户端标识符
1. 描述
如果客户端使用零长度的客户端标识符进行连接,则服务器必须使用包含分配的客户端标识符的 CONNACK 进行响应。
与 MQTT 3.1.1 相比,这解除了服务器分配的客户端 ID 只能用于连接的限制。Clean Session = 1
2. 实现
RabbitMQ 将生成一些随机的客户端 ID(例如 ),并在 CONNACK 数据包中返回它。dcGB2kSwS0JlXnaBa1A6QA
9. 主题别名
1. 描述
主题别名是一个整数值,用于标识主题,而不是使用主题名称。 这减小了 PUBLISH 数据包的大小,并且在主题名称很长且在网络连接中重复使用相同的主题名称时非常有用。
2. 实现
RabbitMQ 中的默认主题别名最大值为 16。 您可以在 中配置此值,例如:rabbitmq.conf
mqtt.topic_alias_maximum = 32
此配置值映射到从 RabbitMQ 发送到客户端的 CONNACK 数据包中的 Topic Alias Maximum。 它限制了任一方向的主题别名数,即从客户端到 RabbitMQ 和 RabbitMQ 到客户端。 如果客户端发送到许多不同的主题或从许多不同的主题接收,则设置更高的值将需要更多的内存使用量。
RabbitMQ 运算符可以通过设置以下设置来禁止使用主题别名:
mqtt.topic_alias_maximum = 0
10. 流量控制
1. 描述
MQTT 5.0 属性 Receive Maximum 定义了未确认的 QoS 1 PUBLISH 数据包的上限。
2. 实现
从 RabbitMQ 发送到客户端的未确认 QoS 1 PUBLISH 数据包的最大数量由 CONNECT 数据包中从客户端发送到 RabbitMQ 的 Receive Maximum 和配置值:mqtt.prefetch
mqtt.prefetch = 10
默认值为 10。mqtt.prefetch
该值在 MQTT 3.1 和 3.1.1 的 RabbitMQ 3.13 之前已存在。 它映射到 RabbitMQ 中的使用者预取。 换句话说,它定义队列发送到其 MQTT 连接进程的动态消息数量。mqtt.prefetch
11. 最大数据包大小
1. 描述
客户端和服务器可以独立指定它们支持的最大数据包大小。
2. 举例
此示例演示如何限制从客户端发送到 RabbitMQ 的最大 MQTT 数据包大小。
假设身份验证成功后,RabbitMQ 操作员不希望 RabbitMQ 接受任何大于 1 KiB 的 MQTT 数据包。 将以下配置写入 rabbitmq.conf(在当前工作目录中):
mqtt.max_packet_size_authenticated = 1024
停止 RabbitMQ 服务器后,启动 RabbitMQ 服务器并应用新配置:
docker run -it --rm --name rabbitmq -p 1883:1883 -p 15672:15672 -p 15692:15692 \--mount type=bind,source="$(pwd)"/rabbitmq.conf,target=/etc/rabbitmq/conf.d/11-blog-post.conf \rabbitmq:3.13.0-beta.2-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmqctl enable_feature_flag all
在第一个终端窗口中,订阅一个主题:
mqttx sub --topic t/11
在第二个终端窗口中,向该主题发送有效负载为 3 字节的消息:
payload=$(head --bytes 3 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"
第一行从特殊文件中读取 3 个字节(3 个 null 字符),将每个 null 字符转换为 ASCII 字符并将结果保存在变量中。/dev/zero``x``xxx``payload
第一个终端窗口将收到该消息:
payload: xxx
接下来,在第 2 个终端窗口中,发送有效负载为 2,000 字节的消息:
payload=$(head --bytes 2000 < /dev/zero | tr '\0' x)
mqttx pub --topic t/11 -m "$payload"
这一次,第一个终端窗口不会收到消息,因为从客户端发送到 RabbitMQ 的 PUBLISH 数据包大于配置的最大数据包大小 1024 字节。
相反,RabbitMQ 会记录一条描述性错误消息:
[error] <0.836.0> MQTT packet size (2007 bytes, type 3) exceeds mqtt.max_packet_size_authenticated (1024 bytes)
日志消息声明 2,007 字节,因为 PUBLISH 数据包的固定和可变标头需要 7 个字节(其中 4 个字节用于主题名称)。t/11
12. 服务器启动的断开连接
1. 描述
在 MQTT 5.0 中,DISCONNECT 数据包不仅可以从客户端发送到服务器,还可以从服务器发送到客户端。
2. 实现
在终止连接之前,RabbitMQ 会在以下情况下向客户端发送 DISCONNECT 数据包:
DISCONNECT 原因代码名称 | 情况 |
---|---|
接管的会话 | 使用同一客户端 ID 连接的另一个客户端。 |
服务器关闭 | RabbitMQ 进入维护模式。 |
保持活动超时 | 客户端无法在“保持活动”时间内进行通信。 |
数据包太大 | RabbitMQ 收到大小超过mqtt.max_packet_size_authenticated |
13. 会话到期
1. 描述
在 MQTT 5.0 中,客户端可以在 CONNECT 数据包中向服务器建议会话到期间隔。 服务器可以接受建议的会话到期间隔,也可以在 CONNACK 数据包中强制要求不同的会话到期间隔。
会话可以跨一系列网络连接继续进行。它的持续时间与最新的网络连接加上会话到期间隔一样长。
当会话到期间隔到期时,客户端和服务器都将删除任何会话状态。
2. 实现
只要会话持续,客户端和服务器就会保持会话状态。
服务器中的会话状态包括已发送到客户端但尚未确认的消息、待发送到客户端的消息以及客户端的订阅。 RabbitMQ 以队列和绑定的形式对这种 MQTT 会话状态进行建模。
因此,会话到期间隔映射到 RabbitMQ 中的队列 TTL。 当 MQTT 会话过期时,队列及其消息和绑定将被删除。
3. 举例
默认情况下,服务器允许的最大会话到期间隔为 1 天。 如果 MQTT 客户端在 1 天内没有重新连接,则其会话状态将在 RabbitMQ 中删除。
此值是可配置的。 出于此示例的目的,让我们在以下情况下设置一个非常低的会话到期间隔 1 分钟:rabbitmq.conf
mqtt.max_session_expiry_interval_seconds = 60
设置名称包含前缀,因为 MQTT 5.0 客户端可以通过在 CONNECT 数据包中发送会话到期间隔来选择较低的值。 如最大数据包大小示例中所做的那样,重新启动 RabbitMQ 节点,以便应用新设置。max
以 20 秒的会话到期间隔连接到 RabbitMQ 并创建订阅:
mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 20 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C
键入终端以断开客户端连接。Ctrl+C
在接下来的 20 秒内,列出队列和绑定:
docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name
mqtt-subscription-sub-13qos1docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
┌─────────────┬──────────────────────────────┬──────────────────────────────┐
│ source_name │ destination_name │ routing_key │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ │ mqtt-subscription-sub-13qos1 │ mqtt-subscription-sub-13qos1 │
├─────────────┼──────────────────────────────┼──────────────────────────────┤
│ amq.topic │ mqtt-subscription-sub-13qos1 │ t.13 │
└─────────────┴──────────────────────────────┴──────────────────────────────┘
20 秒后,再次列出队列和绑定:
docker exec rabbitmq rabbitmqctl list_queues name
Timeout: 60.0 seconds ...
Listing queues for vhost / ...docker exec rabbitmq rabbitmqctl list_bindings source_name destination_name routing_key --formatter=pretty_table
Listing bindings for vhost /...
RabbitMQ 删除了队列及其绑定,因为我们的客户端未在 20 秒的会话到期间隔内连接到 RabbitMQ。Clean Session = 0
接下来,执行相同的测试,但会话到期间隔较长,例如 1 小时:
mqttx sub --client-id sub-13 --topic t/13 --session-expiry-interval 3600 --qos 1
… Connecting...
✔ Connected
… Subscribing to t/13...
✔ Subscribed to t/13
^C
您应该注意到,队列及其绑定将在 1 分钟后被删除,因为有效的会话到期间隔 是客户端请求的最小值(1 小时)和 RabbitMQ 中配置的值(1 分钟)。mqtt.max_session_expiry_interval_seconds
14. 会延迟
1. 描述
客户端可以在 CONNECT 数据包中定义 Will Delay Interval。
服务器会延迟发布客户的遗嘱消息,直到遗嘱延迟间隔过去或会话结束,以先发生者为准。 如果在将延迟间隔过去之前与此会话建立了新的网络连接,则服务器不得发送将消息。 这样做的一个用途是,如果存在临时网络断开连接,并且客户端在发布遗嘱消息之前成功重新连接并继续其会话,则避免发布遗嘱消息。
Will Delay Interval 的另一个用例是通知会话到期:
客户端可以通过将 Will Delay Interval 设置为 Session Expiry Interval 长并发送带有原因0x04代码的 DISCONNECT(Disconnect with Will Message),来安排 Will Message 通知会话到期已发生。
2. 实现
尽管 will 消息有效负载通常很小,但 MQTT 规范允许 will 消息有效负载大小高达 64 KiB。
为了避免在 Khepri(RabbitMQ 未来的元数据存储)中存储大型二进制数据,RabbitMQ 创建了一个包含此单个遗嘱消息的经典队列。 我们称此队列为 Will 队列。 此消息具有每条消息的 TTL 集,该集以毫秒为单位定义,对应于以秒为单位的 Will Delay Interval。 此外,Will 队列还设置了一个队列 TTL,该队列以毫秒为单位定义,对应于以秒为单位的会话到期间隔。 每条消息的有效 TTL 至少比队列 TTL 低几毫秒,以便消息将在队列(会话)过期前不久发布。
Will 队列还定义(MQTT 插件使用的默认主题交换)为死信交换,将 will 主题定义为死信路由键。amq.topic
如果 MQTT 客户端未在其 Will 延迟间隔内重新连接,则 Will 队列中的消息将死信发送到主题交换。
让我们用一个例子来说明这一点。
3. 举例
在第一个终端窗口中,创建一个将使用遗嘱消息的订阅:
mqttx sub --client-id sub-14 --topic t/14
在第二个终端窗口中,创建一个 Will Delay Interval 为 20 秒的连接:
mqttx conn --client-id conn-14 --will-topic t/14 --will-message my-will-message --will-delay-interval 20 --session-expiry-interval 40
在第 3 个终端窗口中,我们看到到目前为止,订阅 MQTT 客户端创建了一个队列:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬───────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼───────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 0 │ 0 │ │
└──────────────────────────────┴────────────┴──────────┴───────────┘
在第二个终端窗口中,键入 to disconnect the MQTT connection with client ID。Ctrl+C``conn-14
这一次,列出队列显示已创建 Will 队列:
docker exec rabbitmq rabbitmqctl --quiet --formatter=pretty_table list_queues name type messages arguments
┌──────────────────────────────┬────────────┬──────────┬────────────────────────────────────────────────────────────┐
│ name │ type │ messages │ arguments │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-subscription-sub-14qos0 │ MQTT QoS 0 │ 0 │ │
├──────────────────────────────┼────────────┼──────────┼────────────────────────────────────────────────────────────┤
│ mqtt-will-conn-14 │ classic │ 1 │ {<<"x-expires">>,long,40000} │
│ │ │ │ {<<"x-dead-letter-exchange">>,longstr,<<"amq.topic">>} │
│ │ │ │ {<<"x-dead-letter-routing-key">>,longstr,<<"t.14">>} │
└──────────────────────────────┴────────────┴──────────┴────────────────────────────────────────────────────────────┘
Will 队列的命名模式为 。 它包含一条消息:遗嘱消息。mqtt-will-<MQTT Client ID>
如上一节所述,队列 TTL () 为 40,000 毫秒,因此与上面命令中的 40 秒会话到期间隔匹配。 如果您等待 20 秒,您的第一个终端窗口应该会收到遗嘱消息,因为我们的客户没有在遗嘱延迟间隔内重新连接:x-expires
› payload: my-will-message
15. 可选的服务器功能可用性
1. 描述
定义一组服务器不允许的功能,并为服务器提供一种机制,以便将其指定给客户端。 可以通过这种方式指定的功能包括:
- 最大 QoS
- 保留可用
- 提供通配符订阅
- 可用的订阅标识符
- 提供共享订阅
客户端使用服务器声明不可用的功能是错误的。
2. 实现
RabbitMQ 3.13 在 CONNACK 属性中包括 Maximum QoS = 1 和 Shared Subscription Available = 0。
RabbitMQ 不支持 QoS 2。
如下一节所述,将来的 RabbitMQ 版本将支持共享订阅。
4. 局限性
本节列出了 RabbitMQ MQTT 实现的限制。
1. MQTT 5.0 特定限制
1. 共享订阅
共享订阅将在将来的 RabbitMQ 版本中添加。 尽管此功能很好地映射到 RabbitMQ 中的队列,但共享订阅是会话状态的一部分,并且需要进行某些 RabbitMQ 数据库迁移才能有效地查询给定 MQTT 客户端 ID 的共享订阅。
1. 延迟和保留的遗嘱消息
延迟和保留的遗嘱信息将不会被保留。 这是因为延迟的遗嘱消息将死信到主题交换,但保留进程当前不会从队列中使用。 将来可以通过保留邮件的新存储来解决此限制。
2. 非 MQTT 5.0 特定限制
为了完整起见,本节列出了在 RabbitMQ 3.13 中支持 MQTT 5.0 之前和 RabbitMQ 3.12 中提供原生 MQTT 之前存在的限制。
1. 保留的消息
保留消息的功能在 RabbitMQ 中受到限制。
保留的消息仅在本地节点上存储和查询。
一个有效的示例如下: MQTT 客户端向节点 A 发布一条保留的消息,主题为 。此后,另一个客户端在节点 A 上使用主题过滤器进行订阅。新订阅者将收到保留的消息。topic/1``topic/1
但是,如果主题筛选器包含通配符(多级通配符 “”或单级通配符 “”),则不会发送保留的消息 (问题 #8824)。#``+
此外,如果客户端在节点 A 上发布了保留的消息,而另一个客户端随后在节点 B 上订阅,则该订阅客户端将不会收到存储在节点 A 上的任何保留消息(问题 #8096)。
将来的 RabbitMQ 版本将复制集群中保留的消息,并发送与包含通配符的主题筛选器匹配的保留消息。
5. 总结
综上所述,RabbitMQ
- 是领先的 AMQP 0.9.1 代理
- 是一个流是代理
- 擅长跨协议互操作性
- 由于支持 3.13 中发布的 MQTT 5.0 和 3.12 中发布的原生 MQTT,它正在成为领先的 MQTT 代理之一
我们将 RabbitMQ 转变为成熟的物联网代理的旅程尚未完成,并计划在未来几个月和几年内进行更多的开发工作。 敬请关注!
6. 相关链接
MQTT 5.0 support is coming in RabbitMQ 3.13 | RabbitMQ
相关文章:

RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表
RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表 文章目录 RabbitMQ3.13.0起支持MQTT5.0协议及MQTT5.0特性功能列表1. MQTT概览2. MQTT 5.0 特性1. 特性概要2. Docker中安装RabbitMQ及启用MQTT5.0协议 3. MQTT 5.0 功能列表1. 消息过期1. 描述2. 举例3. 实现 2. 订阅标识…...

常用脚本01 - 生成证书
1 生成证书 第一步、准备脚本文件 [rootharbor-01 ssl]# vim gencert.sh #!/usr/bin/env bash set -eDOMAIN"$1" IP"$2" WORK_DIR"$(mktemp -d)"if [ -z "$DOMAIN" ]; thenecho "Domain name needed."exit 1 fiecho "…...

【jQuery】jQuery框架
目录 1.jQuery基本用法 1.1选择器 1.2jQuery对象 1.3事件绑定 1.4链式编程 1.5过滤方法 1.6样式操纵 1.6属性操纵 1.7操作value 1.8查找方法 1.9类名操纵 1.10事件进阶 1.11触发事件 1.12window事件绑定 2.节点操作与动画 2.1获取位置 2.2滚动距离 2.3显示/隐…...

使用OMP复原一维信号(MATLAB)
参考文献 https://github.com/aresmiki/CS-Recovery-Algorithms/tree/master MATLAB代码 %% 含有噪声 % minimize ||x||_1 % subject to: (||Ax-y||_2)^2<eps; % minimize : (||Ax-y||_2)^2lambda*||x||_1 % y传输中可能含噪 yyw % %% clc;clearvars; close all; %% 1.构…...

Linux安装最新版Docker完整教程
参考官网地址:Install Docker Engine on CentOS | Docker Docs 一、安装前准备工作 1.1 查看服务器系统版本以及内核版本 cat /etc/redhat-release1.2 查看服务器内核版本 uname -r这里我们使用的是CentOS 7.6 系统,内核版本为3.10 1.3 安装依赖包 …...

iOS object-c self关键字总结
在Objective-C中,self 关键字是一个指向当前对象的指针。它是对象自身实例的别名,通常在对象内部的方法中使用,以提供一个指向当前对象的引用。使用 self 可以帮助你访问对象的属性和方法,特别是在处理消息传递和方法调用时。 以…...

京东云16核64G云服务器租用优惠价格500元1个月、5168元一年,35M带宽
京东云16核64G云服务器租用优惠价格500元1个月、5168元一年,35M带宽,配置为:16C64G-450G SSD系统盘-35M带宽-8000G月流量 华北-北京,京东云活动页面 yunfuwuqiba.com/go/jd 活动链接打开如下图: 京东云16核64G云服务器…...

hive管理之ctl方式
hive管理之ctl方式 hivehive --service clictl命令行的命令 #清屏 Ctrl L #或者 ! clear #查看数据仓库中的表 show tabls; #查看数据仓库中的内置函数 show functions;#查看表的结构 desc表名 #查看hdfs上的文件 dfs -ls 目录 #执行操作系统的命令 !命令…...

cpp 内存分区模型
c程序在执行前,将内存大方向划分为4个区域。 1.代码区:存放函数的二进制代码,有操作系统进行管理 2.全局区:存放全局变量和静态变量以及常量 3.栈区:由编译器自动分配释放,存放的函数参数和局部变量 4.…...

44.网络游戏逆向分析与漏洞攻防-角色管理功能通信分析-角色创建服务器反馈数据包分析
免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 如果看不懂、不知道现在做的什么,那就跟着做完看效果 现在的代码都是依据数据包来写的,如果看不懂代码,就说明没看懂数据包…...

web安全学习笔记(6)
记一下第十节课的内容。 一.PHP语言中的if else判断 语法和c语言中非常类似,不再赘述,也可以使用if...elseif...elseif...else 1.True和False 2.,和 一个等号是赋值 两个等号是比较 三个等号是全等(内容相等,数…...

揭秘“二次放号查询接口”:为您的通信安全保驾护航
在信息化社会中,手机号码已成为我们日常生活中不可或缺的身份标识。然而,您是否了解过“二次放号”这一现象,以及它可能对您的信息安全带来的影响?今天,我们将为您揭开“二次放号查询接口”的神秘面纱,揭示…...

字节8年经验之谈 —— 如何实现高效的自动化渗透测试?
随着当前网络安全威胁的不断扩展与升级,开展渗透测试工作已经成为广大企业组织主动识别安全漏洞与潜在风险的关键过程。然而,传统的人工渗透测试模式对测试人员的专业能力和经验水平有很高的要求,企业需要投入较大的时间和资源才能完成。在此…...

ElasticSearch分词检索
1. 倒排索引:表示一种数据结构,分词词条与文档id集合的隐射关系 2. 它跟关系型数据库是一种互补的关系,因为关系型数据库支持事务操作,满足ACID原则 3. 索引库的文档字段只允许新增不允许修改 1.创建索引库 put /索引库名称2.1 …...

每日三道面试题之 Java并发编程 (四)
1.什么是线程死锁 线程死锁是并发编程中一个常见问题,它发生在两个或多个线程永久性地阻塞彼此,等待对方释放锁,但没有任何一方先行释放锁的情况下。简单来说,每个线程都持有对方需要的资源而等待对方释放资源,导致所…...

ubuntu20.04.6将虚拟机用户目录映射为磁盘Z
文章目录 linux虚拟机设置为NAT模式安装sshd服务映射目录到windows磁盘安装samba套件修改配置文件smb.conf重启smbd并设置用户名和密码 windows映射遇到的问题1、设置好之后映射不成功2、smbd下载失败3、smbd密码配置问题4、当有改动时候,最好重启一下smbd服务 linu…...

TCP挥手中TIME_WAIT存在的原因
四次挥手的一般过程如图所示: 在客户端收到FIN结束报文的时候不是立刻进入CLOSED状态,而是进入TIME_WAIT状态,一般等2MLS后进入关闭状态。 原因: 1.可靠地终止 TCP 连接。 2.保证让迟来的 TCP报文段有足够的时间被识别并丢弃。 …...

使用Docker部署jar包
vi DockerfileDockerfile内容 FROM java:8 ADD chery5G-admin.jar chery5G-admin.jar ENTRYPOINT ["java","-jar","chery5G-admin.jar"]上传jar包到Dockerfile文件同级目录 使用Dockerfile文件,将jar包制作为镜像 docker build -t…...

深入了解WebKit:结构简介
随着互联网的发展,网页浏览器已经成为我们日常生活中不可或缺的工具之一。而在众多浏览器中,WebKit引擎作为其中之一的重要角色,驱动着一系列流行的浏览器,例如Safari和一些移动端浏览器。那么,WebKit究竟是如何构建的…...

Pgsql怎样找到表中某个字段值重复的记录并删除冗余记录,只保留一条
背景 今天发现某个黄页爬取的数据有部分重复了,原本我用的公司详情页的url进行md5来作为主键做upsert入,但后面在核验数据时发现有些详情url虽是同一间公司的,但路由上有细微差别导致写入了重复的公司数据,所以要想办法清理掉重复…...

如何在HarmonyOS(鸿蒙操作系统)上进行应用开发
文章中提到的关键点包括: 学习ArkTS:作者建议初学者首先学习使用ArkTS编写Hello World程序,并可以通过TypeScript教程来快速掌握基础语法。对于有Flutter或React Native开发经验的开发者来说,页面布局会比较容易上手。 页面布局&…...

C++ typeid运算符介绍
在 C++ 中,typeid() 是一个运算符,用于获取表达式的类型信息。typeid() 运算符在 C++ 中是一个强大的工具,可以用于获取对象的类型信息、类型比较、多态类型判断、异常处理以及类型转换安全检查等场景中。 1. 类型比较: 可以使用 typeid() 来比较两个类型是否相同。 if …...

Android适配平板屏幕尺寸
一、划分手机和平板 人为判断方法: 大于6英寸的就是平板。小于6英寸的都是手机 平板尺寸: 6英寸、7英寸、10英寸、14英寸… Android系统支持多配置资源文件,我们可以追加新的资源目录到你的Android项目中。命名规范: 资源名字-限制符 l…...

汽车充电桩主板在出厂前需要做哪些检测?
充电桩主板作为核心组件承载着充电桩的关键功能,其性能和稳定性直接影响着用户充电体验、桩企产品合规和市场竞争力,以及主板厂商的品牌知名度。因此,对充电桩主板进行全面的测试尤为重要。 下面将详细介绍充电桩主板检测的内容,包…...

关于Renesas R7 的选项字节开关看门狗
Renesas看门狗的模式是在选项字节中进行配置的,OPBT0的寄存器说明如下, 关于看门狗模式 : 和看门狗喂狗方式: 我们选择关闭看门狗(也就是配置31位为软件触发看门狗开始,然后不启动就相当于关闭)…...

redis bigKey问题
bigKey的产生 1、使用String存储了大文件的二进制。 2、使用集合没有考虑到数据的规模,或者规模的增长。 3、哈希中冗余了大量键值对。 bigKey问题 1、操作大key时会阻塞线程:redis是单线程。 2、网络阻塞:在网络中占用大量网络流量。 …...

二手车商的套路
https://www.dongchedi.com/article/7126394624675578405 https://www.dongchedi.com/article/7126394624675578405 现在,有越来越多的人去了解二手车,二手车相对于新车来说,更加的亲民划算。很多新车需要四五十万,而二手车有可…...

c++ 根据ip主机号和子网掩码随机生成ip
在C中,可以使用以下方法根据给定的IP地址和子网掩码来随机生成IP地址。这里使用了库来生成随机数,以及<arpa/inet.h>库来处理IP地址。 #include <iostream> #include <random> #include <arpa/inet.h>std::string random_ip(co…...

事务的隔离级别
事务由哪些特性 原子性(Atomicity):一个事务中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节,而且事务在执行过程中发生错误,会被回滚到事务开始前的状态…...

性能优化角度
1.启动角度 2.数据缓存角度比如历史信息联想 3.内存释放角度 4.配合并要求后端进行接口API整合 5.耗时较多需求,根据业务情况进行线程异步处理 6.通过开源的loop监控sdk日志埋点,可监控线程对应环路的休眠/唤醒时间间隔,优化卡顿 7.尽量避免离…...