车联网架构设计(一)_消息平台的搭建
车联网是物联网的一个主要应用方向,车辆通过连接车联网平台,实时进行消息的交互,平台可以提供车辆远程控制,故障检测,车路协同等各方面的功能。
我在车联网行业从事了很长时间的技术工作,参与了整个车联网平台的构建以及很多不同车联网应用的开发工作,这里打算以构建一个车联网平台作为例子,总结一下涉及到的架构设计方面的东西。
系统功能
一个车联网平台需要实现以下的一些功能:
1. 与车辆的消息交互
在物联网中,主要应用的通信协议是MQTT,这是一个基于发布/订阅模式的物联网通信协议,具备了支持QoS,简单易实现,报文紧凑等特点。在车联网中,大部分的车企也是采用MQTT协议来进行通讯。因此在车联网的架构中,我们需要考虑设置一个MQTT Broker集群来与大量的车辆进行连接通信。目前有很多的开源的Broker,例如ActiveMQ, EMQ, RocketMQ等等,其中EMQ和RocketMQ都是国内的产品,有详尽的中文资料介绍。这里我选择EMQ作为MQTT Broker。
2. 车辆消息的消费与存储
MQTT Broker接收到车辆的消息后,需要把消息给到上层应用来进行处理。我们可以把这些消息保存到数据库或者转发到一个消息队列来缓存。这里我选择Kafka。上层应用通过订阅Kafka主题,来获得其需要的相关车辆信息,进行处理。上层应用也可以把要下发给车辆的消息发送到Kafka的主题,然后让MQTT Broker再转发给车辆,也可以直接通过MQTT主题发布消息的方式来直接发送给车辆。
3. V2X应用
包括了V2V, V2I, V2P等应用场景,车辆需要能和不同的数据源进行消息交互,从而为驾驶提供决策信息。我将基于这个平台展示一些V2X应用的开发设计,实现3GPP规范里面制定的一些V2X场景。
4. 车辆数据分析与报表
车联网平台每天都收集和生成了大量的数据,通过对这些数据进行发掘分析,可以更好的了解业务运行的情况,同时也可以更好的为商业决策提供参考。我们可以基于目前流行的大数据处理平台,例如Spark/Beam/Flink等,对数据进行即时的处理,保存到数据仓库,随后再进行各种数据分析和报表呈现。
在这篇文章中,我先对以上提到的第一点功能进行介绍,搭建一个MQTT消息平台。
MQTT消息平台
我选择EMQX来搭建这个平台,EMQX是国内的一个优秀的MQTT broker软件,有企业版和开源版,这里我选择开源版。在官网上有介绍安装方式,在Kubernetes上是采用Operator的方式来安装的,但是我这里采用kustomization的方式来安装,因为这样方便我进行一些设置上的改动。在我本地用minikube启动了一个kubernetes cluster。
安装EMQX集群
定义一个新的namespace
apiVersion: v1
kind: Namespace
metadata:name: emqx
为这个namespace创建一个service account并赋予相关权限
apiVersion: v1
kind: ServiceAccount
metadata:namespace: emqxname: emqx
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:namespace: emqxname: emqx
rules:
- apiGroups:- ""resources:- endpoints verbs: - get- watch- list
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:namespace: emqxname: emqx
subjects:- kind: ServiceAccountname: emqxnamespace: emqx
roleRef:kind: Rolename: emqxapiGroup: rbac.authorization.k8s.io
定义一个configmap,因为我们要创建一个statefulset的emqx多个节点,要实现auto cluster的功能,自动把这多个节点组成一个cluster,因此需要定义相关的配置:
apiVersion: v1
kind: ConfigMap
metadata:name: emqx-confignamespace: emqx
data:EMQX_NAME: "emqx"EMQX_CLUSTER__DISCOVERY_STRATEGY: "k8s"EMQX_CLUSTER__K8S__SERVICE_NAME: "emqx-headless"EMQX_CLUSTER__K8S__NAMESPACE: "emqx"EMQX_CLUSTER__K8S__ADDRESS_TYPE: "hostname"EMQX_CLUSTER__K8S__APISERVER: "https://kubernetes.default.svc:443"EMQX_CLUSTER__K8S__SUFFIX: "svc.cluster.local"
定义一个headless的service,用于statefulset的服务暴露和通信。
apiVersion: v1
kind: Service
metadata:name: emqx-headlessnamespace: emqx
spec:type: ClusterIPclusterIP: Noneselector:app: emqxports:- name: mqttport: 1883protocol: TCPtargetPort: 1883- name: mqttsslport: 8883protocol: TCPtargetPort: 8883- name: mgmtport: 8081protocol: TCPtargetPort: 8081- name: websocketport: 8083protocol: TCPtargetPort: 8083- name: wssport: 8084protocol: TCPtargetPort: 8084- name: dashboardport: 18083protocol: TCPtargetPort: 18083
最后是定义一个statefulset,里面包含了2个节点。
apiVersion: apps/v1
kind: StatefulSet
metadata:name: emqx-statefulsetlabels:app: emqxnamespace: emqx
spec:serviceName: emqx-headlessupdateStrategy:type: RollingUpdatereplicas: 2selector:matchLabels:app: emqxtemplate:metadata:labels:app: emqxspec:serviceAccountName: emqxcontainers:- name: emqximage: emqx/emqx:5.1.6resources:requests:memory: "1Gi"cpu: "250m"limits:memory: "1Gi"cpu: "250m"ports:- name: mqttcontainerPort: 1883- name: mqttsslcontainerPort: 8883- name: mgmtcontainerPort: 8081- name: wscontainerPort: 8083- name: wsscontainerPort: 8084- name: dashboardcontainerPort: 18083envFrom:- configMapRef:name: emqx-config
定义一个kustomization.yaml文件,把以上定义的manifest包括进来:
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- namespace.yaml
- rbac.yaml
- configmap.yaml
- headless.yaml
- statefulset.yaml
最后运行kubectl apply -k即可部署,我们可以运行以下命令来查看emqx cluster的状态:
kubectl exec emqx-statefulset-0 -n emqx -- emqx_ctl cluster status
如果成功运行,将显示如下信息:
Cluster status: #{running_nodes =>['emqx@emqx-statefulset-0.emqx-headless.emqx.svc.cluster.local','emqx@emqx-statefulset-1.emqx-headless.emqx.svc.cluster.local'],stopped_nodes => []}
可见当前的EMQX cluster包括了两个节点并已成功运行。
配置HAProxy
下一步我将配置一个HAProxy来作为Load balancer,连接EMQX集群。这种方式可以提供如下好处:
- HAProxy作为一个反向代理可以隐藏emqx节点的信息,并为外部提供一个统一的地址来连接
- 可以用作MQTT over TLS的终结,减轻emqx节点处理SSL加密的计算负荷,并且简化证书部署和管理的工作
- 提供内在的MQTT支持,支持解析MQTT消息以实现粘性附着和智能负荷分配等功能
- 通过主备方式提供高可靠性
同样我也是以kustomization的方式来部署HAProxy
定义一个namespace
apiVersion: v1
kind: Namespace
metadata:name: haproxy
定义一个configmap,因为haproxy启动需要读取haproxy.cfg配置文件的信息,把这个文件通过configmap的方式来加载
apiVersion: v1
kind: ConfigMap
metadata:name: haproxy-confignamespace: haproxy
data:haproxy.cfg: |global log 127.0.0.1 local3 info daemon maxconn 10240defaults log global mode tcp option tcplog #option dontlognull timeout connect 10000 # timeout > mqtt's keepalive * 1.2 timeout client 240s timeout server 240s maxconn 20000backend mqtt_backendmode tcp# 粘性会话负载均衡stick-table type string len 32 size 1000k expire 30mstick on req.payload(0,0),mqtt_field_value(connect,client_identifier)server emqx0 emqx-statefulset-0.emqx-headless.emqx.svc.cluster.local:1883server emqx1 emqx-statefulset-1.emqx-headless.emqx.svc.cluster.local:1883frontend mqtt_serversbind *:1883mode tcp# 拒绝非 MQTT 连接# tcp-request content reject unless { req.payload(0,0),mqtt_is_valid }default_backend mqtt_backend
定义一个deployment
apiVersion: apps/v1
kind: Deployment
metadata:labels:app: haproxyname: haproxynamespace: haproxy
spec:replicas: 1selector:matchLabels:app: haproxytemplate:metadata:labels:app: haproxyspec:containers:- name: haproxyimage: haproxy:2.8ports:- name: httpcontainerPort: 80- name: httpscontainerPort: 443- name: haproxy-mgmtcontainerPort: 1024- name: mqttcontainerPort: 1883- name: mqttsslcontainerPort: 8883- name: mgmtcontainerPort: 8081- name: wscontainerPort: 8083- name: wsscontainerPort: 8084- name: dashboardcontainerPort: 18083volumeMounts:- name: haproxy-configmountPath: /usr/local/etc/haproxy/haproxy.cfgsubPath: haproxy.cfgvolumes:- name: haproxy-configconfigMap:name: haproxy-configitems:- key: haproxy.cfgpath: haproxy.cfg
定义一个service,暴露harpoxy的端口
apiVersion: v1
kind: Service
metadata:name: haproxy-servicenamespace: haproxy
spec:selector:app: haproxyports:- name: mqttport: 1883protocol: TCPtargetPort: mqtt- name: mqttsport: 8883protocol: TCPtargetPort: 8883- name: wsport: 8083protocol: TCPtargetPort: 8083- name: wssport: 8084protocol: TCPtargetPort: 8084- name: dashboardport: 18083protocol: TCPtargetPort: 18083
最后定义一个kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- namespace.yaml
- configmap.yaml
- haproxy_deployment.yaml
- service.yaml
运行kubectl apply -k来部署
配置Ingress
在我的minikube k8s集群上暴露HAProxy的端口,使得外部可以访问MQTT。因为我想仍然暴露1883端口给外部访问,所以需要在minikube启动的时候设置
minikube start --extra-config=apiserver.service-node-port-range=1-65535
然后安装HAProxy ingress,通过helm的方式安装
helm repo add haproxytech https://haproxytech.github.io/helm-charts
helm repo update
helm install haproxy-kubernetes-ingress haproxytech/kubernetes-ingress \--create-namespace \--namespace haproxy-controller
安装完成之后,我们需要创建一个configmap,配置要暴露的TCP端口,通过kubectl apply -f来部署。
apiVersion: v1
kind: ConfigMap
metadata:name: tcpnamespace: haproxy
data:1883:haproxy/haproxy-service:18838883:haproxy/haproxy-service:8883 8083:haproxy/haproxy-service:80838084:haproxy/haproxy-service:808418083:haproxy/haproxy-service:18083
读取HAProxy ingress的配置信息,保存在values.yaml文件
helm show values haproxytech/kubernetes-ingress > values.yaml
然后在values.yaml里面找到以下对应位置,进行修改:
tcpPorts:- name: mqttport: 1883targetPort: 1883nodePort: 1883- name: mqttsport: 8883targetPort: 8883nodePort: 8883- name: wsport: 8083targetPort: 8083nodePort: 8083- name: wssport: 8084targetPort: 8084nodePort: 8084- name: dashboardport: 18083targetPort: 18083nodePort: 18083
# add extra args in controller sectionextraArgs:- --configmap-tcp-services=haproxy/tcp
运行以下命令更新haproxy-ingress的配置
helm upgrade -f values.yaml haproxy-kubernetes-ingress -n haproxy-controller haproxytech/kubernetes-ingress
现在我们就可通过一个MQTT客户端来通过HAPROXY来连接EMQX了,服务器地址是minikubeip:1883,通过访问minikubeip:18083可以访问EMQX的dashboard
配置证书
在实际应用中,车辆和平台之间是通过TLS加密来通信的,有单向认证和双向认证两种方式。单向认证指客户端需要验证服务器端是否持有受信任的证书,双向认证则指双方都需要验证。这里以双向认证为例进行配置。
1. 创建根CA证书
以自签证书的方式来做,首先是创建一个根CA证书,如以下命令。
openssl req -newkey rsa:2048 -nodes -x509 -days 3650 -keyout root-ca.key -out root-ca.crt
用以下命令可以查看创建的证书内容
openssl x509 -noout -text -in root-ca.crt
2. 创建中间CA证书
有了根CA之后,我们可以用来签发中间CA证书,因为一般证书都不会直接用根CA来签发的。
输入以下命令来创建中间CA的Key和证书签发请求csr,注意common name不能和之前根CA的重复
openssl req -newkey rsa:2048 -nodes -days 3650 -keyout intermediate-ca.key -out intermediate-ca.csr
因为我们这个证书是要用作CA,需要定义一个扩展文件来描述是一个CA type,创建一个ca-cert-extension.cnf,内容如下:
basicConstraints = CA:TRUE
keyUsage = keyCertSign, cRLSign
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
然后就可以用根CA证书对这个csr进行签发,生成中间CA证书
openssl x509 -req -in intermediate-ca.csr -out intermediate-ca.crt -CA root-ca.crt -CAkey root-ca.key -CAcreateserial -days 3650 -extfile ca-cert-extensions.cnf
3. 为客户端签发证书
有了中间CA证书后,我们就可以为客户端创建证书了,例如为一辆ID为vehicle-1的车辆签发证书。
openssl req -newkey rsa:2048 -nodes -days 365 -subj "/CN=vehicle-1/O=vehicle" -keyout client.key -out client.csr
创建一个扩展文件client-cert-extensions.cnf,声明其是client certificate type
basicConstraints = CA:FALSE
keyUsage = digitalSignature
extendedKeyUsage = clientAuth
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
然后用中间CA证书签发
openssl x509 -req -in client.csr -out client.crt -CA intermediate-ca.crt -CAkey intermediate-ca.key -CAcreateserial -days 365 -extfile client-cert-extensions.cnf
4. 为服务器端签发证书
同理,为服务器端也签发证书
openssl req -newkey rsa:2048 -nodes -days 365 -subj "/CN=emqx/O=server" -keyout server.key -out server.csr
创建一个扩展文件server-cert-extensions.cnf
basicConstraints = CA:FALSE
keyUsage = digitalSignature
extendedKeyUsage = serverAuth
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
用中间CA证书签发
openssl x509 -req -in server.csr -out server.crt -CA intermediate-ca.crt -CAkey intermediate-ca.key -CAcreateserial -days 365 -extfile server-cert-extensions.cnf
然后把server.crt和server.key合成一个文件
相关文章:
车联网架构设计(一)_消息平台的搭建
车联网是物联网的一个主要应用方向,车辆通过连接车联网平台,实时进行消息的交互,平台可以提供车辆远程控制,故障检测,车路协同等各方面的功能。 我在车联网行业从事了很长时间的技术工作,参与了整个车联网…...
(蓝桥杯)1125 第 4 场算法双周赛题解+AC代码(c++/java)
题目一:验题人的生日【算法赛】 验题人的生日【算法赛】 - 蓝桥云课 (lanqiao.cn) 思路: 1.又是偶数,又是质数,那么只有2喽 AC_Code:C #include <iostream> using namespace std; int main() {cout<<2;return 0; …...
也可Adobe Animate
Animate CC 由原Adobe Flash Professional CC 更名得来,2015年12月2日:Adobe 宣布Flash Professional更名为Animate CC,在支持Flash SWF文件的基础上,加入了对HTML5的支持。并在2016年1月份发布新版本的时候,正式更名为…...
【面试HOT200】回溯篇
系列综述: 💞目的:本系列是个人整理为了秋招面试的,整理期间苛求每个知识点,平衡理解简易度与深入程度。 🥰来源:材料主要源于【CodeTopHot300】进行的,每个知识点的修正和深入主要参…...
JVM——内存溢出和内存泄漏
目录 1. 内存溢出和内存泄漏内存泄漏的常见场景解决内存溢出的思路1.发现问题 – Top命令2.发现问题 – VisualVM3.发现问题 – Arthas4.发现问题 – Prometheus Grafana5.发现问题 – 堆内存状况的对比:将指定名称绑定到初始化程序的子对象或元素。简而言之,它们使我们能够从元组或结构中声明多个变量。与引用一样,结构化绑定是现有对象的别名;与引用不同,结构化绑定不必是引用类型(referen…...
Mover Creator 用户界面
1 “开始”对话框 首次打开 Mover Creator 时,出现的第一个页面是“开始”对话框,如下所示。从这里开始,用户可以选择开始设计飞机、武器或发动机。在上述每种情况下,用户都可以创建新模型或编辑现有模型。 1.1 新建模型 如果用…...
『Nginx安全访问控制』利用Nginx实现账号密码认证登录的最佳实践
📣读完这篇文章里你能收获到 如何创建用户账号和密码文件,并生成加密密码配置Nginx的认证模块,实现基于账号密码的登录验证 文章目录 一、创建账号密码文件1. 安装htpasswd工具1.1 CentOS1.2 Ubuntu 二、配置Nginx三、重启Nginx 在Web应用程…...
MongoDB导入导出命令
(1)mongoexport命令 例如: mongoexport --db testdb --collection person --out person.json mongoexport --db testdb --collection person --fields name,age --out person.json mongoexport --db testdb --collection person --query {&qu…...
软件工程期末复习(1)
学习资料 软件工程知识点总结_嘤桃子的博客-CSDN博客 软件工程学习笔记_软件工程导论第六版张海藩pdf-CSDN博客 【软件工程】软件工程期末试卷习题课讲解!!_哔哩哔哩_bilibili 【拯救者】软件工程速成(期末考研复试软考)均适用. 支持4K_哔哩哔哩_bil…...
nextjs入门
创建项目 npx create-next-app 项目名 体验文件路由 nextjs提供了文件路由的功能, 根据文件系统的目录结构, 可以识别为对应的页面路由 创建页面 首先, 在src下创建pages目录, 然后创建一个about文件(对应about页面)和main/index.js文件(对应首页) pages/main/index con…...
【C语言】字符串函数strlen #strcpy #strcmp #strcat #strstr及其模拟实现
在C语言中,有一种特殊的数据类型,即字符串类型。C 并没有专门定义一个字符串类型,这对我们使用字符串造成了一定的麻烦。但是,C标准库<string.h> 中定义了各种字符串函数,这对于我们来说是一件值得庆幸的事情。…...
递归实现组合型枚举
递归实现组合型枚举 #include<iostream> #include<vector>int n, m; std::vector<int>res; bool st[30];void Print() {for(int i0;i<res.size();i){printf("%d ",res[i]);}puts(""); }void dfs(int num) {if (res.size() m){Print(…...
SCAU:1065 数组中的指针
1065 数组中的指针 时间限制:1000MS 代码长度限制:10KB 提交次数:3436 通过次数:1692 题型: 编程题 语言: G;GCC Description 设有如下数组定义: int a[3][4]{{1,3,5,7},{9,11,13,15},{17,19,21,23}}; 计算下面各项的值(设数组a的首地址为2000&…...
找不到msvcp110.dll如何修复?分享5个亲测有效的修复方法
在计算机使用过程中,我们经常会遇到一些错误提示,其中之一就是“msvcp110.dll丢失”。这个错误通常发生在运行某些程序时,系统无法找到所需的动态链接库文件。那么,msvcp110.dll到底是什么呢?它又有什么作用࿱…...
D2RML:暗黑破坏神2重制版多开终极指南,告别繁琐登录流程
D2RML:暗黑破坏神2重制版多开终极指南,告别繁琐登录流程 【免费下载链接】D2RML Diablo 2 Resurrected Multilauncher 项目地址: https://gitcode.com/gh_mirrors/d2/D2RML 还在为暗黑破坏神2重制版的多账户切换而烦恼吗?每次登录战网…...
告别手动处理!用MATLAB App Designer打造你的专属数据(图片/表格)预处理小工具
告别手动处理!用MATLAB App Designer打造你的专属数据预处理小工具 在数据分析与科研工作中,我们常常陷入重复性劳动的泥潭:每次收到新数据集都要用不同软件打开图片查看尺寸、用Excel检查表格结构、用统计工具计算基础指标。这种碎片化操作不…...
ElevenLabs藏文语音生成上线仅72小时:开发者必须立即掌握的5个API调用避坑要点
更多请点击: https://intelliparadigm.com 第一章:ElevenLabs藏文语音生成上线背景与技术意义 藏语作为中国官方认可的少数民族语言之一,拥有超过600万母语使用者,主要分布在西藏、青海、四川、甘肃和云南等地区。长期以来&…...
SAP S/4HANA Cloud Public Edition 3-System Landscape 里的系统与 Tenant 设计
做 SAP S/4HANA Cloud Public Edition 项目时,最容易被低估的一件事,不是功能点本身,而是系统与 tenant 的边界。很多实施风险,并不是来自某个配置字段填错,也不是来自某段 ABAP 扩展代码写得不够优雅,而是项目一开始就没有把 Development、Test、Production、Customizin…...
C++二叉树控制台可视化:从递归布局到层序遍历的图形化实现
1. 项目概述:为什么我们需要“看见”二叉树?在C的学习和数据结构实践中,二叉树是一个绕不开的核心概念。我们经常需要实现它的插入、删除、遍历等操作。然而,无论是调试一个复杂的平衡算法,还是向他人展示你的数据结构…...
终极指南:如何用UniversalSplitScreen在一台电脑上玩多人游戏
终极指南:如何用UniversalSplitScreen在一台电脑上玩多人游戏 【免费下载链接】UniversalSplitScreen Split screen multiplayer for any game with multiple keyboards, mice and controllers. 项目地址: https://gitcode.com/gh_mirrors/un/UniversalSplitScree…...
从指标到版图:基于Cadence与gmid方法的两级运放实战设计
1. 两级运放设计入门:从指标到晶体管的思维转换 第一次接触两级运放设计时,我盯着性能指标表发呆了半小时。AV≥10M、CL10pf、SR10V/us这些数字就像天书,直到导师扔给我一本《模拟集成电路设计艺术》和一份Cadence使用手册。现在回想起来&…...
如何快速掌握明日方舟自动化助手:5大核心功能告别重复操作
如何快速掌握明日方舟自动化助手:5大核心功能告别重复操作 【免费下载链接】MaaAssistantArknights 《明日方舟》小助手,全日常一键长草!| A one-click tool for the daily tasks of Arknights, supporting all clients. 项目地址: https:/…...
什么是 TRAE IDE?
TRAE IDE 是一款深度融合 AI 能力的开发工具,提供从代码编写、项目理解、调试运行到变更管理的完整开发体验。你可以像使用传统 IDE 一样掌控每一步,也可以把复杂任务交给 AI 智能体规划和执行。使用场景TRAE IDE 覆盖日常开发与复杂工程任务,…...
基于MCP协议连接AI与Postal邮件服务器的自动化实践
1. 项目概述:一个连接Postal与MCP的桥梁最近在折腾一些自动化工作流,发现很多内部系统的数据都通过Postal(一个开源的邮件服务器管理平台)来流转,而我又想用上新兴的模型上下文协议(MCP)来让AI助…...
