当前位置: 首页 > news >正文

MQTT(Message Queuing Telemetry Transport)协议(三)

主题是什么

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2. TCP 协议封装
tcp.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>// 建立 TCP 连接
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;// 创建套接字sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);// 将 IP 地址从点分十进制转换为二进制形式if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 连接服务器if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {perror("connection failed");close(sockfd);return -1;}return sockfd;
}// 发送数据
int tcp_send(int sockfd, const char *data, int length) {return send(sockfd, data, length, 0);
}// 接收数据
int tcp_receive(int sockfd, char *buffer, int buffer_size) {return recv(sockfd, buffer, buffer_size, 0);
}// 关闭 TCP 连接
void tcp_close(int sockfd) {close(sockfd);
}

tcp.h

#ifndef TCP_H
#define TCP_Hint tcp_connect(const char *server_ip, int server_port);
int tcp_send(int sockfd, const char *data, int length);
int tcp_receive(int sockfd, char *buffer, int buffer_size);
void tcp_close(int sockfd);#endif
  1. MQTT 协议封装

在这里插入图片描述
mqtt.h

#ifndef MQTT_H
#define MQTT_H#include <stdint.h>// 定义主题列表节点结构体
typedef struct TopicNode {char *topic;struct TopicNode *next;
} TopicNode;// 定义主题列表结构体
typedef struct TopicList {TopicNode *head;TopicNode *tail;int count;
} TopicList;int mqtt_connect(int sockfd, const char *client_id);
int mqtt_parse_connack(int sockfd);
int mqtt_subscribe_topics(int sockfd, TopicList *topics);
int mqtt_publish(int sockfd, const char *topic, const char *message);
void init_topic_list(TopicList *list);
void add_topic(TopicList *list, const char *topic);
void free_topic_list(TopicList *list);#endif

mqtt.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"// MQTT 固定报头类型
#define MQTT_CONNECT 1
#define MQTT_CONNACK 2
#define MQTT_PUBLISH 3
#define MQTT_PUBACK 4
#define MQTT_PUBREC 5
#define MQTT_PUBREL 6
#define MQTT_PUBCOMP 7
#define MQTT_SUBSCRIBE 8
#define MQTT_SUBACK 9
#define MQTT_UNSUBSCRIBE 10
#define MQTT_UNSUBACK 11
#define MQTT_PINGREQ 12
#define MQTT_PINGRESP 13
#define MQTT_DISCONNECT 14// 生成 MQTT CONNECT 消息
int mqtt_connect(int sockfd, const char *client_id) {char buffer[100];int index = 0;// 固定报头buffer[index++] = MQTT_CONNECT << 4;// 剩余长度int remaining_length = 12 + strlen(client_id);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 协议名buffer[index++] = 0;buffer[index++] = 4;memcpy(buffer + index, "MQTT", 4);index += 4;// 协议级别buffer[index++] = 4;// 连接标志buffer[index++] = 0;// 保持活动时间buffer[index++] = 0;buffer[index++] = 60;// 有效载荷// 客户端 IDbuffer[index++] = 0;buffer[index++] = strlen(client_id);memcpy(buffer + index, client_id, strlen(client_id));index += strlen(client_id);// 发送 CONNECT 消息return tcp_send(sockfd, buffer, index);
}// 解析 MQTT CONNACK 消息
int mqtt_parse_connack(int sockfd) {char buffer[10];int len = tcp_receive(sockfd, buffer, sizeof(buffer));if (len < 4) {return -1;}if ((buffer[0] & 0xF0) != (MQTT_CONNACK << 4)) {return -1;}return buffer[3];
}// 初始化主题列表
void init_topic_list(TopicList *list) {list->head = NULL;list->tail = NULL;list->count = 0;
}// 添加主题到列表
void add_topic(TopicList *list, const char *topic) {TopicNode *new_node = (TopicNode *)malloc(sizeof(TopicNode));if (new_node == NULL) {perror("Memory allocation failed");return;}new_node->topic = strdup(topic);new_node->next = NULL;if (list->head == NULL) {list->head = new_node;list->tail = new_node;} else {list->tail->next = new_node;list->tail = new_node;}list->count++;
}// 释放主题列表内存
void free_topic_list(TopicList *list) {TopicNode *current = list->head;TopicNode *next;while (current != NULL) {next = current->next;free(current->topic);free(current);current = next;}list->head = NULL;list->tail = NULL;list->count = 0;
}// 生成 MQTT SUBSCRIBE 消息,支持多主题订阅
int mqtt_subscribe_topics(int sockfd, TopicList *topics) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_SUBSCRIBE << 4 | 0x02;// 计算剩余长度int remaining_length = 2; // 报文标识符长度TopicNode *current = topics->head;while (current != NULL) {remaining_length += 2 + strlen(current->topic) + 1; // 主题长度 + 主题名 + QoScurrent = current->next;}// 编码剩余长度do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 报文标识符buffer[index++] = 0;buffer[index++] = 1;// 有效载荷,遍历主题列表current = topics->head;while (current != NULL) {// 主题过滤器buffer[index++] = 0;buffer[index++] = strlen(current->topic);memcpy(buffer + index, current->topic, strlen(current->topic));index += strlen(current->topic);// QoSbuffer[index++] = 0;current = current->next;}// 发送 SUBSCRIBE 消息return tcp_send(sockfd, buffer, index);
}// 生成 MQTT PUBLISH 消息
int mqtt_publish(int sockfd, const char *topic, const char *message) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_PUBLISH << 4;// 剩余长度int remaining_length = 2 + strlen(topic) + strlen(message);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 主题名buffer[index++] = 0;buffer[index++] = strlen(topic);memcpy(buffer + index, topic, strlen(topic));index += strlen(topic);// 有效载荷memcpy(buffer + index, message, strlen(message));index += strlen(message);// 发送 PUBLISH 消息return tcp_send(sockfd, buffer, index);
}
  1. main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"#define SERVER_IP "your_mqtt_server_ip"
#define SERVER_PORT 1883
#define CLIENT_ID "MyClientID"
#define TOPIC1 "home/livingroom/temperature"
#define TOPIC2 "home/bedroom/humidity"
#define MESSAGE "Sensor data"int main() {int sockfd;TopicList topics;// 建立 TCP 连接sockfd = tcp_connect(SERVER_IP, SERVER_PORT);if (sockfd == -1) {return -1;}// 发送 MQTT CONNECT 消息if (mqtt_connect(sockfd, CLIENT_ID) == -1) {tcp_close(sockfd);return -1;}// 解析 MQTT CONNACK 消息int connack_result = mqtt_parse_connack(sockfd);if (connack_result != 0) {printf("Connection failed with result code %d\n", connack_result);tcp_close(sockfd);return -1;}printf("Connected to MQTT server\n");// 初始化主题列表init_topic_list(&topics);// 添加主题add_topic(&topics, TOPIC1);add_topic(&topics, TOPIC2);// 订阅多个主题if (mqtt_subscribe_topics(sockfd, &topics) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Subscribed to multiple topics\n");// 发布消息if (mqtt_publish(sockfd, TOPIC1, MESSAGE) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Published message: %s on topic: %s\n", MESSAGE, TOPIC1);// 释放主题列表内存free_topic_list(&topics);// 关闭连接tcp_close(sockfd);return 0;
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

// 修改 tcp_connect 函数,添加超时处理
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;struct timeval timeout;sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 设置连接超时时间timeout.tv_sec = 5;timeout.tv_usec = 0;setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout));if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {if (errno == ETIMEDOUT) {printf("Connection timed out\n");} else {perror("connection failed");}close(sockfd);return -1;}return sockfd;
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

相关文章:

MQTT(Message Queuing Telemetry Transport)协议(三)

主题是什么 2. TCP 协议封装 tcp.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h>// 建立 TCP 连接 int tcp_connect(const char *server_ip, int s…...

多核cpu与时间片多线程的问题

在多核处理器中&#xff0c;每个核心可以独立运行一个线程。操作系统负责管理和调度这些线程&#xff0c;以确保高效利用处理器资源。下面详细解释如何获取时间片以及四个线程如何在四个核心上同时工作。 ### 时间片和调度 #### 1. 时间片&#xff08;Time Slice&#xff09;…...

电脑出现蓝屏英文怎么办?查看修复过程

电脑出现蓝屏英文是一种常见的电脑故障&#xff0c;它通常表示电脑遇到了严重的错误&#xff0c;需要停止运行以防止进一步的损坏。电脑蓝屏英文的原因可能有很多&#xff0c;比如硬件故障、驱动程序错误、系统文件损坏、病毒感染等。那么&#xff0c;当电脑出现蓝屏英文时&…...

安卓基础(第一集)

SharedPreferences&#xff08;本地存储简单数据&#xff09; 在 Android 中&#xff0c;SharedPreferences 用于存储小型数据。 &#xff08;1&#xff09;存储数据 // 获取 SharedPreferences 对象 SharedPreferences sharedPreferences getSharedPreferences("MyPre…...

【从零开始入门unity游戏开发之——C#篇56】C#补充知识点——模式匹配

考虑到每个人基础可能不一样,且并不是所有人都有同时做2D、3D开发的需求,所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要讲解C#的基础语法,包括变量、数据类型、运算符、流程控制、面向对象等,适合没有编程基础的…...

【数据可视化-16】珍爱网上海注册者情况分析

&#x1f9d1; 博主简介&#xff1a;曾任某智慧城市类企业算法总监&#xff0c;目前在美国市场的物流公司从事高级算法工程师一职&#xff0c;深耕人工智能领域&#xff0c;精通python数据挖掘、可视化、机器学习等&#xff0c;发表过AI相关的专利并多次在AI类比赛中获奖。CSDN…...

c/c++蓝桥杯经典编程题100道(21)背包问题

背包问题 ->返回c/c蓝桥杯经典编程题100道-目录 目录 背包问题 一、题型解释 二、例题问题描述 三、C语言实现 解法1&#xff1a;0-1背包&#xff08;基础动态规划&#xff0c;难度★&#xff09; 解法2&#xff1a;0-1背包&#xff08;空间优化版&#xff0c;难度★…...

电赛DEEPSEEK

以下是针对竞赛题目的深度优化方案&#xff0c;重点解决频率接近时的滤波难题和相位测量精度问题&#xff1a; 以下是使用NI Multisim 14.3实现本项目的详细解决方案&#xff1a; 一、基础要求实现方案&#xff08;模块化设计&#xff09; 1. 双频信号发生电路 电路结构&…...

VSOMEIP ROUTING应用和CLIENT应用之间交互的消息

#define VSOMEIP_ASSIGN_CLIENT 0x00 // client应用请求分配client_id #define VSOMEIP_ASSIGN_CLIENT_ACK 0x01 // routing应用返回分配的client_id #define VSOMEIP_REGISTER_APPLICATION 0x02 // client应用注册someip应用 #…...

HTML之基本布局div|span

HTML基本布局使用 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"width<device-width>, initial-scale1.0"><title>布局</title> <…...

Linux下学【MySQL】常用函数助你成为数据库大师~(配sql+实操图+案例巩固 通俗易懂版~)

绪论​ 每日激励&#xff1a;“唯有努力&#xff0c;才能进步” 绪论​&#xff1a; 本章是MySQL中常见的函数&#xff0c;利用好函数能很大的帮助我们提高MySQL使用效率&#xff0c;也能很好处理一些情况&#xff0c;如字符串的拼接&#xff0c;字符串的获取&#xff0c;进制…...

【Rabbitmq篇】高级特性----TTL,死信队列,延迟队列

目录 一.TTL ???1.设置消息的TTL 2.设置队列的TTL 3.俩者区别? 二.死信队列 定义&#xff1a; 消息成为死信的原因&#xff1a; 1.消息被拒绝&#xff08;basic.reject 或 basic.nack&#xff09; 2.消息过期&#xff08;TTL&#xff09; 3.队列达到最大长度? …...

机器学习赋能的智能光子学器件系统研究与应用

机器学习赋能的智能光子学器件系统研究与应用 时间&#xff1a; 2025年03月29日-03月30日 2025年04月05日-04月06日 机器学习赋能的光子学器件与系统&#xff1a;从创新设计到前沿应用 课程针对光子学方面的从业科研人员及开发者&#xff0c;希望了解和实践在集成光学/空间…...

尚硅谷课程【笔记】——大数据之Linux【三】

课程视频链接&#xff1a;尚硅谷大数据Linux课程 七、定时任务调度 任务调度&#xff1a;指系统在某个时间执行的特定的命令或程序。 1&#xff09;系统工作&#xff1a;有些重要的工作必须周而复始地执行。 2&#xff09;个别用户工作&#xff1a;用户可能希望在某些特定的时…...

Visual Studio踩过的坑

统计Unity项目代码行数 编辑-查找和替换-在文件中查找 查找内容输入 b*[^:b#/].*$ 勾选“使用正则表达式” 文件类型留空 也有网友做了指定&#xff0c;供参考 !*\bin\*;!*\obj\*;!*\.*\*!*.meta;!*.prefab;!*.unity 打开Unity的项目 注意&#xff1a;只是看&#xff0…...

教程 | MySQL 基本指令指南(附MySQL软件包)

此前已经发布了安装教程安装教程&#xff0c;现在让我们来学习一下MySQL的基本指令。 一、数据库连接与退出 连接本地数据库 mysql -uroot -p # 输入后回车&#xff0c;按提示输入密码&#xff08;密码输入不可见&#xff09;若需隐藏密码显示&#xff0c;可使用&#xff1…...

企业数据集成案例:吉客云销售渠道到MySQL

测试-查询销售渠道信息-dange&#xff1a;吉客云数据集成到MySQL的技术案例分享 在企业的数据管理过程中&#xff0c;如何高效、可靠地实现不同系统之间的数据对接是一个关键问题。本次我们将分享一个具体的技术案例——通过轻易云数据集成平台&#xff0c;将吉客云中的销售渠…...

网络编程 day3

思维导图 以select函数模型为例 思维导图2 对应 epoll模型 应使用的函数 题目 使用epoll函数实现 两个客户端 通过服务器 实现聊天 思路 在原先代码基础上 实现 服务器 发向 客户端 使用客户端在服务器上的 套接字描述符 实现 客户端 接收 服务器…...

Excel 融合 deepseek

效果展示 代码实现 Function QhBaiDuYunAIReq(question, _Optional Authorization "Bearer ", _Optional Qhurl "https://qianfan.baidubce.com/v2/chat/completions")Dim XMLHTTP As ObjectDim url As Stringurl Qhurl 这里替换为你实际的URLDim postD…...

【论文笔记】Are Self-Attentions Effective for Time Series Forecasting? (NeurIPS 2024)

官方代码https://github.com/dongbeank/CATS Abstract 时间序列预测在多领域极为关键&#xff0c;Transformer 虽推进了该领域发展&#xff0c;但有效性尚存争议&#xff0c;有研究表明简单线性模型有时表现更优。本文聚焦于自注意力机制在时间序列预测中的作用&#xff0c;提…...

告别AI对话失忆症:深入LangChain4j的ChatMemoryProvider与InMemoryChatMemoryStore

深入LangChain4j记忆管理&#xff1a;构建高性能会话隔离系统的实践指南 在构建企业级AI对话系统时&#xff0c;会话记忆管理往往成为决定用户体验的关键因素。想象这样一个场景&#xff1a;当用户询问"我上周提到的项目进展如何&#xff1f;"时&#xff0c;系统能否…...

开源游戏工具:Steam Achievement Manager实现跨平台成就管理的全攻略

开源游戏工具&#xff1a;Steam Achievement Manager实现跨平台成就管理的全攻略 【免费下载链接】SteamAchievementManager A manager for game achievements in Steam. 项目地址: https://gitcode.com/gh_mirrors/st/SteamAchievementManager 在游戏世界中&#xff0c…...

cv2.findContours()错误的解决办法ValueError: not enough values to unpack (expected 3, got 2)

方法一&#xff1a;直接去掉一个返回值就即可。 方法二&#xff1a;把OpenCV 安装3.X的版本 具体原因 2、解析差异&#xff1a; OpenCV2和OpenCV4中&#xff1a; findContours这个轮廓提取函数会返回两个值&#xff1a;①轮廓的点集(contours)②各层轮廓的索引(hierarchy) 返回…...

基于PostGIS与SpringBoot构建高性能动态MVT矢量瓦片服务

1. 为什么需要动态矢量瓦片服务 第一次接触矢量瓦片是在2018年做智慧城市项目时&#xff0c;当时前端同事抱怨加载行政区划数据太慢。一个省级行政区划的GeoJSON文件大小超过10MB&#xff0c;每次打开网页都要等半天。后来尝试了Mapbox的矢量瓦片方案&#xff0c;加载速度直接提…...

VideoSrt:零基础视频字幕自动化解决方案

VideoSrt&#xff1a;零基础视频字幕自动化解决方案 【免费下载链接】video-srt-windows 这是一个可以识别视频语音自动生成字幕SRT文件的开源 Windows-GUI 软件工具。 项目地址: https://gitcode.com/gh_mirrors/vi/video-srt-windows 视频创作者的效率痛点&#xff1a…...

MemMA:多智能体驱动的记忆自进化框架

&#x1f4cc; 一句话总结&#xff1a; 本工作提出 MemMA&#xff0c;一个通过多智能体协同与自进化机制统一优化“记忆构建-检索-利用”循环的框架&#xff0c;显著提升长程记忆推理能力。 &#x1f50d; 背景问题&#xff1a; 当前 memory-augmented LLM agent 存在两个核…...

HTML基础教程入门保姆级教学

什么是HTMLHTML全称Hyper Text Markup Language, 翻译成中文就是超文本标记语言&#xff0c;是一种最基础的网页开发语言, 需要注意的是HTML并不是编程语言 HTML 只有核心作用&#xff1a;搭建网页的结构和内容…...

Kandinsky-5.0-I2V-Lite-5s多场景应用:社交头像动效、PPT动态配图、电子相册生成

Kandinsky-5.0-I2V-Lite-5s多场景应用&#xff1a;社交头像动效、PPT动态配图、电子相册生成 1. 认识Kandinsky-5.0-I2V-Lite-5s Kandinsky-5.0-I2V-Lite-5s是一款轻量级图生视频模型&#xff0c;它能将静态图片转化为动态视频。你只需要上传一张首帧图片&#xff0c;再补充一…...

【2026年最新600套毕设项目分享】springboot“优兴趣”家教平台(14298)

有需要的同学&#xff0c;源代码和配套文档领取&#xff0c;加文章最下方的名片哦 一、项目演示 项目演示视频 二、资料介绍 完整源代码&#xff08;前后端源代码SQL脚本&#xff09;配套文档&#xff08;LWPPT开题报告/任务书&#xff09;远程调试控屏包运行一键启动项目&…...

Java 25 FFI与C++ ABI不兼容?GCC 13/Clang 18符号修饰差异导致段错误的逆向工程溯源(含LLVM IR级对比图)

第一章&#xff1a;Java 25 FFI与C ABI不兼容问题的现场复现与现象确认Java 25 引入的 Foreign Function & Memory API&#xff08;FFI&#xff09;在调用 C 原生函数时&#xff0c;因 C ABI&#xff08;Application Binary Interface&#xff09;未被标准化支持&#xff0…...