diskodev

DESIGN & DEVELOPMENT AGENCY

Part 2 - Observables and Subjects

This is part of a series on Reactive Programming in RxSwift.

In the previous post, we looked at the basics of Reactive Programming and it's extensions for Swift - RxSwift. In this post, we will look at Observables and subjects and how they are used to build reactive apps.

Reactive programming is programming with asynchronous streams or observable sequences (or simply sequences).

For example, below is the sequence of random numbers between 1 and 20. This stream is represented over time and it terminates normally:

Below is a sequence of network requests that terminates with an error:

There are sequences that do not complete and those that are infinite. Consider an UISwitch that can either be ON or OFF. A stream to represent the state of such switch would look like:

The above representations are called marble diagrams in the reactive world. You can check out various operators and their interactive marble diagrams from http://rxmarbles.com/.

Observables

Lifecycle of an Observable Sequence

A sequence has the following properties:

  • A sequence can have 0 or more elements. The observer do not know when the sequence will produce values or terminate. But you can control when you will accept values and when you choose to stop accepting values
  • The various events emitted by a sequence are: next event that contains the value, and error event and a completed event
  • Once an error event or a completed event is received, the sequence cannot produce any other event
  • When a sequence sends the completed or error event, all resources that are used to produce the elements should be freed

The events, Observables and the ObserverType are represented in RxSwift as:

/// Represents a sequence event.
///
/// Sequence grammar:
/// **next\* (error | completed)**
enum Event<Element>  {
    // Next event with the element produced
    case Next(Element)
    // Error event
    case Error(ErrorType)
    // Completed event
    case Completed
}

class Observable<Element> {
    // Passed an Observer to which the event methods are called
    // Returns a Disposable that should be freed
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable
}

protocol ObserverType {
    func on(_ event: Event<Element>)
}

Observables vs Swift's built-in Sequence

Looking at the interfaces above, we can see that the observables are similar to Swift's built-in Sequence. The observable is push based where it's methods are called via callbacks and can also receive elements asynchronously. Swift's Sequence is pull based where you need an iterator and have to call next() to iterate over it.

To print out a list using Swift's Sequence, the code will look like:

let ss = [1, 2, 3, 4]
var iter = ss.makeIterator()

while let number = iter.next() {
    print(number)
}

And the same using Rx will be:

let observable = Observable.of(1, 2, 3, 4)
observable.subscribe(onNext: { element in
    print(element)
})

The patterns between Swift's Sequence and observable are kind of similar and make it easy to reason about observables.

Creating an Observable

An observer subscribes to an observable. The observer then reacts to sequences emitted by the observable. The observer is sometimes called a subscriber and an observable, a publisher.

There are various class methods that will help with the creation of observables. We will look at some of them below.

just method

let justObservable1 = Observable.just(7.5)
let justObservable2 = Observable.just("Hello")

The just method creates an observable sequence containing just a single element.

from method

let fromObservable = Observable.from([0, 4, 8, 12])

The from method creates a sequence for each element in the given list.

of method

let ofObservable = Observable.of(1, 2, 3, 4)

The of method creates a sequence for each element in the vardic list passed to it.

empty method

let emptyObservable = Observable<String>.empty()

The empty method creates a sequence that publishes a single completed event.

never method

let neverObservable = Observable<Int>.never()

The never method creates an infinite sequence that goes on and on.

range method

let rangeObservable = Observable<Int>.range(start: 22, count: 10)

The range method returns a range of numbers. The above code creates a sequence from 22 to 31.

generate method

let generateObservable = Observable<Int>.generate(initialState: 22, condition: { $0 < 32 }, iterate: { $0 + 1 })

The generate method generates a set and returns it. The above code creates a sequence from 22 to 31.

create method

The create method is different from the above creation methods. It is a helper method that lets you easily implement the observer's subscribe operator using closures. It takes a observer argument and returns a disposable. It signature is:

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E>

The create method can be used to build your own factory operators, if you wish. For example, the from method can be created using the create as:

func customFrom<E>(_ elements: [E]) -> Observable<E> {
    return Observable.create { observer in
        for element in elements {
            observer.on(.next(element))
        }
        observer.on(.completed)
        return Disposables.create()
    }
}

And the just method can be created as:

func customJust<E>(_ element: E) -> Observable<E> {
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

The create method is the preferred operator to create custom types that implement the Observable interface. There is no need to implement the observer/observable interface yourself.

The create method will generate elements and terminate before the subscribe method returns a Disposable. The sequences are lazily evaluated and the process of generating elements cannot be interrupted. This is different from subjects as we shall see later.

Subscribing to Observables

When an observable is created, it does not start producing sequences simply because it has been created. There is no sequence generation just yet. Sequence generation starts when a observer's subscribe method is called.

For example, let's assume you have a long running operation with the signature:

func someLongRunningOp(param: String) -> Observable<Result> {}

And when you use it as below,

let opObservable = someLongRunningOp("param")

The long running operation is not executed solely by calling the method. The actual work is only done and the sequence produced, only when the subscribe method is called.

// The long operations is going to be done now
opObservable.subscribe { result in
    print(result)
}

Similarly, we can subscribe to the various observables as shown below:

let ofObservable = Observable.of(1, 2, 3, 4)
ofObservable.subscribe { event in
    print(event)
}

The output for the above code is:

next(1)
next(2)
next(3)
next(4)
completed

If you need to only subscribe to the next events, you can use onNext handler as:

let fromObservable = Observable.from([0, 4, 8, 12])
fromObservable.subscribe(onNext: { number in
    print(number)
})

Which will print only the next events:

0
4
8
12

Implicit Observables guarantees

The below are the guarantees that an observables must provide:

  • If the observable generate a single sequence and send it over to the observer using observer.on(.next(Element)), then they cannot send the next element until observer.on method has finished execution
  • Observers also cannot send a completed or an error event, if the next event has not finished executing

So, the below code:

observable.subscribe { event in
    print("Event processing")
    // Do something
    print("Event processed")
}

will always print:

Event processing
Event processed
Event processing
Event processed
Event processing
Event processed

and will never print the below or any of its combinations:

Event processing
Event processing
Event processed
Event processed

Observable lifetime management

As you can see, there is no unsubscribe method for an observable. The subscribe method will return a Disposable that needs to be managed. The Disposable interface is simple and has standard usage and patterns. The Disposable returned can be thought of a token representing the subscription. Disposing it will dispose the subscription and effectively unsubscribe.

When we are done listening for events from a subscriber, we can call dispose on it to close and release all of the resources associated with that subscription. If a sequence terminates in finite time, not calling dispose would not cause any permanent resource leaks. However, if a sequence does not terminate on it's own or is infinite, you need to call dispose or put them inside a DisposeBag.

let fromObservable = Observable.from([0, 4, 8, 12])
let fromSubscription = fromObservable.subscribe(onNext: { number in
    print(number)
})

fromSubscription.dispose()

After, dispose no elements will be generated by that subscription.

Usually, in RxSwift, you will not be calling dispose for each subscription. An easier pattern is to add the subscription to a DisposeBag. The DisposeBag brings ARC to RxSwift, and when you subscribe to an observable, you add it to the DisposeBag using the addDisposableTo() method. Then, when that DisposeBag is out of scope and deallocated, it will call dispose() on each added disposable thereby not leaking any resources.

let disposeBag = DisposeBag()
let fromObservable = Observable.from([0, 4, 8, 12])
let fromSubscription = fromObservable.subscribe(onNext: { number in
    print(number)
}).addDisposableTo(disposeBag)

Note that, subscriptions are independent and hence disposing a result of a subscribe call will not cause side effects for other subscribers.

Also, if a subscription subscribes only to the onNext handler, and if such subscription throws an error - the error is propagated up as an exception. It is best practice to always provide an OnError handler to prevent an exception being thrown in an otherwise difficult to handle manner. Always, by using the standard Rx patterns, your apps are predictable, easier to understand, maintain and leak free.

Subjects

A Subject acts as both an Observable and Observer. Since it is an observer, it can subscribe to one or more Observables and also is an Observer, and hence, it can pass through the items it observes by re-emitting them. It can also emit new items instead of acting as a middle.

From reactivex.io:

Because a Subject subscribes to an Observable, it will trigger that Observable to begin emitting items (if that Observable is “cold” — that is, if it waits for a subscription before it begins to emit items). This can have the effect of making the resulting Subject a “hot” Observable variant of the original “cold” Observable.

Hot and Cold Observables

So, what are hot and cold observables?

An example of a cold observable is an Observable seen above. They do not generate sequences until an observer subscribes. The resources involved in producing the sequence are assigned on a per subscriber basis. They are usually stateless and they are mainly used for one time async / network operations.

Subjects are an example of a hot observable. Sequences are generated no matter if any observers are listening to it. The resources involved in producing the sequences are shared among the subscribers. They are stateful and are used to represent UI control values, variables, infinite streams like touches, tap co-ordinates etc.

Types of Subjects

Below, we shall look at the various subjects and how they work.

PublishSubject:

A PublishSubject starts empty and emits to an observer, only those items emitted subsequent to the time of the subscription.

A PublishSubject may begin emitting items immediately upon creation, hence there is a risk that one or more items may be lost between the time the subject is created and the observer subscribes to it.

Let's take a look at the code below:

enum CustomError: Error {
    case someError
}

let publishSubject = PublishSubject<String>()

publishSubject.onNext("Hello")

let sub1 = publishSubject.subscribe { event in
    print("Sub1")
    print(event)
}

publishSubject.onNext("World")

let sub2 = publishSubject.subscribe { event in
    print("Sub2")
    print(event)
}

publishSubject.onNext("Again")
publishSubject.onError(CustomError.someError)

let sub3 = publishSubject.subscribe { event in
    print("Sub3")
    print(event)
}

The output is as follows:

Sub1
next(World)
Sub1
next(Again)
Sub2
next(Again)
Sub1
error(someError)
Sub2
error(someError)
Sub3
error(someError)

In the code above, the event .on(.Next("Hello")) is lost since no subscribers are listening at that time. The other messages are getting transmitted to the respective subscribers after registering itself using the subscribe method. PublishSubject cannot capture and relay past messages.

It is important to note that when an error or completed event has occurred with the subject, future observers who subscribe are only passed the error or completed event.

BehaviorSubject:

A BehaviorSubject is a subject that will store in its state the last next event and replay them to it's new subscribers.

Consider the code:

let behaviorSubject = BehaviorSubject(value: 10)
behaviorSubject.onNext(15)

let sub1 = behaviorSubject.subscribe { event in
    print("sub1")
    print(event)
}

behaviorSubject.onNext(20)

let sub2 = behaviorSubject.subscribe { event in
    print("sub2")
    print(event)
}

behaviorSubject.onCompleted()

let sub3 = behaviorSubject.subscribe { event in
    print("sub3")
    print(event)
}

And it's corresponding output:

sub1
next(15)
sub1
next(20)
sub2
next(20)
sub1
completed
sub2
completed
sub3
completed

As you can see, the last value is stored in the BehaviorSubject's state and is replayed to the next subscriber.

It is important to note that when an error or completed event has occurred with the subject, future observers who subscribe are passed the error or completed event. The last next event is not passed to the new observer.

BehaviorSubject is useful when we need to capture the latest value along with future changes that occur.

ReplaySubject:

ReplaySubject is a subject that will maintain a buffer of size n in its state, and that buffer will be replayed to future subscribers.

Let's look at using ReplaySubject below:

let replaySubject = ReplaySubject<Int>.create(bufferSize: 2)
replaySubject.onNext(22)

let sub1 = replaySubject.subscribe { event in
    print("sub1")
    print(event)
}

replaySubject.onNext(33)

let sub2 = replaySubject.subscribe { event in
    print("sub2")
    print(event)
}

replaySubject.onNext(44)

let sub3 = replaySubject.subscribe { event in
    print("sub3")
    print(event)
}

replaySubject.onError(CustomError.someError)

let sub4 = replaySubject.subscribe { event in
    print("sub4")
    print(event)
}

And it's output:

sub1
next(22)
sub1
next(33)
sub2
next(22)
sub2
next(33)
sub1
next(44)
sub2
next(44)
sub3
next(33)
sub3
next(44)
sub1
error(someError)
sub2
error(someError)
sub3
error(someError)
sub4
next(33)
sub4
next(44)
sub4
error(someError)

As we can see above, the last n items are held in the subject's state and they are replayed to the future subscribers.

It is important to note that when an error or completed event has occurred with the subject, future observers who subscribe are passed the n buffer along with the error or completed event. This is different from other subjects seen above. And this is the reason that, with a buffer of 1 - ReplaySubject is not the same as a BehaviorSubject.

Variable:

Variable is a wrapper for BehaviorSubject. But unlike BehaviorSubject, it can not terminate with error and when the variable is deallocated, it will complete it's observable sequence.

You can use it's value property to access the current value. You can also use it to set a new value.

Similar to BehaviorSubject, a variable is created with an initial value and it will replay it's initial / latest to future subscribers.

Code to use a Variable is shown below:

let variable = Variable(7.5)
variable.value = 8.5

let variableObserver = variable.asObservable().subscribe { event in
    print(event)
}

variable.value = 9.5

And the output:

next(8.5)
next(9.5)

The code above is very simple and the thing is keep in mind is - In order to create an observable from a Variable, you need to use the asObservable() method. You can then subscribe to this observable.

create() vs Subjects

A significant benefit of using the create() method to using a subject is that the sequence will be lazily evaluated. This can provide you with a lot of benefits over using subjects. The create() method provides a non-blocking implementation, where as a subject generates sequences in a single go. Look at the code below:

func sampleCreate() -> Observable<String> {
    return Observable.create { observer in
        observer.on(.next("Hello"))
        observer.on(.next("World"))
        print("sleeping")
        Thread.sleep(forTimeInterval: 2.0)
        print("slept")
        observer.onCompleted()
        
        return Disposables.create()
    }
}

sampleCreate().subscribe { event in
    print(event)
}

And the output will be:

next(Hello)
next(World)
sleeping
slept
completed

As you see, the sequence is lazily evaluated and the next events are processed before we sleep on the rest of the instructions.

Contrast it to using a subject in the below code:

func sampleSubject() -> ReplaySubject<String> {
    let replaySubject = ReplaySubject<String>.create(bufferSize: 2)
    replaySubject.on(.next("Hello"))
    replaySubject.on(.next("World"))
    print("sleeping")
    Thread.sleep(forTimeInterval: 2.0)
    print("slept")
    replaySubject.onCompleted()
    
    return replaySubject
}

sampleSubject().subscribe { event in
    print(event)
}

And the output:

sleeping
slept
next(Hello)
next(World)
completed

The subject implements a blocking kind of sequence generation, and the events are processed together and not lazily evaluated.

Subjects also maintain state, which is mutating, which goes against functional programming principles. Mutating state and asynchronous programming are very hard to get right. The create() method is an alternative to using subjects if your use case can be modelled using the method.

Debugging Observables and Subjects

Below are some tips on working with issues that may occur when using RxSwift.

  • There might be issues in deducing the types of properties in an Rx closure. It might be helpful to annotate the type when you run into weird errors regarding types
  • You can use the debug() method on the observable to see additional messages. For example, the below code will print extra debug information to the console:
let fromObservable = Observable.from([1, 2, 3, 4, 5])
fromObservable
    .debug("From Debug")
    .subscribe { event in
        print(event)
}

This will print:

2017-06-30 20:24:46.078: From Debug -> subscribed
2017-06-30 20:24:46.080: From Debug -> Event next(1)
next(1)
2017-06-30 20:24:46.081: From Debug -> Event next(2)
next(2)
2017-06-30 20:24:46.081: From Debug -> Event next(3)
next(3)
2017-06-30 20:24:46.081: From Debug -> Event next(4)
next(4)
2017-06-30 20:24:46.081: From Debug -> Event next(5)
next(5)
2017-06-30 20:24:46.082: From Debug -> Event completed
completed
2017-06-30 20:24:46.082: From Debug -> isDisposed
  • You can debug memory leaks using the RxSwift.Resources struct. To enable debug method, you need to set TRACE_RESOURCES flag to the RxSwift target build settings. You can check this link for more detail. You can track all allocated resources using RxSwift.Resources.total. You can simply print out the total in a subscription and view the count over multiple runs of the same operation. The count should give you a clear picture about any leaks if any.

Practical Usage

Let us look at some practical usages of observables and subjects.

Retry Operations

Let us say, you have an operation that might fail. And you need to retry that operation during the times of failure. How would you implement such functionality? It would usually involve maintaining a global state and and managing that state during the application's lifetime.

Rx provides a retry() operator that you can use in such situations. It is easy to use and can also take in custom retry operators. The way to use it would be:

networkRequest.retry(4)
    .subscribe(onNext: { next in
        // Do work on next event
    }, onError: { error in
        // Log the error
})

Here, the networkRequest observable retries until four times in the case of a failure and then errors out.

Key-Value Observing

To observe a property, instead of using:

-(void)observeValueForKeyPath:(NSString *)keyPath
                     ofObject:(id)object
                       change:(NSDictionary *)change
                      context:(void *)context

use the simpler rx.observe and rx.observerWeakly method like below:

view.rx.observe(CGFloat.self, "alpha")
    .subscribe(onNext: { alpha in
        // Work with the new value
    })
    .disposed(by: disposeBag)

Note - The above code uses RxCocoa, which, we will be looking at later in the series.

Replacing Delegates

Using subject, you can replace delegates to communicate between view controllers in your application.

Consider a simple application where you have multiple flows. The first view controller displays the current cartoon show being run. If you are not happy with it, you can change the show by navigating to the second view controller and entering your favourite cartoon show.

Imagine how you would implement this. Delegates would be the first pattern that comes to your mind. In fact, Apple encourages you to use this design pattern to interact with it's various frameworks. But using subject, we can simplify this further to easily pass data between view controllers.

In the first view controller (CartoonViewController), there is a label displaying the current running cartoon show. If you do not like it, you can press the change cartoon button which presents the second view controller (ChangeCartoonViewController) that displays a textfield where you can enter a new show. Once you press the change button, the new data is passed to the first view controller and dismissed. The various screens are shown below:

In the second view controller, we create a PublishSubject that we will use to pass data to the observer.

var cartoonPublishSubject = PublishSubject<String>()
var cartoonObserver: Observable<String> {
    return cartoonPublishSubject.asObservable()
}

When the change button is pressed after entering a new show, the following code is run:

func changeButtonPressed(_ button: UIButton) {
    cartoonPublishSubject.onNext(cartoonTextField.text!)
    cartoonPublishSubject.onCompleted()

    self.dismiss(animated: true)
}

Above, we pass the entered text to the observers along with the completed event.

In the first view controller, we shall observe for the above change before pushing the second view controller.

func changeButtonPressed(_ button: UIButton) {
    self.view.endEditing(true)

    let changeCartoonViewController = ChangeCartoonViewController()

    changeCartoonViewController.cartoonObserver
        .subscribe(onNext: { [weak self] cartoon in
            self?.nameLabel.text = cartoon
        }, onCompleted: {
            print("Completed.")
        }, onDisposed: {
            print("Disposed.")
        })
        .addDisposableTo(disposeBag)

    self.present(changeCartoonViewController, animated: true)
 }

So once a new show is entered, we get the latest data and display that in the label. This is a simple application to get a gist as to how the data is passed between view controllers.

The entire code for this sample application can be downloaded here.

That's it for observables and subject. Hope, this post made it easier for you to understand the various concepts. In the next part, we shall look at some of the operators available for RxSwift and see how we can use them to transform and manipulate streams.

You can view my projects on GitHub.

You can follow me on twitter.