交叉处理事件 - 释放所有服务员可靠服务员、可靠、事件

2023-09-03 16:16:12 作者:洎己の杺洎己慬

我创建通过ManualResetEvent的交叉过程中的事件。当这个事件确实发生潜在N的N个不同进程中的线程应畅通,并开始运行以获取新的数据。的问题是,它似乎ManualResetEvent.Set随后立即复位不导致所有等待的线程醒来。该文档是pretty的模糊的还有

http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx

  

在手动重置事件对象的状态信号,它仍然   信号,直到它被明确地重置由ResetEvent无信号   功能。任意数量的等待线程,或线程的随后   开始为指定事件对象等待操作,可以释放   而对象的状态信号。

有一个名为方法PulseEvent这似乎做的正是我需要的,但不幸的是它也有缺陷。

  

一个线程等待一个同步对象可以暂时   从等待状态除去一个内核模式的APC,然后返回到   在APC后等待状态完成。如果调用PulseEvent   期间,当该线程已被从等待除去时间发生   状态,该线程将不会被释放,因为PulseEvent发布   只有那些等待的时刻它被称为线程。   因此,PulseEvent是不可靠的,不应该使用的新   应用程序。相反,使用条件变量。

峰会回顾 身份简史 可信数字身份创新成果展备受关注

现在MS不推荐使用条件变量。

  

条件变量的同步原语,使线程   等到特定情况发生。条件变量   该用户模式的对象不能在进程间共享

接着我似乎已经用完了运气的文档可靠地做到这一点。有没有一种简单的方法来完成同样的事情没有规定的限制有一个ManualResetEvent的还是我需要为每个侦听程序的响应事件得到一个ACK每个订阅来电?在这种情况下,我将需要一个小的共享存储器来注册订阅过程的PID但似乎带来在其自己的一组问题。什么是发生在一个进程崩溃或不响应? ......

要给予一定的范围内。我有新的国家要发布的所有其他进程应该从共享存储位置读取。它是确定会错过一个更新的时候几次更新发生一次,但这个过程必须读取至少在过去的最新值。我可以轮询一个超时,但似乎不像是正确的解决方案。

目前,我到

  =的ChangeEvent新的EventWaitHandle(假,EventResetMode.ManualReset,counterName +_Event);

ChangeEvent.Set();
Thread.sleep代码(1); //增加胜算释放所有的服务员
ChangeEvent.Reset();
 

解决方案

一个通用的选择处理,生产者必须唤醒所有的消费者和消费者的数量正在不断发展的情况下使用移动栅栏的方法。此选项需要共享内存IPC区了。该方法不会导致有时在消费者被唤醒时没有工作是present,特别是如果大量进程需要调度和负载是高的,但它们将总是唤醒除外无望重载机

创建若干手动复位事件和具有生产者保持计数器到将被设置的下一个事件。所有的事件都留一套,除了NextToFire事件。消费过程中等待的NextToFire事件。当生产商希望将其唤醒复位下一步+ 1事件的所有消费者,并将当前的事件。所有的消费者最终会被排定,然后等待新的NextToFire事件。其效果是,只有制作者使用ResetEvent,但消费者总是知道该事件将唤醒他们旁边。

所有用户初始化:(伪code是C / C ++,不是C#)

  //创建共享内存并初始化NextToFire;
pSharedMemory = MapMySharedMemory();
如果(先创建内存)pSharedMemory-> NextToFire = 0;

HANDLE数组[4];
阵列[0] = CreateEvent的(NULL,1,0,事件1);
阵列[1] = CreateEvent的(NULL,1,0,EVENT2);
阵列[2] = CreateEvent的(NULL,1,0,EVENT3);
阵列[3] = CreateEvent的(NULL,1,0,Event4);
 

制片人,以唤醒所有

 长CurrentNdx = pSharedMemory-> NextToFire;
长NextNdx =(CurrentNdx +​​ 1)及3;

//重置下一个事件让消费者块
ResetEvent(数组[NextNdx]);

//标志给消费者新的价值
长实际= InterlockedIncrement(安培; pSharedMemory-> NextToFire)及3;

//下一行,如果需要多个生产者积极。
//不是一个完美的解决方案
如果(实际= NextNdx!)ResetEvent(实际);

//现在唤醒他们全部
SetEvent的(CurrentNdx);
 

消费者等待逻辑

 长CurrentNdx =(pSharedMemory-> NextToFire)及3;
WaitForSingleObject的(数组[CurrentNdx],超时);
 

I have created a cross process event via ManualResetEvent. When this event does occur potentially n threads in n different processes should be unblocked and start running to fetch the new data. The problem is that it seems that ManualResetEvent.Set followed by an immediate Reset does not cause all waiting threads to wake up. The docs are pretty vague there

http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx

When the state of a manual-reset event object is signaled, it remains signaled until it is explicitly reset to nonsignaled by the ResetEvent function. Any number of waiting threads, or threads that subsequently begin wait operations for the specified event object, can be released while the object's state is signaled.

There is a method called PulseEvent which seems to do exactly what I need but unfortunately it is also flawed.

A thread waiting on a synchronization object can be momentarily removed from the wait state by a kernel-mode APC, and then returned to the wait state after the APC is complete. If the call to PulseEvent occurs during the time when the thread has been removed from the wait state, the thread will not be released because PulseEvent releases only those threads that are waiting at the moment it is called. Therefore, PulseEvent is unreliable and should not be used by new applications. Instead, use condition variables.

Now MS does recommend to use condition variables.

Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs. Condition variables are user-mode objects that cannot be shared across processes.

Following the docs I seem to have run out of luck to do it reliably. Is there an easy way to accomplish the same thing without the stated limitations with one ManualResetEvent or do I need to create for each listener process a response event to get an ACK for each subscribed caller? In that case I would need a small shared memory to register the pids of the subscribed processes but that seems to bring in its own set of problems. What does happen when one process crashes or does not respond? ....

To give some context. I have new state to publish which all other processes should read from a shared memory location. It is ok to miss one update when several updates occur at once but the process must read at least the last up to date value. I could poll with a timeout but that seems not like a correct solution.

Currently I am down to

ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, counterName + "_Event");

ChangeEvent.Set();
Thread.Sleep(1); // increase odds to release all waiters
ChangeEvent.Reset();

解决方案

One general purpose option for handling the case where producers must wake all consumers and the number of consumers is evolving is to use a moving fence approach. This option requires a shared memory IPC region too. The method does sometimes result in consumers being woken when no work is present, especially if lots of processes need scheduling and load is high, but they will always wake except on hopelessly overloaded machines.

Create several manual reset events and have the producers maintain a counter to the next event that will be set. All Events are left set, except the NextToFire event. Consumer processes wait on the NextToFire event. When the producer wishes to wake all consumers it resets the Next+1 event and sets the current event. All consumers will eventually be scheduled and then wait on the new NextToFire event. The effect is that only the producer uses ResetEvent, but consumers always know which event will be next to wake them.

All Users Init: (pseudo code is C/C++, not C#)

// Create Shared Memory and initialise NextToFire;
pSharedMemory = MapMySharedMemory();
if (First to create memory) pSharedMemory->NextToFire = 0;

HANDLE Array[4];
Array[0] = CreateEvent(NULL, 1, 0, "Event1");
Array[1] = CreateEvent(NULL, 1, 0, "Event2");
Array[2] = CreateEvent(NULL, 1, 0, "Event3");
Array[3] = CreateEvent(NULL, 1, 0, "Event4");

Producer to Wake all

long CurrentNdx = pSharedMemory->NextToFire;
long NextNdx = (CurrentNdx+1) & 3;

// Reset next event so consumers block
ResetEvent(Array[NextNdx]);

// Flag to consumers new value
long Actual = InterlockedIncrement(&pSharedMemory->NextToFire) & 3;

// Next line needed if multiple producers active.
// Not a perfect solution
if (Actual != NextNdx) ResetEvent(Actual);

// Now wake them all up
SetEvent(CurrentNdx);

Consumers wait logic

long CurrentNdx = (pSharedMemory->NextToFire) & 3;
WaitForSingleObject(Array[CurrentNdx],  Timeout);