我有一些库(套筒联网)code,它提供了一个工作
基于API挂起响应请求,根据 TaskCompletionSource&LT ; T>
。然而,有一个在TPL的烦恼,因为它似乎是不可能的prevent同步延续。我会的喜欢的,能够做的是两种:
TaskCompletionSource< T>
是不应该允许呼叫者与 TaskContinuationOptions.ExecuteSynchronously
来连接,或
设置的结果(的setResult
/ TrySetResult
)的方式,指定 TaskContinuationOptions .ExecuteSynchronously
应该被忽略,使用泳池,而不是
具体而言,这个问题我已经是输入的数据是由一个专门的阅读器处理,如果呼叫者可以与 TaskContinuationOptions.ExecuteSynchronously
连接就可以搪塞读者(它影响的不仅仅是他们)。 previously,我身边这个被一些两轮牛车,检测是否任何的延续是present,如果他们是它推动完成到线程池,但是这有主叫者是否已饱和自己的工作队列,因为建成后,将不能得到及时处理显著的影响。如果他们使用
Task.Wait()
(或类似),他们会那么基本上死锁自己。同样的,这就是为什么读者在一个专门的线程,而不是使用工人。
所以,之前,我尝试和老马第三方物流团队:?我失去了一个选项
要点:
我不希望外部调用者可以劫持我的线程 在我无法使用线程池
作为一个实现,因为它需要工作时,池饱和
下面的例子生成输出(顺序可以根据时间而有所不同):
续上:主线程
preSS [返回]
续上:线程池
的问题是这样一个事实:随机呼叫者设法延续上的主线程。在真实code,这将是中断初级读者;不好的事情!
code:
使用系统;
使用的System.Threading;
使用System.Threading.Tasks;
静态类节目
{
静态无效的标识()
{
VAR线程= Thread.CurrentThread;
字符串名称= thread.IsThreadPoolThread
? 线程池:thread.Name;
如果(string.IsNullOrEmpty(名))
NAME =#+ thread.ManagedThreadId;
Console.WriteLine(关于继续:+姓名);
}
静态无效的主要()
{
Thread.CurrentThread.Name =主线程;
无功源=新TaskCompletionSource<诠释>();
VAR任务= source.Task;
task.ContinueWith(委托{
识别();
});
task.ContinueWith(委托{
识别();
},TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine(preSS [返回]);
到Console.ReadLine();
}
}
解决方案
新的.NET 4.6:
.NET 4.6包含了一个新的 TaskCreationOptions
: RunContinuationsAsynchronously
既然你愿意使用反射来访问私有字段...
您可以标记TCS的任务与 TASK_STATE_THREAD_WAS_ABORTED
标记,这将导致不被内联的所有延续。
const int的TASK_STATE_THREAD_WAS_ABORTED = 134217728;
VAR stateField = typeof运算(任务).GetField(m_stateFlags,BindingFlags.NonPublic可| BindingFlags.Instance);
stateField.SetValue(任务,(int)的stateField.GetValue(任务)| TASK_STATE_THREAD_WAS_ABORTED);
编辑:
而不是使用反射的发出,我建议你使用EX pressions。这是更具可读性,并有被PCL兼容的优点:
VAR taskParameter =前pression.Parameter(typeof运算(任务));
常量字符串stateFlagsFieldName =m_stateFlags;
VAR二传手=
防爆pression.Lambda<作用<任务>>(
防爆pression.Assign(出pression.Field(taskParameter,stateFlagsFieldName)
防爆pression.Or(出pression.Field(taskParameter,stateFlagsFieldName)
防爆pression.Constant(TASK_STATE_THREAD_WAS_ABORTED))),taskParameter).Compile();
如果不使用反射:
如果任何人的兴趣,我想出了一个办法做到这一点不用思考,但它是一个有点脏,以及,当然,带有不可忽略的PERF的处罚:
尝试
{
Thread.CurrentThread.Abort();
}
赶上(ThreadAbortException)
{
source.TrySetResult(123);
Thread.ResetAbort();
}
I have some library (socket networking) code that provides a Task
-based API for pending responses to requests, based on TaskCompletionSource<T>
. However, there's an annoyance in the TPL in that it seems to be impossible to prevent synchronous continuations. What I would like to be able to do is either:
TaskCompletionSource<T>
that is should not allow callers to attach with TaskContinuationOptions.ExecuteSynchronously
, or
set the result (SetResult
/ TrySetResult
) in a way that specifies that TaskContinuationOptions.ExecuteSynchronously
should be ignored, using the pool instead
Specifically, the issue I have is that the incoming data is being processed by a dedicated reader, and if a caller can attach with TaskContinuationOptions.ExecuteSynchronously
they can stall the reader (which affects more than just them). Previously, I have worked around this by some hackery that detects whether any continuations are present, and if they are it pushes the completion onto the ThreadPool
, however this has significant impact if the caller has saturated their work queue, as the completion will not get processed in a timely fashion. If they are using Task.Wait()
(or similar), they will then essentially deadlock themselves. Likewise, this is why the reader is on a dedicated thread rather than using workers.
So; before I try and nag the TPL team: am I missing an option?
Key points:
I don't want external callers to be able to hijack my thread I can't use theThreadPool
as an implementation, as it needs to work when the pool is saturated
The example below produces output (ordering may vary based on timing):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
The problem is the fact that a random caller managed to get a continuation on "Main thread". In the real code, this would be interrupting the primary reader; bad things!
Code:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
解决方案
New in .NET 4.6:
.NET 4.6 contains a new TaskCreationOptions
: RunContinuationsAsynchronously
.
Since you're willing to use Reflection to access private fields...
You can mark the TCS's Task with the TASK_STATE_THREAD_WAS_ABORTED
flag, which would cause all continuations not to be inlined.
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Edit:
Instead of using Reflection emit, I suggest you use expressions. This is much more readable and has the advantage of being PCL-compatible:
var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
Expression.Lambda<Action<Task>>(
Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Without using Reflection:
If anyone's interested, I've figured out a way to do this without Reflection, but it is a bit "dirty" as well, and of course carries a non-negligible perf penalty:
try
{
Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
source.TrySetResult(123);
Thread.ResetAbort();
}