当前位置: 首页 > news >正文

MQTT(Message Queuing Telemetry Transport)协议(三)

主题是什么

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2. TCP 协议封装
tcp.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>// 建立 TCP 连接
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;// 创建套接字sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);// 将 IP 地址从点分十进制转换为二进制形式if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 连接服务器if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {perror("connection failed");close(sockfd);return -1;}return sockfd;
}// 发送数据
int tcp_send(int sockfd, const char *data, int length) {return send(sockfd, data, length, 0);
}// 接收数据
int tcp_receive(int sockfd, char *buffer, int buffer_size) {return recv(sockfd, buffer, buffer_size, 0);
}// 关闭 TCP 连接
void tcp_close(int sockfd) {close(sockfd);
}

tcp.h

#ifndef TCP_H
#define TCP_Hint tcp_connect(const char *server_ip, int server_port);
int tcp_send(int sockfd, const char *data, int length);
int tcp_receive(int sockfd, char *buffer, int buffer_size);
void tcp_close(int sockfd);#endif
  1. MQTT 协议封装

在这里插入图片描述
mqtt.h

#ifndef MQTT_H
#define MQTT_H#include <stdint.h>// 定义主题列表节点结构体
typedef struct TopicNode {char *topic;struct TopicNode *next;
} TopicNode;// 定义主题列表结构体
typedef struct TopicList {TopicNode *head;TopicNode *tail;int count;
} TopicList;int mqtt_connect(int sockfd, const char *client_id);
int mqtt_parse_connack(int sockfd);
int mqtt_subscribe_topics(int sockfd, TopicList *topics);
int mqtt_publish(int sockfd, const char *topic, const char *message);
void init_topic_list(TopicList *list);
void add_topic(TopicList *list, const char *topic);
void free_topic_list(TopicList *list);#endif

mqtt.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"// MQTT 固定报头类型
#define MQTT_CONNECT 1
#define MQTT_CONNACK 2
#define MQTT_PUBLISH 3
#define MQTT_PUBACK 4
#define MQTT_PUBREC 5
#define MQTT_PUBREL 6
#define MQTT_PUBCOMP 7
#define MQTT_SUBSCRIBE 8
#define MQTT_SUBACK 9
#define MQTT_UNSUBSCRIBE 10
#define MQTT_UNSUBACK 11
#define MQTT_PINGREQ 12
#define MQTT_PINGRESP 13
#define MQTT_DISCONNECT 14// 生成 MQTT CONNECT 消息
int mqtt_connect(int sockfd, const char *client_id) {char buffer[100];int index = 0;// 固定报头buffer[index++] = MQTT_CONNECT << 4;// 剩余长度int remaining_length = 12 + strlen(client_id);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 协议名buffer[index++] = 0;buffer[index++] = 4;memcpy(buffer + index, "MQTT", 4);index += 4;// 协议级别buffer[index++] = 4;// 连接标志buffer[index++] = 0;// 保持活动时间buffer[index++] = 0;buffer[index++] = 60;// 有效载荷// 客户端 IDbuffer[index++] = 0;buffer[index++] = strlen(client_id);memcpy(buffer + index, client_id, strlen(client_id));index += strlen(client_id);// 发送 CONNECT 消息return tcp_send(sockfd, buffer, index);
}// 解析 MQTT CONNACK 消息
int mqtt_parse_connack(int sockfd) {char buffer[10];int len = tcp_receive(sockfd, buffer, sizeof(buffer));if (len < 4) {return -1;}if ((buffer[0] & 0xF0) != (MQTT_CONNACK << 4)) {return -1;}return buffer[3];
}// 初始化主题列表
void init_topic_list(TopicList *list) {list->head = NULL;list->tail = NULL;list->count = 0;
}// 添加主题到列表
void add_topic(TopicList *list, const char *topic) {TopicNode *new_node = (TopicNode *)malloc(sizeof(TopicNode));if (new_node == NULL) {perror("Memory allocation failed");return;}new_node->topic = strdup(topic);new_node->next = NULL;if (list->head == NULL) {list->head = new_node;list->tail = new_node;} else {list->tail->next = new_node;list->tail = new_node;}list->count++;
}// 释放主题列表内存
void free_topic_list(TopicList *list) {TopicNode *current = list->head;TopicNode *next;while (current != NULL) {next = current->next;free(current->topic);free(current);current = next;}list->head = NULL;list->tail = NULL;list->count = 0;
}// 生成 MQTT SUBSCRIBE 消息,支持多主题订阅
int mqtt_subscribe_topics(int sockfd, TopicList *topics) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_SUBSCRIBE << 4 | 0x02;// 计算剩余长度int remaining_length = 2; // 报文标识符长度TopicNode *current = topics->head;while (current != NULL) {remaining_length += 2 + strlen(current->topic) + 1; // 主题长度 + 主题名 + QoScurrent = current->next;}// 编码剩余长度do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 报文标识符buffer[index++] = 0;buffer[index++] = 1;// 有效载荷,遍历主题列表current = topics->head;while (current != NULL) {// 主题过滤器buffer[index++] = 0;buffer[index++] = strlen(current->topic);memcpy(buffer + index, current->topic, strlen(current->topic));index += strlen(current->topic);// QoSbuffer[index++] = 0;current = current->next;}// 发送 SUBSCRIBE 消息return tcp_send(sockfd, buffer, index);
}// 生成 MQTT PUBLISH 消息
int mqtt_publish(int sockfd, const char *topic, const char *message) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_PUBLISH << 4;// 剩余长度int remaining_length = 2 + strlen(topic) + strlen(message);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 主题名buffer[index++] = 0;buffer[index++] = strlen(topic);memcpy(buffer + index, topic, strlen(topic));index += strlen(topic);// 有效载荷memcpy(buffer + index, message, strlen(message));index += strlen(message);// 发送 PUBLISH 消息return tcp_send(sockfd, buffer, index);
}
  1. main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"#define SERVER_IP "your_mqtt_server_ip"
#define SERVER_PORT 1883
#define CLIENT_ID "MyClientID"
#define TOPIC1 "home/livingroom/temperature"
#define TOPIC2 "home/bedroom/humidity"
#define MESSAGE "Sensor data"int main() {int sockfd;TopicList topics;// 建立 TCP 连接sockfd = tcp_connect(SERVER_IP, SERVER_PORT);if (sockfd == -1) {return -1;}// 发送 MQTT CONNECT 消息if (mqtt_connect(sockfd, CLIENT_ID) == -1) {tcp_close(sockfd);return -1;}// 解析 MQTT CONNACK 消息int connack_result = mqtt_parse_connack(sockfd);if (connack_result != 0) {printf("Connection failed with result code %d\n", connack_result);tcp_close(sockfd);return -1;}printf("Connected to MQTT server\n");// 初始化主题列表init_topic_list(&topics);// 添加主题add_topic(&topics, TOPIC1);add_topic(&topics, TOPIC2);// 订阅多个主题if (mqtt_subscribe_topics(sockfd, &topics) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Subscribed to multiple topics\n");// 发布消息if (mqtt_publish(sockfd, TOPIC1, MESSAGE) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Published message: %s on topic: %s\n", MESSAGE, TOPIC1);// 释放主题列表内存free_topic_list(&topics);// 关闭连接tcp_close(sockfd);return 0;
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

// 修改 tcp_connect 函数,添加超时处理
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;struct timeval timeout;sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 设置连接超时时间timeout.tv_sec = 5;timeout.tv_usec = 0;setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout));if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {if (errno == ETIMEDOUT) {printf("Connection timed out\n");} else {perror("connection failed");}close(sockfd);return -1;}return sockfd;
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

相关文章:

MQTT(Message Queuing Telemetry Transport)协议(三)

主题是什么 2. TCP 协议封装 tcp.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h>// 建立 TCP 连接 int tcp_connect(const char *server_ip, int s…...

多核cpu与时间片多线程的问题

在多核处理器中&#xff0c;每个核心可以独立运行一个线程。操作系统负责管理和调度这些线程&#xff0c;以确保高效利用处理器资源。下面详细解释如何获取时间片以及四个线程如何在四个核心上同时工作。 ### 时间片和调度 #### 1. 时间片&#xff08;Time Slice&#xff09;…...

电脑出现蓝屏英文怎么办?查看修复过程

电脑出现蓝屏英文是一种常见的电脑故障&#xff0c;它通常表示电脑遇到了严重的错误&#xff0c;需要停止运行以防止进一步的损坏。电脑蓝屏英文的原因可能有很多&#xff0c;比如硬件故障、驱动程序错误、系统文件损坏、病毒感染等。那么&#xff0c;当电脑出现蓝屏英文时&…...

安卓基础(第一集)

SharedPreferences&#xff08;本地存储简单数据&#xff09; 在 Android 中&#xff0c;SharedPreferences 用于存储小型数据。 &#xff08;1&#xff09;存储数据 // 获取 SharedPreferences 对象 SharedPreferences sharedPreferences getSharedPreferences("MyPre…...

【从零开始入门unity游戏开发之——C#篇56】C#补充知识点——模式匹配

考虑到每个人基础可能不一样,且并不是所有人都有同时做2D、3D开发的需求,所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要讲解C#的基础语法,包括变量、数据类型、运算符、流程控制、面向对象等,适合没有编程基础的…...

【数据可视化-16】珍爱网上海注册者情况分析

&#x1f9d1; 博主简介&#xff1a;曾任某智慧城市类企业算法总监&#xff0c;目前在美国市场的物流公司从事高级算法工程师一职&#xff0c;深耕人工智能领域&#xff0c;精通python数据挖掘、可视化、机器学习等&#xff0c;发表过AI相关的专利并多次在AI类比赛中获奖。CSDN…...

c/c++蓝桥杯经典编程题100道(21)背包问题

背包问题 ->返回c/c蓝桥杯经典编程题100道-目录 目录 背包问题 一、题型解释 二、例题问题描述 三、C语言实现 解法1&#xff1a;0-1背包&#xff08;基础动态规划&#xff0c;难度★&#xff09; 解法2&#xff1a;0-1背包&#xff08;空间优化版&#xff0c;难度★…...

电赛DEEPSEEK

以下是针对竞赛题目的深度优化方案&#xff0c;重点解决频率接近时的滤波难题和相位测量精度问题&#xff1a; 以下是使用NI Multisim 14.3实现本项目的详细解决方案&#xff1a; 一、基础要求实现方案&#xff08;模块化设计&#xff09; 1. 双频信号发生电路 电路结构&…...

VSOMEIP ROUTING应用和CLIENT应用之间交互的消息

#define VSOMEIP_ASSIGN_CLIENT 0x00 // client应用请求分配client_id #define VSOMEIP_ASSIGN_CLIENT_ACK 0x01 // routing应用返回分配的client_id #define VSOMEIP_REGISTER_APPLICATION 0x02 // client应用注册someip应用 #…...

HTML之基本布局div|span

HTML基本布局使用 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"width<device-width>, initial-scale1.0"><title>布局</title> <…...

Linux下学【MySQL】常用函数助你成为数据库大师~(配sql+实操图+案例巩固 通俗易懂版~)

绪论​ 每日激励&#xff1a;“唯有努力&#xff0c;才能进步” 绪论​&#xff1a; 本章是MySQL中常见的函数&#xff0c;利用好函数能很大的帮助我们提高MySQL使用效率&#xff0c;也能很好处理一些情况&#xff0c;如字符串的拼接&#xff0c;字符串的获取&#xff0c;进制…...

【Rabbitmq篇】高级特性----TTL,死信队列,延迟队列

目录 一.TTL ???1.设置消息的TTL 2.设置队列的TTL 3.俩者区别? 二.死信队列 定义&#xff1a; 消息成为死信的原因&#xff1a; 1.消息被拒绝&#xff08;basic.reject 或 basic.nack&#xff09; 2.消息过期&#xff08;TTL&#xff09; 3.队列达到最大长度? …...

机器学习赋能的智能光子学器件系统研究与应用

机器学习赋能的智能光子学器件系统研究与应用 时间&#xff1a; 2025年03月29日-03月30日 2025年04月05日-04月06日 机器学习赋能的光子学器件与系统&#xff1a;从创新设计到前沿应用 课程针对光子学方面的从业科研人员及开发者&#xff0c;希望了解和实践在集成光学/空间…...

尚硅谷课程【笔记】——大数据之Linux【三】

课程视频链接&#xff1a;尚硅谷大数据Linux课程 七、定时任务调度 任务调度&#xff1a;指系统在某个时间执行的特定的命令或程序。 1&#xff09;系统工作&#xff1a;有些重要的工作必须周而复始地执行。 2&#xff09;个别用户工作&#xff1a;用户可能希望在某些特定的时…...

Visual Studio踩过的坑

统计Unity项目代码行数 编辑-查找和替换-在文件中查找 查找内容输入 b*[^:b#/].*$ 勾选“使用正则表达式” 文件类型留空 也有网友做了指定&#xff0c;供参考 !*\bin\*;!*\obj\*;!*\.*\*!*.meta;!*.prefab;!*.unity 打开Unity的项目 注意&#xff1a;只是看&#xff0…...

教程 | MySQL 基本指令指南(附MySQL软件包)

此前已经发布了安装教程安装教程&#xff0c;现在让我们来学习一下MySQL的基本指令。 一、数据库连接与退出 连接本地数据库 mysql -uroot -p # 输入后回车&#xff0c;按提示输入密码&#xff08;密码输入不可见&#xff09;若需隐藏密码显示&#xff0c;可使用&#xff1…...

企业数据集成案例:吉客云销售渠道到MySQL

测试-查询销售渠道信息-dange&#xff1a;吉客云数据集成到MySQL的技术案例分享 在企业的数据管理过程中&#xff0c;如何高效、可靠地实现不同系统之间的数据对接是一个关键问题。本次我们将分享一个具体的技术案例——通过轻易云数据集成平台&#xff0c;将吉客云中的销售渠…...

网络编程 day3

思维导图 以select函数模型为例 思维导图2 对应 epoll模型 应使用的函数 题目 使用epoll函数实现 两个客户端 通过服务器 实现聊天 思路 在原先代码基础上 实现 服务器 发向 客户端 使用客户端在服务器上的 套接字描述符 实现 客户端 接收 服务器…...

Excel 融合 deepseek

效果展示 代码实现 Function QhBaiDuYunAIReq(question, _Optional Authorization "Bearer ", _Optional Qhurl "https://qianfan.baidubce.com/v2/chat/completions")Dim XMLHTTP As ObjectDim url As Stringurl Qhurl 这里替换为你实际的URLDim postD…...

【论文笔记】Are Self-Attentions Effective for Time Series Forecasting? (NeurIPS 2024)

官方代码https://github.com/dongbeank/CATS Abstract 时间序列预测在多领域极为关键&#xff0c;Transformer 虽推进了该领域发展&#xff0c;但有效性尚存争议&#xff0c;有研究表明简单线性模型有时表现更优。本文聚焦于自注意力机制在时间序列预测中的作用&#xff0c;提…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

练习(含atoi的模拟实现,自定义类型等练习)

一、结构体大小的计算及位段 &#xff08;结构体大小计算及位段 详解请看&#xff1a;自定义类型&#xff1a;结构体进阶-CSDN博客&#xff09; 1.在32位系统环境&#xff0c;编译选项为4字节对齐&#xff0c;那么sizeof(A)和sizeof(B)是多少&#xff1f; #pragma pack(4)st…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】

1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件&#xff08;System Property Definition File&#xff09;&#xff0c;用于声明和管理 Bluetooth 模块相…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

jmeter聚合报告中参数详解

sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample&#xff08;样本数&#xff09; 表示测试中发送的请求数量&#xff0c;即测试执行了多少次请求。 单位&#xff0c;以个或者次数表示。 示例&#xff1a;…...