RxJava取消订阅的各种方式的实现

这篇文章主要介绍了RxJava取消订阅的各种方式的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

手动取消订阅

Consumer类型

Observable创建返回Disposable取消


public class SecondActivity extends AppCompatActivity {

  private static final String TAG = "SecondActivity";
  private Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_second);
    disposable = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.d(TAG, "accept: "+s);
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //取消订阅
    if(disposable != null && !disposable.isDisposed()){
      disposable.dispose();
      Log.d(TAG, "onDestroy: dispose");
    }
  }
}

普通类型Observer

在Observer中获取Disposable然后取消


public class ThirdActivity extends AppCompatActivity {
  private static final String TAG = "ThirdActivity";
  Disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_third);
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {
            disposable = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG, "onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG, "onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "onComplete: ");
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG, "onDestroy: ");
    //然后在需要取消订阅的地方调用即可
    if (disposable != null && !disposable.isDisposed()) {
      Log.d(TAG, "dispose: ");
      disposable.dispose();
    }
  }
}

DisposableObserver类型

利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消


public class FourthActivity extends AppCompatActivity {
  private static final String TAG = "FourthActivity";
  private DisposableObserver<String> observer;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_fourth);
    observer = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(new DisposableObserver<String>() {
      @Override
      public void onNext(String o) {
        Log.d(TAG, "onNext: "+o);
      }

      @Override
      public void onError(Throwable e) {
        Log.d(TAG, "onError: ");
      }

      @Override
      public void onComplete() {
        Log.d(TAG, "onComplete: ");
      }
    });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    if (observer != null && !observer.isDisposed()) {
      Log.d(TAG, "dispose: ");
      observer.dispose();
    }
  }
}

取消多个Observer

把多个Observer添加CompositeDisposable,一次取消


public class ComDisposableActivity extends AppCompatActivity {

  private Disposable disposable1;
  private Disposable disposable2;
  private static final String TAG = "ComDisposableActivity";
  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_com_disposable);
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnDispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG, "run: Unsubscribing subscription from onCreate()");
          }
        })
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {
            disposable1 = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG, "onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG, "onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "onComplete: ");
          }
        });
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {
            disposable2 = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG, "onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG, "onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG, "onComplete: ");
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    //批量添加
    compositeDisposable.add(disposable1);
    compositeDisposable.add(disposable2);
    //最后一次性全部取消订阅
    compositeDisposable.dispose();
  }
}

RxLifecyle取消

OnDestory取消


Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");
          }
        })
        .compose(this.<Long>bindToLifecycle())
        .subscribe(new Consumer<Long>() {
          @Override
          public void accept(Long num) throws Exception {
            Log.d(TAG, "accept: " + num);
          }
        });

指定生命周期取消


Observable.interval(1,TimeUnit.SECONDS)
        .doOnDispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");
          }
        }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
        .subscribe(new Consumer<Long>() {
          @Override
          public void accept(Long aLong) throws Exception {
            Log.d(TAG, "bindUntilEvent accept: " + aLong);
          }
        });

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持得得之家。

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

这篇文章主要给大家介绍了关于RxJava2中异常及处理方法的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用RxJava2具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
这篇文章主要为大家详细介绍了RXjava网络获取图片数据,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
这篇文章主要介绍了RxJava操作符,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
Retrofit是当前应用非常广泛的网络请求框架,通常结合RxJava来进行网络请求,本文将展示一个采用RxJava+Retrofit的网络请求demo,感兴趣的可以了解一下
Android线程优化知识点学习 目录 前言 一.线程调度原理解析 线程调度的原理 线程调度模型 Android 的线程调度 线程调度小结 二.Android 异步方式汇总 Thread HandlerThread IntentService AsyncTask 线程池 RxJava 三.Android线程优化实战 线程使用准则 线程池优化实战
Android实现倒计时的方案梳理 目录 前言 一.CountDownTimer的实现 二.直接用Handler的实现 三.直接用Time.TimeTask的实现 四.使用Theard倒计时 五.使用框架RxJava 六.Kotlin Flow 的实现 总结 前言 关于倒计时可以说我们App开发中常见的一种场景了,比如Splash倒计时