RxJava构建流基本原理示例解析
itbird01 人气:0正文
本节,我们从Rxjava使用代码入手,去结合自己已有的知识体系,加查阅部分源码验证的方式,来一起探索一下Rxjava实现的基本原理。
为了本文原理分析环节,可以被更多的人理解、学习,所以小编从初学者的角度,从使用入手,一点点的分析了其中的源码细节、思想,建议大家随着本文的章节步骤,一步一步的来阅读,才能更快、更好的理解Rxjava的真正的思想精髓,也为我们之后的实践课程留一个好的底子。
1.构建流
大家一定郁闷,怎么突然跑出来构建流的词汇,小编你之前也没讲到过呀?大家先别急,我们一步一步来。 首先从一个最简单的,RxJava的样例使用代码,我们去分解,打上步骤
private void test() { //第一步:just调用 Observable.just("https://img-blog.csdn.net/20160903083319668") //第二步:map调用 .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { //Bitmap bitmap = downloadImage(s); return null; } }) //第三步:subscribeOn、observeOn调用 .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) //第四步:subscribe调用 .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe() { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Bitmap s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError ", e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
从上面的样例代码分析、分解,我们明面上看到四个步骤,暂且列下来:
- 第一步:just调用
- 第二步:map调用
- 第三步:subscribeOn、observeOn调用
- 第四步:subscribe调用
这里运行一下,我们看到日志打印
大家看一下这里几个关键的函数对象
Observable 被观察者,subscribe订阅的意思,Observer观察者的意思,just是仅仅输入url,subscribeOn和observeOn是线程切换功能,map是中间的一个操作符。接下来,我们就这些看到的操作符、关键字,进行解读。
1.1 just的解读
从整体链式调用的代码形式上,我们大概可以猜测到,just、subscribeOn、observeOn、map等中间操作符,必定都是基于同一个对象的变形的builder模式,也就是说,他们应该都是同一个类内部的方法。
我们看一下的Observable的just源码
@CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }
可以看到这里,仅仅是将传入的T 也就是我们上面样例中的urlString,包装为了ObservableJust对象。而just的调用,返回的依然是一个Observable被观察者对象。 我们看一下ObservableJust类代码
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); observer.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } }
从上面看到,ObservableJust仅仅是将传入的T封装了一层而已,它继承与Observable抽象类,而Observable抽象类实现了ObservableSource接口
public abstract class Observable<T> implements ObservableSource<T> {
而ObservableSource接口,就是我们外界调用的subscribe订阅方法的源头
public interface ObservableSource<T> { /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); }
好了,just讲到这里,我们继续map,到目前为止,ObservableJust的subscribe方法仅仅是一个方法而已,并未调用到,所以我们暂时不予理睬,后面调用到,我们再去分析。
1.2 map的解读
private void test() { //第一步:just调用 Observable.just("https://img-blog.csdn.net/20160903083319668") //第二步:map调用 .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { //Bitmap bitmap = downloadImage(s); return null; } }) //第三步:subscribeOn、observeOn调用 .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) //第四步:subscribe调用 .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe() { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Bitmap s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError ", e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
从这个使用代码来看,map是just与subscribe之前的一个操作符,起到承上启下的作用。
我们看到首先是just返回的对象,可以直接调用map方法,我们之前分析、实现过just,知道just实际上返回的就是一个Observable,所以map方法就是Observable里面的一个方法。
再往下看,map返回后的对象,直接调用了subscribe方法,我们之前设计实现响应式功能代码的时候知道,subscribe是Observable里面的一个方法,那么这里也就知道了,map方法肯定也是返回一个Observable对象。
从上面分析,我们看到一个流向图
大家发现,这个流。只做了一件事情,每个操作符,都是对上一层的Observable进行了一层包装代理而已。这时我们提出一个概念,这个流,我们命名为构建流。
同理,subscribeOn、observeOn是否也是这样的呢?我们通过源码验证一下。
1.3 subscribeOn、observeOn
有了上面map的经验,我们猜测,subscribeOn、observeOn顺序调用的时候,应该也仅仅是对上一层的Observable进行了一层包装代理而已。我们看一下subscribeOn、observeOn的源码
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) @NonNull public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); //果然这里包装为了ObservableSubscribeOn对象 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler)); } public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) { Objects.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize)); }
我们继续看一下ObservableSubscribeOn、ObservableObserveOn类的源码
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } //...省略若干 } public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } }
而AbstractObservableWithUpstream本身就是继承与Observable
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {}
1.4 小结
从上面可以看到,也就是到目前为止,just、map、subscribeOn、observeOn一系列调用下来,依然都是执行一个操作而已,即每个操作符,都是对上一层的Observable进行了一层包装代理而已。
这就是构建流,构建流的作用就是,从开始操作符到订阅为止,从左往右执行,期间每个操作符,都是对上一层的Observable进行了一层包装代理而已。如图:
加载全部内容