无管道 - 如何控制并行?管道

2023-09-03 01:36:26 作者:凉城

我建立一个直接的处理管道,其中一个项目被取作为输入,它是由多个处理器以顺序的方式操作,并最终被输出。下图描述了总体架构:

这是目前工作的方式:管道是从供应商尽可能快,因为它可以获取项目。一旦一个项目被取出,它被传递给处理器。一旦一个项目被处理时,输出通知。而个别项目被以顺序的方式进行处理,多个项目可以并行地处理(取决于它们的速度有多快从提供商获取的)。

的IObservable 创建,并从管道返回看起来是这样的:

 返回Observable.Create< T>(异步观察者=>
{
    而(_provider.HasNext)
    {
        牛逼项目=等待_provider.GetNextAsync();
        observer.OnNext(项目);
    }
})的SelectMany(项目=> Observable.FromAsync(()=>
    _processors.Aggregate(
        种子:Task.FromResult(项目),
        FUNC:(目前,处理器)=> current.ContinueWith(//将延续。
            previous => processor.ProcessAsync(previous.Result))
            .Unwrap()))); //我们需要从任务{任务【T}}​​解开任务【T】;
 

缺少的一部分:我需要控制一个控制机制的许多项目(最大值)如何能够在管道在任何给定时间的。

例如,如果最大并行处理是3 的,那么这将导致以下的工作流程:

在第1项是获取并传递给处理器。 物品2被取出并传递给处理器。 物品3被取出,并传递到处理器。 在第1项完成处理。 物品4被取出,并传递到处理器。 在第3项处理完成。 项目5被取出,并传递到处理器。 等等... 解决方案

合并提供了一个重载需要一个最大并发。

它的签名如下所示:的IObservable< T>合并< T>(此的IObservable<的IObservable< T>>源,INT maxConcurrency);

卫生间一根管道你都找不到 很多人想装却一直搞不懂

下面是它的样子与你的榜样(我重构一些其他的code,以及,你可以采取或离开):

 返回观测
//无while循环也需要照顾的onComplete为你
。而(()=> _provider.HasNext,
       Observable.FromAsync(_provider.GetNextAsync))
//使该预订之后将只执行退换货品
。选择(项目=> Observable.Defer(()=> {
  返回_processers.Aggregate(
    种子:Observable.Return(项目),
    FUNC:(目前,处理器)=> current.SelectMany(processor.ProcessAsync));
  }))
 //只允许3流并行执行。
.Merge(3);
 

要打破这是什么一样,

将检查每一次迭代,如果 _provider.HasNext 是真实的, 如果是的话那么它会重新订阅获取下一个值 _provider ,否则它发出 onCompleted 在里面选择一个新的可观察数据流被创建,但尚未评估使用推迟 返回的IObservable<的IObservable< T>> 传递给合并这所预订最多的3同时观测。 内观察到的最后评估时,它被订阅。

选择1

如果您还需要控制并行请求,你需要得到一点棘手的数量,因为你需要的信号,你的观测准备好新的价值:

 返回Observable.Create< T>(观察者=>
{
  变种受试者=新主题&其中;单元>();
  VAR一次性=新CompositeDisposable(学科);

  disposable.Add(视
    //这将完成值时,供应商已用完
    .TakeWhile(_ => _provider.HasNext)
    .SelectMany(
      _ => _provider.GetNextAsync(),
     (_,项目)=>
     {
       返回_processors
        。骨料(
         种子:Observable.Return(项目),
         FUNC:(目前,处理器)=> current.SelectMany(processor.ProcessAsync))
        //也可以使用`Finally`这里,这个信号链
        //开始的下一个项目。
        。做(dontCare项=> {},()=> subject.OnNext(Unit.Default));
     }
    )
    .Merge(3)
    .Subscribe(观察员));

  //排队3请求初始开球
  disposable.Add(Observable.Repeat(Unit.Default,3).Subscribe(subject.OnNext));

  返回一次性;
});
 

I'm building a straightforward processing pipeline where an item is fetched as an input, it is being operated by multiple processors in a sequential manner and finally it is output. Image below describes the overall architecture:

The way it is currently working: Pipeline is fetching items from the provider as quickly as it can. As soon as an item is fetched, it is passed to the processors. Once an item is processed, the output is notified. While an individual item is processed in a sequential manner, multiple items may be processed in parallel (depending on how fast they are fetched from the provider).

The IObservable created and returned from the pipeline looks like this:

return Observable.Create<T>(async observer =>
{
    while (_provider.HasNext)
    {
        T item = await _provider.GetNextAsync();
        observer.OnNext(item);
    }                
}).SelectMany(item => Observable.FromAsync(() =>
    _processors.Aggregate(
        seed: Task.FromResult(item),
        func: (current, processor) => current.ContinueWith( // Append continuations.
            previous => processor.ProcessAsync(previous.Result))
            .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};

The missing part: I need a control mechanism which controls how many items (max) can be in the pipeline at any given time.

For example, if max parallel processings is 3, then that would result in the following workflow:

Item 1 is fetched and passed to the processors. Item 2 is fetched and passed to the processors. Item 3 is fetched and passed to the processors. Item 1 completed processing. Item 4 is fetched and passed to the processors. Item 3 completed processing. Item 5 is fetched and passed to the processors. Etc...

解决方案

Merge provides an overload which takes a max concurrency.

Its signature looks like: IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);

Here is what it would look like with your example (I refactored some of the other code as well, which you can take or leave):

return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext, 
       Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
  return _processers.Aggregate(
    seed: Observable.Return(item),
    func: (current, processor) => current.SelectMany(processor.ProcessAsync)); 
  }))
 //Only allow 3 streams to be execute in parallel.
.Merge(3);

To break down what this does,

While will check each iteration, if _provider.HasNext is true, if so then it will resubscribe to get the next value for _provider, otherwise it emits onCompleted Inside of select a new observable stream is created, but not yet evaluated by using Defer The returned IObservable<IObservable<T>> is passed to Merge which subscribes to a max of 3 observables simultaneously. The inner observable finally evaluates when it is subscribed to.

Alternative 1

If you also need to control the number of parallel requests you need to get a little trickier, since you will need to signal that your Observable is ready for new values:

return Observable.Create<T>(observer => 
{
  var subject = new Subject<Unit>();
  var disposable = new CompositeDisposable(subject);

  disposable.Add(subject
    //This will complete when provider has run out of values
    .TakeWhile(_ => _provider.HasNext)
    .SelectMany(
      _ => _provider.GetNextAsync(),
     (_, item) => 
     {
       return _processors
        .Aggregate(
         seed: Observable.Return(item),
         func: (current, processor) => current.SelectMany(processor.ProcessAsync))
        //Could also use `Finally` here, this signals the chain
        //to start on the next item.
        .Do(dontCare => {}, () => subject.OnNext(Unit.Default));
     }
    )
    .Merge(3)
    .Subscribe(observer));

  //Queue up 3 requests for the initial kickoff
  disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));

  return disposable;
});