我试图拼凑出一个接收管道的工作原理如下:
I'm attempting to piece together a Rx pipeline that works as follows:
在我写了一个函数,它在的IObservable为我提供一个包含关于公司的信息档案 我查询各种数据源的发现可能涉及公司简介,所有的并行。我合并入公司简介单一的IObservable。 在我回来后这些潜在相关的配置文件,我把它们比作是我已经观察到的轮廓,如果他们有针对性> 80%,是不一样的,因为我已经观察到的任何资料,我认为他们匹配。 我要养活匹配的公司回到步骤1,这样我就可以搜索相关的数据,这些新的配套文件。我引导了一些已知的良好配置文件的过程。
I bootstrap the process with some known good profiles.
最终,没有那些尚未被看见更多匹配的轮廓,所以处理结束。
Eventually, there's no more matching profiles that haven't already been seen, and so the process ends.
我有麻烦的编程这一点。如果我用一个主题,让尾部的管道发送其配置文件的工作流程的开始,那么没有人会打电话OnCompleted,我从来没有发现,这个过程已经结束。如果我开发这个递归相反,我似乎永远结束了一个堆栈溢出,因为我想调用一个函数与它自己的返回值。
I'm having trouble programming this. If I use a Subject to allow the tail end of the pipeline to send its profiles to the beginning of the workflow, then no one is going to call OnCompleted and I never find out that the process has ended. If I develop this with recursion instead, I seem to always end up with a stack overflow since I'm trying to call a function with its own return value.
谁能帮我我怎么能在某种程度上,我可以确定的过程已经结束完成这项任务?
Can anyone help me with how I can accomplish this task in a way where I can determine that the process has ended?
这听起来像你想这样的数据流:
It sounds like you want a dataflow like this:
seed profiles --> source --> get related --> output
^ |
| v
-<--- transform <-----
这似乎是一个情况下,解决普遍的问题是不是具体的一个容易或更容易,所以我提出一个通用的反馈功能,应该给你你需要的构建块:
This seems like a case where solving the general problem is as easy or easier than the specific one, so I'll propose a generic "feedback" function that should give you the building blocks you need:
编辑:固定功能来完成的
IObservable<TResult> Feedback<T, TResult>(this IObservable<T> seed,
Func<T, IObservable<TResult>> produce,
Func<TResult, IObservable<T>> feed)
{
return Observable.Create<TResult>(
obs =>
{
var ret = new CompositeDisposable();
Action<IDisposable> partComplete =
d =>
{
ret.Remove(d);
if (ret.Count == 0) obs.OnCompleted();
};
Action<IObservable<T>, Action<T>> ssub =
(o, n) =>
{
var disp = new SingleAssignmentDisposable();
ret.Add(disp);
disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
};
Action<IObservable<TResult>, Action<TResult>> rsub =
(o, n) =>
{
var disp = new SingleAssignmentDisposable();
ret.Add(disp);
disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
};
Action<T> recurse = null;
recurse = s =>
{
rsub(produce(s),
r =>
{
obs.OnNext(r);
ssub(feed(r), recurse);
});
};
ssub(seed, recurse);
return ret;
});
}
在你的情况, T
和 TResult
看起来是一样的,所以饲料
将是身份的功能。 制作
将用于实施步骤2和3的功能。
In your case, T
and TResult
appear to be the same, so feed
will be the identity function. produce
will be the functions used to implement step 2 and 3.
有些样品code我测试了此功能:
Some sample code I tested this function with:
void Main()
{
var seed = new int[] { 1, 2, 3, 4, 5, 6 };
var found = new HashSet<int>();
var mults = seed.ToObservable()
.Feedback(i =>
{
return Observable.Range(0, 5)
.Select(r => r * i)
.TakeWhile(v => v < 100)
.Where(v => found.Add(v));
},
i => Observable.Return(i));
using (var disp = mults.Dump())
{
Console.WriteLine("Press any key to stop");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
static IDisposable Dump<T>(this IObservable<T> source)
{
return source.Subscribe(item => Console.WriteLine(item),
ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToString()),
() => Console.WriteLine("Dump completed"));
}