ObserveOn与Scheduler.NewThread不遵守的话,如果观察者的OnNext受阻并继续的话、观察者、继续、Scheduler

2023-09-06 22:00:14 作者:我比你白a.

有人请帮助可以解释为什么当我阻止并继续观察者的onNext序列认购随着时间可观察序列的缓冲,这Scheduler.NewThread已经不适用?

例如:

如果我缓冲区数的序列通过

  VAR的查询=从Enumerable.Range数(1200)
            选择SnoozeNumberProduction(号码);

变种observableQuery = query.ToObservable();
变种bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));
 
Dawn of a new dimension

在哪里SnoozeNumberProduction由250毫秒延迟数生成

 静态INT SnoozeNumberProduction(的Int32号)
{
    Thread.sleep代码(250);
    返回数;
}
 

现在如果以后我订阅了bufferedSequence与ObserveOn(Scheduler.NewThread)这样的,我阻塞了Console.ReadKey第四缓冲

 随机随机=新的随机();
INT32计数= 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(名单=>
{
    Console.WriteLine(({0})从{1}数字 -  {2}产生的线程ID {3},list.Count,列表[0],列表[list.Count -1],Thread.CurrentThread。 ManagedThreadId);

    Thread.sleep代码(1000);
    算上++;
    如果(计数== 4)
    {
        Console.WriteLine(计数达到4,拦截... preSS任意键继续);
        Console.ReadKey(); //阻止并建立队列
    }

    Console.WriteLine(醒来+列表[0] + - +列表[list.Count  -  1]);
});
 

在这种情况下,如果我按任意键比如​​10秒左右,我看到下面接下来的几个缓冲区上,即使Scheduler.NewThread中提到的ObserveOn相同ManagedThread执行后。是否有人可以帮助解释这种现象?

示例输出:

 (7),1-7号产生的线程ID 12
唤醒1  -  7
(9)从8-16号产生的线程ID 14
唤醒8  -  16
(8)17-24号产生的线程ID 15
唤醒17  -  24
(8)25-32号产生的线程ID 16
计数达到4,拦截... preSS任意键继续。
唤醒25  -  32
(8)33-40号产生的线程ID ** 16 **
唤醒33  -  40
(8)41-48号产生的线程ID ** 16 **
唤醒41  -  48
(8)49-56号产生的线程ID ** 16 **
唤醒49  -  56
(8)57-64号产生的线程ID ** 16 **
唤醒57  -  64
(8)65-72号产生的线程ID ** 16 **
唤醒65  -  72
(8)73-80号产生的线程ID ** 16 **
唤醒73  -  80
(8)81-88号产生的线程ID ** 16 **
唤醒81  -  88
(8)89-96号产生的线程ID ** 16 **
 

解决方案

我交叉张贴了这个问题MSDN接收论坛的http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5并了解到,这是出于效率的考虑

您正在阻塞调用OnNext的订阅。 的ObserveOn运营商保证在OnNext将被称为当前线程上多次越好。 该ObserveOn运营商重复使用当前线程调用OnNext,顺序,尽可能多的值目前已经上市。 因为你堵在订阅,多次调用OnNext会累积。您解除后,排队的呼叫是在相​​同的线程中执行。我相信这是为了避免造成每通知一个新的线程时,它的不必要的开销。

Can some one please help explain why when I "block and continue" observer's onNext sequence subscribed to a buffer with time observable sequence, that Scheduler.NewThread does not apply anymore?

For example:

If I buffer a sequence of number via

var query = from number in Enumerable.Range(1,200)
            select SnoozeNumberProduction(number);

var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));

Where SnoozeNumberProduction delays the number generation by 250 ms

static int SnoozeNumberProduction(Int32 number)
{
    Thread.Sleep(250);
    return number;
}

Now later if i subscribe to the bufferedSequence with an "ObserveOn(Scheduler.NewThread)" such that I block on the fourth buffer with a Console.ReadKey

Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
    Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);

    Thread.Sleep(1000);
    count++;
    if (count == 4)
    {
        Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
        Console.ReadKey(); // Block and build up the queue
    }

    Console.WriteLine("Woken " + list[0] + " - " + list[list.Count - 1]);
});

In this case if I hit any key after say 10 seconds or so, I see that the following next few buffers execute on the same ManagedThread even when Scheduler.NewThread is mentioned in the ObserveOn. Can someone please help explain this behavior?

Sample output:

(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**

解决方案

I cross posted this question to MSDN Rx forum http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 and learnt that this is for efficiency reasons

You're blocking the call to OnNext in Subscribe. The ObserveOn operator ensures that OnNext will be called as many times as possible on the current thread. The ObserveOn operator reuses the current thread to call OnNext, sequentially, for as many values as are currently available. Since you're blocking in Subscribe, multiple calls to OnNext will accumulate. After you unblock, the queued calls are executed on the same thread. I believe this is to avoid the overhead of creating a new thread per notification when it's unnecessary.

 
精彩推荐
图片推荐