注册一个流“完成"钩quot

2023-09-07 01:30:04 作者:李是我易生中最美的峰景i

使用 Java 8 Stream API,我想注册一个完成挂钩",如下所示:

Using the Java 8 Stream API, I would like to register a "completion hook", along the lines of:

Stream<String> stream = Stream.of("a", "b", "c");

// additional filters / mappings that I don't control
stream.onComplete((Completion c) -> {
    // This is what I'd like to do:
    closeResources();

    // This might also be useful:
    Optional<Throwable> exception = c.exception();
    exception.ifPresent(e -> throw new ExceptionWrapper(e));
});

我之所以要这样做是因为我想将资源包装在 Stream 中以供 API 客户端使用,并且我希望该 Stream 进行清理资源一旦被消耗就会自动.如果可能,那么客户端可以调用:

The reason why I want to do that is because I want to wrap a resource in a Stream for API clients to consume, and I want that Stream to clean up the resource automatically once it is consumed. If that were possible, then the client could call:

Collected collectedInOneGo =
Utility.something()
       .niceLookingSQLDSL()
       .moreDSLFeatures()
       .stream()
       .filter(a -> true)
       .map(c -> c)
       .collect(collector);

而不是当前需要的:

try (Stream<X> meh = Utility.something()
                            .niceLookingSQLDSL()
                            .moreDSLFeatures()
                            .stream()) {

    Collected collectedWithUglySyntacticDissonance =
    meh.filter(a -> true)
       .map(c -> c)
       .collect(collector);
}

理想情况下,我想进入 java.util.stream.ReferencePipeline 的各种方法,例如:

Ideally, I'd like to get into the java.util.stream.ReferencePipeline's various methods, such as:

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    try {

        // Existing loop
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

    // These would be nice:
    catch (Throwable t) {
        completion.onFailure(t);
    }
    finally {
        completion.onSuccess();
    }
}

有没有一种简单的方法可以使用现有的 JDK 8 API 做到这一点?

Is there an easy way to do this with existing JDK 8 API?

推荐答案

除了基于 flatMap 的解决方案(由@Holger 提出)之外的任何拦截终端操作的解决方案都将易受以下代码的影响:

Any solution intercepting the terminal operations except flatMap-based solution (as proposed by @Holger) would be fragile to the following code:

Stream<String> stream = getAutoCloseableStream();
if(stream.iterator().hasNext()) {
    // do something if stream is non-empty
}

按照规范,这种用法是绝对合法的.不要忘记 iterator()spliterator() 是终端流操作,但在它们执行之后,您仍然需要访问流源.此外,在任何状态下放弃 IteratorSpliterator 都是完全有效的,所以你只是不知道它是否会被进一步使用.

Such usage is absolutely legal by the specification. Do not forget that iterator() and spliterator() are terminal stream operations, but after their execution you still need an access to the stream source. Also it's perfectly valid to abandon the Iterator or Spliterator in any state, so you just cannot know whether it will be used further or not.

您可以考虑建议用户不要使用 iterator()spliterator(),但是这段代码呢?

You may consider advicing users not to use iterator() and spliterator(), but what about this code?

Stream<String> stream = getAutoCloseableStream();
Stream.concat(stream, Stream.of("xyz")).findFirst();

这在内部将 spliterator().tryAdvance() 用于第一个流,然后放弃它(尽管如果显式调用生成的流 close() 则会关闭).您还需要要求您的用户不要使用 Stream.concat.据我所知,在您的库内部,您经常使用 iterator()/spliterator(),因此您需要重新访问所有这些地方以查找可能的问题.而且,当然还有很多其他库也使用 iterator()/spliterator() 并且在那之后可能会短路:所有这些库都将与您的功能.

This internally uses spliterator().tryAdvance() for the first stream, then abandons it (though closes if the resulting stream close() is called explicitly). You will need to ask your users not to use Stream.concat as well. And as far as I know internally in your library you are using iterator()/spliterator() pretty often, so you will need to revisit all these places for possible problems. And, of course there are plenty of other libraries which also use iterator()/spliterator() and may short-circuit after that: all of them would become incompatible with your feature.

为什么基于 flatMap 的解决方案在这里有效?因为在第一次调用 hasNext()tryAdvance() 时,它会将 整个 流内容转储到中间缓冲区并关闭原始流来源.因此,根据流的大小,您可能会浪费很多中间内存,甚至会出现 OutOfMemoryError.

Why flatMap-based solution works here? Because upon the first call of the hasNext() or tryAdvance() it dumps the entire stream content into the intermediate buffer and closes the original stream source. So depending on the stream size you may waste much intermediate memory or even have OutOfMemoryError.

您还可以考虑将 PhantomReference 保存到 Stream 对象并监控 ReferenceQueue.在这种情况下,完成将由垃圾收集器触发(这也有一些缺点).

You may also consider keeping the PhantomReferences to the Stream objects and monitoring the ReferenceQueue. In this case the completion will be triggered by garbage collector (which also has some drawbacks).

总之,我的建议是继续使用 try-with-resources.

In conclusion my advice is to stay with try-with-resources.