这看起来像一个合理的方法来并发组/队列组合?组合、队列、方法来、合理

2023-09-04 12:09:18 作者:扬起微笑丶活得高傲

更新:正如布赖恩指出,我最初的想法的确有一个并发的问题。设置 - .AddOrUpdate 方法,它可以麻痹一个懒惰的思想家(像我一样),以为一切; TKEY的,TValue>这是某种程度上的 ConcurrentDictionary&LT的签名遮蔽增加以及队列推送 - 会莫名其妙地一下子都发生,原子(即神奇地)

Update: As Brian pointed out, my original idea did indeed have a concurrency issue. This was somewhat obscured by the signature of the ConcurrentDictionary<TKey, TValue>.AddOrUpdate method, which can lull a lazy thinker (like myself) into believing that everything--the set add as well as the queue push--will somehow happen all at once, atomically (i.e., magically).

在回想起来,那是愚蠢的我有这个期望。事实上,无论 AddOrUpdate 的实施,应该明确的是竞争状态仍然在我最初的想法存在,正如布赖恩指出:推到队列会发生添加到该组之前,事件因而以下顺序可能发生:

In retrospect, it was foolish of me to have this expectation. In fact, regardless of the implementation of AddOrUpdate, it should be clear that a race condition would still exist in my original idea, as Brian pointed out: pushing to the queue would happen before adding to the set, hence the following sequence of events could occur:

项目推到排队 项目从队列弹出 在项目(不)从集合中删除 项目加入到设置

上面的序列将导致在该组是不是在队列中,有效地黑名单从数据结构该项目的一个项目。

The above sequence would result in an item in the set that isn't in the queue, effectively blacklisting that item from the data structure.

现在,我想了很短的一段时间,我开始觉得下面的方法的可以的解决这些问题:

Now, I thought about this for a short while, and I'm beginning to think that the following approach may resolve these issues:

public bool Enqueue(T item)
{
    // This should:
    // 1. return true only when the item is first added to the set
    // 2. subsequently return false as long as the item is in the set;
    //    and it will not be removed until after it's popped
    if (_set.TryAdd(item, true))
    {
        _queue.Enqueue(item);
        return true;
    }

    return false;
}

构建这种方式,在排队呼叫只会发生一次 - 的在的项目是在集合。所以在队列中的重复项目不应该是一个问题。它似乎认为,由于队列操作bookended由一组操作 - 即一个项目推只的在的它添加到集合,它的突然出现的在的它是从集合中删除 - 不应该发生上述事件的问题的顺序

Structuring it this way, the Enqueue call only happens once -- after the item is in the set. So duplicate items in the queue should not be a problem. And it seems that since the queue operations are "bookended" by the set operations--i.e., an item is pushed only after it's added to the set, and it's popped before it's removed from the set--the problematic sequence of events outlined above should not occur.

人怎么看?难道说这样可以解决问题吗? (布莱恩一样,我倾向于怀疑自己和猜测,答案是的没有的,而我失去了一些东西一次。但是,嘿,它不会是一个有趣的挑战,如果它很容易,对不对?)

What do people think? Could it be that this resolves the problem? (Like Brian, I am inclined to doubt myself and guess that the answer is No, and I am missing something once again. But hey, it wouldn't be a fun challenge if it were easy, right?)

我也肯定看到了类似的问题在这里提出对SO,但令人惊讶的(考虑如何.NET重这个网站),他们似乎都对Java的。

I have definitely seen similar questions posed here on SO, but surprisingly (considering how .NET-heavy this site is) they all seemed to be for Java.

我还需要实际上是一组/队列组合类是线程安全的。换句话说,它应该是一个FIFO集合不允许重复(因此,如果相同的项目已经在队列中,随后排队调用将返回假,直到该项目从该队列弹出)。

I have need of essentially a set/queue combo class that is thread-safe. In other words, it should be a FIFO collection that does not allow duplicates (so if the same item is already in the queue, subsequent Enqueue calls will return false, until the item is popped from the queue).

我知道我可以用一个简单的实现这个很容易的HashSet&LT; T&GT; 问答LT; T&GT; 与锁定在所有必要的地方。不过,我很感兴趣,与 ConcurrentDictionary&LT完成它; TKEY的,TValue&GT; ConcurrentQueue&LT; T&GT; 从.NET 4.0类(也可作为接收扩展.NET 3.5的一部分,这是我使用的是什么),我的理解是莫名其妙无锁收藏*。

I realize I could implement this quite easily with a simple HashSet<T> and a Queue<T> with locking in all the necessary places. However, I was interested in accomplishing it with the ConcurrentDictionary<TKey, TValue> and ConcurrentQueue<T> classes from .NET 4.0 (also available as part of Rx extensions for .NET 3.5, which is what I'm using), which I understand to be somehow lock-free collections*.

我的基本计划是实施该集合是这样的:

My basic plan was to implement this collection something like this:

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue;
    ConcurrentDictionary<T, bool> _set;

    public ConcurrentSetQueue(IEqualityComparer<T> comparer)
    {
        _queue = new ConcurrentQueue<T>();
        _set = new ConcurrentDictionary<T, bool>(comparer);
    }

    public bool Enqueue(T item)
    {
        // This should:
        // 1. if the key is not present, enqueue the item and return true
        // 2. if the key is already present, do nothing and return false
        return _set.AddOrUpdate(item, EnqueueFirst, EnqueueSecond);
    }

    private bool EnqueueFirst(T item)
    {
        _queue.Enqueue(item);
        return true;
    }

    private bool EnqueueSecond(T item, bool dummyFlag)
    {
        return false;
    }

    public bool TryDequeue(out T item)
    {
        if (_queue.TryDequeue(out item))
        {
            // Another thread could come along here, attempt to enqueue, and
            // fail; however, this seems like an acceptable scenario since the
            // item shouldn't really be considered "popped" until it's been
            // removed from both the queue and the dictionary.
            bool flag;
            _set.TryRemove(item, out flag);

            return true;
        }

        return false;
    }
}

我有没有恰当地考虑了这一点?从表面上看,我看不到,因为我已经上面写的是这个基本理念任何明显的错误。不过,也许我忽视的东西。或者,也许使用 ConcurrentQueue&LT的; T&GT; ConcurrentDictionary&LT; T,布尔&GT; 实际上不是一个明智的选择,对于还没有发生,我认为原因。或者,也许别人已经实现了这一想法在一次战斗中验证库的地方,我应该只使用了。

Have I thought this through properly? On the surface, I can't see any obvious bugs in this basic idea as I've written it above. But maybe I'm overlooking something. Or maybe the use of a ConcurrentQueue<T> with a ConcurrentDictionary<T, bool> isn't actually a wise choice, for reasons that have not occurred to me. Or maybe somebody else has already implemented this idea in a battle-proven library somewhere, and I should just use that.

关于这个问题的任何想法或有用的信息将是多少AP preciated!

Any thoughts or useful info on this subject would be much appreciated!

*这是否是严格准确的,我不知道;但性能测试表明,我认为他们这样做优于可比手卷的集合,使用了许多消费者线程锁定。

推荐答案

短是否定的,psented的问题code $ P $不是线程安全的。

The short is no, the code presented in the question is not thread-safe.

MSDN文档是相当稀少的 AddOrUpdate 方法,所以我看了看在反射器的 AddOrUpdate 方法。这是基本的算法(我不是张贴反射输出出于法律原因和其很容易做你自己)。

The MSDN documentation is rather sparse on the AddOrUpdate method so I took a look at the AddOrUpdate method in Reflector. Here is the basic algorithm (I am not posting the Reflector output for legal reasons and its easy enough to do yourself).

TValue value;
do
{
  if (!TryGetValue(...))
  {
    value = AddValueFactoryDelegate(key);
    if (!TryAddInternal(...))
    {
      continue;
    }
    return value;
  }
  value = UpdateValueFactoryDelegate(key);
} 
while (!TryUpdate(...))
return value;

所以很明显的 AddValueFactoryDe​​legate UpdateValueFactoryDe​​legate 可以执行一次以上。没有必要在这里进一步解释。它应该是很明显这将如何打破你的code。我其实是一个有点震惊,代表可以被多次执行。该documenation没有提到这一点。你可能会认为这将是一个非常重要的点,以便调用者知道要避免将具有副作用(如您遇到这种情况)的代表。

So clearly the AddValueFactoryDelegate and UpdateValueFactoryDelegate can execute more than once. There is no need for further explanation here. It should be quite obvious how this will break your code. I am actually a little shocked that the delegates can be executed multiple times. The documenation makes no mention of this. You would think that would be an incredibly important point so callers know to avoid passing delegates that have side-effects (as is your case).

不过,即使代表们保证为只执行一次,那么这仍将是一个问题。它应该很容易通过可视化与 AddOrUpdate 方法的内容替换你的排队方法的问题序列。该 AddValueFactoryDe​​legate 可以执行与原件插入 _queue ,但线程可以通过上下文切换加入之前被打断项目 _set 。第二个线程可以再打电话给你的 TryDequeue 方法和提取 _queue 该项目,但未能从 _set ,因为它是不是在那里。

But even if the delegates were guarenteed to only execute once there would still be a problem. It should be easy to visualize a problem sequence by replacing your Enqueue method with the contents of the AddOrUpdate method. The AddValueFactoryDelegate could execute and insert an item into _queue, but the thread could get interrupted by a context switch before adding the item to _set. A second thread could then call your TryDequeue method and extract that item from _queue, but fail to remove it from _set since it is not in there yet.

更新:

好吧,我不认为这是有可能得到它的工作。 ConcurrentQueue 缺少一个关键的操作。我相信你需要相当于为 TryDequeue 方法。如果这样的操作存在的话,我的认为的下面的code是正确的。我用的是神话中的 TryDequeueCas 方法,它接受作为条件,说执行此操作比较值原子当且仅当在队列中的首个项目等于比较值。我们的想法是正好在 Interlocked.CompareExchange 方法中使用的相同。

Okay, I do not think it is possible to get it to work. ConcurrentQueue is missing one crucial operation. I believe you need a CAS equivalent for the TryDequeue method. If such an operation existed then I think the following code would be correct. I use the mythical TryDequeueCas method which accepts a comparison value that is used as condition to say perform this operation atomically if and only if the top item on the queue equals the comparison value. The idea is exactly the same used in the Interlocked.CompareExchange method.

请注意如何code使用了 ConcurrentDictionary 布尔值作为一个虚拟锁同步队列和字典的协调。数据结构也包含了CAS操作相当于 TryUpdate 被利用来获取和释放这种虚拟的锁。而且因为锁是虚而实际上并没有阻止并发访问,而循环中的 TryDequeue 方法是强制性的。这符合CAS操作的规范模式,因为它们在一个循环通常进行,直到取得成功。

Notice how the code uses the bool value in the ConcurrentDictionary as a "virtual" lock to synchronize the coordination of the queue and the dictionary. The data structure also contains the CAS equivalent operation TryUpdate which is exploited to acquire and release this "virtual" lock. And because the lock is "virtual" and does not actually block concurrent access the while loop in the TryDequeue method is mandatory. That fits the canonical pattern of CAS operations in that they are usually performed in a loop until they succeed.

在code还采用了的。NET 4.0风格的尝试,终于格式的锁获取symantics,以帮助防止造成了带外(异步)异常的问题。

The code also uses the .NET 4.0 style try-finally pattern for lock acquire symantics to help guard against the problems caused by out-of-band (asynchronous) exceptions.

注:同样,code使用的神话 ConcurrentQueue.TryDequeueCas 方法的

class ConcurrentSetQueue<T>
{
    ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    ConcurrentDictionary<T, bool> _set = new ConcurrentDictionary<T, bool>();

    public ConcurrentSetQueue()
    {
    }

    public bool Enqueue(T item)
    {
        bool acquired = false;
        try
        {
            acquired = _set.TryAdd(item, true);
            if (acquired)
            {
                _queue.Enqueue(item);
                return true;
            }
            return false;
        }
        finally
        {
            if (acquired) _set.TryUpdate(item, false, true);
        }
    }

    public bool TryDequeue(out T item)
    {
        while (_queue.TryPeek(out item))
        {
            bool acquired = false;
            try
            {
                acquired = _set.TryUpdate(item, true, false);
                if (acquired)
                {
                    if (_queue.TryDequeueCas(out item, item))
                    {
                        return true;
                    }
                }
            }
            finally
            {
                if (acquired) _set.TryRemove(item, out acquired);
            }
        }
        item = default(T);
        return false;
    }
}

更新2:

在引用您的修改通知是多么similiar时相比,我的。事实上,如果你从我的变化中删除所有的绒毛排队方法具有的确切的语句顺序相同。

In reference to your modification notice how similiar it is when compared to mine. In fact, if you remove all of the fluff from my variation the Enqueue method has the exact same sequence of statements.

我更担心的是 TryDequeue 虽然,这就是为什么我说这需要很多额外的东西在我的实行虚拟锁的概念。我当时特别担心访问的顺序相反的数据结构(字典,然后排队在排队的方法,但排队随后字典中的 TryDequeue 法)不过,我越觉得你修改的方法,我就越喜欢它。我觉得现在是反转的访问顺序是安全的的,因为的!

I was more worried about the TryDequeue though and that is why I added the "virtual" lock concept which required a lot of extra stuff in my implemenation. I was specifically worried about the opposite order of access to the data structures (dictionary then queue in the Enqueue method, but queue then dictionary in the TryDequeue method) But, the more I think about your modified approach the more I like it. And I now think it is because of the reversed access order that it is safe!