/ RxJava

Cleanup when a subscriber unsubscribes

Often times, I need to clean up stuff after a subscriber unsubscribes. There's a handy methods just for that: Subscriptions.create(Action0). The parameter Action0 is a function passed in to do things after a subscriber unsubscribes.

For example, if a callback is set on an instance when a subscriber subscribes to a stream, so the instance can emit items whenever the callback gets called. After the Subscriber unsubscribes, we need to free the listener from the instance to avoid side effects.
Or if the observable is a network call, you may want to cancel it when a subscriber unsubscribes.

Let's look at some code.

public static Observable<Integer> textLengthChanges(final Text text) {
  return Observable.create(subscriber -> {
    Text.OnLengthChangedListener listener = length -> {
      if (!subscriber.isUnsubscribed()) {


This textLengthChanges observable emits the length of the text whenever it's changed.

A subscriber at some point would unsubscribe to the stream after it has done the work. Then we need to release the Text.OnLengthChangedListener from the text instance. It can be done by simply adding:

subscriber.add(Subscriptions.create(() -> text.setOnLengthChangedListener(null)));

Under the hood, Subscriptions.create(Action0) creates a BooleanSubscription which checks if it's got unsubscribed and if so, it will invoke the Action0.

I must say the naming Subscriptions.create(Action0) isn't that intuitive to me (and someone agrees on that). But because Subscription is equivalent to Disposable in Rx.Net, the naming makes some sense. It's also worth mentioning that it seems RxJava 2.0 is going with Disposable instead of Subscription.

Last but not least, don't be confused with Observable.using. This operator is useful when you have full control over lifespan of an object, so you can do some cleanups of the object when the Observable terminates.