线程安全的异步字节队列队列、线程、字节、安全

2023-09-07 04:39:21 作者:无泪人

我有一个回调方法被调用,每当新的数据是可用的:

I've got a callback method that is called whenever new data is available:

public delegate void DataCallback(
    byte[] buffer,
    int offset,
    int count);

我想这包装在实现类似于此接口的类:

I want to wrap this in a class that implements an interface similar to this:

public interface IDataSource
{
    IAsyncResult BeginRead(
        byte[] buffer,
        int offset,
        int size,
        TimeSpan timeout,
        AsyncCallback callback,
        object state);

    int EndRead(
        IAsyncResult asyncResult);

    int Read(
        byte[] buffer,
        int offset,
        int size,
        TimeSpan timeout);
}

这显然是一个经典的生产者 - 消费者的问题:字节通过调用回调方法制作,并开​​始/ EndRead和读取方法消耗。在开始/如果没有数据可用(直到发生超时)EndRead和读取方法应该阻止。实施中应使用一个固定大小的内部缓冲区,因此回调方法需要在缓冲区目前已满阻止。

This is obviously a classical producer-consumer problem: the bytes are produced by calls to the callback method, and consumed by the Begin/EndRead and Read methods. The Begin/EndRead and Read methods should block if no data is available (until a timeout occurs). The implementation should use a fixed-size internal buffer, so the callback method needs to block when the buffer is currently full.

由于考虑多线程通常会导致剧烈的头痛,我的问题是:?是否已有这样的数据结构的实现

Since thinking about multithreading usually results in a severe headache, my question is: Is there already an implementation of such a data structure?

(我认为落实读法应该是很简单的,但我想,以避免执行开始/ EndRead与读取。开始 / EndInvoke会

(I think implementing the Read method should be quite simple, but I'd like to avoid implementing Begin/EndRead with Read.Begin/EndInvoke.)

推荐答案

是否必须是异步通过的IAsyncResult ?我有一个通用的阻止的队列here (即读者阻塞,直到有数据或者是封闭的;作家块,直到有空间);它不是专门为优化的byte [] ,但只要规模不是巨大的,应该适应 - 但作为一个阻塞队列,它需要(至少一个)专用消费者线程,这样做的:

Does it have to be async via IAsyncResult? I have a generic blocking queue here (i.e. readers block until there is data or it is closed; writers block until there is space); it isn't optimised specifically for byte[], but as long as the size isn't vast it should cope - but as a blocking queue it requires (at least one) dedicated consumer thread, doing:

T val;
while(queue.TryDequeue(out val)) {
    // process val
}