当前位置: 首页 > news >正文

C++编程:模拟实现CyberRT的DataVisitor和DataDispatcher

文章目录

    • 0. 引言
    • 1. 设计概要
      • 1.1 主要组件
      • 1.2 类关系图
      • 1.3 工作流程
    • 2. 代码实现
      • 2.1. 定义数据结构
      • 2.2. 实现 DataVisitor
      • 2.3. 实现 DataDispatcher
      • 2.4. 实现 Receiver
      • 2.5. 实现具体的 DataVisitor
      • 2.6. 示例主程序
      • 2.7. 编译和运行

0. 引言

使用 C++ 实现一个类似CyberRT 架构的 DataVisitorDataDispatcher。在 CyberRT 中:

  • Receiver 接收到消息后,会触发回调。
  • 回调中调用 DataDispatcher(消息分发器)发布消息。
  • DataDispatcher 是一个单例,负责所有的数据分发,并将数据放入对应的缓存中。
  • 然后,DataDispatcher 会通知对应的协程(在此简化为线程)去处理消息。
  • DataVisitor(消息访问器)是辅助类,用于管理数据处理过程,包括注册通知机制和绑定回调函数。

1. 设计概要

1.1 主要组件

  • DataDispatcher

    • 单例模式。
    • 管理所有 DataVisitor
    • 分发数据到对应的 DataVisitor 的缓冲区。
    • 通知 DataVisitor 处理数据。
  • DataVisitor

    • 负责特定类型数据的处理。
    • 包含一个线程,等待 DataDispatcher 的通知。
    • 绑定一个回调函数用于处理数据。
    • 管理自己的数据缓冲区。
  • Receiver

    • 模拟消息接收器,接收到消息后调用 DataDispatcher 发布数据。

1.2 类关系图

以下类关系图,反映了 DataDispatcher 作为单例管理多个 DataVisitor,并与 Receiver 交互的关系。

管理
1
*
调用 Dispatch
Data
+int id
+std::string content
«abstract»
DataVisitor
+Notify(std::shared_ptr<data> data)
LoggingVisitor
+Create()
ProcessingVisitor
+Create()
DataDispatcher
-std::vector> visitors
-std::mutex mutex
+Instance()
+RegisterVisitor(std::shared_ptr visitor)
+UnregisterVisitor(std::shared_ptr visitor)
+Dispatch(std::shared_ptr<data> data)
Receiver
+ReceiveMessage(int id, const std::string& content)

1.3 工作流程

  1. Receiver 接收到消息后,调用 DataDispatcher::Instance()->Dispatch(data)
  2. DataDispatcher 将数据放入对应的 DataVisitor 的缓冲区。
  3. DataDispatcher 通知对应的 DataVisitor
  4. DataVisitor 的线程被唤醒,取出数据并执行绑定的回调函数进行处理。

2. 代码实现

完整代码见:data-visitor-dispatcher

2.1. 定义数据结构

首先,定义一个通用的数据结构 Data

// data.h
#ifndef DATA_H
#define DATA_H#include <string>// 定义一个通用的数据类型
struct Data {int id;std::string content;
};#endif  // DATA_H

2.2. 实现 DataVisitor

DataVisitor 负责处理特定类型的数据。它包含一个线程,该线程等待 DataDispatcher 的通知,然后处理数据。

// data_visitor.h
#ifndef DATA_VISITOR_H
#define DATA_VISITOR_H#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>#include "data.h"class DataVisitor : public std::enable_shared_from_this<DataVisitor> {public:using Callback = std::function<void(std::shared_ptr<Data>)>;// 构造函数,绑定回调函数explicit DataVisitor(Callback callback) : callback_(callback), stop_flag_(false) {worker_thread_ = std::thread(&DataVisitor::ProcessData, this);}// 析构函数,确保线程安全停止~DataVisitor() {Stop();if (worker_thread_.joinable()) {worker_thread_.join();}}// 用于 DataDispatcher 发布数据时调用void Notify(std::shared_ptr<Data> data) {{std::lock_guard<std::mutex> lock(queue_mutex_);data_queue_.push(data);}cv_.notify_one();}private:// 数据处理线程函数void ProcessData() {while (!stop_flag_) {std::unique_lock<std::mutex> lock(queue_mutex_);cv_.wait(lock, [this]() { return stop_flag_ || !data_queue_.empty(); });while (!data_queue_.empty()) {auto data = data_queue_.front();data_queue_.pop();lock.unlock();// 执行绑定的回调函数if (callback_) {try {callback_(data);} catch (const std::exception& e) {// 处理回调中的异常,防止线程终止// 可以记录日志或采取其他措施// 这里简单输出错误信息std::cerr << "Exception in callback: " << e.what() << std::endl;}}lock.lock();}}}// 停止线程void Stop() {stop_flag_ = true;cv_.notify_all();}Callback callback_;std::thread worker_thread_;std::mutex queue_mutex_;std::condition_variable cv_;std::queue<std::shared_ptr<Data>> data_queue_;std::atomic<bool> stop_flag_;
};#endif  // DATA_VISITOR_H

2.3. 实现 DataDispatcher

DataDispatcher 是一个单例,负责管理所有的 DataVisitor 并分发数据。

// data_dispatcher.h
#ifndef DATA_DISPATCHER_H
#define DATA_DISPATCHER_H#include <algorithm>
#include <memory>
#include <mutex>
#include <vector>#include "data.h"
#include "data_visitor.h"// 单例的 DataDispatcher
class DataDispatcher {public:// 获取单例实例static DataDispatcher& Instance() {static DataDispatcher instance;return instance;}// 禁止拷贝和赋值DataDispatcher(const DataDispatcher&) = delete;DataDispatcher& operator=(const DataDispatcher&) = delete;// 注册一个 DataVisitorvoid RegisterVisitor(const std::shared_ptr<DataVisitor>& visitor) {std::lock_guard<std::mutex> lock(mutex_);visitors_.emplace_back(visitor);}// 注销一个 DataVisitorvoid UnregisterVisitor(const std::shared_ptr<DataVisitor>& visitor) {std::lock_guard<std::mutex> lock(mutex_);visitors_.erase(std::remove(visitors_.begin(), visitors_.end(), visitor), visitors_.end());}// 分发数据到所有注册的 DataVisitorvoid Dispatch(const std::shared_ptr<Data>& data) {std::lock_guard<std::mutex> lock(mutex_);for (auto& visitor : visitors_) {if (visitor) {visitor->Notify(data);}}}private:// 私有构造函数DataDispatcher() = default;~DataDispatcher() = default;std::vector<std::shared_ptr<DataVisitor>> visitors_;std::mutex mutex_;
};#endif  // DATA_DISPATCHER_H

2.4. 实现 Receiver

模拟消息接收器,每当接收到消息时,调用 DataDispatcher 分发数据。

// receiver.h
#ifndef RECEIVER_H
#define RECEIVER_H#include <functional>
#include <memory>
#include <string>#include "data.h"
#include "data_dispatcher.h"// Receiver,模拟消息接收器
class Receiver {public:using Callback = std::function<void(std::shared_ptr<Data>)>;// 构造函数,绑定回调函数explicit Receiver(Callback callback) : callback_(callback) {}// 模拟接收消息void ReceiveMessage(int id, const std::string& content) {auto data = std::make_shared<Data>();data->id = id;data->content = content;// 触发回调if (callback_) {callback_(data);}}private:Callback callback_;
};#endif  // RECEIVER_H

2.5. 实现具体的 DataVisitor

例如,创建 LoggingVisitorProcessingVisitor,它们各自有不同的处理逻辑。

// logging_visitor.h
#ifndef LOGGING_VISITOR_H
#define LOGGING_VISITOR_H#include <iostream>
#include <memory>
#include "data_visitor.h"// LoggingVisitor,负责记录数据
class LoggingVisitor {public:// 创建一个 LoggingVisitor 的 DataVisitor 实例static std::shared_ptr<DataVisitor> Create() {return std::make_shared<DataVisitor>([](std::shared_ptr<Data> data) {std::cout << "[LoggingVisitor] Received data: ID=" << data->id << ", Content=\"" << data->content << "\""<< std::endl;});}
};#endif  // LOGGING_VISITOR_H
// processing_visitor.h
#ifndef PROCESSING_VISITOR_H
#define PROCESSING_VISITOR_H#include <iostream>
#include <memory>
#include "data_visitor.h"// ProcessingVisitor,负责处理数据
class ProcessingVisitor {public:// 创建一个 ProcessingVisitor 的 DataVisitor 实例static std::shared_ptr<DataVisitor> Create() {return std::make_shared<DataVisitor>([](std::shared_ptr<Data> data) {// 简单示例:打印数据长度std::cout << "[ProcessingVisitor] Processed data ID=" << data->id << ", Length=" << data->content.length()<< std::endl;});}
};#endif  // PROCESSING_VISITOR_H

2.6. 示例主程序

展示如何使用上述组件,实现数据接收、分发和处理。

// main.cpp
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>#include "data_dispatcher.h"
#include "logging_visitor.h"
#include "processing_visitor.h"
#include "receiver.h"int main() {// 创建并注册 DataVisitorauto logger = LoggingVisitor::Create();auto processor = ProcessingVisitor::Create();DataDispatcher::Instance().RegisterVisitor(logger);DataDispatcher::Instance().RegisterVisitor(processor);// 创建 Receiver,并绑定 DataDispatcher 的 Dispatch 方法Receiver receiver([](std::shared_ptr<Data> data) { DataDispatcher::Instance().Dispatch(data); });// 模拟接收消息std::cout << "=== 接收第1条消息 ===" << std::endl;receiver.ReceiveMessage(1, "Hello, CyberRT!");std::cout << "=== 接收第2条消息 ===" << std::endl;receiver.ReceiveMessage(2, "Another data packet.");// 等待一段时间以确保所有消息被处理std::this_thread::sleep_for(std::chrono::seconds(1));// 注销一个 DataVisitorstd::cout << "\n=== 移除 LoggingVisitor ===" << std::endl;DataDispatcher::Instance().UnregisterVisitor(logger);// 再次接收消息std::cout << "=== 接收第3条消息 ===" << std::endl;receiver.ReceiveMessage(3, "Data after removing logger.");// 等待一段时间以确保消息被处理std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "\n=== 程序结束 ===" << std::endl;return 0;
}

2.7. 编译和运行

假设所有文件在同一目录下,可以使用以下命令进行编译:

g++ -std=c++14 main.cpp -o dispatcher -pthread

运行程序后,您将看到类似如下的输出:

test@pi:~/dataVisitor$ g++ -std=c++14 main.cpp -o dispatcher -pthread
test@pi:~/dataVisitor$ ./dispatcher 
=== 接收第1条消息 ===
=== 接收第2条消息 ===
[LoggingVisitor] Received data: ID=[ProcessingVisitor] Processed data ID=11, Content="Hello, CyberRT!"
[LoggingVisitor] Received data: ID=2, Content="Another data packet."
, Length=15
[ProcessingVisitor] Processed data ID=2, Length=20=== 移除 LoggingVisitor ===
=== 接收第3条消息 ===
[ProcessingVisitor] Processed data ID=3, Length=27=== 程序结束 ===

注意,第三条消息只被 ProcessingVisitor 处理,因为 LoggingVisitor 已被移除。

相关文章:

C++编程:模拟实现CyberRT的DataVisitor和DataDispatcher

文章目录 0. 引言1. 设计概要1.1 主要组件1.2 类关系图1.3 工作流程 2. 代码实现2.1. 定义数据结构2.2. 实现 DataVisitor2.3. 实现 DataDispatcher2.4. 实现 Receiver2.5. 实现具体的 DataVisitor2.6. 示例主程序2.7. 编译和运行 0. 引言 使用 C 实现一个类似CyberRT 架构的 …...

【Flutter】WillPopScope组件-监听物理返回键事件自定义返回事件

WillPopScope(onWillPop: () async {if ( flutterWebViewPlugin ! null && await flutterWebViewPlugin.canGoBack() true) {flutterWebViewPlugin!.goBack();return false; // 阻止默认的返回行为} else {return true; // 允许默认的返回行为}},child: Scaffold(),);…...

【sqlserver】mssql 批量加载数据文件 bulk copy使用

参考文章&#xff1a; Using bulk copy with the JDBC driver SqlServer数据批量写入 SqlServer批量插入数据方法–SqlBulkCopy sqlserver buld copy需要提供&#xff0c;数据文件的对应表的元数据信息主要的字段的位置、字段的名称、字段的数据类型。 执行bulk load时候不一…...

flinkSql中累计窗口CUMULATE

eventTime package com.bigdata.day08;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class _05_flinkSql_Cumulate_eventTime {/*** 累积窗口 eventTime* …...

关于在ubuntu上无法运行EasyConnect的解决方法

需要这三个文件 libpangocairo-1.0-0_1.40.14-1_amd64.deb libpangoft2-1.0-0_1.40.14-1_amd64.deb libpango-1.0-0_1.40.14-1_amd64.deb然后执行 cp source /usr/share/sangfor/EasyConnect再重启EasyConnect即可 下载链接 http://kr.archive.ubuntu.com/ubuntu/pool/main/…...

【Axure高保真原型】数值条件分组

今天和大家分享数值条件分组的原型模板&#xff0c;效果包括&#xff1a; 点击添加分组按钮&#xff0c;可以显示添加弹窗&#xff0c;填写分组名称和数值区间后&#xff0c;可以新增该分组信息‘’ 修改分组区间&#xff0c;可以直接在输入框里修改已有的分组区间&#xff0c…...

python学习——字符串的拼接操作

在Python中&#xff0c;字符串拼接是一项基本操作&#xff0c;用于将多个字符串合并成一个字符串。以下是几种常见的字符串拼接方式&#xff1a; 1. 使用 运算符 最简单和直接的方式是使用 运算符来拼接字符串。 str1 "Hello, " str2 "World!" resu…...

多线程篇-8--线程安全(死锁,常用保障安全的方法,安全容器,原子类,Fork/Join框架等)

1、线程安全和不安全定义 &#xff08;1&#xff09;、线程安全 线程安全是指一个类或方法在被多个线程访问的情况下可以正确得到结果&#xff0c;不会出现数据不一致或其他错误行为。 线程安全的条件 1、原子性&#xff08;Atomicity&#xff09; 多个操作要么全部完成&a…...

el-select的搜索功能

el-select的相关信息&#xff1a; 最基本信息 v-model的值为当前被选中的el-option的 value 属性值 :label是选择器可以看到的内容 过滤搜索 普通过滤搜索 <el-selectv-model"selectedCountry"placeholder"请选择国家"filterable:loading"lo…...

MFC实现全屏功能

之前全屏都是参考&#xff1a; MFC单文档&#xff08;SDI&#xff09;全屏程序的实现 主要思路就是将各种菜单工具栏隐藏恢复。 随着MFC的升级&#xff0c;MFC框架本身就具备了全屏的功能。 微软有一个全屏实现类&#xff1a; CFullScreenImpl Class managing full-screen mod…...

网络安全技术详解:虚拟专用网络(VPN) 安全信息与事件管理(SIEM)

虚拟专用网络&#xff08;VPN&#xff09;详细介绍 虚拟专用网络&#xff08;VPN&#xff09;通过在公共网络上创建加密连接来保护数据传输的安全性和隐私性。 工作原理 VPN的工作原理涉及建立安全隧道和数据加密&#xff1a; 隧道协议&#xff1a;使用协议如PPTP、L2TP/IP…...

v-model 根据后端接口返回的数据动态地确定要绑定的变量

在 Vue 中&#xff0c;v-model 是用于创建双向绑定的指令。通常&#xff0c;它用于与组件或表单元素的值进行绑定。但有时你可能需要根据后端接口返回的数据动态地确定要绑定的变量。 你可以通过以下步骤来实现这个需求&#xff1a; 步骤 1: 获取后端接口数据 首先&#xff…...

图形开发基础之在WinForms中使用OpenTK.GLControl进行图形绘制

前言 GLControl 是 OpenTK 库中一个重要的控件&#xff0c;专门用于在 Windows Forms 应用程序中集成 OpenGL 图形渲染。通过 GLControl&#xff0c;可以轻松地将 OpenGL 的高性能图形绘制功能嵌入到传统的桌面应用程序中。 1. GLControl 的核心功能 OpenGL 渲染上下文&…...

离散数学重点复习

第一章.集合论 概念 1.集合是不能精确定义的基本数学概念.通常是由指定范围内的满足给定条件的所有对象聚集在一起构成的 2.制定范围内的每一个对象称为这个集合的元素 3.固定符号如下: N:自然数集合 Z:整数集合 Q:有理数集合 R:实数集合 C:复数集合 4.集合中的元素是…...

Javaweb梳理21——Servlet

Javaweb梳理21——Servlet 21 Servlet21.1 简介21.3 执行流程21.4 生命周期4.5 方法介绍21.6 体系结构21.7 urlPattern配置21.8 XML配置 21 Servlet 21.1 简介 Servlet是JavaWeb最为核心的内容&#xff0c;它是Java提供的一门动态web资源开发技术。使用Servlet就可以实现&…...

推荐学习笔记:矩阵补充和矩阵分解

参考&#xff1a; 召回 fun-rec/docs/ch02/ch2.1/ch2.1.1/mf.md at master datawhalechina/fun-rec GitHub 业务 隐语义模型与矩阵分解 协同过滤算法的特点&#xff1a; 协同过滤算法的特点就是完全没有利用到物品本身或者是用户自身的属性&#xff0c; 仅仅利用了用户与…...

etcd分布式存储系统快速入门指南

在分布式系统的复杂世界中&#xff0c;确保有效的数据管理至关重要。分布式可靠的键值存储在维护跨分布式环境的数据一致性和可伸缩性方面起着关键作用。 在这个全面的教程中&#xff0c;我们将深入研究etcd&#xff0c;这是一个开源的分布式键值存储。我们将探索其基本概念、特…...

解决VUE3 Vite打包后动态图片资源不显示问题

解决VUE3 Vite打包后动态图片资源不显示问题 <script setup> let url ref()const setimg (item)>{let src ../assets/image/${e}.pngurl.value src }</script><template><div v-for"item in 6"><h1 click"setimg(item)"…...

大数据新视界 -- 大数据大厂之 Hive 临时表与视图:灵活数据处理的技巧(上)(29 / 30)

&#x1f496;&#x1f496;&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎你们来到 青云交的博客&#xff01;能与你们在此邂逅&#xff0c;我满心欢喜&#xff0c;深感无比荣幸。在这个瞬息万变的时代&#xff0c;我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...

Android学习14--charger

1 概述 最近正好在做关机充电这个&#xff0c;就详细看看吧。还是本着保密的原则&#xff0c;项目里的代码也不能直接用&#xff0c;这里就用的Github的。https://github.com/aosp-mirror 具体位置是&#xff1a;https://github.com/aosp-mirror/platform_system_core/tree/mai…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程&#xff0c;系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...