是否有PLINQ的异步版本?版本、PLINQ

2023-09-03 10:13:20 作者:于你心安

欲执行查询过的数据流,同时处理具有一定程度的并行的并联件。通常情况下,我会使用PLINQ对于这一点,但我的工作项目不受CPU约束,但IO的约束。我想使用异步IO。 PLINQ不支持异步工作。

I want to execute a query over a stream of data while processing items in parallel with a certain degree of parallelism. Normally, I'd use PLINQ for that, but my work items are not CPU bound but IO bound. I want to use async IO. PLINQ does not support async work.

什么是运行PLINQ式查询的最聪明的方式,但与异步工作项目?

下面是该问题的更详细的说明:

Here's a more detailed illustration of the problem:

我的目标是要处理的是逻辑上通过下面的查询中描述的方式项一个潜在的无限流:

My goal is to process a potentially infinite stream of "items" in a way that is logically described by the following query:

var items = new int[10]; //simulate data

var results =
 from x in items.AsParallel().WithDegreeOfParallelism(100)
 where Predicate(x)
 select ComputeSomeValue(x);

foreach (var result in results)
 PerformSomeAction(result);

此查询是真正的查询只是一个草图。现在,我想每一个占位符的功能是同步(返回工作和国内是基于异步IO)。

This query is just a sketch of the real query. Now I want each of the placeholder functions to be asynchronous (returning a Task and internally being based on async IO).

请注意,可能有多得多的项目相比,可以存储在存储器中。我还必须的控制并行度到最大程度的发挥基础网络和磁盘硬件

Note, that there might be far more items than can be stored in memory. I also must control the degree of parallelism to max out the underlying network and disk hardware.

这个问题是不是多核。它完全适用于机器只有一个CPU核心,因为IO仍然可以并行性中获益。想想缓慢的网络服务调用等。

This question is not about multi-core. It fully applies to machines with only one CPU core because the IO can still benefit from parallelism. Think of slow web-service calls and the like.

推荐答案

这听起来像微软的反应框架的工作。

This sounds like a job for Microsoft's reactive framework.

我开始用这个code作为我最初的变量:

I started with this code as my initial variables:

var items = Enumerable.Range(0, 10).ToArray();

Func<int, bool> Predicate = x => x % 2 == 0;

Func<int, int> ComputeSomeValue = x =>
{
    Thread.Sleep(10000);
    return x * 3;
};

现在,我经常使用LINQ查询作为基线:

Now, I used regular LINQ query as a base-line:

var results =
    from x in items
    where Predicate(x)
    select ComputeSomeValue(x);

此需要50秒来计算,结果如下:

This took 50 seconds to compute the following results:

然后我切换到可观察到的(无框架)查询:

Then I switched over to an observable (reactive framework) query:

var results =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select y;

这花了10秒钟就可以:

This took 10 seconds to get:

这显然计算并行。

然而,结果是乱序。所以我改变了查询,以这样的:

However, the results are out of order. So I changed the query to this:

var query =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

var results =
    query
        .ToEnumerable()
        .OrderBy(z => z.x)
        .Select(z => z.y);

这还是用了10秒,但我回到正确的顺序。结果

That still took 10 seconds, but I got the results back in the correct order.

现在,这里唯一的问题是 WithDegreeOfParallelism 。还有一个事物的轿跑车,试图在这里。

Now, the only issue here is the WithDegreeOfParallelism. There's a coupe of things to try here.

首先起来,我改变了code生产10,000个值与10ms的计算时间。我的标准的LINQ查询还是花了50秒。但无查询了6.3秒。如果它可以同时执行所有的计算应该采取少得多。这表明,它是杏异步管道。

First up I changed the code to produce 10,000 values with a 10ms compute time. My standard LINQ query still took 50 seconds. But the reactive query took 6.3 seconds. If it could perform all the computations at the same time it should have taken much less. This shows that it is maxing out the asynchronous pipeline.

第二点是,反应框架使用调度一切工作调度。你可以尝试各种调度附带反应框架,以找到一种替代,如果内置一doeesn't做你想要什么的。您甚至可以编写自己的调度程序做任何安排你喜欢的。

The second point is that the reactive framework uses schedulers for all of the work scheduling. You could try the variety of schedulers that come with the reactive framework to find an alternative if the built-in one doeesn't do what you want. Or you could even write your own scheduler to do whatever scheduling you like.

下面是一个版本,计算predicate并行过的查询。

Here's a version of the query that computes the predicate in parallel too.

var results =
    from x in items.ToObservable()
    from p in Observable.Start(() => Predicate(x))
    where p
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };