Rxjava笔记

Author Avatar
w4ctech 3月18日
  • 在其它设备中阅读本文章

最后更新于2019年03月21日; 如遇到问题,请留言及时通知站长

使用步骤

build.gradle 引入依赖
compile 'io.reactivex:rxjava:1.0.14' 
compile 'io.reactivex:rxandroid:1.0.1' 

创建数据源(被观察者 / 可观察者),泛型必须是 Object 的子类

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    
    //事件源,可以指定
    @Override
    public void call(Subscriber<? super String> subscriber) {
        //执行多次
        subscriber.onNext("第一次执行");
        subscriber.onNext("第二次执行");
        //标记事件结束
        subscriber.onCompleted();
        //标记事件发送错误
        //subscriber.onError();
    }
});

//from(T[]),返回的对象一般都是数值类型
Integer[] items = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Observable observable = Observable.from(items);

//指定某一时刻进行数据发送
Observable observable = Observable.interval(1, 1, TimeUnit.SECONDS);//每隔一秒发送数据

//just(T...),处理任意类型的数组集合或数值,参数上限10个,参数类型必须一致
Integer[] items1 = {1, 2, 3, 4, 5, 6};
Integer[] items2 = {3, 5, 6, 8, 3, 8};
Observable observable = Observable.just(items1, items2);

//使用范围数据,指定输出数据的范围(1-40的数值)
Observable observable = Observable.range(1, 40);

创建事件的接收者(观察者 | 订阅者),onNext 方法中的数据类型必须被观察者指定的泛型

onNext(T item):Observable 调用这个方法发射数据,方法的参数就是 Observable 发射的数据,这个方法可能会被调用多次,取决于业务逻辑

onCompleted():正常终止,在没有遇到错误的情况下,Observable 在最后一次调用 onNext 之后调用此方法

onError(Throwable e):当 Observable 遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止 Observable,后续不会再调用 onNext 和 onCompleted,onError 方法的参数是抛出异常

Observer<String> observer = new Observer<String>() {

    @Override
    public void onNext(String s) {
        System.out.println("onNext" + s);
    }
    
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
    
    @Override
    public void onError(Throwable e) {
        System.out.println("onError" + e.getMessage());
    }
};

//订阅者
Subscriber<String> subscriber = new Subscriber<String>() {

    @Override
    public void onStart(String s) {
        System.out.println("事件开始了");
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext" + s);
    }
    
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
    
    @Override
    public void onError(Throwable e) {
        System.out.println("onError" + e.getMessage());
    }
};


//对订阅者进行简化,更简单
Action1<String> action1 = new Action1<String>() {

    @Override
    public void call(String s) {
        System.out.println("call" + s);
    }
};

订阅事件,被观察者必须指定了事件的接收者(观察者 | 订阅者),整个事件流程才可以被启动

observable.subscribe(observer);

observable.subscribe(subscriber);

//选择性参数方法,可对onNext,onCompleted,onError选择性使用,一般只需要onNext方法就足够
observable.subscribe(action1);

//自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
//自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
//自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1 和 Action0。 
Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;
由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。
这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。
Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;
与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。
事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法
订阅者是观察者的子类,区别在于订阅者可以取消订阅(在程序销毁后)
if(subscriber != null && !subscriber.isUnsubscribed()) {
    subscriber.unsubscribe();
}

本文链接:https://www.w4ctech.cn/Rxjava/note1.html
This blog is under a CC BY-NC-SA 3.0 Unported License