管理与RxJava相同的请求RxJava

2023-09-04 23:00:33 作者:承诺つ敷衍了谁的青春

假设我有一个读取器的读取图像从一个单独的线程给定链路。该图像将被缓存在内存中。一旦图像已经被缓存的读取器不会重新获取链接。取出器被认为是一个可观察到的。可能有许多订户询问读取器为图像。经过第一次的用户订阅取出器,取出器将拍摄网络。但是,如果有第2个用户来订阅,然后,取出器应该不会拍另一个请求,同时它已经被前取之一。在此之后,如果取结束,两个订户将得到的图像。现在,如果有自带一个3户,取出器会发出图像的时候了。

Assume that I have a fetcher that fetches an image from a given link on a separate thread. The image will then be cached in memory. Once the image already gets cached, the fetcher won't re-fetch the link. The fetcher is considered as an Observable. There may be many subscribers that ask the fetcher for the image. After the first-ever subscriber subscribe the fetcher, the fetcher will shoot network. However, if there's a 2nd subscriber comes to subscribe then, the fetcher shouldn't shoot yet another request while it's already been fetching one before. After that, if the fetch finishes, both subscribers will get the image. Right now, if there's a 3rd subscriber that comes, the fetcher will emit the image right away.

我如何能实现上面RxJava办法的情况?我想到的是利用某种现有经营者,撰写他们的方式更为声明,最重要的是,避免了同步,锁的开销,和原子的东西。

How can I implement the scenario above with RxJava approach? What I expect is to utilise some sort of existing operators, compose them in ways much more declarative, and most importantly, to avoid the overhead of synchronised, lock, and atomic stuff.

推荐答案

这可以通过ConcurrentMap和AsyncSubject完成的:

This can be accomplished via ConcurrentMap and AsyncSubject:

import java.awt.image.BufferedImage;
import java.io.*;
import java.net.URL;
import java.util.concurrent.*;

import javax.imageio.ImageIO;

import rx.*;
import rx.Scheduler.Worker;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;


public class ObservableImageCache {
    final ConcurrentMap<String, AsyncSubject<BufferedImage>> image = 
        new ConcurrentHashMap<>();
    public Observable<BufferedImage> get(String url) {
        AsyncSubject<BufferedImage> result = image.get(url);
        if (result == null) {
            result = AsyncSubject.create();
            AsyncSubject<BufferedImage> existing = image.putIfAbsent(url, result);
            if (existing == null) {
                System.out.println("Debug: Downloading " + url);
                AsyncSubject<BufferedImage> a = result;
                Worker w = Schedulers.io().createWorker();
                w.schedule(() -> {
                    try {
                        Thread.sleep(500); // for demo
                        URL u = new URL(url);

                        try (InputStream openStream = u.openStream()) {
                            a.onNext(ImageIO.read(openStream));
                        }
                        a.onCompleted();
                    } catch (IOException | InterruptedException ex) {
                        a.onError(ex);
                    } finally {
                        w.unsubscribe();
                    }
                });
            } else {
                result = existing;
            }
        }
        return result;
    }
    public static void main(String[] args) throws Exception {
        ObservableImageCache cache = new ObservableImageCache();
        CountDownLatch cdl = new CountDownLatch(4);

        Observable<BufferedImage> img1 = cache.get("https://m.xsw88.com/allimgs/daicuo/20230904/132.png");
        System.out.println("Subscribing for IMG1");
        img1.subscribe(e -> System.out.println("IMG1: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
        Thread.sleep(500);
        Observable<BufferedImage> img2 = cache.get("https://m.xsw88.com/allimgs/daicuo/20230904/132.png");
        System.out.println("Subscribing for IMG2");
        img2.subscribe(e -> System.out.println("IMG2: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);

        Observable<BufferedImage> img3 = cache.get("https://m.xsw88.com/allimgs/daicuo/20230904/133.png");
        Observable<BufferedImage> img4 = cache.get("https://m.xsw88.com/allimgs/daicuo/20230904/133.png");

        Thread.sleep(500);

        System.out.println("Subscribing for IMG3");
        img3.subscribe(e -> System.out.println("IMG3: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
        Thread.sleep(1000);
        System.out.println("-> Should be immediate: ");
        System.out.println("Subscribing for IMG4");
        img4.subscribe(e -> System.out.println("IMG4: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);

        cdl.await();
    }
}

我使用的是ConcurrentMap的的putIfAbsent,以确保只有一个下载时触发​​一个新的URL;每个人都将获​​得相同的AsyncSubject上,他们可以等待,并获得一次数据可用,后立即。通常情况下,你会希望通过使用自定义的调度器来限制并发下载的数量。

I'm using the ConcurrentMap's putIfAbsent to make sure only one download is triggered for a new url; everyone else will receive the same AsyncSubject on which they can 'wait' and get the data once available and immediately after that. Usually, you'd want to limit the number of concurrent downloads by using a custom Scheduler.