当前位置: 首页 > article >正文

.net core集成MQTT服务端

程序作为MQTT的服务端,也是WebApi 接口地址,在Web页面中MQTTJS用的是Websocker协议,在Winfrom中用MQTT协议。导致程序需要启动两个端口。直接上代码

创建服务

引用包:MQTTnet,MQTTnet.AspNetCore,这包最新的5.*,我引用的是4.3.7.1207。5.*的包在接收消息的处理方式略微不同

MqttConfigServer

  public class MqttConfigServer{/// <summary>/// 客户端最大连接数/// </summary>public int MaxPendingPerClient { get; set; }/// <summary>/// Mqtt端口/// </summary>public int MqttPort { get; set; }/// <summary>/// MqttWs端口/// </summary>public int MqttWsPort { get; set; }/// <summary>/// MqttWs路径/// </summary>public string MqttWsPath { get; set; }}

这是拓展的方法,添加mqtt服务
AppInfo.GetOptions() 这是获取配置文件

 /// <summary>/// 添加mqtt服务/// </summary>/// <param name="services"></param>/// <param name="webHostBuilder"></param>public static void AddMqttService(this IServiceCollection services, IWebHostBuilder webHostBuilder){var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();webHostBuilder.ConfigureKestrel((context, serverOptions) =>{//配置Mqtt端口serverOptions.ListenAnyIP(mqttServerConfig.MqttPort, options => options.UseMqtt());//配置web端口serverOptions.ListenAnyIP(mqttServerConfig.MqttWsPort);});//配置MQTT服务services.AddHostedMqttServerWithServices(options =>{options.WithoutDefaultEndpoint();options.WithMaxPendingMessagesPerClient(mqttServerConfig.MaxPendingPerClient);options.WithKeepAlive();});services.AddMqttConnectionHandler();services.AddConnections();//注入单例mqtt侦听服务services.AddSingleton<MyMqttService>();}

MyMqttService

public class MyMqttService
{public MyMqttService(){ }public Task Server_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg){Console.WriteLine($"客户端:【{arg.SenderId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");return Task.CompletedTask;}/// <summary>/// 客户端验证账号密码/// </summary>/// <param name="arg"></param>/// <returns></returns>public Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg){Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");return Task.CompletedTask;}/// <summary>/// 侦听所有消息/// </summary>/// <param name="arg"></param>/// <returns></returns>public async Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg){Console.WriteLine($"侦听所有消息:【{arg.ClientId}】,主题【{arg.ApplicationMessage.Topic}】收到消息:{System.Text.Encoding.Default.GetString(arg.ApplicationMessage.PayloadSegment)}");await Task.CompletedTask;return;}/// <summary>/// 客户端断开连接/// </summary>/// <param name="arg"></param>/// <returns></returns>public Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg){Console.WriteLine($"【{arg.ClientId}】 下线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");return Task.CompletedTask;}/// <summary>/// 客户端上线/// </summary>/// <param name="arg"></param>/// <returns></returns>public Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg){Console.WriteLine($"{arg.ClientId} 上线了,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");return Task.CompletedTask;}/// <summary>/// 客户端取消订阅/// </summary>/// <param name="arg"></param>/// <returns></returns>public Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg){return Task.CompletedTask;}/// <summary>/// 客户端订阅/// </summary>/// <param name="arg"></param>/// <returns></returns>public Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg){return Task.CompletedTask;}/// <summary>/// 客户端保留消息/// </summary>/// <param name="arg"></param>/// <returns></returns>public Task Server_RetainedMessageChangedAsync(RetainedMessageChangedEventArgs arg){Console.WriteLine($"收到消息:{arg.ClientId}");return Task.CompletedTask;}
}

在Program 添加mqtt 服务

  //添加Mqtt服务services.AddMqttService(builder.WebHost);var mqttServerConfig = AppInfo.GetOptions<MqttConfigServer>();//WsMqtt 路径app.MapConnectionHandler<MqttConnectionHandler>(mqttServerConfig.MqttWsPath, http => http.WebSockets.SubProtocolSelector = protocolList => protocolList.FirstOrDefault() ?? string.Empty);var mqttService = app.Services.GetRequiredService<MyMqttService>();//加载MQTT回调app.UseMqttServer(server =>{//验证账号密码server.ValidatingConnectionAsync += mqttService.Server_ValidatingConnectionAsync;//侦听为消费消息server.ApplicationMessageNotConsumedAsync += mqttService.Server_ApplicationMessageNotConsumedAsync;//侦听所有消息server.InterceptingPublishAsync += mqttService.Server_InterceptingPublishAsync;//客户端订阅消息server.ClientSubscribedTopicAsync += mqttService.Server_ClientSubscribedTopicAsync;//客户端取消订阅消息server.ClientUnsubscribedTopicAsync += mqttService.Server_ClientUnsubscribedTopicAsync;//客户端链接事件server.ClientConnectedAsync += mqttService.Server_ClientConnectedAsync;//客户端关闭事件server.ClientDisconnectedAsync += mqttService.Server_ClientDisconnectedAsync;//保留消息server.RetainedMessageChangedAsync += mqttService.Server_RetainedMessageChangedAsync;});

mqttconfig

{"maxPendingPerClient": 1000, //客户端最大等待连接数"mqttPort": 1883, //mqtt端口"mqttWsPort": 8083, //mqtt websocket端口"mqttWsPath": "/mqtt" //mqtt websocket路径}

这样服务就完成,这里mqtt 服务并不影响WebApi 的接口,只是启动的两个端口

服务端主动推送消息

 private IServiceProvider _serviceProvider;public ClientControlController(IServiceProvider serviceProvider){_serviceProvider = serviceProvider;}/// <summary>/// 服务端推送消息/// </summary>/// <returns></returns>private async Task PushMsg(string topic, ClientControl control){var server = _serviceProvider.GetService<MqttHostedServer>() as MqttServer;//获取客户端var client = (await server?.GetClientsAsync()).Where(m => m.Id == "HardwareService").FirstOrDefault();//推送消息var payload = JsonConvert.SerializeObject(control);var message = new MqttApplicationMessageBuilder().WithTopic(topic)//主题.WithPayload(payload).Build();if (client != null){await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(message){SenderClientId = client.Id,});}}

相关文章:

.net core集成MQTT服务端

程序作为MQTT的服务端&#xff0c;也是WebApi 接口地址&#xff0c;在Web页面中MQTTJS用的是Websocker协议&#xff0c;在Winfrom中用MQTT协议。导致程序需要启动两个端口。直接上代码 创建服务 引用包&#xff1a;MQTTnet&#xff0c;MQTTnet.AspNetCore&#xff0c;这包最新…...

poetry安装与使用

文章目录 安装方法创建虚拟环境其他常用命令从 poetry.lock 中安装第三方依赖包 安装方法 安装命令&#xff08;全局安装&#xff0c;不要在虚拟环境中安装&#xff0c;方便后面创建环境使用&#xff09; pip install poetry修改虚拟环境路径&#xff08;首次使用poetry时执行&…...

UVM config机制及uvm_resource_pool

目录 1. uvm_config_db 类源码 1.1 set 1.2 get 2. uvm_resource_pool 2.1 uvm_resource_pool::set 2.2 uvm_resource 3. usage 4. 小结 uvm提供一种uvm_config_db机制使得在仿真中通过变量设置来修改环境,使环境更加灵活。本文主要介绍uvm_config_db#(type)::get/set…...

JAVA学习*接口

接口 在生活中我们常听说USB接口&#xff0c;那接口是什么呢&#xff1f; 在Java中&#xff0c;接口相当于多个类的一种公共规范&#xff0c;是一种引用数据类型。 定义接口 public interface IUSB {public static final String SIZE "small";public abstract vo…...

Day11 动态规划入门

动态规划 就是 : 给定一个问题&#xff0c;我们把它拆成一个个子问题&#xff0c;直到子问题可以直接解决。然后把子问题的答案保存起来&#xff0c;以减少重复计算。再根据子问题答案反推&#xff0c;得出原问题解的一种方法. 记忆化搜索 暴力dfs 记录答案 动态规划入门思…...

WPF UI元素保存为图像文件

WPF UI元素保存为图像文件 实现功能示例代码使用示例关键代码说明WPF UI元素保存为图像文件 实现功能 将WPF界面元素(如控件、布局容器)的当前视觉内容保存为图像文件适用场景:截取控件的实时显示内容(如图表、界面快照);将动态生成的UI元素导出为图片用于分享、存档或打…...

指令型样本或偏好型样本有什么区别和联系

两者都是基于给定文本生成的训练样本&#xff0c;但侧重点和用途不同&#xff1a; 指令型样本&#xff08;Instruction-based samples&#xff09; 结构&#xff1a;通常是一个简单的指令和对应的回答&#xff0c;例如一对“问题&#xff0d;答案”或“指令&#xff0d;回答”。…...

neo4j-如何让外部设备访问wsl中的neo4j

WSL 运行在一个虚拟网络环境中&#xff0c;它的 IP 只能被宿主 Windows 访问&#xff0c;外部设备无法直接访问 WSL 的端口。你需要在 Windows 上转发端口&#xff0c;让外部设备可以访问 Windows 并映射到 WSL。 1. 获取 WSL 的 IP 地址 在 WSL 中运行以下命令获取其 IP 地址…...

Python实验:读写文本文件并添加行号

[实验目的] 熟练掌握内置函数open()的用法&#xff1b;熟练运用内置函数len()、max()、和enumerate()&#xff1b;熟练运用字符串的strip()、ljust()和其它方法&#xff1b;熟练运用列表推导式。 [实验和内容] 1.编写一个程序demo.py&#xff0c;要求运行该程序后&#xff0…...

IDEA导入jar包后提示无法解析jar包中的类,比如无法解析符号 ‘log4j‘

IDEA导入jar包后提示无法解析jar包中的类 问题描述解决方法 问题描述 IDEA导入jar包的Maven坐标后&#xff0c;使用jar中的类比如log4j&#xff0c;仍然提示比如无法解析符号 log4j。 解决方法 在添加了依赖和配置文件后&#xff0c;确保刷新你的IDE项目和任何缓存&#xff…...

抖音用户视频批量下载工具开发全解析

一、逆向工程原理剖析 1.1 抖音Web端防护体系 抖音采用五层防御机制保护数据接口: graph LRA[浏览器指纹检测] --> B[请求参数签名]B --> C[Cookie动态验证]C --> D[请求频率限制]D --> E[IP信誉评级] 1.2 核心参数解密 参数名称作用原理生成方式有效期x-bogu…...

数据结构——顺序栈seq_stack

前言&#xff1a;大家好&#x1f60d;&#xff0c;本文主要介绍了数据结构——顺序栈 目录 一、概念 1.1 顺序栈的基本概念 1.2 顺序栈的存储结构 二、基本操作 2.1 结构体定义 2.2 初始化 2.3 判空 2.4 判满 2.5 扩容 2.6 插入 入栈 2.7 删除 出栈 2.8 获取栈顶元…...

LangChain其它五类组件详解(1)—— 文档加载器(Document loaders)

LangChain其它五类组件详解(1)—— 文档加载器(Document loaders) 前言本篇摘要15. LangChain其它五类组件详解15.1 文档加载器(Document loaders)15.1.1 文档加载概述15.1.2 加载Markdown1. 基本用法2. 保留元素参考文献前言 本系列文章主要介绍WEB界面工具Gradio。Gra…...

JVM常见面试总结

JVM&#xff08;Java虚拟机&#xff09;是Java程序运行的核心&#xff0c;掌握JVM相关知识对于Java开发者至关重要。以下是JVM常见的面试问题总结&#xff1a; 1. JVM内存模型 问题&#xff1a;JVM的内存结构分为哪些部分&#xff1f; 答案&#xff1a; 方法区&#xff08;Met…...

美团Leaf分布式ID生成器使用教程:号段模式与Snowflake模式详解

引言 在分布式系统中&#xff0c;生成全局唯一ID是核心需求之一。美团开源的Leaf提供了两种分布式ID生成方案&#xff1a;号段模式&#xff08;高可用、依赖数据库&#xff09;和Snowflake模式&#xff08;高性能、去中心化&#xff09;。本文将手把手教你如何配置和使用这两种…...

python3.13.2安装详细步骤(附安装包)

文章目录 前言一、python3.13.2下载二、python3.13.2安装详细步骤1.查看安装文件2.启动安装程序3.安装模式选择4.自定义安装配置5.高级选项设置6.执行安装7.开始安装8.安装完成8.打开软件9.安装验证 前言 在数字化时代&#xff0c;Python 已成为不可或缺的编程语言。无论是开发…...

AI-Talk开发板之更换串口引脚

一、默认引脚 CSK6011A使用UART0作为Debug uart&#xff0c;AI-Talk开发板默认使用的GPIOA2和GPIOA3作为Debug uart的RX和TX&#xff0c;通过连接器CN6引出。 二 、更换到其它引脚 查看60xx_iomux_v1.0可以&#xff0c;UART0的tx和rx可以映射到很多管脚上。 结合AI-Talk开发板…...

深度解读DeepSeek:源码解读 DeepSeek-V3

深度解读DeepSeek&#xff1a;开源周&#xff08;Open Source Week&#xff09;技术解读 深度解读DeepSeek&#xff1a;源码解读 DeepSeek-V3 深度解读DeepSeek&#xff1a;技术原理 深度解读DeepSeek&#xff1a;发展历程 文章目录 整体流程模型初始化模型前向传播MoE https:/…...

JavaIO流的使用和修饰器模式(直击心灵版)

系列文章目录 JavaIO流的使用和修饰器模式 文章目录 系列文章目录前言一、字节流&#xff1a; 1.FileInputStream(读取文件)2.FileOutputStream(写入文件) 二、字符流&#xff1a; 1..基础字符流:2.处理流&#xff1a;3.对象处理流&#xff1a;4.转换流&#xff1a; 三、修饰器…...

爬虫入门re+bs4

目录 前言 1. 导入必要的库 2. 定义获取网页HTML内容的函数 get_html 3. 定义获取数据的函数 get_data 4. 定义获取文章正文内容的函数 content_text 5. 定义获取单条课程数据的函数 get_one_course_data 6. 定义保存数据的函数 save_data 7. 定义文件名合法化处理函数 sanitiz…...

【WebGL】texImage2D函数

参数 从像素数据加载纹理 gl.texImage2D(target, level, internalformat, width, height, border, format, type, source);从图像元素加载纹理 gl.texImage2D(target, level, internalformat, format, type, image);target gl.TEXTURE_2D&#xff08;2D 纹理&#xff09; T…...

北斗设备启动流程与时长解析

北斗卫星导航系统作为我国自主研发的全球卫星导航系统&#xff0c;广泛应用于交通、通信、农业等多个领域。今天&#xff0c;我们就来详细探讨一下北斗设备的启动流程以及不同启动方式下的时长。 一、北斗设备的启动流程 北斗设备的启动流程可以分为以下几个关键步骤&#xf…...

MySQL身份验证的auth_socket插件

在Ubuntu 20.04 LTS上&#xff0c;MySQL 8.0默认使用auth_socket插件进行身份验证&#xff0c;可能存在意想不到的情况。 一、auth_socket插件 在使用sudo mysql或通过sudo切换用户后执行任何MySQL命令时&#xff0c;不需要输入密码或错误密码都可以正常登入mysql数据库&…...

openstack安装部署

在OpenStack的安装和部署中&#xff0c;你需要按照一定的步骤来完成整个环境的搭建。OpenStack是一个开源的云计算平台&#xff0c;它提供了基础设施即服务&#xff08;IaaS&#xff09;的能力&#xff0c;包括计算、存储和网络等资源的管理。下面是一些基本的步骤来安装和部署…...

【日志库】—— log4cpp 部署套路

部署&#xff1a; 1、安装log4cpp&#xff0c;执行如下指令进行编译安装 log4cpp的官网是&#xff1a; http://log4cpp.sourceforge.net/ wget https://nchc.dl.sourceforge.net/project/log4cpp/log4cpp-1.1.x%20%28new%29/log4cpp-1.1/log4cpp-1.1.3.tar.gz tar xzvf log4cpp…...

使用Gitee Go流水线部署个人项目到服务器指南

使用Gitee Go流水线部署个人项目到服务器指南 前言&#xff01;&#xff01;&#xff01; 本文解决的问题&#xff1a; 你有一台ECS服务器&#xff0c;你在上面部署了一个Java服务也就是一个jar&#xff0c;你觉着你每次手动本地打包&#xff0c;上传&#xff0c;在通过命令去…...

BlockChain.java

BlockChain 区块链&#xff0c;举个栗子 注意啦&#xff0c;列子里面的hashcode相等&#xff0c;但是字符串是不一样的哦&#xff0c;之前有记录这个问题 String.hashCode()-CSDN博客...

SystemVerilog 数据类型

1、内建数据类型 verilog有两种基本的数据类型&#xff1a;变量和线网&#xff0c;他们各自都可以有四种取值&#xff1a;0 1 z x&#xff1b; RTL代码使用 变量 来存放组合和时序值&#xff1b;变量可以是单bit或者是多bit的无符号数 reg [7:0] m&#xff0c; 32bit的有符号…...

【技术简析】触觉智能RK3506 Linux星闪网关开发板:重新定义工业物联新标杆

在工业智能化与物联网深度融合的今天&#xff0c;深圳触觉智能推出首款搭载瑞芯微RK3506芯片的Linux星闪网关开发板&#xff0c;为大家技术解析。 RK3506-国产芯的硬核实力 作为瑞芯微2024年第四季度推出的入门级工业芯片平台&#xff0c;RK3506以三核Cortex-A7&#xff08;1.…...

YOLO-UniOW: 高效通用开放世界目标检测模型【附论文与源码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…...