C#排队依赖任务由一个线程池来处理线程、任务

2023-09-03 11:14:57 作者:城市那么空.

欲在多个流动需要以进行处理(每个流)队列相关任务。该流可以被并行处理。

I want to queue dependant tasks across several flows that need to be processed in order (in each flow). The flows can be processed in parallel.

要具体,让我们说,我需要两个队列,我想每个队列的任务​​,以便进行处理。下面是示例伪code来说明所期望的行为:

To be specific, let's say I need two queues and I want the tasks in each queue to be processed in order. Here is sample pseudocode to illustrate the desired behavior:

Queue1_WorkItem wi1a=...;

enqueue wi1a;

... time passes ...

Queue1_WorkItem wi1b=...;

enqueue wi1b; // This must be processed after processing of item wi1a is complete

... time passes ...

Queue2_WorkItem wi2a=...;

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b

... time passes ...

Queue1_WorkItem wi1c=...;

enqueue wi1c; // This must be processed after processing of item wi1b is complete

下面是用箭头表示工作项之间的依赖关系图:

Here is a diagram with arrows illustrating dependencies between work items:

现在的问题是我怎么这个用C#4.0 /做.NET 4.0?现在我有两个工作线程,每个队列之一,我用 BlockingCollection<> 为每个队列。我想,而不是使用.NET线程池,并有工作线程处理项目同时(在整个流),但在连续的流动。换句话说,我希望能够指示例如wi1b取决于wi1a完成,而不必跟踪完成并记住wi1a,当wi1b到达。换句话说,我只想说,我想提出一个工作项队列1,这是与我已经提交了队列1,但可能与提交给其他队列工作项目并联等物品串行处理。

The question is how do I do this using C# 4.0/.NET 4.0? Right now I have two worker threads, one per queue and I use a BlockingCollection<> for each queue. I would like to instead leverage the .NET thread pool and have worker threads process items concurrently (across flows), but serially within a flow. In other words I would like to be able to indicate that for example wi1b depends on completion of wi1a, without having to track completion and remember wi1a, when wi1b arrives. In other words, I just want to say, "I want to submit a work item for queue1, which is to be processed serially with other items I have already submitted for queue1, but possibly in parallel with work items submitted to other queues".

我希望这说明是有意义的。如果没有,请随时提问的意见,我会相应地更新这个问题。

I hope this description made sense. If not please feel free to ask questions in the comments and I will update this question accordingly.

感谢您的阅读。

更新:

要总结硬伤的解决方案,到目前为止,这里有来自答案部分,我不能使用的解决方案和原因(S)我为什么不能使用它们:

To summarize "flawed" solutions so far, here are the solutions from the answers section that I cannot use and the reason(s) why I cannot use them:

TPL任务需要指定先行任务的 ContinueWith()。我不希望提交一个新的任务时,保持每个队列的前提任务的知识。

TPL tasks require specifying the antecedent task for a ContinueWith(). I do not want to maintain knowledge of each queue's antecedent task when submitting a new task.

TDF ActionBlocks看起来很有希望,但它会出现贴到ActionBlock项目并行处理。我需要的物品的特定队列进行串行处理。

TDF ActionBlocks looked promising, but it would appear that items posted to an ActionBlock are processed in parallel. I need for the items for a particular queue to be processed serially.

更新2:

RE:ActionBlocks

RE: ActionBlocks

这样看来,设置 MaxDegreeOfParallelism 选项之一prevents提交给一个 ActionBlock 。因此,它似乎有一个 ActionBlock 每个队列解决了我的问题,唯一的缺点是,这需要从微软的TDF库的安装和部署,我是希望有一个纯粹的.NET 4.0的解决方案。到目前为止,这是考生公认的答案,除非有人能想出办法用纯.NET 4.0的解决方案,不变质为每个队列工作线程(这我已经使用)做到这一点。

It would appear that setting the MaxDegreeOfParallelism option to one prevents parallel processing of work items submitted to a single ActionBlock. Therefore it seems that having an ActionBlock per queue solves my problem with the only disadvantage being that this requires the installation and deployment of the TDF library from Microsoft and I was hoping for a pure .NET 4.0 solution. So far, this is the candidate accepted answer, unless someone can figure out a way to do this with a pure .NET 4.0 solution that doesn't degenerate to a worker thread per queue (which I am already using).

推荐答案

我知道你有很多队列和不想占用线程。你可以有每个队列的 ActionBlock 。该ActionBlock自动化大部分你所需要的:它串行处理的工作项目,并且只有在启动时工作,正在等待一个任务。当没有工作正在等待,没有任务/线程被阻塞。

I understand you have many queues and don't want to tie up threads. You could have an ActionBlock per queue. The ActionBlock automates most of what you need: It processes work items serially, and only starts a Task when work is pending. When no work is pending, no Task/Thread is blocked.