MQTT(Message Queuing Telemetry Transport)协议(三)
主题是什么




2. TCP 协议封装
tcp.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>// 建立 TCP 连接
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;// 创建套接字sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);// 将 IP 地址从点分十进制转换为二进制形式if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 连接服务器if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {perror("connection failed");close(sockfd);return -1;}return sockfd;
}// 发送数据
int tcp_send(int sockfd, const char *data, int length) {return send(sockfd, data, length, 0);
}// 接收数据
int tcp_receive(int sockfd, char *buffer, int buffer_size) {return recv(sockfd, buffer, buffer_size, 0);
}// 关闭 TCP 连接
void tcp_close(int sockfd) {close(sockfd);
}
tcp.h
#ifndef TCP_H
#define TCP_Hint tcp_connect(const char *server_ip, int server_port);
int tcp_send(int sockfd, const char *data, int length);
int tcp_receive(int sockfd, char *buffer, int buffer_size);
void tcp_close(int sockfd);#endif
- MQTT 协议封装

mqtt.h
#ifndef MQTT_H
#define MQTT_H#include <stdint.h>// 定义主题列表节点结构体
typedef struct TopicNode {char *topic;struct TopicNode *next;
} TopicNode;// 定义主题列表结构体
typedef struct TopicList {TopicNode *head;TopicNode *tail;int count;
} TopicList;int mqtt_connect(int sockfd, const char *client_id);
int mqtt_parse_connack(int sockfd);
int mqtt_subscribe_topics(int sockfd, TopicList *topics);
int mqtt_publish(int sockfd, const char *topic, const char *message);
void init_topic_list(TopicList *list);
void add_topic(TopicList *list, const char *topic);
void free_topic_list(TopicList *list);#endif
mqtt.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"// MQTT 固定报头类型
#define MQTT_CONNECT 1
#define MQTT_CONNACK 2
#define MQTT_PUBLISH 3
#define MQTT_PUBACK 4
#define MQTT_PUBREC 5
#define MQTT_PUBREL 6
#define MQTT_PUBCOMP 7
#define MQTT_SUBSCRIBE 8
#define MQTT_SUBACK 9
#define MQTT_UNSUBSCRIBE 10
#define MQTT_UNSUBACK 11
#define MQTT_PINGREQ 12
#define MQTT_PINGRESP 13
#define MQTT_DISCONNECT 14// 生成 MQTT CONNECT 消息
int mqtt_connect(int sockfd, const char *client_id) {char buffer[100];int index = 0;// 固定报头buffer[index++] = MQTT_CONNECT << 4;// 剩余长度int remaining_length = 12 + strlen(client_id);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 协议名buffer[index++] = 0;buffer[index++] = 4;memcpy(buffer + index, "MQTT", 4);index += 4;// 协议级别buffer[index++] = 4;// 连接标志buffer[index++] = 0;// 保持活动时间buffer[index++] = 0;buffer[index++] = 60;// 有效载荷// 客户端 IDbuffer[index++] = 0;buffer[index++] = strlen(client_id);memcpy(buffer + index, client_id, strlen(client_id));index += strlen(client_id);// 发送 CONNECT 消息return tcp_send(sockfd, buffer, index);
}// 解析 MQTT CONNACK 消息
int mqtt_parse_connack(int sockfd) {char buffer[10];int len = tcp_receive(sockfd, buffer, sizeof(buffer));if (len < 4) {return -1;}if ((buffer[0] & 0xF0) != (MQTT_CONNACK << 4)) {return -1;}return buffer[3];
}// 初始化主题列表
void init_topic_list(TopicList *list) {list->head = NULL;list->tail = NULL;list->count = 0;
}// 添加主题到列表
void add_topic(TopicList *list, const char *topic) {TopicNode *new_node = (TopicNode *)malloc(sizeof(TopicNode));if (new_node == NULL) {perror("Memory allocation failed");return;}new_node->topic = strdup(topic);new_node->next = NULL;if (list->head == NULL) {list->head = new_node;list->tail = new_node;} else {list->tail->next = new_node;list->tail = new_node;}list->count++;
}// 释放主题列表内存
void free_topic_list(TopicList *list) {TopicNode *current = list->head;TopicNode *next;while (current != NULL) {next = current->next;free(current->topic);free(current);current = next;}list->head = NULL;list->tail = NULL;list->count = 0;
}// 生成 MQTT SUBSCRIBE 消息,支持多主题订阅
int mqtt_subscribe_topics(int sockfd, TopicList *topics) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_SUBSCRIBE << 4 | 0x02;// 计算剩余长度int remaining_length = 2; // 报文标识符长度TopicNode *current = topics->head;while (current != NULL) {remaining_length += 2 + strlen(current->topic) + 1; // 主题长度 + 主题名 + QoScurrent = current->next;}// 编码剩余长度do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 报文标识符buffer[index++] = 0;buffer[index++] = 1;// 有效载荷,遍历主题列表current = topics->head;while (current != NULL) {// 主题过滤器buffer[index++] = 0;buffer[index++] = strlen(current->topic);memcpy(buffer + index, current->topic, strlen(current->topic));index += strlen(current->topic);// QoSbuffer[index++] = 0;current = current->next;}// 发送 SUBSCRIBE 消息return tcp_send(sockfd, buffer, index);
}// 生成 MQTT PUBLISH 消息
int mqtt_publish(int sockfd, const char *topic, const char *message) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_PUBLISH << 4;// 剩余长度int remaining_length = 2 + strlen(topic) + strlen(message);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 主题名buffer[index++] = 0;buffer[index++] = strlen(topic);memcpy(buffer + index, topic, strlen(topic));index += strlen(topic);// 有效载荷memcpy(buffer + index, message, strlen(message));index += strlen(message);// 发送 PUBLISH 消息return tcp_send(sockfd, buffer, index);
}
- main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"#define SERVER_IP "your_mqtt_server_ip"
#define SERVER_PORT 1883
#define CLIENT_ID "MyClientID"
#define TOPIC1 "home/livingroom/temperature"
#define TOPIC2 "home/bedroom/humidity"
#define MESSAGE "Sensor data"int main() {int sockfd;TopicList topics;// 建立 TCP 连接sockfd = tcp_connect(SERVER_IP, SERVER_PORT);if (sockfd == -1) {return -1;}// 发送 MQTT CONNECT 消息if (mqtt_connect(sockfd, CLIENT_ID) == -1) {tcp_close(sockfd);return -1;}// 解析 MQTT CONNACK 消息int connack_result = mqtt_parse_connack(sockfd);if (connack_result != 0) {printf("Connection failed with result code %d\n", connack_result);tcp_close(sockfd);return -1;}printf("Connected to MQTT server\n");// 初始化主题列表init_topic_list(&topics);// 添加主题add_topic(&topics, TOPIC1);add_topic(&topics, TOPIC2);// 订阅多个主题if (mqtt_subscribe_topics(sockfd, &topics) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Subscribed to multiple topics\n");// 发布消息if (mqtt_publish(sockfd, TOPIC1, MESSAGE) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Published message: %s on topic: %s\n", MESSAGE, TOPIC1);// 释放主题列表内存free_topic_list(&topics);// 关闭连接tcp_close(sockfd);return 0;
}






// 修改 tcp_connect 函数,添加超时处理
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;struct timeval timeout;sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 设置连接超时时间timeout.tv_sec = 5;timeout.tv_usec = 0;setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout));if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {if (errno == ETIMEDOUT) {printf("Connection timed out\n");} else {perror("connection failed");}close(sockfd);return -1;}return sockfd;
}



相关文章:
MQTT(Message Queuing Telemetry Transport)协议(三)
主题是什么 2. TCP 协议封装 tcp.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h>// 建立 TCP 连接 int tcp_connect(const char *server_ip, int s…...
多核cpu与时间片多线程的问题
在多核处理器中,每个核心可以独立运行一个线程。操作系统负责管理和调度这些线程,以确保高效利用处理器资源。下面详细解释如何获取时间片以及四个线程如何在四个核心上同时工作。 ### 时间片和调度 #### 1. 时间片(Time Slice)…...
电脑出现蓝屏英文怎么办?查看修复过程
电脑出现蓝屏英文是一种常见的电脑故障,它通常表示电脑遇到了严重的错误,需要停止运行以防止进一步的损坏。电脑蓝屏英文的原因可能有很多,比如硬件故障、驱动程序错误、系统文件损坏、病毒感染等。那么,当电脑出现蓝屏英文时&…...
安卓基础(第一集)
SharedPreferences(本地存储简单数据) 在 Android 中,SharedPreferences 用于存储小型数据。 (1)存储数据 // 获取 SharedPreferences 对象 SharedPreferences sharedPreferences getSharedPreferences("MyPre…...
【从零开始入门unity游戏开发之——C#篇56】C#补充知识点——模式匹配
考虑到每个人基础可能不一样,且并不是所有人都有同时做2D、3D开发的需求,所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要讲解C#的基础语法,包括变量、数据类型、运算符、流程控制、面向对象等,适合没有编程基础的…...
【数据可视化-16】珍爱网上海注册者情况分析
🧑 博主简介:曾任某智慧城市类企业算法总监,目前在美国市场的物流公司从事高级算法工程师一职,深耕人工智能领域,精通python数据挖掘、可视化、机器学习等,发表过AI相关的专利并多次在AI类比赛中获奖。CSDN…...
c/c++蓝桥杯经典编程题100道(21)背包问题
背包问题 ->返回c/c蓝桥杯经典编程题100道-目录 目录 背包问题 一、题型解释 二、例题问题描述 三、C语言实现 解法1:0-1背包(基础动态规划,难度★) 解法2:0-1背包(空间优化版,难度★…...
电赛DEEPSEEK
以下是针对竞赛题目的深度优化方案,重点解决频率接近时的滤波难题和相位测量精度问题: 以下是使用NI Multisim 14.3实现本项目的详细解决方案: 一、基础要求实现方案(模块化设计) 1. 双频信号发生电路 电路结构&…...
VSOMEIP ROUTING应用和CLIENT应用之间交互的消息
#define VSOMEIP_ASSIGN_CLIENT 0x00 // client应用请求分配client_id #define VSOMEIP_ASSIGN_CLIENT_ACK 0x01 // routing应用返回分配的client_id #define VSOMEIP_REGISTER_APPLICATION 0x02 // client应用注册someip应用 #…...
HTML之基本布局div|span
HTML基本布局使用 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"width<device-width>, initial-scale1.0"><title>布局</title> <…...
Linux下学【MySQL】常用函数助你成为数据库大师~(配sql+实操图+案例巩固 通俗易懂版~)
绪论 每日激励:“唯有努力,才能进步” 绪论: 本章是MySQL中常见的函数,利用好函数能很大的帮助我们提高MySQL使用效率,也能很好处理一些情况,如字符串的拼接,字符串的获取,进制…...
【Rabbitmq篇】高级特性----TTL,死信队列,延迟队列
目录 一.TTL ???1.设置消息的TTL 2.设置队列的TTL 3.俩者区别? 二.死信队列 定义: 消息成为死信的原因: 1.消息被拒绝(basic.reject 或 basic.nack) 2.消息过期(TTL) 3.队列达到最大长度? …...
机器学习赋能的智能光子学器件系统研究与应用
机器学习赋能的智能光子学器件系统研究与应用 时间: 2025年03月29日-03月30日 2025年04月05日-04月06日 机器学习赋能的光子学器件与系统:从创新设计到前沿应用 课程针对光子学方面的从业科研人员及开发者,希望了解和实践在集成光学/空间…...
尚硅谷课程【笔记】——大数据之Linux【三】
课程视频链接:尚硅谷大数据Linux课程 七、定时任务调度 任务调度:指系统在某个时间执行的特定的命令或程序。 1)系统工作:有些重要的工作必须周而复始地执行。 2)个别用户工作:用户可能希望在某些特定的时…...
Visual Studio踩过的坑
统计Unity项目代码行数 编辑-查找和替换-在文件中查找 查找内容输入 b*[^:b#/].*$ 勾选“使用正则表达式” 文件类型留空 也有网友做了指定,供参考 !*\bin\*;!*\obj\*;!*\.*\*!*.meta;!*.prefab;!*.unity 打开Unity的项目 注意:只是看࿰…...
教程 | MySQL 基本指令指南(附MySQL软件包)
此前已经发布了安装教程安装教程,现在让我们来学习一下MySQL的基本指令。 一、数据库连接与退出 连接本地数据库 mysql -uroot -p # 输入后回车,按提示输入密码(密码输入不可见)若需隐藏密码显示,可使用࿱…...
企业数据集成案例:吉客云销售渠道到MySQL
测试-查询销售渠道信息-dange:吉客云数据集成到MySQL的技术案例分享 在企业的数据管理过程中,如何高效、可靠地实现不同系统之间的数据对接是一个关键问题。本次我们将分享一个具体的技术案例——通过轻易云数据集成平台,将吉客云中的销售渠…...
网络编程 day3
思维导图 以select函数模型为例 思维导图2 对应 epoll模型 应使用的函数 题目 使用epoll函数实现 两个客户端 通过服务器 实现聊天 思路 在原先代码基础上 实现 服务器 发向 客户端 使用客户端在服务器上的 套接字描述符 实现 客户端 接收 服务器…...
Excel 融合 deepseek
效果展示 代码实现 Function QhBaiDuYunAIReq(question, _Optional Authorization "Bearer ", _Optional Qhurl "https://qianfan.baidubce.com/v2/chat/completions")Dim XMLHTTP As ObjectDim url As Stringurl Qhurl 这里替换为你实际的URLDim postD…...
【论文笔记】Are Self-Attentions Effective for Time Series Forecasting? (NeurIPS 2024)
官方代码https://github.com/dongbeank/CATS Abstract 时间序列预测在多领域极为关键,Transformer 虽推进了该领域发展,但有效性尚存争议,有研究表明简单线性模型有时表现更优。本文聚焦于自注意力机制在时间序列预测中的作用,提…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)
LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 题目描述解题思路Java代码 题目描述 题目链接:LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...
Python学习(8) ----- Python的类与对象
Python 中的类(Class)与对象(Object)是面向对象编程(OOP)的核心。我们可以通过“类是模板,对象是实例”来理解它们的关系。 🧱 一句话理解: 类就像“图纸”,对…...
