如何加入多的IObservable序列?序列、IObservable

2023-09-03 02:05:51 作者:曾经的私宠、现在的失宠

  VAR一个= Observable.Range(0,10);
        变种B = Observable.Range(5,10);
        变种邮编= a.Zip(二,(X,Y)=&X的催化剂+ - + y)的;
        zip.Subscribe(Console.WriteLine);
 

打印 0 - 5 1 - 6 2 - 7 ......

相反,我想加入相同的值 5 - 5 6 - 6 7 - 7 8 - 8 ......

这就是问题所在合并有序异步序列的100年代的一个简单的例子。这是很容易将两个IEnumerable的的,但我不能找到一种方法,这样做的接收。任何想法?

更多关于输入什么,我想要的目的。基本上,整个系统是一个实时的管道与的fork-join模式连接的多个状态机(聚合器,缓冲,平滑滤波器等)。 RX是一个良好的适合于实现这样的事情? 每个输入都可以重新psented为

$ P $

 公共结构数据点
{
    公众双重价值;
    公众的DateTimeOffset时间戳;
}
 
如何在word中插入序号

每个数据输入位是由他们结合键(时间戳)时间戳抵达,因此,所有的活动都自然排序。随着事件通过管道行进,他们得到分叉和加入。联接需要通过时间戳相关并在predefined顺序应用。例如,加入(A,B,C,D)=>加盟(加盟(加盟(A,B),C),D)。

修改 下面是我能与匆忙上来。希望有基于现有的Rx运营一个简单的解决方案。

 静态无效测试()
    {
        变种一个= Observable.Range(0,10);
        变种B = Observable.Range(5,10);
        //变种邮编= a.Zip(二,(X,Y)=&X的催化剂+ - + y)的;
        //zip.Subscribe(Console.WriteLine);

        变种加入= MergeJoin(A,B,(X,Y)=&X的催化剂+ - + y)的;
        joined.Subscribe(Console.WriteLine);
    }

静态的IObservable<字符串> MergeJoin(的IObservable< INT>左,的IObservable< INT>右,Func键< INT,INT,字符串>选择器)
    {
        返回Observable.CreateWithDisposable<字符串>(O =>
            {
                队列< INT> A =新的队列和LT;诠释>();
                队列< INT> B =新问答其中,诠释>();
                目标门=新的对象();

                left.Subscribe(X =>
                    {
                        锁(门)
                        {
                            如果(a.Count == 0 || a.Peek()< X)
                                a.Enqueue(X);

                            而(a.Count = 0&放大器;!&安培;!b.Count = 0)
                            {
                                如果(a.Peek()== b.Peek())
                                {
                                    o.OnNext(选择器(a.D​​equeue(),b.Dequeue()));
                                }
                                否则,如果(a.Peek()< b.Peek())
                                {
                                    a.Dequeue();
                                }
                                其他
                                {
                                    b.Dequeue();
                                }
                            }
                        }
                    });

                right.Subscribe(X =>
                {
                    锁(门)
                    {
                        如果(b.Count == 0 || b.Peek()< X)
                            b.Enqueue(X);

                        而(a.Count = 0&放大器;!&安培;!b.Count = 0)
                        {
                            如果(a.Peek()== b.Peek())
                            {
                                o.OnNext(选择器(a.D​​equeue(),b.Dequeue()));
                            }
                            否则,如果(a.Peek()< b.Peek())
                            {
                                a.Dequeue();
                            }
                            其他
                            {
                                b.Dequeue();
                            }
                        }
                    }
                });

                返回Disposable.Empty;
            });
 

解决方案

老实说,我不认为基于现有运营商的解决方案能对未知的顺序热资源(也就是前XS的YS VS XS之前YS )。您的解决方案似乎罚款(嘿,如果它的工作原理),但我会做一些修改,如果它是我的code:

支持取消正确使用 MutableDisposable CompositeDisposable 呼叫的OnError 从选择抛出的异常(这使得它与其他运营商更一致的) 在考虑支持完成,如果有可能为一个信号源到另一个前完成

在code以下已经过测试,您的双量程输入,相同的输入翻转,以及与空缺< INT> +切勿< INT>

 公共静态的IObservable<字符串> MergeJoin(
    的IObservable< INT>左侧的IObservable< INT>没错,Func键和LT; INT,INT,字符串>选择器)
{
    返回Observable.CreateWithDisposable<字符串>(O =>
    {
        队列< INT> A =新的队列和LT;诠释>();
        队列< INT> B =新问答其中,诠释>();
        目标门=新的对象();

        布尔leftComplete = FALSE;
        布尔rightComplete = FALSE;

        MutableDisposable leftSubscription =新MutableDisposable();
        MutableDisposable rightSubscription =新MutableDisposable();

        行动tryDequeue =()=>
        {
            锁(门)
            {
                而(a.Count = 0&放大器;!&安培;!b.Count = 0)
                {
                    如果(a.Peek()== b.Peek())
                    {
                        字符串值= NULL;

                        尝试
                        {
                            值=选择(a.Dequeue(),b.Dequeue());
                        }
                        赶上(例外前)
                        {
                            o.OnError(前);
                            返回;
                        }

                        o.OnNext(值);
                    }
                    否则,如果(a.Peek()< b.Peek())
                    {
                        a.Dequeue();
                    }
                    其他
                    {
                        b.Dequeue();
                    }
                }
            }
        };

        leftSubscription.Disposable = left.Subscribe(X =>
        {
            锁(门)
            {
                如果(a.Count == 0 || a.Peek()< X)
                    a.Enqueue(X);

                tryDequeue();

                如果(rightComplete&安培;&安培; b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        },()=>
        {
            leftComplete = TRUE;

            如果(a.Count == 0 || rightComplete)
            {
                o.OnCompleted();
            }
        });

        rightSubscription.Disposable = right.Subscribe(X =>
        {
            锁(门)
            {
                如果(b.Count == 0 || b.Peek()< X)
                    b.Enqueue(X);

                tryDequeue();

                如果(rightComplete&安培;&安培; b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        },()=>
        {
            rightComplete = TRUE;

            如果(b.Count == 0 || leftComplete)
            {
                o.OnCompleted();
            }
        });

        返回新CompositeDisposable(leftSubscription,rightSubscription);
    });
}
 

        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        var zip = a.Zip(b, (x, y) => x + "-" + y);
        zip.Subscribe(Console.WriteLine);

Prints 0 - 5 1 - 6 2 - 7 ...

Instead, I would like to join identical values 5 - 5 6 - 6 7 - 7 8 - 8 ...

This is a simplified example of the problem merging 100s of ordered asynchronous sequences. It is very easy to join two IEnumerable's, but I could not find a way to do something like this in Rx. Any ideas?

More about inputs and what I am trying to achieve. Basically, the whole system is a real-time pipeline with multiple state machines (aggregators, buffers, smoothing filters, etc) connected by fork-join pattern. Is RX a good fit for implementing such things? Each input can be represented as

public struct DataPoint
{
    public double Value;
    public DateTimeOffset Timestamp;
}

Each input bit of data is timestamped on arrival, thus all events are naturally ordered by their joining key (timestamp). As the events travel through the pipeline they get forked and joined. Joins need to be correlated by timestamp and applied in predefined order. For example, join(a,b,c,d) => join(join(join(a,b),c),d).

Edit Below is what I could come up with in a hurry. Hopefully there is a simpler solution based on the existing Rx operators.

static void Test()
    {
        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        //var zip = a.Zip(b, (x, y) => x + "-" + y);
        //zip.Subscribe(Console.WriteLine);

        var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
        joined.Subscribe(Console.WriteLine);
    }

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
    {
        return Observable.CreateWithDisposable<string>(o =>
            {
                Queue<int> a = new Queue<int>();
                Queue<int> b = new Queue<int>();
                object gate = new object();

                left.Subscribe(x =>
                    {
                        lock (gate)
                        {
                            if (a.Count == 0 || a.Peek() < x)
                                a.Enqueue(x);

                            while (a.Count != 0 && b.Count != 0)
                            {
                                if (a.Peek() == b.Peek())
                                {
                                    o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                                }
                                else if (a.Peek() < b.Peek())
                                {
                                    a.Dequeue();
                                }
                                else
                                {
                                    b.Dequeue();
                                }
                            }
                        }
                    });

                right.Subscribe(x =>
                {
                    lock (gate)
                    {
                        if (b.Count == 0 || b.Peek() < x)
                            b.Enqueue(x);

                        while (a.Count != 0 && b.Count != 0)
                        {
                            if (a.Peek() == b.Peek())
                            {
                                o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                            }
                            else if (a.Peek() < b.Peek())
                            {
                                a.Dequeue();
                            }
                            else
                            {
                                b.Dequeue();
                            }
                        }
                    }
                });

                return Disposable.Empty;
            });

解决方案

I honestly can't think of a solution based on existing operators that works for hot sources of unknown order (that is, xs before ys vs ys before xs). Your solution seems fine (hey, if it works), but I'd make a few changes if it were my code:

Support cancellation properly using MutableDisposable and CompositeDisposable Call OnError for exceptions thrown from the selector (making it more consistant with other operators) Consider supporting completion if it's possible for one source to complete before the other

The code below has been tested with your dual-Range input, the same inputs flipped, as well as with Empty<int> + Never<int>:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
    return Observable.CreateWithDisposable<string>(o =>
    {
        Queue<int> a = new Queue<int>();
        Queue<int> b = new Queue<int>();
        object gate = new object();

        bool leftComplete = false;
        bool rightComplete = false;

        MutableDisposable leftSubscription = new MutableDisposable();
        MutableDisposable rightSubscription = new MutableDisposable();

        Action tryDequeue = () =>
        {
            lock (gate)
            {
                while (a.Count != 0 && b.Count != 0)
                {
                    if (a.Peek() == b.Peek())
                    {
                        string value = null;

                        try
                        {
                            value = selector(a.Dequeue(), b.Dequeue());
                        }
                        catch (Exception ex)
                        {
                            o.OnError(ex);
                            return;
                        }

                        o.OnNext(value);
                    }
                    else if (a.Peek() < b.Peek())
                    {
                        a.Dequeue();
                    }
                    else
                    {
                        b.Dequeue();
                    }
                }
            }
        };

        leftSubscription.Disposable = left.Subscribe(x =>
        {
            lock (gate)
            {
                if (a.Count == 0 || a.Peek() < x)
                    a.Enqueue(x);

                tryDequeue();

                if (rightComplete && b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        }, () =>
        {
            leftComplete = true;

            if (a.Count == 0 || rightComplete)
            {
                o.OnCompleted();
            }
        });

        rightSubscription.Disposable = right.Subscribe(x =>
        {
            lock (gate)
            {
                if (b.Count == 0 || b.Peek() < x)
                    b.Enqueue(x);

                tryDequeue();

                if (rightComplete && b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        }, () =>
        {
            rightComplete = true;

            if (b.Count == 0 || leftComplete)
            {
                o.OnCompleted();
            }
        });

        return new CompositeDisposable(leftSubscription, rightSubscription);
    });
}