C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式概述
生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。
二.Channels 概念
Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel和无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。
三.Channels 生产者-消费者模式实现
创建通道来作为生产者和消费者之间的共享缓冲区
- 无界通道
- 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用
Channel.CreateUnbounded<T>()方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
- 有界通道
- 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略
BoundedChannelFullMode枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
使用Channel.CreateBounded<T>(int capacity)方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
- 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用
WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
- 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。
async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}
下面展示一个完整的生产者和消费者示例
- 启动
Program类
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
- 生产者
Producer类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
- 消费者
Consumer类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}

- [ 参考] :
https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0
相关文章:
C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式概述 生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区&#x…...
git: hint:use --reapply-cherry-picks to include skipped commits
问: 当我在feture分支写完功能,切换到dev更新了远端dev代码,切回feture分支,git rebase dev分支后出现报错: warning skipped previously applied commit 709xxxx hint:use --reapply-cherry-picks to include skippe…...
AI:对比ChatGPT这类聊天机器人,人形机器人对人类有哪些不一样的影响?
人形机器人与像ChatGPT这样的聊天机器人相比,虽然都属于人工智能技术的应用,但由于其具备的物理形态和与环境的互动能力,它们对人类的影响会有很大的不同。下面从多个角度进行对比,阐述它们各自对人类的不同影响: 1. …...
vue3 +ts 学习记录
1 父子传参 父传子 父组件 <TestFuzichuancan :title"title"/> const title 父组件标题子组件 import { defineProps } from vue; interface Props {title?: string,arr: number[]; } const props withDefaults(defineProps<Props>(), {title: 默认…...
微服务的配置共享
1.什么是微服务的配置共享 微服务架构中,配置共享是一个重要环节,它有助于提升服务间的协同效率和数据一致性。以下是对微服务配置共享的详细阐述: 1.1.配置共享的概念 配置共享是指在微服务架构中,将某些通用或全局的配置信息…...
Scala分布式语言二(基础功能搭建、面向对象基础、面向对象高级、异常、集合)
章节3基础功能搭建 46.函数作为值三 package cn . itbaizhan . chapter03 // 函数作为值,函数也是个对象 object FunctionToTypeValue { def main ( args : Array [ String ]): Unit { //Student stu new Student() /*val a ()>{"GTJin"…...
Chromium 132 编译指南 Windows 篇 - 配置核心环境变量 (三)
1. 引言 在之前的 Chromium 编译指南系列文章中,我们已经完成了编译前的准备工作以及 depot_tools 工具的安装与配置。本篇我们将聚焦于 Chromium 编译过程中至关重要的环境变量设置,这些配置是您顺利进行 Chromium 构建的基石。 2. 启用本地编译&…...
开源文件存储分享平台Seafile部署与应用
Seafile 是一款开源的企业云盘,注重可靠性和性能,支持全平台客户端。Seafile 内置协同文档 SeaDoc ,让协作撰写、管理和发布文档更便捷。适用于团队协作、文件存储和同步的开源解决方案,它提供了可靠、安全和易用的云存储服务。主要有以下特点: 文件存储和同步:Seafile 允…...
MYSQL-创建数据库 CREATE DATABASE (十一)
13.1.11 CREATE DATABASE 语句 -- 创建 数据库的 CREATE 权限 CREATE {DATABASE | SCHEMA} [IF NOT EXISTS] db_name[create_option] ...create_option: [DEFAULT] {CHARACTER SET [] charset_name| COLLATE [] collation_name } -- 删除 数据库具有 DROP 权限 DROP {DATABASE…...
Java高频面试之SE-11
hello啊,各位观众姥爷们!!!本牛马baby今天又来了!哈哈哈哈哈嗝🐶 Java中是引用传递还是值传递? 在 Java 中,方法参数传递是通过 值传递 的方式实现的,但这可能会引起一…...
C#结构体,枚举,泛型,事件,委托--10
目录 一.结构体 二.特殊的结构体(ref struct): 三.枚举 四.泛型 泛型的使用: 1.泛型类:定义一个泛型类,使用类型参数T 2.泛型方法:在方法定义中使用类型参数 3.泛型接口 五.委托及泛型委托 委托 泛型委托 六.事件 事件: 泛型事件:使用泛型委托(如Event…...
MapReduce完整工作流程
1、mapreduce工作流程(终极版) 0. 任务提交 1. 拆-split逻辑切片--任务切分。 FileInputFormat--split切片计算工具 FileSplit--单个计算任务的数据范围。 2. 获得split信息和个数。 MapTask阶段 1. 读取split范围内的数据。k(偏移量)-v(行数据) 关键API:TextI…...
网络编程(1)
网络编程概述 Java是 Internet 上的语言,它从语言级上提供了对网络应用程序的支持,程序员能够很容易开发常见的网络应用程序。 Java提供的网络类库,可以实现无痛的网络连接,联网的底层细节被隐藏在 Java 的本机安装系统里&#…...
mysql中创建计算字段
目录 1、计算字段 2、拼接字段 3、去除空格和使用别名 (1)去除空格 (2)使用别名:AS 4、执行算术计算 5、小结 博主用的是mysql8 DBMS,附上示例资料: 百度网盘链接: https://pan.baidu.co…...
【算法】判断一个链表是否为回文结构
问: 给定一个单链表的头节点head,请判断该链表是否为回文结构 例: 1 -> 2 -> 1返回true;1 -> 2 -> 2 -> 1返回true;15 -> 6 -> 15返回true 答: 笔试:初始化一个栈用来…...
计算机网络之---ICMP协议与Ping命令
ICMP 协议 ICMP (Internet Control Message Protocol) 是一种网络层协议,主要用于在 IP 网络中传递控制消息。ICMP 主要用于网络设备之间的故障报告和诊断,帮助设备检测网络连接问题。它是 IP 协议的核心部分之一,用于发送错误消息和操作信息…...
【硬件介绍】Type-C接口详解
一、Type-C接口概述 Type-C接口特点:以其独特的扁头设计和无需区分正反两面的便捷性而广受欢迎。这种设计大大提高了用户的使用体验,避免了传统USB接口需要多次尝试才能正确插入的问题。Type-C接口内部结构:内部上下两排引脚的设计虽然可能不…...
【Pandas】pandas Series rtruediv
Pandas2.2 Series Binary operator functions 方法描述Series.add()用于对两个 Series 进行逐元素加法运算Series.sub()用于对两个 Series 进行逐元素减法运算Series.mul()用于对两个 Series 进行逐元素乘法运算Series.div()用于对两个 Series 进行逐元素除法运算Series.true…...
项目开发版本控制Git流程规范
个人&测试&预发布&生产分支命名 1)个人分支: 从sit或者master进行切出,姓名切出分支命名,或者日期切出分支命名 示例:liuys_sit、20250110_sit2)测试分支: sit3)用户验…...
STM32 : 波特率发生器
波特率发生器 1. 发送器和接收器的波特率 波特率寄存器 (BRR): 在串行通信中,发送器和接收器的波特率是由波特率寄存器(BRR)中的一个值 DIV 来确定的。 2. 计算公式 计算公式: 详细解释 1. 波特率寄存器 (BRR) BRR: 波特率寄存器是一…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
深度学习在微纳光子学中的应用
深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向: 逆向设计 通过神经网络快速预测微纳结构的光学响应,替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
AI病理诊断七剑下天山,医疗未来触手可及
一、病理诊断困局:刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断",医生需通过显微镜观察组织切片,在细胞迷宫中捕捉癌变信号。某省病理质控报告显示,基层医院误诊率达12%-15%,专家会诊…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
