C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式概述
生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。
二.Channels 概念
Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel和无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。
三.Channels 生产者-消费者模式实现
创建通道来作为生产者和消费者之间的共享缓冲区
- 无界通道
- 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用
Channel.CreateUnbounded<T>()方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
- 有界通道
- 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略
BoundedChannelFullMode枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
使用Channel.CreateBounded<T>(int capacity)方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
- 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用
WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
- 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。
async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}
下面展示一个完整的生产者和消费者示例
- 启动
Program类
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
- 生产者
Producer类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
- 消费者
Consumer类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}

- [ 参考] :
https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0
相关文章:
C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式概述 生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区&#x…...
git: hint:use --reapply-cherry-picks to include skipped commits
问: 当我在feture分支写完功能,切换到dev更新了远端dev代码,切回feture分支,git rebase dev分支后出现报错: warning skipped previously applied commit 709xxxx hint:use --reapply-cherry-picks to include skippe…...
AI:对比ChatGPT这类聊天机器人,人形机器人对人类有哪些不一样的影响?
人形机器人与像ChatGPT这样的聊天机器人相比,虽然都属于人工智能技术的应用,但由于其具备的物理形态和与环境的互动能力,它们对人类的影响会有很大的不同。下面从多个角度进行对比,阐述它们各自对人类的不同影响: 1. …...
vue3 +ts 学习记录
1 父子传参 父传子 父组件 <TestFuzichuancan :title"title"/> const title 父组件标题子组件 import { defineProps } from vue; interface Props {title?: string,arr: number[]; } const props withDefaults(defineProps<Props>(), {title: 默认…...
微服务的配置共享
1.什么是微服务的配置共享 微服务架构中,配置共享是一个重要环节,它有助于提升服务间的协同效率和数据一致性。以下是对微服务配置共享的详细阐述: 1.1.配置共享的概念 配置共享是指在微服务架构中,将某些通用或全局的配置信息…...
Scala分布式语言二(基础功能搭建、面向对象基础、面向对象高级、异常、集合)
章节3基础功能搭建 46.函数作为值三 package cn . itbaizhan . chapter03 // 函数作为值,函数也是个对象 object FunctionToTypeValue { def main ( args : Array [ String ]): Unit { //Student stu new Student() /*val a ()>{"GTJin"…...
Chromium 132 编译指南 Windows 篇 - 配置核心环境变量 (三)
1. 引言 在之前的 Chromium 编译指南系列文章中,我们已经完成了编译前的准备工作以及 depot_tools 工具的安装与配置。本篇我们将聚焦于 Chromium 编译过程中至关重要的环境变量设置,这些配置是您顺利进行 Chromium 构建的基石。 2. 启用本地编译&…...
开源文件存储分享平台Seafile部署与应用
Seafile 是一款开源的企业云盘,注重可靠性和性能,支持全平台客户端。Seafile 内置协同文档 SeaDoc ,让协作撰写、管理和发布文档更便捷。适用于团队协作、文件存储和同步的开源解决方案,它提供了可靠、安全和易用的云存储服务。主要有以下特点: 文件存储和同步:Seafile 允…...
MYSQL-创建数据库 CREATE DATABASE (十一)
13.1.11 CREATE DATABASE 语句 -- 创建 数据库的 CREATE 权限 CREATE {DATABASE | SCHEMA} [IF NOT EXISTS] db_name[create_option] ...create_option: [DEFAULT] {CHARACTER SET [] charset_name| COLLATE [] collation_name } -- 删除 数据库具有 DROP 权限 DROP {DATABASE…...
Java高频面试之SE-11
hello啊,各位观众姥爷们!!!本牛马baby今天又来了!哈哈哈哈哈嗝🐶 Java中是引用传递还是值传递? 在 Java 中,方法参数传递是通过 值传递 的方式实现的,但这可能会引起一…...
C#结构体,枚举,泛型,事件,委托--10
目录 一.结构体 二.特殊的结构体(ref struct): 三.枚举 四.泛型 泛型的使用: 1.泛型类:定义一个泛型类,使用类型参数T 2.泛型方法:在方法定义中使用类型参数 3.泛型接口 五.委托及泛型委托 委托 泛型委托 六.事件 事件: 泛型事件:使用泛型委托(如Event…...
MapReduce完整工作流程
1、mapreduce工作流程(终极版) 0. 任务提交 1. 拆-split逻辑切片--任务切分。 FileInputFormat--split切片计算工具 FileSplit--单个计算任务的数据范围。 2. 获得split信息和个数。 MapTask阶段 1. 读取split范围内的数据。k(偏移量)-v(行数据) 关键API:TextI…...
网络编程(1)
网络编程概述 Java是 Internet 上的语言,它从语言级上提供了对网络应用程序的支持,程序员能够很容易开发常见的网络应用程序。 Java提供的网络类库,可以实现无痛的网络连接,联网的底层细节被隐藏在 Java 的本机安装系统里&#…...
mysql中创建计算字段
目录 1、计算字段 2、拼接字段 3、去除空格和使用别名 (1)去除空格 (2)使用别名:AS 4、执行算术计算 5、小结 博主用的是mysql8 DBMS,附上示例资料: 百度网盘链接: https://pan.baidu.co…...
【算法】判断一个链表是否为回文结构
问: 给定一个单链表的头节点head,请判断该链表是否为回文结构 例: 1 -> 2 -> 1返回true;1 -> 2 -> 2 -> 1返回true;15 -> 6 -> 15返回true 答: 笔试:初始化一个栈用来…...
计算机网络之---ICMP协议与Ping命令
ICMP 协议 ICMP (Internet Control Message Protocol) 是一种网络层协议,主要用于在 IP 网络中传递控制消息。ICMP 主要用于网络设备之间的故障报告和诊断,帮助设备检测网络连接问题。它是 IP 协议的核心部分之一,用于发送错误消息和操作信息…...
【硬件介绍】Type-C接口详解
一、Type-C接口概述 Type-C接口特点:以其独特的扁头设计和无需区分正反两面的便捷性而广受欢迎。这种设计大大提高了用户的使用体验,避免了传统USB接口需要多次尝试才能正确插入的问题。Type-C接口内部结构:内部上下两排引脚的设计虽然可能不…...
【Pandas】pandas Series rtruediv
Pandas2.2 Series Binary operator functions 方法描述Series.add()用于对两个 Series 进行逐元素加法运算Series.sub()用于对两个 Series 进行逐元素减法运算Series.mul()用于对两个 Series 进行逐元素乘法运算Series.div()用于对两个 Series 进行逐元素除法运算Series.true…...
项目开发版本控制Git流程规范
个人&测试&预发布&生产分支命名 1)个人分支: 从sit或者master进行切出,姓名切出分支命名,或者日期切出分支命名 示例:liuys_sit、20250110_sit2)测试分支: sit3)用户验…...
STM32 : 波特率发生器
波特率发生器 1. 发送器和接收器的波特率 波特率寄存器 (BRR): 在串行通信中,发送器和接收器的波特率是由波特率寄存器(BRR)中的一个值 DIV 来确定的。 2. 计算公式 计算公式: 详细解释 1. 波特率寄存器 (BRR) BRR: 波特率寄存器是一…...
无机布防火卷帘门报价透明,包工包料,一次说清所有费用
很多客户在选购无机布防火卷帘门时,最关心实际成交价格,也担心报价不清晰,后期产生各类额外支出。行业内产品定价参差不齐,选材做工不同,最终价位自然存在差距,挑选时不能只看表面低价。 👉 点击…...
Godot中型项目工程化实践:目录规范、资源引用与状态管理
1. 这不是续集,而是项目落地的分水岭“Godot 游戏引擎项目(二)”——看到这个标题,很多人第一反应是:“哦,上一篇讲了环境搭建和Hello World,这篇该讲节点树和信号了?”但我在带三个…...
智能手机相机光谱特性测量与多光谱成像技术
1. 智能手机相机光谱特性测量基础智能手机相机的光谱灵敏度函数(Spectral Sensitivity Function, SSF)和透射率函数是计算摄影领域的核心参数,它们决定了设备对光信号的响应特性。准确获取这些参数对色彩还原、光谱重建和白平衡校准等任务至关重要。1.1 光谱灵敏度函…...
pan-baidu-download:百度网盘多线程下载加速器架构解析与性能优化指南
pan-baidu-download:百度网盘多线程下载加速器架构解析与性能优化指南 【免费下载链接】pan-baidu-download 百度网盘下载脚本 项目地址: https://gitcode.com/gh_mirrors/pa/pan-baidu-download pan-baidu-download是一款基于Python开发的百度网盘命令行下载…...
2026年一键生成论文工具对比实测:5款神器从选题到格式全流程护航
写论文的焦虑,是每个科研人和学生都心照不宣的“隐形压力”。选题无从下手,文献检索耗时费力,逻辑框架反复推翻,格式排版让人抓狂,查重降重更是像在和系统玩“猫鼠游戏”。2026年的AI工具早已不是过去那种“打字机”&a…...
基于ESP8266与MQTT的家庭水压自动控制系统设计与实现
1. 项目概述与核心需求解析家里水压不稳、供水时断时续,这大概是很多朋友都遇到过的烦心事。我所在的城市供水情况就很不理想,为了解决这个问题,我不得不自己动手,搭建了一套基于ESP8266微控制器的家庭水压增压与储水自动控制系统…...
破解材料数据荒:合成数据与随机森林预测聚合物阻燃性能
1. 项目概述与核心挑战在材料研发领域,尤其是涉及公共安全的聚合物阻燃性研究,传统实验方法正面临巨大瓶颈。想象一下,你是一位材料工程师,需要设计一种用于高铁内饰或高层建筑电缆护套的新型聚合物,其阻燃性能必须满足…...
微信小程序项目实战:从npm安装Vant Weapp到解决样式冲突的完整避坑指南
微信小程序工程化实战:Vant Weapp集成与样式冲突解决方案全解析 第一次在小程序里引入Vant Weapp时,我对着满屏错位的组件样式发呆了半小时——原本优雅的按钮变成了扭曲的色块,表单元素叠在一起像抽象画。这不是个例,根据社区反…...
手机也能玩转无人机仿真:用安卓QGC App连接同一WiFi下的PX4 JMAVSim模拟器
手机也能玩转无人机仿真:用安卓QGC App连接同一WiFi下的PX4 JMAVSim模拟器 无人机开发者和爱好者们,是否曾想过用手机就能完成整个无人机仿真测试流程?告别笨重的电脑束缚,只需一部安卓设备,就能在沙发上调试飞控算法。…...
Playwright文件上传避坑指南:遇到动态生成的文件选择框怎么办?
Playwright文件上传避坑指南:动态生成文件选择框的实战解决方案最近在为一个电商平台做自动化测试时,遇到了一个棘手的问题——商品图片上传功能总是失败。页面上的"上传图片"按钮明明可以点击,但传统的set_input_files()方法却毫无…...
