当前位置: 首页 > news >正文

MQTT(Message Queuing Telemetry Transport)协议(三)

主题是什么

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2. TCP 协议封装
tcp.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>// 建立 TCP 连接
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;// 创建套接字sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);// 将 IP 地址从点分十进制转换为二进制形式if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 连接服务器if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {perror("connection failed");close(sockfd);return -1;}return sockfd;
}// 发送数据
int tcp_send(int sockfd, const char *data, int length) {return send(sockfd, data, length, 0);
}// 接收数据
int tcp_receive(int sockfd, char *buffer, int buffer_size) {return recv(sockfd, buffer, buffer_size, 0);
}// 关闭 TCP 连接
void tcp_close(int sockfd) {close(sockfd);
}

tcp.h

#ifndef TCP_H
#define TCP_Hint tcp_connect(const char *server_ip, int server_port);
int tcp_send(int sockfd, const char *data, int length);
int tcp_receive(int sockfd, char *buffer, int buffer_size);
void tcp_close(int sockfd);#endif
  1. MQTT 协议封装

在这里插入图片描述
mqtt.h

#ifndef MQTT_H
#define MQTT_H#include <stdint.h>// 定义主题列表节点结构体
typedef struct TopicNode {char *topic;struct TopicNode *next;
} TopicNode;// 定义主题列表结构体
typedef struct TopicList {TopicNode *head;TopicNode *tail;int count;
} TopicList;int mqtt_connect(int sockfd, const char *client_id);
int mqtt_parse_connack(int sockfd);
int mqtt_subscribe_topics(int sockfd, TopicList *topics);
int mqtt_publish(int sockfd, const char *topic, const char *message);
void init_topic_list(TopicList *list);
void add_topic(TopicList *list, const char *topic);
void free_topic_list(TopicList *list);#endif

mqtt.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"// MQTT 固定报头类型
#define MQTT_CONNECT 1
#define MQTT_CONNACK 2
#define MQTT_PUBLISH 3
#define MQTT_PUBACK 4
#define MQTT_PUBREC 5
#define MQTT_PUBREL 6
#define MQTT_PUBCOMP 7
#define MQTT_SUBSCRIBE 8
#define MQTT_SUBACK 9
#define MQTT_UNSUBSCRIBE 10
#define MQTT_UNSUBACK 11
#define MQTT_PINGREQ 12
#define MQTT_PINGRESP 13
#define MQTT_DISCONNECT 14// 生成 MQTT CONNECT 消息
int mqtt_connect(int sockfd, const char *client_id) {char buffer[100];int index = 0;// 固定报头buffer[index++] = MQTT_CONNECT << 4;// 剩余长度int remaining_length = 12 + strlen(client_id);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 协议名buffer[index++] = 0;buffer[index++] = 4;memcpy(buffer + index, "MQTT", 4);index += 4;// 协议级别buffer[index++] = 4;// 连接标志buffer[index++] = 0;// 保持活动时间buffer[index++] = 0;buffer[index++] = 60;// 有效载荷// 客户端 IDbuffer[index++] = 0;buffer[index++] = strlen(client_id);memcpy(buffer + index, client_id, strlen(client_id));index += strlen(client_id);// 发送 CONNECT 消息return tcp_send(sockfd, buffer, index);
}// 解析 MQTT CONNACK 消息
int mqtt_parse_connack(int sockfd) {char buffer[10];int len = tcp_receive(sockfd, buffer, sizeof(buffer));if (len < 4) {return -1;}if ((buffer[0] & 0xF0) != (MQTT_CONNACK << 4)) {return -1;}return buffer[3];
}// 初始化主题列表
void init_topic_list(TopicList *list) {list->head = NULL;list->tail = NULL;list->count = 0;
}// 添加主题到列表
void add_topic(TopicList *list, const char *topic) {TopicNode *new_node = (TopicNode *)malloc(sizeof(TopicNode));if (new_node == NULL) {perror("Memory allocation failed");return;}new_node->topic = strdup(topic);new_node->next = NULL;if (list->head == NULL) {list->head = new_node;list->tail = new_node;} else {list->tail->next = new_node;list->tail = new_node;}list->count++;
}// 释放主题列表内存
void free_topic_list(TopicList *list) {TopicNode *current = list->head;TopicNode *next;while (current != NULL) {next = current->next;free(current->topic);free(current);current = next;}list->head = NULL;list->tail = NULL;list->count = 0;
}// 生成 MQTT SUBSCRIBE 消息,支持多主题订阅
int mqtt_subscribe_topics(int sockfd, TopicList *topics) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_SUBSCRIBE << 4 | 0x02;// 计算剩余长度int remaining_length = 2; // 报文标识符长度TopicNode *current = topics->head;while (current != NULL) {remaining_length += 2 + strlen(current->topic) + 1; // 主题长度 + 主题名 + QoScurrent = current->next;}// 编码剩余长度do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 报文标识符buffer[index++] = 0;buffer[index++] = 1;// 有效载荷,遍历主题列表current = topics->head;while (current != NULL) {// 主题过滤器buffer[index++] = 0;buffer[index++] = strlen(current->topic);memcpy(buffer + index, current->topic, strlen(current->topic));index += strlen(current->topic);// QoSbuffer[index++] = 0;current = current->next;}// 发送 SUBSCRIBE 消息return tcp_send(sockfd, buffer, index);
}// 生成 MQTT PUBLISH 消息
int mqtt_publish(int sockfd, const char *topic, const char *message) {char buffer[200];int index = 0;// 固定报头buffer[index++] = MQTT_PUBLISH << 4;// 剩余长度int remaining_length = 2 + strlen(topic) + strlen(message);do {unsigned char digit = remaining_length % 128;remaining_length /= 128;if (remaining_length > 0) {digit |= 0x80;}buffer[index++] = digit;} while (remaining_length > 0);// 可变报头// 主题名buffer[index++] = 0;buffer[index++] = strlen(topic);memcpy(buffer + index, topic, strlen(topic));index += strlen(topic);// 有效载荷memcpy(buffer + index, message, strlen(message));index += strlen(message);// 发送 PUBLISH 消息return tcp_send(sockfd, buffer, index);
}
  1. main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"#define SERVER_IP "your_mqtt_server_ip"
#define SERVER_PORT 1883
#define CLIENT_ID "MyClientID"
#define TOPIC1 "home/livingroom/temperature"
#define TOPIC2 "home/bedroom/humidity"
#define MESSAGE "Sensor data"int main() {int sockfd;TopicList topics;// 建立 TCP 连接sockfd = tcp_connect(SERVER_IP, SERVER_PORT);if (sockfd == -1) {return -1;}// 发送 MQTT CONNECT 消息if (mqtt_connect(sockfd, CLIENT_ID) == -1) {tcp_close(sockfd);return -1;}// 解析 MQTT CONNACK 消息int connack_result = mqtt_parse_connack(sockfd);if (connack_result != 0) {printf("Connection failed with result code %d\n", connack_result);tcp_close(sockfd);return -1;}printf("Connected to MQTT server\n");// 初始化主题列表init_topic_list(&topics);// 添加主题add_topic(&topics, TOPIC1);add_topic(&topics, TOPIC2);// 订阅多个主题if (mqtt_subscribe_topics(sockfd, &topics) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Subscribed to multiple topics\n");// 发布消息if (mqtt_publish(sockfd, TOPIC1, MESSAGE) == -1) {tcp_close(sockfd);free_topic_list(&topics);return -1;}printf("Published message: %s on topic: %s\n", MESSAGE, TOPIC1);// 释放主题列表内存free_topic_list(&topics);// 关闭连接tcp_close(sockfd);return 0;
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

// 修改 tcp_connect 函数,添加超时处理
int tcp_connect(const char *server_ip, int server_port) {int sockfd;struct sockaddr_in server_addr;struct timeval timeout;sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket creation failed");return -1;}memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {perror("invalid address/ address not supported");close(sockfd);return -1;}// 设置连接超时时间timeout.tv_sec = 5;timeout.tv_usec = 0;setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout));if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {if (errno == ETIMEDOUT) {printf("Connection timed out\n");} else {perror("connection failed");}close(sockfd);return -1;}return sockfd;
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

相关文章:

MQTT(Message Queuing Telemetry Transport)协议(三)

主题是什么 2. TCP 协议封装 tcp.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h>// 建立 TCP 连接 int tcp_connect(const char *server_ip, int s…...

多核cpu与时间片多线程的问题

在多核处理器中&#xff0c;每个核心可以独立运行一个线程。操作系统负责管理和调度这些线程&#xff0c;以确保高效利用处理器资源。下面详细解释如何获取时间片以及四个线程如何在四个核心上同时工作。 ### 时间片和调度 #### 1. 时间片&#xff08;Time Slice&#xff09;…...

电脑出现蓝屏英文怎么办?查看修复过程

电脑出现蓝屏英文是一种常见的电脑故障&#xff0c;它通常表示电脑遇到了严重的错误&#xff0c;需要停止运行以防止进一步的损坏。电脑蓝屏英文的原因可能有很多&#xff0c;比如硬件故障、驱动程序错误、系统文件损坏、病毒感染等。那么&#xff0c;当电脑出现蓝屏英文时&…...

安卓基础(第一集)

SharedPreferences&#xff08;本地存储简单数据&#xff09; 在 Android 中&#xff0c;SharedPreferences 用于存储小型数据。 &#xff08;1&#xff09;存储数据 // 获取 SharedPreferences 对象 SharedPreferences sharedPreferences getSharedPreferences("MyPre…...

【从零开始入门unity游戏开发之——C#篇56】C#补充知识点——模式匹配

考虑到每个人基础可能不一样,且并不是所有人都有同时做2D、3D开发的需求,所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要讲解C#的基础语法,包括变量、数据类型、运算符、流程控制、面向对象等,适合没有编程基础的…...

【数据可视化-16】珍爱网上海注册者情况分析

&#x1f9d1; 博主简介&#xff1a;曾任某智慧城市类企业算法总监&#xff0c;目前在美国市场的物流公司从事高级算法工程师一职&#xff0c;深耕人工智能领域&#xff0c;精通python数据挖掘、可视化、机器学习等&#xff0c;发表过AI相关的专利并多次在AI类比赛中获奖。CSDN…...

c/c++蓝桥杯经典编程题100道(21)背包问题

背包问题 ->返回c/c蓝桥杯经典编程题100道-目录 目录 背包问题 一、题型解释 二、例题问题描述 三、C语言实现 解法1&#xff1a;0-1背包&#xff08;基础动态规划&#xff0c;难度★&#xff09; 解法2&#xff1a;0-1背包&#xff08;空间优化版&#xff0c;难度★…...

电赛DEEPSEEK

以下是针对竞赛题目的深度优化方案&#xff0c;重点解决频率接近时的滤波难题和相位测量精度问题&#xff1a; 以下是使用NI Multisim 14.3实现本项目的详细解决方案&#xff1a; 一、基础要求实现方案&#xff08;模块化设计&#xff09; 1. 双频信号发生电路 电路结构&…...

VSOMEIP ROUTING应用和CLIENT应用之间交互的消息

#define VSOMEIP_ASSIGN_CLIENT 0x00 // client应用请求分配client_id #define VSOMEIP_ASSIGN_CLIENT_ACK 0x01 // routing应用返回分配的client_id #define VSOMEIP_REGISTER_APPLICATION 0x02 // client应用注册someip应用 #…...

HTML之基本布局div|span

HTML基本布局使用 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"width<device-width>, initial-scale1.0"><title>布局</title> <…...

Linux下学【MySQL】常用函数助你成为数据库大师~(配sql+实操图+案例巩固 通俗易懂版~)

绪论​ 每日激励&#xff1a;“唯有努力&#xff0c;才能进步” 绪论​&#xff1a; 本章是MySQL中常见的函数&#xff0c;利用好函数能很大的帮助我们提高MySQL使用效率&#xff0c;也能很好处理一些情况&#xff0c;如字符串的拼接&#xff0c;字符串的获取&#xff0c;进制…...

【Rabbitmq篇】高级特性----TTL,死信队列,延迟队列

目录 一.TTL ???1.设置消息的TTL 2.设置队列的TTL 3.俩者区别? 二.死信队列 定义&#xff1a; 消息成为死信的原因&#xff1a; 1.消息被拒绝&#xff08;basic.reject 或 basic.nack&#xff09; 2.消息过期&#xff08;TTL&#xff09; 3.队列达到最大长度? …...

机器学习赋能的智能光子学器件系统研究与应用

机器学习赋能的智能光子学器件系统研究与应用 时间&#xff1a; 2025年03月29日-03月30日 2025年04月05日-04月06日 机器学习赋能的光子学器件与系统&#xff1a;从创新设计到前沿应用 课程针对光子学方面的从业科研人员及开发者&#xff0c;希望了解和实践在集成光学/空间…...

尚硅谷课程【笔记】——大数据之Linux【三】

课程视频链接&#xff1a;尚硅谷大数据Linux课程 七、定时任务调度 任务调度&#xff1a;指系统在某个时间执行的特定的命令或程序。 1&#xff09;系统工作&#xff1a;有些重要的工作必须周而复始地执行。 2&#xff09;个别用户工作&#xff1a;用户可能希望在某些特定的时…...

Visual Studio踩过的坑

统计Unity项目代码行数 编辑-查找和替换-在文件中查找 查找内容输入 b*[^:b#/].*$ 勾选“使用正则表达式” 文件类型留空 也有网友做了指定&#xff0c;供参考 !*\bin\*;!*\obj\*;!*\.*\*!*.meta;!*.prefab;!*.unity 打开Unity的项目 注意&#xff1a;只是看&#xff0…...

教程 | MySQL 基本指令指南(附MySQL软件包)

此前已经发布了安装教程安装教程&#xff0c;现在让我们来学习一下MySQL的基本指令。 一、数据库连接与退出 连接本地数据库 mysql -uroot -p # 输入后回车&#xff0c;按提示输入密码&#xff08;密码输入不可见&#xff09;若需隐藏密码显示&#xff0c;可使用&#xff1…...

企业数据集成案例:吉客云销售渠道到MySQL

测试-查询销售渠道信息-dange&#xff1a;吉客云数据集成到MySQL的技术案例分享 在企业的数据管理过程中&#xff0c;如何高效、可靠地实现不同系统之间的数据对接是一个关键问题。本次我们将分享一个具体的技术案例——通过轻易云数据集成平台&#xff0c;将吉客云中的销售渠…...

网络编程 day3

思维导图 以select函数模型为例 思维导图2 对应 epoll模型 应使用的函数 题目 使用epoll函数实现 两个客户端 通过服务器 实现聊天 思路 在原先代码基础上 实现 服务器 发向 客户端 使用客户端在服务器上的 套接字描述符 实现 客户端 接收 服务器…...

Excel 融合 deepseek

效果展示 代码实现 Function QhBaiDuYunAIReq(question, _Optional Authorization "Bearer ", _Optional Qhurl "https://qianfan.baidubce.com/v2/chat/completions")Dim XMLHTTP As ObjectDim url As Stringurl Qhurl 这里替换为你实际的URLDim postD…...

【论文笔记】Are Self-Attentions Effective for Time Series Forecasting? (NeurIPS 2024)

官方代码https://github.com/dongbeank/CATS Abstract 时间序列预测在多领域极为关键&#xff0c;Transformer 虽推进了该领域发展&#xff0c;但有效性尚存争议&#xff0c;有研究表明简单线性模型有时表现更优。本文聚焦于自注意力机制在时间序列预测中的作用&#xff0c;提…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

定时器任务——若依源码分析

分析util包下面的工具类schedule utils&#xff1a; ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类&#xff0c;封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz&#xff0c;先构建任务的 JobD…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

【单片机期末】单片机系统设计

主要内容&#xff1a;系统状态机&#xff0c;系统时基&#xff0c;系统需求分析&#xff0c;系统构建&#xff0c;系统状态流图 一、题目要求 二、绘制系统状态流图 题目&#xff1a;根据上述描述绘制系统状态流图&#xff0c;注明状态转移条件及方向。 三、利用定时器产生时…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

EtherNet/IP转DeviceNet协议网关详解

一&#xff0c;设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络&#xff0c;本网关连接到EtherNet/IP总线中做为从站使用&#xff0c;连接到DeviceNet总线中做为从站使用。 在自动…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...