RxJava 触发流基本原理源码解析
itbird01 人气:0正文
本节,我们从Rxjava使用代码入手,去结合自己已有的知识体系,加查阅部分源码验证的方式,来一起探索一下Rxjava实现的基本原理。
为了本文原理分析环节,可以被更多的人理解、学习,所以小编从初学者的角度,从使用入手,一点点的分析了其中的源码细节、思想,建议大家随着本文的章节步骤,一步一步的来阅读,才能更快、更好的理解Rxjava的真正的思想精髓,也为我们之后的实践课程留一个好的底子。
触发流
到目前为止,我们讲了构建流、订阅流,但是依然没有触发真正的observer中的事件,例如:
@Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError"); } @Override public void onComplete() { Log.d(TAG, "onComplete"); }
各位看官,莫急莫急,且听老衲娓娓道来。
还记得上面的订阅流吗?订阅流从右往左执行的,执行到最后的observable,执行了它的subscribe方法。我们从使用代码知道,最左端的observable是啥来着,大家还记得吗?当然是ObservableJust
private void test() { //第一步:just调用 Observable.just("https://img-blog.csdn.net/20160903083319668") //第二步:map调用 .map(new Function<String, Bitmap>() { @Override public Bitmap apply(String s) throws Exception { //Bitmap bitmap = downloadImage(s); return null; } }) //第三步:subscribeOn、observeOn调用 .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) //第四步:subscribe调用 .subscribe(new Observer<Bitmap>() { @Override public void onSubscribe() { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Bitmap s) { Log.d(TAG, "onNext s = " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError ", e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
我们就顺坡下驴,看一下ObservableJust的subscribe方法做啥了
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value); observer.onSubscribe(sd); sd.run(); } @Override public T get() { return value; } }
仔细一看,这里面没有subscribe方法,那么肯定就是调用父类observable的subscribe方法了
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { //对象封装,暂时不是重点,我们跳过 observer = RxJavaPlugins.onSubscribe(this, observer); //判空 ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
大家看到这里,其实关键在于,最终调用了一个subscribeActual方法,所以我们继续看子类ObservableJust的subscribeActual方法干啥了?
@Override protected void subscribeActual(Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value); observer.onSubscribe(sd); sd.run(); }
接续根据ScalarDisposable的run方法
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable { private static final long serialVersionUID = 3880992722410194083L; final Observer<? super T> observer; final T value; //...省略很多代码 @Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { //可以看到这里执行了onNext、onComplete方法 observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } }
小结
看到这里,我们知道了,开始一层一层的从左往右去调用observer的相关方法了。 由订阅流可知,每层的observable实际上拥有下一层的observer的代理类,所以自然而然,从最左边开始调用observer的相关方法开始,触发流,就是从左往右,一层一层的剥开之前包裹的observer,然后顺序调用里面的onNext、onComplete等方法。 不信,我们挑一个ObservableMap来验证一下。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } //此处调用了下游的observer的onNext方法 downstream.onNext(v); } } }
可以看到里面,的确调用了下游的observer的onNext方法。
总结
整个过程,分为构建流、订阅流、触发流。
构建流:
从左到右执行不同的操作符的过程,其实很简单,就是根据不同的操作符,对原始的 observable进行逐层包装,这里可以看出,每层的节点 N* 就持有了上一层的observable。
订阅流:
从右到左的 subscribe 调用过程,这个过程中,每个observable内部的subscribeActual执行两个关键操作,一个是对自己已有的observer进行一层重新包装,另外一个就是使用前面节点的observable,订阅包装好的observer。
触发流:
在订阅流执行完成之后,执行到最左端的observable,我们发现它内部的subscribeActual实现,实际上就是调用里面拥有的observer的相关回调方法(onNext、onComplete、onError等),那么这层回调流就简单了,就是一层一层的调用里面的observer,最终执行到最右端的observer。
篇幅所限,大家也发现了,我们本节课,我们详细讲解Rxjava线程切换的实现原理,这个有两个原因,一是篇幅所限,本节内容已经够多了,大家先吃透框架,另外一方面是,线程切换我相信我们后面实践环节,待框架自我搭建实现之后,里面的线程切换功能就是水到渠成的事情,相信凭借大家已有的知识,都可以做到的。
所以建议大家,先别看这块Rxjava是如何实现线程切换的,而是想一下,它是怎么实现的?到时我们自己的Rxjava框架搭建起来之后,填充实现一下。
提个醒儿,大家还记得我们之前EventBus源码分析、实践环节吗?其中也说到了线程切换。其实原理差不多,大家先想一下。
加载全部内容