unity 中使用zeroMq和Mqtt 进行通讯
最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下
1、整体设计
设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topic
和 bytes
进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。
public class MsgQueue
{// 用于存储接收到的二进制消息private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();public static BlockingCollection<Msg> RecvMessageQueue{get { return recMessageQueue; }}public static BlockingCollection<Msg> SendMessageQueue{get { return sendMessageQueue; }}
}
2、zeroMq功能实现
这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client
,一个server
。
2.1 zeroMq 介绍
第一次使用zeroMq
,稍微介绍下;ZeroMQ
是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ
的一些关键特性和概念:ZeroMQ
支持多种消息模式,包括:
-
请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。
-
发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。
-
推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。
-
管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个
TCP
通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/
2.2 插件介绍
zeromq
的在C#
上主要是通过netMq
库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装
Install-Package NetMQ
不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dll
到plugins
拷贝的时候注意依赖项,总共有三个,要不然会报错
2.3 代码实现
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;public class ZeroStarter : MonoBehaviour
{private SubscriberSocket subscriber;private PublisherSocket publisherSocket;private bool isRunning = true;void Start(){AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作// 初始化发布者publisherSocket = new PublisherSocket();publisherSocket.Bind("tcp://*:5557");subscriber = new SubscriberSocket();subscriber.Connect("tcp://localhost:5556");// 启动发布和订阅任务Task.Run(() => StartPub());Task.Run(() => StartSubscriber());}private void StartPub(){while (isRunning){try{// 使用 BlockingCollection 的 Take 方法获取消息Msg message = MsgQueue.SendMessageQueue.Take();publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);}catch (Exception e){Debug.LogError("Error in StartPub: " + e.Message);}}publisherSocket?.Close(); // 确保发布套接字在退出时关闭}private void StartSubscriber(){subscriber.Subscribe("");while (isRunning){try{// 通过超时和 TryReceiveFrameString 检查订阅消息if (subscriber.TryReceiveFrameString(out string topic)){byte[] bytes = subscriber.ReceiveFrameBytes();MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));string str = Encoding.Default.GetString(bytes);Debug.Log($"{topic} {str}");}Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环}catch (Exception e){Debug.LogError("Error in StartSubscriber: " + e.Message);}}subscriber.Close(); // 手动关闭订阅者套接字}private void OnDestroy(){isRunning = false; // 设置标志位,通知任务退出// 确保发布套接字关闭publisherSocket?.Close();NetMQConfig.Cleanup(); // 清理 NetMQ}
}
这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。
3、Mqtt功能实现
3.1 emqx介绍
mqtt也是一个消息队列,主要是用在IOT
,简单来说就是一个TCP
服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx
,还行,挺好用。工具客户端使用的是mqttx
,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh
3.2 unity插件
这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html
官方示例:https://github.com/emqx/MQTT-Client-Examples
我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity
,所以打开了mono
的库
编译之后
将生成的dll
拷贝到plugins
下
3.3 代码实现
using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;public class MqttStarter : MonoBehaviour
{// Start is called before the first frame updatevoid Start(){string ipAddr = "192.168.3.8";int emqxPort = 1883;string clientId = "csharpclientid";//服务器默认密码是这个string username = "username";string password = "pwd";MqttClient client = new MqttClient(ipAddr, emqxPort, false, null, null, MqttSslProtocols.None);// register to message receivedclient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;client.Connect(clientId,username,password);// client.Subscribe(new string[] { "idse/cloud2veh/#" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });}private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e){MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));Debug.Log("mqtt--> " +e.Topic +" ==== >" + Encoding.Default.GetString(e.Message));}
}
这里需要注意的是m2mqtt
会自动开启线程,不需要单独实现线程。其他的都是常规操作。
4、验证
先看下zeromq
的收发
验证下Mqtt的收发
5、Java代码zeromq的使用
pom中加入
<dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>0.5.2</version></dependency>
zeromq 发布者
package org.example;/*** Hello world!**/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;public class App {public static void main(String[] args) {int i = 0;// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个发布者socketZMQ.Socket publisher = context.createSocket(ZMQ.PUB);// 绑定到指定的端口String address = "tcp://*:5556";publisher.bind(address);System.out.println("Publisher started at " + address);// 持续发送消息int messageNumber = 0;while (!Thread.currentThread().isInterrupted()) {String message = "Message " + messageNumber;publisher.sendMore("t:"+ (messageNumber%2));publisher.send("abc".getBytes());System.out.println("Sent: " + message);// 增加消息计数并休眠1秒messageNumber++;Thread.sleep(1000);}// 关闭发布者socketpublisher.close();} catch (Exception e) {e.printStackTrace();}}
}
zeromq 订阅者
package org.example;import org.zeromq.ZMQ;
import org.zeromq.ZContext;import java.nio.ByteBuffer;public class ZmqSubscriber {public static void main(String[] args) {// 创建一个ZeroMQ的上下文try (ZContext context = new ZContext()) {// 创建一个订阅者socketZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);// 连接到发布者的地址String address = "tcp://localhost:5557"; // 确保使用正确的地址subscriber.connect(address);System.out.println("Subscriber connected to " + address);// 订阅所有消息subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题// 持续接收消息while (!Thread.currentThread().isInterrupted()) {// 接收消息String topic = subscriber.recvStr(0);byte[] message = subscriber.recv(0);ByteBuffer wrap = ByteBuffer.wrap(message);if (message != null) {System.out.println("Received: " + topic +" " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());}}// 关闭订阅者socketsubscriber.close();} catch (Exception e) {e.printStackTrace();}}
}
6、总结
第一次搞zeromq
的消息队列,和平常用的kafka
和rocketmq
差的很多,甚至完全不在同一个讨论方向。
还有一些需要研究
-
在
unity
中nuget
的学习 -
c#
不同平台的学习 -
启动后台线程之后无法关闭,导致
unity
死掉。
byte[] bytes = BitConverter.GetBytes(66666);
-
C#侧生成的bytes 是小端序列
相关文章:

unity 中使用zeroMq和Mqtt 进行通讯
最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验&…...

四、k8s快速入门之Kubernetes资源清单
kubernetes中的资源 ⭐️ k8s中所有的内容都抽象为资源,资源实列化之后,叫做对象 1️⃣名称空间级别 ⭐️ kubeadm在执行k8s的pod的时候会在kube-system这个名称空间下执行,所以说当你kubectl get pod 的时候是查看不到的查看的是默认的po…...

掌握ElasticSearch(六):分析过程
文章目录 一、什么是分析1. 字符过滤 (Character Filtering)2. 分词 (Breaking into Tokens)3. 词条过滤 (Token Filtering)4. 词条索引 (Token Indexing) 二、内置分析器分类1. 标准分析器 (Standard Analyzer)2. 简单分析器 (Simple Analyzer)3. 语言分析器 (Language Analyz…...
【设计模式】使用python 实践框架设计
单一职责原则(SRP):一个类应该只有一个职责,意味着该类只应该有一个引起变化的原因。这使得代码更易于维护和理解。 开放封闭原则(OCP):软件实体(类、模块、函数等)应该…...
Apache paimon-CDC
CDC集成 paimon支持五种方式通过模式转化数据提取到paimon表中。添加的列会实时同步到Paimon表中 MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。MySQL同步数据库:将MySQL的整个数据库同步到一个Paimon数据库中。API同步表:将您的自定义DataStream输入同步到一…...
如何分析算法的执行效率和资源消耗
分析算法的执行效率和资源消耗可以从以下几个方面入手: 一、时间复杂度分析 定义和概念 时间复杂度是衡量算法执行时间随输入规模增长的速度的指标。它通常用大 O 符号表示,表示算法执行时间与输入规模之间的关系。例如,一个算法的时间复杂度为 O(n),表示该算法的执行时间…...

提示工程(Prompt Engineering)指南(进阶篇)
在 Prompt Engineering 的进阶阶段,我们着重关注提示的结构化、复杂任务的分解、反馈循环以及模型的高级特性利用。随着生成式 AI 技术的快速发展,Prompt Engineering 已经从基础的单一指令优化转向了更具系统性的设计思维,并应用于多轮对话、…...
音视频入门基础:FLV专题(19)——FFmpeg源码中,解码Audio Tag的AudioTagHeader,并提取AUDIODATA的实现
一、引言 从《音视频入门基础:FLV专题(18)——Audio Tag简介》可以知道,未加密的情况下,FLV文件中的一个Audio Tag Tag header AudioTagHeader AUDIODATA。本文讲述FFmpeg源码中是怎样解码Audio Tag的AudioTagHead…...

前端零基础入门到上班:【Day3】从零开始构建网页骨架HTML
HTML 基础入门:从零开始构建网页骨架 目录 1. 什么是 HTML?HTML 的核心作用 2. HTML 基本结构2.1 DOCTYPE 声明2.2 <html> 标签2.3 <head> 标签2.4 <body> 标签 3. HTML 常用标签详解3.1 标题标签3.2 段落和文本标签3.3 链接标签3.4 图…...
字符脱敏工具类
1、字符脱敏工具类 import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils;/*** 数据脱敏工具类** date 2024/10/30 13:44*/Slf4j public class DataDesensitizationUtils {public static final String STAR_1 "*";public static final …...
【jvm】jvm对象都分配在堆上吗
目录 1. 说明2. 堆上分配3. 栈上分配(逃逸分析和标量替换)4. 方法区分配5. 直接内存(非堆内存) 1. 说明 1.JVM的对象并不总是分配在堆上。2.堆是JVM用于存储对象实例的主要内存区域,存在一些特殊情况,对象…...
@AutoWired和 @Resource原理深度分析!
嗨,你好呀,我是猿java Autowired和Resource是 Java程序员经常用来实现依赖注入的两个注解,这篇文章,我们将详细分析这两个注解的工作原理、使用示例和它们之间的对比。 依赖注入概述 依赖注入是一种常见的设计模式,…...

C++设计模式创建型模式———原型模式
文章目录 一、引言二、原型模式三、总结 一、引言 与工厂模式相同,原型模式(Prototype)也是创建型模式。原型模式通过一个对象(原型对象)克隆出多个一模一样的对象。实际上,该模式与其说是一种设计模式&am…...

重学SpringBoot3-Spring WebFlux之SSE服务器发送事件
更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ Spring WebFlux之SSE服务器发送事件 1. 什么是 SSE?2. Spring Boot 3 响应式编程与 SSE为什么选择响应式编程实现 SSE? 3. 实现 SSE 的基本步骤3.…...

YOLO即插即用模块---AgentAttention
Agent Attention: On the Integration of Softmax and Linear Attention 论文地址:https://arxiv.org/pdf/2312.08874 问题: 普遍使用的 Softmax 注意力机制在视觉 Transformer 模型中计算复杂度过高,限制了其在各种场景中的应用。 方法&a…...
探索开源语音识别的未来:高效利用先进的自动语音识别技术20241030
🚀 探索开源语音识别的未来:高效利用自动语音识别技术 🌟 引言 在数字化时代,语音识别技术正在引领人机交互的新潮流,为各行业带来了颠覆性的改变。开源的自动语音识别(ASR)系统,如…...

学习路之TP6--workman安装
一、安装 首先通过 composer 安装 composer require topthink/think-worker 报错: 分析:最新版本需要TP8,或装低版本的 composer require topthink/think-worker:^3.*安装后, 增加目录 vendor\workerman vendor\topthink\think-w…...

.NET内网实战:通过白名单文件反序列化漏洞绕过UAC
01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏,主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧,对内网和后渗透感兴趣的朋友们可以订阅该电子报刊,解锁更多的报刊内容。 02基本介绍 03原理分析 在渗透测试和红…...
AI Agents - 自动化项目:计划、评估和分配
Agents: Role 角色Goal 目标Backstory 背景故事 Tasks: Description 描述Expected Output 期望输出Agent 代理 Automated Project: Planning, Estimation, and Allocation Initial Imports 1.本地文件helper.py # Add your utilities or helper functions to…...
Git的.gitignore文件
一、各语言对应的.gitignore模板文件 项目地址:https://github.com/github/gitignore 二、.gitignore文件不生效 .gitignore文件只是ignore没有被追踪的文件,已被追踪的文件,要先删除缓存文件。 # 单个文件 git rm --cached file/path/to…...

51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...

C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...

基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...