Java 8 Streams 中副作用的危险是什么?副作用、危险、Java、Streams

2023-09-07 01:20:27 作者:空虛de心

我正在尝试理解我在 Streams 文档中发现的警告.我已经养成了使用 forEach() 作为通用迭代器的习惯.这导致我编写这种类型的代码:

I'm trying to understand warnings I found in the Documentation on Streams. I've gotten in the habit of using forEach() as a general purpose iterator. And that's lead me to writing this type of code:

public class FooCache {
    private static Map<Integer, Integer> sortOrderCache = new ConcurrentHashMap<>();
    private static Map<Integer, String> codeNameCache = new ConcurrentHashMap<>();

    public static void populateCache() {
        List<Foo> myThings = getThings();

        myThings.forEach(thing -> {
            sortOrderCache.put(thing.getId(), thing.getSortOrder());
            codeNameCache.put(thing.getId(), thing.getCodeName())
        });
    }
}

这是一个简单的例子.我了解此代码违反了 Oracle 对有状态 lamdas 和副作用的警告.但我不明白为什么会出现这个警告.

This is a trivialized example. I understand that this code violates Oracle's warning against stateful lamdas and side-effects. But I don't understand why this warning exists.

运行此代码时,它的行为似乎符合预期.那么我该如何打破它来证明为什么这是一个坏主意呢?

When running this code it appears to behave as expected. So how do I break this to demonstrate why it's a bad idea?

总的来说,我读到了这个:

In sort, I read this:

如果并行执行,ArrayList 的非线程安全性将导致不正确的结果,并且添加所需的同步会导致争用,破坏了并行性的好处.

If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

但是任何人都可以澄清一下以帮助我理解警告吗?

But can anyone add clarity to help me understand the warning?

推荐答案

来自 Javadoc:

From the Javadoc:

还要注意,尝试从行为访问可变状态参数为您提供了一个关于安全性和表现;如果您不同步对该状态的访问,则您有数据竞争,因此您的代码已损坏,但如果您这样做同步对该状态的访问,您可能会遇到竞争破坏您正在寻求从中受益的并行性.最好的方法是避免有状态的行为参数完全流式操作;通常有一种方法可以重组流管道以避免状态.

Note also that attempting to access mutable state from behavioral parameters presents you with a bad choice with respect to safety and performance; if you do not synchronize access to that state, you have a data race and therefore your code is broken, but if you do synchronize access to that state, you risk having contention undermine the parallelism you are seeking to benefit from. The best approach is to avoid stateful behavioral parameters to stream operations entirely; there is usually a way to restructure the stream pipeline to avoid statefulness.

这里的问题是,如果你访问一个可变状态,你会失去两个方面:

The problem here is that if you access a mutable state, you loose on two side:

安全,因为您需要 Stream 试图最小化的同步性能,因为所需的同步成本是您的(在您的示例中,如果您使用 ConcurrentHashMap,这是有成本的). Safety, because you need synchronization which the Stream tries to minimize Performance, because the required synchronization cost you (in your example, if you use a ConcurrentHashMap, this has a cost).

现在,在您的示例中,这里有几点:

Now, in your example, there are several points here:

如果你想使用Stream和多线程流,你需要像myThings.parralelStream()那样使用parralelStream();就目前而言,java.util.Collection 提供的 forEach 方法很简单 for each.您将 HashMap 用作 static 成员并对其进行变异.HashMap 不是线程安全的;您需要使用 ConcurrentHashMap. If you want to use Stream and multi threading stream, you need to use parralelStream() as in myThings.parralelStream(); as it stands, the forEach method provided by java.util.Collection is simple for each. You use HashMap as a static member and you mutate it. HashMap is not threadsafe; you need to use a ConcurrentHashMap.

在 lambda 和 Stream 的情况下,您不得改变流的来源:

In the lambda, and in the case of a Stream, you must not mutate the source of your stream:

myThings.stream().forEach(thing -> myThings.remove(thing));

这可能行得通(但我怀疑它会抛出 ConcurrentModificationException),但这可能行不通:

This may work (but I suspect it will throw a ConcurrentModificationException) but this will likely not work:

myThings.parallelStream().forEach(thing -> myThings.remove(thing));

那是因为 ArrayList 不是线程安全的.

That's because the ArrayList is not thread safe.

如果您使用同步视图(Collections.synchronizedList),那么您将获得性能,因为您在每次访问时都进行了同步.

If you use a synchronized view (Collections.synchronizedList), then you would have a performance it because you synchronize on each access.

在您的示例中,您宁愿使用:

In your example, you would rather use:

sortOrderCache = myThings.stream()
                         .collect(Collectors.groupingBy(
                           Thing::getId, Thing::getSortOrder);
codeNameCache= myThings.stream()
                       .collect(Collectors.groupingBy(
                         Thing::getId, Thing::getCodeName);

finisher(这里是 groupingBy)完成你正在做的工作,并且可能会被顺序调用(我的意思是,Stream 可能会被拆分到多个线程中,finisher 可能会被调用多次(在不同的线程中),然后它可能需要合并.

The finisher (here the groupingBy) does the work you were doing and might be called sequentially (I mean, the Stream may be split across several thread, the the finisher may be invoked several times (in different thread) and then it might need to merge.

顺便说一句,您最终可能会删除 codeNameCache/sortOrderCache 并简单地存储 id->Thing 映射.

By the way, you might eventually drop the codeNameCache/sortOrderCache and simply store the id->Thing mapping.