讨论/《Java 实战(第 2 版)》 - 17.2.2 创建你的第一个反应式应用/
《Java 实战(第 2 版)》 - 17.2.2 创建你的第一个反应式应用
共 1 个回复

答案:这段代码的问题在于每次TempSubscriber接受一个新的元素都会调用它的onNext方法,onNext方法又会向TempSubscription发送一个新请求,接着request方法又会向TempSubscriber发送另一个元素。这种递归的调用一个接着一个被压入栈,最终导致栈溢出,造成像下面这样的StackOverflowError错误:

Exception in thread "main" java.lang.StackOverflowError
at java.base/java.io.PrintStream.print(PrintStream.java:666)
at java.base/java.io.PrintStream.println(PrintStream.java:820)
at flow.TempSubscriber.onNext(TempSubscriber.java:36)
at flow.TempSubscriber.onNext(TempSubscriber.java:24)
at flow.TempSubscription.request(TempSubscription.java:60)
at flow.TempSubscriber.onNext(TempSubscriber.java:37)
at flow.TempSubscriber.onNext(TempSubscriber.java:24)
at flow.TempSubscription.request(TempSubscription.java:60)
...

怎样才能修复这个问题,避免发生栈溢出呢?一种可行的解决方案是在TempSubscription中添加Executor,使用它通过另外一个线程向TempSubscriber发送新的元素。为了达到这个目标,你可以像下面的代码清单那样修改TempSubscription。(注意,这个类的实现是不完整的,完整的定义需要结合代码清单17-6剩余的部分。)

代码清单 17-9 为TempSubscription添加Executor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TempSubscription implements Subscription {       ←---- 为了节省页面,刻意省略了原TempSubscription类中未改动的代码

    private static final ExecutorService executor =
                                       Executors.newSingleThreadExecutor();

    @Override
    public void request( long n ) {
        executor.submit( () -> {       ←---- 另起一个线程向subscriber发送下一个元素
            for (long i = 0L; i < n; i++) {
                try {
                    subscriber.onNext( TempInfo.fetch( town ) );
                } catch (Exception e) {
                    subscriber.onError( e );
                    break;
               }
            }
        });
    }
}

Flow API定义了四个接口,目前为止,你仅使用了其中的三个。那么,什么时候使用Processor接口呢?为了解释这个问题,我们举一个例子,通过它你大概就能理解什么时候采用Processor接口了。譬如你需要创建一个Publisher,用来汇报温度数据,不过你收到了一个额外的要求,这些收集的数据要以摄氏温度而不是华氏温度的方式表示(假设你要收集的城市并不在美国)。这时使用Processor接口就非常适合了。