如何访问阻塞集合的底层默认并发队列队列、底层

2023-09-07 15:10:10 作者:你不爱我、我不怪你

我有多个生产者和单次消费。然而,如果有一些在一个尚未消耗生产者不应再排队它的队列。 (独特的无重复阻挡集合,使用默认的并发队列)

I have multiple producers and a single consumer. However if there is something in the queue that is not yet consumed a producer should not queue it again. (unique no duplicates blocking collection that uses the default concurrent queue)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)

然而,阻挡科尔没有包含方法,也没有提供任何形式trypeek()之类的方法。我怎样才能访问底层并发队列,所以我可以做的像做

However the blocking coll does not have a contains method nor does it provide any kind of trypeek() like method. How can I access the underlying concurrent queue so I can do do something like

if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)

在尾旋。请帮忙。谢谢

推荐答案

这是一个有趣的问题。这是我第一次看到有人问一个阻塞队列,忽略重复。奇怪的是我能找到一点都不像你想的已经存在于BCL什么。我说这是奇怪,因为 BlockingCollection 可以接受 IProducerConsumerCollection 作为底层集合其中有 TryAdd 方法是标榜能够检测到重复的时候失败。问题是,我没有看到具体的实施 IProducerConsumerCollection 是prevents重复。至少我们可以写我们自己的。

This is an interesting question. This is the first time I have seen someone ask for a blocking queue that ignores duplicates. Oddly enough I could find nothing like what you want that already exists in the BCL. I say this is odd because BlockingCollection can accept a IProducerConsumerCollection as the underlying collection which has the TryAdd method that is advertised as being able to fail when duplicates are detected. The problem is that I see no concrete implementation of IProducerConsumerCollection that prevents duplicates. At least we can write our own.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.

  private Queue<T> queue = new Queue<T>();

  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }

  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}

现在,我们有我们的 IProducerConsumerCollection 不接受重复的,我们可以用这样的:

Now that we have our IProducerConsumerCollection that does not accept duplicates we can use it like this:

public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());

  public Example()
  {
    new Thread(Consume).Start();
  }

  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }

  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}

您可以不喜欢我实施 NoDuplicatesConcurrentQueue 。你当然可以自由地实现自己的使用 ConcurrentQueue 或者如果你认为你需要低锁性能的TPL集合提供力所能及的。

You may not like my implementation of NoDuplicatesConcurrentQueue. You are certainly free to implement your own using ConcurrentQueue or whatever if you think you need the low-lock performance that the TPL collections provide.

更新:

我能够今天早上测试code。有一些好消息和坏消息。好消息是,这将在技术上的工作。坏消息是,你可能不希望这样做,因为 BlockingCollection.TryAdd 从底层 IProducerConsumerCollection.TryAdd 方法并抛出时中检测到异常。是的,这是正确的。它不会返回就像你所期望的,而是产生异常。我必须说实话,这既令人惊讶的和荒谬的。的TryXXX方法的要点是,他们不应该抛出异常。我深感失望。

I was able to test the code this morning. There is some good news and bad news. The good news is that this will technically work. The bad news is that you probably will not want to do this because BlockingCollection.TryAdd intercepts the return value from the underlying IProducerConsumerCollection.TryAdd method and throws an exception when false is detected. Yep, that is right. It does not return false like you would expect and instead generates an exception. I have to be honest, this is both surprising and ridiculous. The whole point of the TryXXX methods is that they should not throw exceptions. I am deeply disappointed.