深入了解Channel类:.NET CORE 3.0的新特性

Channel 类位于 System.Threading.Channels 命名空间,提供了一组同步数据结构,用于异步在生产者和消费者之间传递数据。该库针对 .NET Standard 并适用于所有.NET实现。在程序设计中,Channels 是生产者/消费者概念模型的一种实现。

简而言之,它可以被视为一个内存消息队列。

⼩标题:示例 1

以下是一个简单的示例说明如何使用 Channel 类创建生产者-消费者模型:


static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<int>();
    var producer = Task.Run(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await channel.Writer.WriteAsync(i);
            await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
        }
        channel.Writer.Complete();
    });
    var consumer = Task.Run(async () =>
    {
        await foreach (var item in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"消费者接收到: {item}");
        }
    });
    await Task.WhenAll(producer, consumer);
}

在此示例中,我们创建了一个无界的通道,然后创建了两个任务,分别是生产者和消费者。生产者每秒生成一个数字,然后将其写入通道。消费者从通道中读取数据并打印。当生产者完成写入后,调用 channel.Writer.Complete() 来通知消费者没有更多的数据可读。

⼩标题:示例 2

使用 Channel.CreateBounded (capacity) 方法可以创建一个有界通道,其中 capacity 参数指定了通道的容量。当通道已满时,尝试写入的操作将会被阻塞,直到有空间可用。


static async Task Main(string[] args)
{
    var channel = Channel.CreateBounded<int>(5); // 创建一个容量为5的有界通道
    var producer = Task.Run(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await channel.Writer.WriteAsync(i);
            Console.WriteLine($"生产者生成了: {i}");
            await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
        }
        channel.Writer.Complete();
    });
    var consumer = Task.Run(async () =>
    {
        await foreach (var item in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine($"消费者接收到: {item}");
            await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
        }
    });
    await Task.WhenAll(producer, consumer);
}

在本示例中,我们创建了一个容量为5的有界通道。生产者每秒生成一个数字,然后将其写入通道。消费者从通道中读取数据并打印出来。消费者处理数据的速度比生产者慢,所以当通道满时,生产者的 WriteAsync 操作将会阻塞,直到消费者读取了一些数据,使得通道有空间可用。

⼩标题:示例 3

以下示例展示了如何在多个生产者和消费者之间共享一个通道:


static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<int>();
    var producer1 = Produce(channel.Writer, id: 1);
    var producer2 = Produce(channel.Writer, id: 2);
    var consumer1 = Consume(channel.Reader, id: 1);
    var consumer2 = Consume(channel.Reader, id: 2);
    await Task.WhenAll(producer1, producer2, consumer1, consumer2);
}
static async Task Produce(ChannelWriter<int> writer, int id)
{
    for (int i = 0; i < 10; i++)
    {
        await writer.WriteAsync(i);
        Console.WriteLine($"生产者{id}生成了: {i}");
        await Task.Delay(1000); // 模拟生产者需要一些时间来生成数据
    }
    writer.Complete();
}
static async Task Consume(ChannelReader<int> reader, int id)
{
    await foreach (var item in reader.ReadAllAsync())
    {
        Console.WriteLine($"消费者{id}接收到: {item}");
        await Task.Delay(2000); // 模拟消费者需要一些时间来处理数据
    }
}

在这个示例中,我们创建了两个生产者和两个消费者,它们共享同一个通道。这是一个非常重要的使用模式,因为在消息队列中,通常会有多个生产者和多个消费者。通过控制生产者的生产速度,来控制数据入队的量。还可以通过控制消费者的数量来控制数据的处理速度,从而调节系统的流量,实现消峰填谷。

⼩标题:总结

Channel 类是 .NET CORE 3.0 后新增的类,为我们提供了便利的生产者/消费者模式实现方案。相当于是一个进程内的内存队列,它没有持久化,是纯内存操作,性能非常非常高。在面临高并发时,可以为系统提供高吞吐量。当然,代价是一定的内存开销和一些实时性的牺牲。

⼩标题:关注我的公众号一起玩转技术

热门手游下载
相关文章
下载排行榜