Task.Yield()在库需要ConfigureWait(假)Task、Yield、ConfigureWait

2023-09-03 16:22:52 作者:檬℃柠叶

这是建议一个用 ConfigureAwait(假)无论何时,当你可以的,尤其是在图书馆,因为它可以帮助避免死锁和提高性能。

我写了一个库,大量使用异步的(访问Web服务为DB)。图书馆的使用者获得一个僵局,经过多次痛苦的调试和他的动手我跟踪它到单独使用的等待Task.Yield()。在其他地方,我有一个计谋,我用 .ConfigureAwait(假),但不支持对 Task.Yield()

是什么情况推荐的解决方案,其中一个需要相当于 Task.Yield()。ConfigureAwait(假)

我已经知道了如何有一个 SwitchTo 法被删除。我能明白为什么,可能是危险的,但为什么没有相当于 Task.Yield()。ConfigureAwait(假)

编辑:

要为我的问题提供进一步的情况下,这里是一些code。我采取的开源库访问DynamoDB(分布式数据库从AWS服务),支持异步。一些操作返回 IAsyncEnumerable< T> 的 IX的规定-async库。该库不提供从生成提供行块,即每个异步请求返回的许多项目的数据源异步可枚举的好方法。所以,我有我自己的泛型类型这一点。该库支持预读选项,允许用户指定多少数据应该被要求提前时,确实需要通过调用的MoveNext的()

基本上,它的工作原理是,我通过调用 GetMore()和国家一起通过这两者之间做出了大块的请求。我把这些任务在队列和出队他们,把他们变成了我放在一个单独的队列实际效果。该 NextChunk()方法是这里的问题。根据预读我会保持,只要最后一个完成(所有)或没有得到下一个块的值,直到需要的值,但不可用(无)或仅获得下一个块超出当前正在使用的值(一些)。正因为如此,获得下一个块应该并行运行/未阻塞获取的下一个值。枚举code因为这是:

 私有类ChunkedAsyncEnumerator< TSTATE,TResult> :IAsyncEnumerator< TResult>
{
    私人只读ChunkedAsyncEnumerable< TSTATE,TResult>枚举;
    私人只读ConcurrentQueue<任务< TSTATE>>块=新ConcurrentQueue<任务< TSTATE>>();
    私人只读队列< TResult>结果=新问答LT; TResult>();
    私人CancellationTokenSource CTS =新CancellationTokenSource();
    私人TSTATE lastState;
    私人TResult电流;
    私人布尔完整的; //我们是否已经走到了尽头

    公共ChunkedAsyncEnumerator(ChunkedAsyncEnumerable< TSTATE,TResult>枚举,TSTATE初始化状态)
    {
        this.enumerable =枚举;
        lastState =初始化状态;
        如果(enumerable.ReadAhead!= ReadAhead.None)
            chunks.Enqueue(NextChunk(初始化状态));
    }

    私人异步任务< TSTATE> NextChunk(TSTATE状态的CancellationToken?的CancellationToken = NULL)
    {
        等待Task.Yield(); // **原因僵局
        VAR nextState =等待enumerable.GetMore(州,的CancellationToken ?? cts.Token).ConfigureAwait(假);
        如果(enumerable.ReadAhead == ReadAhead.All和放大器;&安培;!enumerable.IsComplete(nextState))
            chunks.Enqueue(NextChunk(nextState)); //这是一个超前读,因此它不应该被连接到我们的令牌

        返回nextState;
    }

    公共任务<布尔> MoveNext方法(的CancellationToken的CancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        如果(results.Count大于0)
        {
            电流= results.Dequeue();
            返回TaskConstants.True;
        }
        返回完整? TaskConstants.False:MoveNextAsync(的CancellationToken);
    }

    私人异步任务<布尔> MoveNextAsync(的CancellationToken的CancellationToken)
    {
        任务< TSTATE> nextStateTask;
        如果(chunks.TryDequeue(出nextStateTask))
            lastState =等待nextStateTask.WithCancellation(的CancellationToken).ConfigureAwait(假);
        其他
            lastState =等待NextChunk(lastState,的CancellationToken).ConfigureAwait(假);

        完整= enumerable.IsComplete(lastState);
        的foreach(VAR结果enumerable.GetResults(lastState))
            results.Enqueue(结果);

        如果(完整的&放大器;!&安培; enumerable.ReadAhead == ReadAhead.Some)
            chunks.Enqueue(NextChunk(lastState)); //这是一个超前读,因此它不应该被连接到我们的令牌

        回等待的MoveNext(的CancellationToken).ConfigureAwait(假);
    }

    公共TResult当前{{返回电流; }}

    // Dispose()方法实现省略
}
 

我并没有要求这code是完美的。很抱歉,这是这么长时间,不知道如何简化。最重要的部分是 NextChunk 方法调用 Task.Yield()。这个功能是通过一个静态施工方法中使用

 内部静态类AsyncEnumerableEx
{
    公共静态IAsyncEnumerable< TResult> GenerateChunked< TSTATE,TResult>(
        TSTATE的初始化状态,
        FUNC< TSTATE,的CancellationToken,任务< TSTATE>>得到更多,
        FUNC< TSTATE,IEnumerable的< TResult>> getResults,
        FUNC< TSTATE,布尔> isComplete,
        预读预读= ReadAhead.None)
    {...}
}
 
数据湖,当然得要全闪存的

解决方案

的完全等效Task.Yield()。ConfigureAwait(假)(不存在因为 ConfigureAwait 工作 Task.Yield 的方法返回一个自定义的awaitable)被简单地使用 Task.Factory.StartNew CancellationToken.None TaskCreationOptions。preferFairness TaskScheduler.Current 。 在大多数情况下,然而, Task.Run (使用默认的的TaskScheduler )是足够接近。

您可以验证通过查看源代码的 YieldAwaiter 并看到它使用 ThreadPool.QueueUserWorkItem / ThreadPool.UnsafeQueueUserWorkItem TaskScheduler.Current 是默认的(即线程池)和 Task.Factory.StartNew 时,它不是。

不过,您可以创建自己的awaitable(像我一样),模仿 YieldAwaitable ,但忽略了的SynchronizationContext

 异步任务运行(INT输入)
{
    等待新的NoContextYieldAwaitable();
    //在一个线程池线程中执行
}

公共结构NoContextYieldAwaitable
{
    公共NoContextYieldAwaiter GetAwaiter(){返回新NoContextYieldAwaiter(); }
    公共结构NoContextYieldAwaiter:INotifyCompletion
    {
        公共BOOL IsCompleted {{返回FALSE; }}
        公共无效OnCompleted(动作续)
        {
            VAR调度= TaskScheduler.Current;
            如果(调度== TaskScheduler.Default)
            {
                ThreadPool.QueueUserWorkItem(RunAction,延续);
            }
            其他
            {
                Task.Factory.StartNew(延续,CancellationToken.None,TaskCreationOptions preferFairness,调度器。);
            }
        }

        公共无效调用getResult(){}
        私有静态无效RunAction(对象状态){((行动)的状态)(); }
    }
}
 

请注意:使用 NoContextYieldAwaitable ,它只是一个回答你的问题我真的不推荐。您应该使用 Task.Run (或 Task.Factory.StartNew 与特定的TaskScheduler

It's recommended that one use ConfigureAwait(false) whenever when you can, especially in libraries because it can help avoid deadlocks and improve performance.

I have written a library that makes heavy use of async (accesses web services for a DB). The users of the library were getting a deadlock and after much painful debugging and tinkering I tracked it down to the single use of await Task.Yield(). Everywhere else that I have an await, I use .ConfigureAwait(false), however that is not supported on Task.Yield().

What is the recommended solution for situations where one needs the equivalent of Task.Yield().ConfigureAwait(false)?

I've read about how there was a SwitchTo method that was removed. I can see why that could be dangerous, but why is there no equivalent of Task.Yield().ConfigureAwait(false)?

Edit:

To provide further context for my question, here is some code. I am implementing an open source library for accessing DynamoDB (a distributed database as a service from AWS) that supports async. A number of operations return IAsyncEnumerable<T> as provided by the IX-Async library. That library doesn't provide a good way of generating async enumerables from data sources that provide rows in "chunks" i.e. each async request returns many items. So I have my own generic type for this. The library supports a read ahead option allowing the user to specify how much data should be requested ahead of when it is actually needed by a call to MoveNext().

Basically, how this works is that I make requests for chunks by calling GetMore() and passing along state between these. I put those tasks in a chunks queue and dequeue them and turn them into actual results that I put in a separate queue. The NextChunk() method is the issue here. Depending on the value of ReadAhead I will keeping getting the next chunk as soon as the last one is done (All) or not until a value is needed but not available (None) or only get the next chunk beyond the values that are currently being used (Some). Because of that, getting the next chunk should run in parallel/not block getting the next value. The enumerator code for this is:

private class ChunkedAsyncEnumerator<TState, TResult> : IAsyncEnumerator<TResult>
{
    private readonly ChunkedAsyncEnumerable<TState, TResult> enumerable;
    private readonly ConcurrentQueue<Task<TState>> chunks = new ConcurrentQueue<Task<TState>>();
    private readonly Queue<TResult> results = new Queue<TResult>();
    private CancellationTokenSource cts = new CancellationTokenSource();
    private TState lastState;
    private TResult current;
    private bool complete; // whether we have reached the end

    public ChunkedAsyncEnumerator(ChunkedAsyncEnumerable<TState, TResult> enumerable, TState initialState)
    {
        this.enumerable = enumerable;
        lastState = initialState;
        if(enumerable.ReadAhead != ReadAhead.None)
            chunks.Enqueue(NextChunk(initialState));
    }

    private async Task<TState> NextChunk(TState state, CancellationToken? cancellationToken = null)
    {
        await Task.Yield(); // ** causes deadlock
        var nextState = await enumerable.GetMore(state, cancellationToken ?? cts.Token).ConfigureAwait(false);
        if(enumerable.ReadAhead == ReadAhead.All && !enumerable.IsComplete(nextState))
            chunks.Enqueue(NextChunk(nextState)); // This is a read ahead, so it shouldn't be tied to our token

        return nextState;
    }

    public Task<bool> MoveNext(CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        if(results.Count > 0)
        {
            current = results.Dequeue();
            return TaskConstants.True;
        }
        return complete ? TaskConstants.False : MoveNextAsync(cancellationToken);
    }

    private async Task<bool> MoveNextAsync(CancellationToken cancellationToken)
    {
        Task<TState> nextStateTask;
        if(chunks.TryDequeue(out nextStateTask))
            lastState = await nextStateTask.WithCancellation(cancellationToken).ConfigureAwait(false);
        else
            lastState = await NextChunk(lastState, cancellationToken).ConfigureAwait(false);

        complete = enumerable.IsComplete(lastState);
        foreach(var result in enumerable.GetResults(lastState))
            results.Enqueue(result);

        if(!complete && enumerable.ReadAhead == ReadAhead.Some)
            chunks.Enqueue(NextChunk(lastState)); // This is a read ahead, so it shouldn't be tied to our token

        return await MoveNext(cancellationToken).ConfigureAwait(false);
    }

    public TResult Current { get { return current; } }

    // Dispose() implementation omitted
}

I make no claim this code is perfect. Sorry it is so long, wasn't sure how to simplify. The important part is the NextChunk method and the call to Task.Yield(). This functionality is used through a static construction method:

internal static class AsyncEnumerableEx
{
    public static IAsyncEnumerable<TResult> GenerateChunked<TState, TResult>(
        TState initialState,
        Func<TState, CancellationToken, Task<TState>> getMore,
        Func<TState, IEnumerable<TResult>> getResults,
        Func<TState, bool> isComplete,
        ReadAhead readAhead = ReadAhead.None)
    { ... }
}

解决方案

The exact equivalent of Task.Yield().ConfigureAwait(false) (which doesn't exist since ConfigureAwait is a method on Task and Task.Yield returns a custom awaitable) is simply using Task.Factory.StartNew with CancellationToken.None, TaskCreationOptions.PreferFairness and TaskScheduler.Current. In most cases however, Task.Run (which uses the default TaskScheduler) is close enough.

You can verify that by looking at the source for YieldAwaiter and see that it uses ThreadPool.QueueUserWorkItem/ThreadPool.UnsafeQueueUserWorkItem when TaskScheduler.Current is the default one (i.e. thread pool) and Task.Factory.StartNew when it isn't.

You can however create your own awaitable (as I did) that mimics YieldAwaitable but disregards the SynchronizationContext:

async Task Run(int input)
{
    await new NoContextYieldAwaitable();
    // executed on a ThreadPool thread
}

public struct NoContextYieldAwaitable
{
    public NoContextYieldAwaiter GetAwaiter() { return new NoContextYieldAwaiter(); }
    public struct NoContextYieldAwaiter : INotifyCompletion
    {
        public bool IsCompleted { get { return false; } }
        public void OnCompleted(Action continuation)
        {
            var scheduler = TaskScheduler.Current;
            if (scheduler == TaskScheduler.Default)
            {
                ThreadPool.QueueUserWorkItem(RunAction, continuation);
            }
            else
            {
                Task.Factory.StartNew(continuation, CancellationToken.None, TaskCreationOptions.PreferFairness, scheduler);
            }
        }

        public void GetResult() { }
        private static void RunAction(object state) { ((Action)state)(); }
    }
}

Note: I don't recommend actually using NoContextYieldAwaitable, it's just an answer to your question. You should be using Task.Run (or Task.Factory.StartNew with a specific TaskScheduler)