TPL建筑问题建筑、问题、TPL

2023-09-03 10:27:24 作者:想进哥哥的鱼塘

我目前工作的一个项目,在这里我们要并行处理的项目所面临的挑战。到目前为止,没有什么大不了的事;) 现在的问题。我们有身份证,列表,我们定期(每2秒的)什么叫一个StoredProcedure的每个ID。 在2秒的需要单独检查每个项目,如添加和运行过程中去除。 此外,我们要配置的最大并行度,因为数据库不应与300线程同时被淹。 这是正在处理的项目不应该被重新安排处理,直到它与previous执行完毕。原因是,我们要prevent排起了很多物品,以防对数据库的延迟。

I'm currently working on a project, where we have the challenge to process items in parallel. So far not a big deal ;) Now to the problem. We have a list of IDs, where we periodically (every 2 sec's) what to call a StoredProcedure for each ID. The 2 sec's need to be checked for each item individually, as they are added and removing during runtime. In addition we want to configure the maximum degree of parallelism, as the DB should not be flooded with 300 threads concurrently. An item which is being processed should not be rescheduled for processing until it has finished with the previous execution. Reason is that we want to prevent queueing up a lot of items, in case of delays on the DB.

现在我们使用的是自主研发的成分,有一个主线程,它定期检查哪些项目需要安排处理。一旦它具有列表,它丢弃那些在定制IOCP基于线程池,然后使用waithandles等待物品正在处理中。然后在下一次迭代开始。 IOCP因为工作窃取提供。

Right now we are using a self-developed component, that has a main thread, which periodically checks what items need to scheduled for processing. Once it has the list, it's dropping those on a custom IOCP-based thread pool, and then uses waithandles to wait for the items being processed. Then the next iteration starts. IOCP because of the work-stealing it provides.

我想换掉这个自定义实现与TPL / .NET 4的版本,我想知道你将如何解决这个问题(理想简单而精美可读/维护)。 我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx,但它只是限制使用的线程数量。叶的工作窃取,定期执行的项目......

I would like to replace this custom implementation with a TPL/.NET 4 version, and I would like to know how you would solve it (ideally simple and nicely readable/maintainable). I know about this article: http://msdn.microsoft.com/en-us/library/ee789351.aspx, but it's just limiting the amount of threads being used. Leaves work stealing, periodically executing the items ....

理想情况下它会成为一个通用部件,可以用于某些所有需要定期对项目列表完成的任务。

Ideally it will become a generic component, that can be used for some all the tasks that need to be done periodically for a list of items.

任何输入欢迎, TIA 马丁

any input welcome, tia Martin

推荐答案

我不认为你真的需要得到降浊直接TPL Tasks为了这。首先我会成立一个 BlockingCollection 围绕 ConcurrentQueue (默认)没有 BoundedCapacity 的设定 BlockingCollection 存储需要进行处理的ID。

I don't think you actually need to get down and dirty with direct TPL Tasks for this. For starters I would set up a BlockingCollection around a ConcurrentQueue (the default) with no BoundedCapacity set on the BlockingCollection to store the IDs that need to be processed.

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();

从那里,我只想用Parallel::ForEach在枚举从BlockingCollection::GetConsumingEnumerable.在的ForEach 叫你将设置您的ParallelOptions::MaxDegreeOfParallelism在的ForEach 将执行存储过程的主体。

From there I would just use Parallel::ForEach on the enumeration returned from the BlockingCollection::GetConsumingEnumerable. In the ForEach call you will setup your ParallelOptions::MaxDegreeOfParallelism Inside the body of the ForEach you will execute your stored procedure.

现在,一旦存储过程执行完成后,你说你不希望重新安排用于执行的至少的两秒钟。没问题,安排System.Threading.Timer有一个回调,这将只需添加回标识的 BlockingCollection 中所提供的回调。

Now, once the stored procedure execution completes, you're saying you don't want to re-schedule the execution for at least two seconds. No problem, schedule a System.Threading.Timer with a callback which will simply add the ID back to the BlockingCollection in the supplied callback.

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(),
    new ParallelOptions 
    { 
        MaxDegreeOfParallelism = 4 // read this from config
    },
    (id) =>
    {
       // ... execute sproc ...

       // Need to declare/assign this before the delegate so that we can dispose of it inside 
       Timer timer = null;

       timer = new Timer(
           _ =>
           {
               // Add the id back to the collection so it will be processed again
               idsToProcess.Add(id);

               // Cleanup the timer
               timer.Dispose();
           },
           null, // no state, id wee need is "captured" in the anonymous delegate
           2000, // probably should read this from config
           Timeout.Infinite);
    }

最后,当进程正在关闭,你会叫BlockingCollection::CompleteAdding使被处理以停止阻塞及完整,且并行的枚举::的ForEach将退出。如果这是一个Windows服务,比如你会在OnStop.

// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();

更新

您提出您的意见很关注,你可能要处理大量的ID在任何给定的点,并担心会有每ID的计时器开销太大。我绝对同意这一点。所以,你正在处理ID的大名单并发的情况下,我会改变使用计时器每ID来使用其他队列来保存它是由一个单一的短间隔定时器,而不是监视沉睡的ID。首先,您需要一个 ConcurrentQueue 在其上放置了睡着的ID:

You raised a valid concern in your comment that you might be processing a large amount of IDs at any given point and fear that there would be too much overhead in a timer per ID. I would absolutely agree with that. So in the case that you are dealing with a large list of IDs concurrently, I would change from using a timer-per-ID to using another queue to hold the "sleeping" IDs which is monitored by a single short interval timer instead. First you'll need a ConcurrentQueue onto which to place the IDs that are asleep:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();

现在,我使用的是两部分 元组 来作说明用途,但您可能希望创建一个更强类型结构为它(或用使用语句至少别名它)以获得更好的可读性。元组有ID和一个DateTime从而重新presents当它被放入队列。

Now, I'm using a two-part Tuple here for illustration purposes, but you may want to create a more strongly typed struct for it (or at least alias it with a using statement) for better readability. The tuple has the id and a DateTime which represents when it was put on the queue.

现在你还需要设置,将监视此队列中的计时器:

Now you'll also want to setup the timer that will monitor this queue:

Timer wakeSleepingIdsTimer = new Timer(
   _ =>
   {
       DateTime utcNow = DateTime.UtcNow;

       // Pull all items from the sleeping queue that have been there for at least 2 seconds
       foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
       {
           // Add this id back to the processing queue
           idsToProcess.Enqueue(id);
       }
   },
   null, // no state
   Timeout.Infinite, // no due time
   100 // wake up every 100ms, probably should read this from config
 );

,那么只需在修改并行::的ForEach 执行以下操作,而不是设置一个计时器每一个:

Then you would simply change the Parallel::ForEach to do the following instead of setting up a timer for each one:

(id) =>
{
       // ... execute sproc ...

       sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
}