如何(如果)来编写使用TPL单消费者队列?队列、消费者、TPL

2023-09-03 08:54:05 作者:生命旳过客

我已经听说了一堆播客最近关于第三方物流在.NET 4.0中。他们中的大多数形容像下载图像或做一个计算,使用任务,使工作不以GUI线程干扰的背景活动。

I've heard a bunch of podcasts recently about the TPL in .NET 4.0. Most of them describe background activities like downloading images or doing a computation, using tasks so that the work doesn't interfere with a GUI thread.

在大多数情况下code,我的工作有更多的多生产者/单消费者的口味,其中来自多个来源的工作项目必须排队,然后按顺序处理。一个例子将被记录,其中,来自多个线程日志行sequentialized成为最终写入到文件或数据库中的单个队列。所有来自任何单个源中的记录必须留在顺序,和从同一时刻的记录应为关闭,以彼此在最终输出

Most of the code I work on has more of a multiple-producer / single-consumer flavor, where work items from multiple sources must be queued and then processed in order. One example would be logging, where log lines from multiple threads are sequentialized into a single queue for eventual writing to a file or database. All the records from any single source must remain in order, and records from the same moment in time should be "close" to each other in the eventual output.

所以多线程或任务或什么都调用queuer:

So multiple threads or tasks or whatever are all invoking a queuer:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

和一个专门的工作线程处理队列:

And a dedicated worker thread processes the queue:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

它似乎总是合理的,专用的工作线程对这些任务消费者身边。我应该写未来计划使用来自第三方物流,而不是一些结构?哪一个?为什么呢?

It's always seemed reasonable to dedicate a worker thread for the consumer side of these tasks. Should I write future programs using some construct from the TPL instead? Which one? Why?

推荐答案

您可以使用一个长期运行的任务,以所建议的Wilka从BlockingCollection处理项目。下面是其中pretty的多少满足您的应用需求为例。你会看到的输出是这样的:

You can use a long running Task to process items from a BlockingCollection as suggested by Wilka. Here's an example which pretty much meets your applications requirements. You'll see output something like this:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

这并不是说从A,B,C和输出;因为它们依赖于线的开始时间,但是B总是出现B1之前Ð会出现随机的。

Not that outputs from A, B, C & D appear random because they depend on the start time of the threads but B always appears before B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}