C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式概述
生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。
二.Channels 概念
Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel和无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。
三.Channels 生产者-消费者模式实现
创建通道来作为生产者和消费者之间的共享缓冲区
- 无界通道
- 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用
Channel.CreateUnbounded<T>()方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
- 有界通道
- 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略
BoundedChannelFullMode枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
使用Channel.CreateBounded<T>(int capacity)方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
- 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用
WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
- 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。
async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}
下面展示一个完整的生产者和消费者示例
- 启动
Program类
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
- 生产者
Producer类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
- 消费者
Consumer类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}

- [ 参考] :
https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0
相关文章:
C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式概述 生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区&#x…...
git: hint:use --reapply-cherry-picks to include skipped commits
问: 当我在feture分支写完功能,切换到dev更新了远端dev代码,切回feture分支,git rebase dev分支后出现报错: warning skipped previously applied commit 709xxxx hint:use --reapply-cherry-picks to include skippe…...
AI:对比ChatGPT这类聊天机器人,人形机器人对人类有哪些不一样的影响?
人形机器人与像ChatGPT这样的聊天机器人相比,虽然都属于人工智能技术的应用,但由于其具备的物理形态和与环境的互动能力,它们对人类的影响会有很大的不同。下面从多个角度进行对比,阐述它们各自对人类的不同影响: 1. …...
vue3 +ts 学习记录
1 父子传参 父传子 父组件 <TestFuzichuancan :title"title"/> const title 父组件标题子组件 import { defineProps } from vue; interface Props {title?: string,arr: number[]; } const props withDefaults(defineProps<Props>(), {title: 默认…...
微服务的配置共享
1.什么是微服务的配置共享 微服务架构中,配置共享是一个重要环节,它有助于提升服务间的协同效率和数据一致性。以下是对微服务配置共享的详细阐述: 1.1.配置共享的概念 配置共享是指在微服务架构中,将某些通用或全局的配置信息…...
Scala分布式语言二(基础功能搭建、面向对象基础、面向对象高级、异常、集合)
章节3基础功能搭建 46.函数作为值三 package cn . itbaizhan . chapter03 // 函数作为值,函数也是个对象 object FunctionToTypeValue { def main ( args : Array [ String ]): Unit { //Student stu new Student() /*val a ()>{"GTJin"…...
Chromium 132 编译指南 Windows 篇 - 配置核心环境变量 (三)
1. 引言 在之前的 Chromium 编译指南系列文章中,我们已经完成了编译前的准备工作以及 depot_tools 工具的安装与配置。本篇我们将聚焦于 Chromium 编译过程中至关重要的环境变量设置,这些配置是您顺利进行 Chromium 构建的基石。 2. 启用本地编译&…...
开源文件存储分享平台Seafile部署与应用
Seafile 是一款开源的企业云盘,注重可靠性和性能,支持全平台客户端。Seafile 内置协同文档 SeaDoc ,让协作撰写、管理和发布文档更便捷。适用于团队协作、文件存储和同步的开源解决方案,它提供了可靠、安全和易用的云存储服务。主要有以下特点: 文件存储和同步:Seafile 允…...
MYSQL-创建数据库 CREATE DATABASE (十一)
13.1.11 CREATE DATABASE 语句 -- 创建 数据库的 CREATE 权限 CREATE {DATABASE | SCHEMA} [IF NOT EXISTS] db_name[create_option] ...create_option: [DEFAULT] {CHARACTER SET [] charset_name| COLLATE [] collation_name } -- 删除 数据库具有 DROP 权限 DROP {DATABASE…...
Java高频面试之SE-11
hello啊,各位观众姥爷们!!!本牛马baby今天又来了!哈哈哈哈哈嗝🐶 Java中是引用传递还是值传递? 在 Java 中,方法参数传递是通过 值传递 的方式实现的,但这可能会引起一…...
C#结构体,枚举,泛型,事件,委托--10
目录 一.结构体 二.特殊的结构体(ref struct): 三.枚举 四.泛型 泛型的使用: 1.泛型类:定义一个泛型类,使用类型参数T 2.泛型方法:在方法定义中使用类型参数 3.泛型接口 五.委托及泛型委托 委托 泛型委托 六.事件 事件: 泛型事件:使用泛型委托(如Event…...
MapReduce完整工作流程
1、mapreduce工作流程(终极版) 0. 任务提交 1. 拆-split逻辑切片--任务切分。 FileInputFormat--split切片计算工具 FileSplit--单个计算任务的数据范围。 2. 获得split信息和个数。 MapTask阶段 1. 读取split范围内的数据。k(偏移量)-v(行数据) 关键API:TextI…...
网络编程(1)
网络编程概述 Java是 Internet 上的语言,它从语言级上提供了对网络应用程序的支持,程序员能够很容易开发常见的网络应用程序。 Java提供的网络类库,可以实现无痛的网络连接,联网的底层细节被隐藏在 Java 的本机安装系统里&#…...
mysql中创建计算字段
目录 1、计算字段 2、拼接字段 3、去除空格和使用别名 (1)去除空格 (2)使用别名:AS 4、执行算术计算 5、小结 博主用的是mysql8 DBMS,附上示例资料: 百度网盘链接: https://pan.baidu.co…...
【算法】判断一个链表是否为回文结构
问: 给定一个单链表的头节点head,请判断该链表是否为回文结构 例: 1 -> 2 -> 1返回true;1 -> 2 -> 2 -> 1返回true;15 -> 6 -> 15返回true 答: 笔试:初始化一个栈用来…...
计算机网络之---ICMP协议与Ping命令
ICMP 协议 ICMP (Internet Control Message Protocol) 是一种网络层协议,主要用于在 IP 网络中传递控制消息。ICMP 主要用于网络设备之间的故障报告和诊断,帮助设备检测网络连接问题。它是 IP 协议的核心部分之一,用于发送错误消息和操作信息…...
【硬件介绍】Type-C接口详解
一、Type-C接口概述 Type-C接口特点:以其独特的扁头设计和无需区分正反两面的便捷性而广受欢迎。这种设计大大提高了用户的使用体验,避免了传统USB接口需要多次尝试才能正确插入的问题。Type-C接口内部结构:内部上下两排引脚的设计虽然可能不…...
【Pandas】pandas Series rtruediv
Pandas2.2 Series Binary operator functions 方法描述Series.add()用于对两个 Series 进行逐元素加法运算Series.sub()用于对两个 Series 进行逐元素减法运算Series.mul()用于对两个 Series 进行逐元素乘法运算Series.div()用于对两个 Series 进行逐元素除法运算Series.true…...
项目开发版本控制Git流程规范
个人&测试&预发布&生产分支命名 1)个人分支: 从sit或者master进行切出,姓名切出分支命名,或者日期切出分支命名 示例:liuys_sit、20250110_sit2)测试分支: sit3)用户验…...
STM32 : 波特率发生器
波特率发生器 1. 发送器和接收器的波特率 波特率寄存器 (BRR): 在串行通信中,发送器和接收器的波特率是由波特率寄存器(BRR)中的一个值 DIV 来确定的。 2. 计算公式 计算公式: 详细解释 1. 波特率寄存器 (BRR) BRR: 波特率寄存器是一…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
关于uniapp展示PDF的解决方案
在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项: 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库: npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...
