Parallel.Foreach产卵太多的线程太多、线程、Parallel、Foreach

2023-09-02 23:44:24 作者:鱼沉深海@

虽然code,对此,我会在这里说我在F#中写的,它是基于.NET 4的框架,没有具体取决于F#中的任何特殊性(至少看起来如此!)。

Although the code about which I will talk here I wrote in F#, it is based on the .NET 4 framework, not specifically depending on any particularity of F# (at least it seems so!).

我有一些作品在我的磁盘上的数据,我应该从网络更新,节省了最新版本的磁盘:

I have some pieces of data on my disk that I should update from the network, saving the latest version to the disk:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

问题是,以 loadAndSaveAndUpdate 我的所有数据,我将不得不执行功能的许多的时间:

The problem is that to loadAndSaveAndUpdate all my data, I would have to execute the function many times:

{1 .. 5000} |> loadAndSaveAndUpdate

每个步骤会做

在一些磁盘IO, 在一些数据捣鼓, 在一些网络IO(带有大量的延迟可能), 在更多的数据运算, 和一些磁盘IO。

那岂不是很高兴有这样做的同时,在一定程度上?不幸的是,没有我的阅读和分析功能是异步工作流程,准备。

Wouldn't it be nice to have this done in parallel, to some degree? Unfortunately, none of my reading and parsing functions are "async-workflows-ready".

的第一件事,我做的是建立一个任务[] 并启动它们:

The first thing I've done was to set up a Task[] and start them all:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

然后我按CTRL + ESC只是为了看看有多少线程,它是使用。 15,17,...,35,...,170,......直到被杀死的应用程序!某些事正在错误的。

Then I hit CTRL+ESC just to see how many threads it was using. 15, 17, ..., 35, ..., 170, ... until killed the application! Something was going wrong.

我也几乎是同样的事情,但使用 Parallel.ForEach(...),其结果都是一样的:很多,很多,很多线程

I did almost the same thing but using Parallel.ForEach(...) and the results were the same: lots and lots and lots of threads.

于是我决定开始仅 N 主题, Task.WaitAll(他们的),然后其他ñ,直到有可用的没有更多的任务。

Then I decided to start only n threads, Task.WaitAll(of them), then other n, until there were no more tasks available.

这工作,但问题是,当它已经完成处理,比方说, N-1 的任务,它会等待,等待,等待该死的最后一个任务了坚持堵由于大量的网络延迟。这是不好的!

This works, but the problem is that when it has finished processing, say, n-1 tasks, it will wait, wait, wait for the damn last Task that insist on blocking due to lots of network latency. This is not good!

因此​​,你会怎么攻击这个问题?我最好AP preciate以查看不同的解决方案,无论是涉及异步工作流(在这种情况下如何适应我的非异步函数),并行扩展,怪异并行模式,等等。

So, how would you attack this problem? I'd appreciate to view different solutions, involving either Async Workflows (and in this case how to adapt my non-async functions), Parallel Extensions, weird parallel patterns, etc.

感谢。

推荐答案

您确认您的个人任务完成及时?我相信无论 Parallel.ForEach 工作类已经使用.NET线程池。任务一般应是短暂的工作项目,在这种情况下,线程池将只产生一个少数实际线程,但如果你的任务没有取得进展并且还有其他的任务队列,然后使用将稳步增加至线程数最大(默认情况下是250/processor在.NET 2.0 SP1,但在不同版本的框架)不同。另外值得一提的是,(至少在.NET 2.0 SP1)的新线程创建节流为每秒2新主题,所以起床后,你所看到的线程数表示该任务没有完成在短期内大量时间(所以它可能不完全准确的引脚上的怪 Parallel.ForEach )。

Are you sure that your individual tasks are completing in a timely manner? I believe that both Parallel.ForEach and the Task class already use the .NET threadpool. Tasks should generally be short-lived work items, in which case the threadpool will only spawn a small number of actual threads, but if your tasks are not making progress and there are other tasks queued then the number of threads used will steadily increase up to the maximum (which by default is 250/processor in .NET 2.0 SP1, but is different under different versions of the framework). It's also worth noting that (at least in .NET 2.0 SP1) new thread creation is throttled to 2 new threads per second, so getting up to the number of threads you're seeing indicates that the tasks are not completing in a short amount of time (so it may not be completely accurate to pin the blame on Parallel.ForEach).

我认为,Brian的建议,使用异步工作流程是一个很好的,特别是如果的长寿命的任务源为IO,因为异步将直到IO完成返回你的线程的线程池。另一种方法是简单地接受你的任务没有完成,快速,并允许多个线程产卵(这可以通过使用 System.Threading.ThreadPool.SetMaxThreads ) - 根据您的情况可能不是你正在使用大量线程的一个大问题。

I think that Brian's suggestion to use async workflows is a good one, particularly if the source of the long-lived tasks is IO, since async will return your threads to the threadpool until the IO completes. Another option is to simply accept that your tasks aren't completing quickly and allow the spawning of many threads (which can be controlled to some extent by using System.Threading.ThreadPool.SetMaxThreads) - depending on your situation it may not be a big deal that you're using a lot of threads.