diskodev

DESIGN & DEVELOPMENT AGENCY

Part 2 - Observables and Subjects

This is part of a series on Reactive Programming in RxSwift.

In the previous post, we looked at the basics of Reactive Programming and it's extensions for Swift - RxSwift. In this post, we will look at Observables and subjects and how they are used to build reactive apps.

Reactive programming is programming with asynchronous streams or observable sequences (or simply sequences).

For example, below is the sequence of random numbers between 1 and 20. This stream is represented over time and it terminates normally:

Below is a sequence of network requests that terminates with an error:

There are sequences that do not complete and those that are infinite. Consider an UISwitch that can either be ON or OFF. A stream to represent the state of such switch would look like:

The above representations are called marble diagrams in the reactive world. You can check out various operators and their interactive marble diagrams from http://rxmarbles.com/.

Observables

Lifecycle of an Observable Sequence

A sequence has the following properties:

  • A sequence can have 0 or more elements. The observer do not know when the sequence will produce values or terminate. But you can control when you will accept values and when you choose to stop accepting values
  • The various events emitted by a sequence are: next event that contains the value, and error event and a completed event
  • Once an error event or a completed event is received, the sequence cannot produce any other event
  • When a sequence sends the completed or error event, all resources that are used to produce the elements should be freed

The events, Observables and the ObserverType are represented in RxSwift as:

/// Represents a sequence event.
///
/// Sequence grammar:
/// **next\* (error | completed)**
enum Event<Element>  {
    // Next event with the element produced
    case Next(Element)
    // Error event
    case Error(ErrorType)
    // Completed event
    case Completed
}

class Observable<Element> {
    // Passed an Observer to which the event methods are called
    // Returns a Disposable that should be freed
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable
}

protocol ObserverType {
    func on(_ event: Event<Element>)
}

Observables vs Swift's built-in Sequence

Looking at the interfaces above, we can see that the observables are similar to Swift's built-in Sequence. The observable is push based where it's methods are called via callbacks and can also receive elements asynchronously. Swift's Sequence is pull based where you need an iterator and have to call next() to iterate over it.

To print out a list using Swift's Sequence, the code will look like:

let ss = [1, 2, 3, 4]
var iter = ss.makeIterator()

while let number = iter.next() {
    print(number)
}

And the same using Rx will be:

let observable = Observable.of(1, 2, 3, 4)
observable.subscribe(onNext: { element in
    print(element)
})

The patterns between Swift's Sequence and observable are kind of similar and make it easy to reason about observables.

Creating an Observable

An observer subscribes to an observable. The observer then reacts to sequences emitted by the observable. The observer is sometimes called a subscriber and an observable, a publisher.

There are various class methods that will help with the creation of observables. We will look at some of them below.

just method

let justObservable1 = Observable.just(7.5)
let justObservable2 = Observable.just("Hello")

The just method creates an observable sequence containing just a single element.

from method

let fromObservable = Observable.from([0, 4, 8, 12])

The from method creates a sequence for each element in the given list.

of method

let ofObservable = Observable.of(1, 2, 3, 4)

The of method creates a sequence for each element in the vardic list passed to it.

empty method

let emptyObservable = Observable<String>.empty()

The empty method creates a sequence that publishes a single completed event.

never method

let neverObservable = Observable<Int>.never()

The never method creates an infinite sequence that goes on and on.

range method

let rangeObservable = Observable<Int>.range(start: 22, count: 10)

The range method returns a range of numbers. The above code creates a sequence from 22 to 31.

generate method

let generateObservable = Observable<Int>.generate(initialState: 22, condition: { $0 < 32 }, iterate: { $0 + 1 })

The generate method generates a set and returns it. The above code creates a sequence from 22 to 31.

create method

The create method is different from the above creation methods. It is a helper method that lets you easily implement the observer's subscribe operator using closures. It takes a observer argument and returns a disposable. It signature is:

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E>

The create method can be used to build your own factory operators, if you wish. For example, the from method can be created using the create as:

func customFrom<E>(_ elements: [E]) -> Observable<E> {
    return Observable.create { observer in
        for element in elements {
            observer.on(.next(element))
        }
        observer.on(.completed)
        return Disposables.create()
    }
}

And the just method can be created as:

func customJust<E>(_ element: E) -> Observable<E> {
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

The create method is the preferred operator to create custom types that implement the Observable interface. There is no need to implement the observer/observable interface yourself.

The create method will generate elements and terminate before the subscribe method returns a Disposable. The sequences are lazily evaluated and the process of generating elements cannot be interrupted. This is different from subjects as we shall see later.

Subscribing to Observables

When an observable is created, it does not start producing sequences simply because it has been created. There is no sequence generation just yet. Sequence generation starts when a observer's subscribe method is called.

For example, let's assume you have a long running operation with the signature:

func someLongRunningOp(param: String) -> Observable<Result> {}

And when you use it as below,

let opObservable = someLongRunningOp("param")

The long running operation is not executed solely by calling the method. The actual work is only done and the sequence produced, only when the subscribe method is called.

// The long operations is going to be done now
opObservable.subscribe { result in
    print(result)
}

Similarly, we can subscribe to the various observables as shown below:

let ofObservable = Observable.of(1, 2, 3, 4)
ofObservable.subscribe { event in
    print(event)
}

The output for the above code is:

next(1)
next(2)
next(3)
next(4)
completed

If you need to only subscribe to the next events, you can use onNext handler as:

let fromObservable = Observable.from([0, 4, 8, 12])
fromObservable.subscribe(onNext: { number in
    print(number)
})

Which will print only the next events:

0
4
8
12

Implicit Observables guarantees

The below are the guarantees that an observables must provide:

  • If the observable generate a single sequence and send it over to the observer using observer.on(.next(Element)), then they cannot send the next element until observer.on method has finished execution
  • Observers also cannot send a completed or an error event, if the next event has not finished executing

So, the below code:

observable.subscribe { event in
    print("Event processing")
    // Do something
    print("Event processed")
}

will always print:

Event processing
Event processed
Event processing
Event processed
Event processing
Event processed

and will never print the below or any of its combinations:

Event processing
Event processing
Event processed
Event processed

Observable lifetime management

As you can see, there is no unsubscribe method for an observable. The subscribe method will return a Disposable that needs to be managed. The Disposable interface is simple and has standard usage and patterns. The Disposable returned can be thought of a token representing the subscription. Disposing it will dispose the subscription and effectively unsubscribe.

When we are done listening for events from a subscriber, we can call dispose on it to close and release all of the resources associated with that subscription. If a sequence terminates in finite time, not calling dispose would not cause any permanent resource leaks. However, if a sequence does not terminate on it's own or is infinite, you need to call dispose or put them inside a DisposeBag.

let fromObservable = Observable.from([0, 4, 8, 12])
let fromSubscription = fromObservable.subscribe(onNext: { number in
    print(number)
})

fromSubscription.dispose()

After, dispose no elements will be generated by that subscription.

Usually, in RxSwift, you will not be calling dispose for each subscription. An easier pattern is to add the subscription to a DisposeBag. The DisposeBag brings ARC to RxSwift, and when you subscribe to an observable, you add it to the DisposeBag using the addDisposableTo() method. Then, when that DisposeBag is out of scope and deallocated, it will call dispose() on each added disposable thereby not leaking any resources.

let disposeBag = DisposeBag()
let fromObservable = Observable.from([0, 4, 8, 12])
let fromSubscription = fromObservable.subscribe(onNext: { number in
    print(number)
}).addDisposableTo(disposeBag)

Note that, subscriptions are independent and hence disposing a result of a subscribe call will not cause side effects for other subscribers.

Also, if a subscription subscribes only to the onNext handler, and if such subscription throws an error - the error is propagated up as an exception. It is best practice to always provide an OnError handler to prevent an exception being thrown in an otherwise difficult to handle manner. Always, by using the standard Rx patterns, your apps are predictable, easier to understand, maintain and leak free.

Subjects

A Subject acts as both an Observable and Observer. Since it is an observer, it can subscribe to one or more Observables and also is an Observer, and hence, it can pass through the items it observes by re-emitting them. It can also emit new items instead of acting as a middle.

From reactivex.io:

Because a Subject subscribes to an Observable, it will trigger that Observable to begin emitting items (if that Observable is “cold” — that is, if it waits for a subscription before it begins to emit items). This can have the effect of making the resulting Subject a “hot” Observable variant of the original “cold” Observable.

Hot and Cold Observables

So, what are hot and cold observables?

An example of a cold observable is an Observable seen above. They do not generate sequences until an observer subscribes. The resources involved in producing the sequence are assigned on a per subscriber basis. They are usually stateless and they are mainly used for one time async / network operations.

Subjects are an example of a hot observable. Sequences are generated no matter if any observers are listening to it. The resources involved in producing the sequences are shared among the subscribers. They are stateful and are used to represent UI control values, variables, infinite streams like touches, tap co-ordinates etc.

Types of Subjects

Below, we shall look at the various subjects and how they work.

PublishSubject:

A PublishSubject starts empty and emits to an observer, only those items emitted subsequent to the time of the subscription.

A PublishSubject may begin emitting items immediately upon creation, hence there is a risk that one or more items may be lost between the time the subject is created and the observer subscribes to it.

Let's take a look at the code below:

enum CustomError: Error {
    case someError
}

let publishSubject = PublishSubject<String>()

publishSubject.onNext("Hello")

let sub1 = publishSubject.subscribe { event in
    print("Sub1")
    print(event)
}

publishSubject.onNext("World")

let sub2 = publishSubject.subscribe { event in
    print("Sub2")
    print(event)
}

publishSubject.onNext("Again")
publishSubject.onError(CustomError.someError)

let sub3 = publishSubject.subscribe { event in
    print("Sub3")
    print(event)
}

The output is as follows:

Sub1
next(World)
Sub1
next(Again)
Sub2
next(Again)
Sub1
error(someError)
Sub2
error(someError)
Sub3
error(someError)

In the code above, the event .on(.Next("Hello")) is lost since no subscribers are listening at that time. The other messages are getting transmitted to the respective subscribers after registering itself using the subscribe method. PublishSubject cannot capture and relay past messages.

It is important to note that when an error or completed event has occurred with the subject, future observers who subscribe are only passed the error or completed event.

BehaviorSubject:

A BehaviorSubject is a subject that will store in its state the last next event and replay them to it's new subscribers.

Consider the code:

let behaviorSubject = BehaviorSubject(value: 10)
behaviorSubject.onNext(15)

let sub1 = behaviorSubject.subscribe { event in
    print("sub1")
    print(event)
}

behaviorSubject.onNext(20)

let sub2 = behaviorSubject.subscribe { event in
    print("sub2")
    print(event)
}

behaviorSubject.onCompleted()

let sub3 = behaviorSubject.subscribe { event in
    print("sub3")
    print(event)
}

And it's corresponding output:

sub1
next(15)
sub1
next(20)
sub2
next(20)
sub1
completed
sub2
completed
sub3
completed

As you can see, the last value is stored in the BehaviorSubject's state and is replayed to the next subscriber.

It is important to note that when an error or completed event has occurred with the subject, future observers who subscribe are passed the error or completed event. The last next event is not passed to the new observer.

BehaviorSubject is useful when we need to capture the latest value along with future changes that occur.

ReplaySubject:

ReplaySubject is a subject that will maintain a buffer of size n in its state, and that buffer will be replayed to future subscribers.

Let's look at using ReplaySubject below:

let replaySubject = ReplaySubject<Int>.create(bufferSize: 2)
replaySubject.onNext(22)

let sub1 = replaySubject.subscribe { event in
    print("sub1")
    print(event)
}

replaySubject.onNext(33)

let sub2 = replaySubject.subscribe { event in
    print("sub2")
    print(event)
}

replaySubject.onNext(44)

let sub3 = replaySubject.subscribe { event in
    print("sub3")
    print(event)
}

replaySubject.onError(CustomError.someError)

let sub4 = replaySubject.subscribe { event in
    print("sub4")
    print(event)
}

And it's output:

sub1
next(22)
sub1
next(33)
sub2
next(22)
sub2
next(33)
sub1
next(44)
sub2
next(44)
sub3
next(33)
sub3
next(44)
sub1
error(someError)
sub2
error(someError)
sub3
error(someError)
sub4
next(33)
sub4
next(44)
sub4
error(someError)

As we can see above, the last n items are held in the subject's state and they are replayed to the future subscribers.

It is important to note that when an error or completed event has occurred with the subject, future observers who subscribe are passed the n buffer along with the error or completed event. This is different from other subjects seen above. And this is the reason that, with a buffer of 1 - ReplaySubject is not the same as a BehaviorSubject.

Variable:

Variable is a wrapper for BehaviorSubject. But unlike BehaviorSubject, it can not terminate with error and when the variable is deallocated, it will complete it's observable sequence.

You can use it's value property to access the current value. You can also use it to set a new value.

Similar to BehaviorSubject, a variable is created with an initial value and it will replay it's initial / latest to future subscribers.

Code to use a Variable is shown below:

let variable = Variable(7.5)
variable.value = 8.5

let variableObserver = variable.asObservable().subscribe { event in
    print(event)
}

variable.value = 9.5

And the output:

next(8.5)
next(9.5)

The code above is very simple and the thing is keep in mind is - In order to create an observable from a Variable, you need to use the asObservable() method. You can then subscribe to this observable.

create() vs Subjects

A significant benefit of using the create() method to using a subject is that the sequence will be lazily evaluated. This can provide you with a lot of benefits over using subjects. The create() method provides a non-blocking implementation, where as a subject generates sequences in a single go. Look at the code below:

func sampleCreate() -> Observable<String> {
    return Observable.create { observer in
        observer.on(.next("Hello"))
        observer.on(.next("World"))
        print("sleeping")
        Thread.sleep(forTimeInterval: 2.0)
        print("slept")
        observer.onCompleted()
        
        return Disposables.create()
    }
}

sampleCreate().subscribe { event in
    print(event)
}

And the output will be:

next(Hello)
next(World)
sleeping
slept
completed

As you see, the sequence is lazily evaluated and the next events are processed before we sleep on the rest of the instructions.

Contrast it to using a subject in the below code:

func sampleSubject() -> ReplaySubject<String> {
    let replaySubject = ReplaySubject<String>.create(bufferSize: 2)
    replaySubject.on(.next("Hello"))
    replaySubject.on(.next("World"))
    print("sleeping")
    Thread.sleep(forTimeInterval: 2.0)
    print("slept")
    replaySubject.onCompleted()
    
    return replaySubject
}

sampleSubject().subscribe { event in
    print(event)
}

And the output:

sleeping
slept
next(Hello)
next(World)
completed

The subject implements a blocking kind of sequence generation, and the events are processed together and not lazily evaluated.

Subjects also maintain state, which is mutating, which goes against functional programming principles. Mutating state and asynchronous programming are very hard to get right. The create() method is an alternative to using subjects if your use case can be modelled using the method.

Debugging Observables and Subjects

Below are some tips on working with issues that may occur when using RxSwift.

  • There might be issues in deducing the types of properties in an Rx closure. It might be helpful to annotate the type when you run into weird errors regarding types
  • You can use the debug() method on the observable to see additional messages. For example, the below code will print extra debug information to the console:
let fromObservable = Observable.from([1, 2, 3, 4, 5])
fromObservable
    .debug("From Debug")
    .subscribe { event in
        print(event)
}

This will print:

2017-06-30 20:24:46.078: From Debug -> subscribed
2017-06-30 20:24:46.080: From Debug -> Event next(1)
next(1)
2017-06-30 20:24:46.081: From Debug -> Event next(2)
next(2)
2017-06-30 20:24:46.081: From Debug -> Event next(3)
next(3)
2017-06-30 20:24:46.081: From Debug -> Event next(4)
next(4)
2017-06-30 20:24:46.081: From Debug -> Event next(5)
next(5)
2017-06-30 20:24:46.082: From Debug -> Event completed
completed
2017-06-30 20:24:46.082: From Debug -> isDisposed
  • You can debug memory leaks using the RxSwift.Resources struct. To enable debug method, you need to set TRACE_RESOURCES flag to the RxSwift target build settings. You can check this link for more detail. You can track all allocated resources using RxSwift.Resources.total. You can simply print out the total in a subscription and view the count over multiple runs of the same operation. The count should give you a clear picture about any leaks if any.

Practical Usage

Let us look at some practical usages of observables and subjects.

Retry Operations

Let us say, you have an operation that might fail. And you need to retry that operation during the times of failure. How would you implement such functionality? It would usually involve maintaining a global state and and managing that state during the application's lifetime.

Rx provides a retry() operator that you can use in such situations. It is easy to use and can also take in custom retry operators. The way to use it would be:

networkRequest.retry(4)
    .subscribe(onNext: { next in
        // Do work on next event
    }, onError: { error in
        // Log the error
})

Here, the networkRequest observable retries until four times in the case of a failure and then errors out.

Key-Value Observing

To observe a property, instead of using:

-(void)observeValueForKeyPath:(NSString *)keyPath
                     ofObject:(id)object
                       change:(NSDictionary *)change
                      context:(void *)context

use the simpler rx.observe and rx.observerWeakly method like below:

view.rx.observe(CGFloat.self, "alpha")
    .subscribe(onNext: { alpha in
        // Work with the new value
    })
    .disposed(by: disposeBag)

Note - The above code uses RxCocoa, which, we will be looking at later in the series.

Replacing Delegates

Using subject, you can replace delegates to communicate between view controllers in your application.

Consider a simple application where you have multiple flows. The first view controller displays the current cartoon show being run. If you are not happy with it, you can change the show by navigating to the second view controller and entering your favourite cartoon show.

Imagine how you would implement this. Delegates would be the first pattern that comes to your mind. In fact, Apple encourages you to use this design pattern to interact with it's various frameworks. But using subject, we can simplify this further to easily pass data between view controllers.

In the first view controller (CartoonViewController), there is a label displaying the current running cartoon show. If you do not like it, you can press the change cartoon button which presents the second view controller (ChangeCartoonViewController) that displays a textfield where you can enter a new show. Once you press the change button, the new data is passed to the first view controller and dismissed. The various screens are shown below:

In the second view controller, we create a PublishSubject that we will use to pass data to the observer.

var cartoonPublishSubject = PublishSubject<String>()
var cartoonObserver: Observable<String> {
    return cartoonPublishSubject.asObservable()
}

When the change button is pressed after entering a new show, the following code is run:

func changeButtonPressed(_ button: UIButton) {
    cartoonPublishSubject.onNext(cartoonTextField.text!)
    cartoonPublishSubject.onCompleted()

    self.dismiss(animated: true)
}

Above, we pass the entered text to the observers along with the completed event.

In the first view controller, we shall observe for the above change before pushing the second view controller.

func changeButtonPressed(_ button: UIButton) {
    self.view.endEditing(true)

    let changeCartoonViewController = ChangeCartoonViewController()

    changeCartoonViewController.cartoonObserver
        .subscribe(onNext: { [weak self] cartoon in
            self?.nameLabel.text = cartoon
        }, onCompleted: {
            print("Completed.")
        }, onDisposed: {
            print("Disposed.")
        })
        .addDisposableTo(disposeBag)

    self.present(changeCartoonViewController, animated: true)
 }

So once a new show is entered, we get the latest data and display that in the label. This is a simple application to get a gist as to how the data is passed between view controllers.

The entire code for this sample application can be downloaded here.

That's it for observables and subject. Hope, this post made it easier for you to understand the various concepts. In the next part, we shall look at some of the operators available for RxSwift and see how we can use them to transform and manipulate streams.

You can view my projects on GitHub.

You can follow me on twitter.

Observer Design Pattern

The Observer design pattern defines a one-to-many mapping between objects so that when one object changes state, all its dependents are notified and updated automatically.

This pattern is a behavioral pattern and makes it easier to 'observe' complex state of an object and be notified when it changes. The views and the model need not be tightly coupled with each other to observe the change in state. This pattern is commonly called Publish-Subscribe and is the cornerstone for Reactive Programming.

Examples

Some examples of this pattern are:

  • In a MVC application, the views observe the model (state) of the app and changes its representations accordingly
  • Stock ticker clients observe the price of a stock and gets notified when the price changes. The stock ticker can then change its views to reflect the latest price
  • A CI environment would observe a remote repository for any code changes and then run a build if new code was added to the repository

Why do we need this design pattern?

Consider a realtime / IOT kind of application. Here, the events are generated many times in a given period of time. You can design a system that frequently polls for the state - but this design brings with it a lot of difficulties. How frequently do you poll when you have no control of when the data changes? Poll very frequently and the model layer does little work apart from servicing such requests, poll less frequently and you might not work with the latest data. You also need to decide how the model and its various observers are to be coupled so that the observers are consistent with the model layer. These are some of the many design decisions that you need to take care of, that further increases the system's complexity.

In these type of applications, you can change your programming model from "pull" to "push". In the push model, you register your clients to the concerned model, and when the state changes, a notification is sent to the clients and the clients can update to the latest snapshot of the model and work with it.

When do you need to use this pattern?

  • When a change to one object requires others to be notified, so that they can use that data downstream to do other work
  • When you do not know how many or what objects need to be notified of the change in model state
  • When an object should be able to notify other objects without making assumptions about who or what these objects are. This makes the design loosely coupled and flexible to changes

Class structure and participants

The class structure and the participants of the Observer design pattern are as below,

Class Structure

Let us take a look at all the participants in this design pattern.

Subject: This class provides an interface for registering and deregistering observers. This has a list of observers that has registered with it.

Observer: This class defines an interface that gets called when an subject changes. Based on a subject's implementation, the new data gets pushed to the observer or it is the duty of the observer to get the refreshed state of the subject.

ConcreteSubject: This class is a subclass of Subject and contains the state which the observers are interested in. This sends a notification to its observers when its state changes.

ConcreteObserver: This class is a subclass of Observer and stores a copy of the subject's state that it is interested in. This has a reference to the subject so that, it can query the latest state of the subject. This object implements the interface that gets called when the subject state's changes.

How it works

  • The subject changes its state after some work/task
  • The subject notifies the observers, registered with it, of the need to update itself since the observer's state is now inconsistent with that of the subject. To do this, the subject's notify() method is called, which in turn calls the update() method of the observer
  • Depending upon the implementation, the new data is either pushed to the observer using the update() method or the observer queries for the new state of the subject. The information received from the subject is used to reconcile the observer's state

Example

Consider an environment, where, various clients need to monitor the price of a stock. The client has to display the correct price at all times and the price should be reflected in realtime. Now, consider an approach when the clients need to request the price from the server. This approach fails easily since we do not control the fluctuations in price and we would not know how frequently to poll for the price of a stock. This type of applications is made for the Observer pattern. We will use the stock model as the subject and have all the clients act as the observers. So, when the price of a stock changes, the observers get notified of the change in stock price and the observer can then get the new price from the model.

In code, the stock_model will be represented as:

class stock_observer;

class stock_model {
public:
    stock_model(float stockPrice);

    void register_observer(stock_observer *obs);
    void deregister_observer(stock_observer *obs);

    void set_price(float newValue);
    float get_price();

    void notify();

private:
    std::vector<stock_observer *> views;
    float price;
};

The model has methods that register, deregister observers and various getters / setters. The notify() is the method that gets called when the model's state (price) changes. 

As for the observer, the interface of stock_observer is as follows:

class stock_observer {
public:
    stock_observer(stock_model *);
    virtual void update() = 0;

    virtual ~stock_observer();

protected:
    stock_model *get_subject();

private:
    stock_model *model;
};

The observer has a reference to the model that it listens to. The model's notify() method is called when the price changes and this is the method that informs the clients that the price has changed. The clients then gets the new state from the model. The implementation of the notify() method of the model is as follows:

void stock_model::notify() {
    for (unsigned int i = 0; i < views.size(); ++i) {
       views[i]->update();
    }
}

The stock_view (client) will be represented as:

class stock_view: public stock_observer {
public:
    stock_view(stock_model *);
    ~stock_view();
    void update();

    float get_stock_price();

private:
    void update_price(float newPrice);

    float current_price;
};

The client's update() method is called from the model's notify() method and the update() method is implemented as:

void stock_view::update_price(float newPrice) {
    this->current_price = newPrice;
}

void stock_view::update() {
    update_price(get_subject()->get_price());
}

The various code manifests itself in the main() function as:

int main() {
    stock_model stock(17.43);

    stock_view desktop_client(&stock);
    stock_view mobile_client(&stock);

    std::cout << "Stock price (Desktop - moment 1) : " <<desktop_client.get_stock_price()<<std::endl;
    std::cout << "Stock price (Mobile - moment 1) : " <<mobile_client.get_stock_price()<<std::endl;

    stock.set_price(16.29);

    std::cout << "Stock price (Desktop - moment 2) : " <<desktop_client.get_stock_price()<<std::endl;
    std::cout << "Stock price (Mobile - moment 2) : " <<mobile_client.get_stock_price()<<std::endl;

    return 0;
}

In the main() function above, the model's initial value is 17.43 and the two clients observe the model. The price is printed onto the console. Then we make a change to the stock price. Once the model's price changes, the model's notify() method is called which in turn calls the observer's update() method, which syncs the new value and that new value is printed out to the console now. When you run the above program, the output you get is:

Stock price (Desktop - moment 1) : 17.43
Stock price (Mobile - moment 1) : 17.43
Stock price (Desktop - moment 2) : 16.29
Stock price (Mobile - moment 2) : 16.29
Deregistering observer...
Deregistering observer...

This is a simple overview as to how the Observer pattern works. The complete code can be viewed / downloaded from github.

Consequences of this pattern

  • The Observer pattern lets you decouple the subjects and observers. You can add as many observers without modifying the subjects. The subjects can be reused without reusing the observers and vice versa. All the subject knows, is that it has a list of observers, each conforming to a simple interface. The subject is not concerned with the concrete observer class
  • Any number of observers are supported by the subject. The subject notifies all observers under it of the change that has happened to its model
  • The model notifies of its changes which may cause a cascade of updates to observers and its dependents. The observers will be out of sync if the rules and criteria of the dependencies are not defined correctly

Implementation Notes

  • You need to decide on how you are going to store your observers. You can either optimize for speed or space. The application's high level requirements should drive this decision
  • When an observer registers with more than a single subject, the observer needs to know from which model the notify() method was called. This changes the definition of the observer's update() method. The new definition will be: virtual void update(subject *). The subject that caused the update is also passed to the client and hence it knows which model has changed
  • An important consideration is then - who triggers the call to a model's notify() method. An external operation that changes the model or is it the model itself? If it is the model, the advantage is that the clients do not have to remember to call the notify() method. The disadvantage is that consecutive operations will cause consecutive calls, which may be inefficient. If it is the external operation, the advantage is that the client can call the notify() method after it finishes a set of calls that updates the model. The disadvantage is that the external application needs to remember now to call the notify() method. If it forgets, then the new update is not pushed to the downstream clients
  • What happens when a subject is dereferenced? You need to provide a mechanism that allows the subject to notify the observers that the subject is being deleted and no further updates will be provided
  • The subject's state should be consistent before the call of the notify() method. If this is not the case, the state of the observers that query for the model's state will not be consistent with other observers
  • Should the changed payload be included in the update() method by the model? If so, the observer's update() method definition is as follows: virtual void update(subject *, payload *). The observer does not need to query for the state after the update() method. The new data is pushed to it. This also will help in get all the observers consistent with each other
  • If you are going with the above approach, it would also help if you include the type of update that occurs within the model. The type of update can be of any - insert, delete or edit
  • Sometimes in your application, you might need an object that can act as both a subject and as an observer. This is especially useful in Reactive Programming, where we need objects that can act as an observer and an observable

That's it.

You can view my projects on github.

You can follow me on twitter.

Part 1 - Introduction to RxSwift

This is part of a series on Reactive Programming in RxSwift.

RxSwift is a reactive extension for Swift. Reactive extensions are a combination of the best ideas from the Observer pattern, Iterator pattern and Functional Programming. Reactive extensions are available for Java, Scala, C#, C++, JavaScript, Python, Clojure, Kotlin among others. The programming model of RxSwift is similar to reactive extensions present for other languages and is more than a set of APIs. It is a new way of building applications that are responsive, resilient, elastic and message driven.

What is Reactive Programming?

Reactive programming is programming with asynchronous streams or observable sequences (or simply sequences). It involves streams, its transformations and the bindings that goes with it. State changes / new data are passed on as streams which you can observe and work on. Anything can be a stream: variables, events, properties, requests / responses, etc. You can listen to any number of streams and react.

It is not useful to just observe streams. You are provided tools to transform any of those streams. You can chain any of those transformations and create a new stream altogether. And, you can bind streams to properties, UI elements, etc. This makes it easier to write and structure your asynchronous program.

Consider the following piece of code:  

var health = 100.00
var playerCritical = health <= healthCutoffValue
if playerCritical { // Do Something } else { // Do Another Thing }

Here, the variable playerCritical is dependent on health being less than a cutoff value. We check if the player is critical and do some work based on it. But what happens when the player becomes critical sometime later in the game? Maybe, you need to execute a piece of code when the player is critical. This is where Reactive Programming comes in. You can subscribe to playerCritical and when the value changes, you are passed the current stream (or value) and execute your code when it reaches a cutoff level. See how simple your program's data flow becomes. And as specified earlier, anything can be a stream.

This is just a simple example, but we will see much more in the later series about the full power of Rx and in specific to iOS / macOS development.

Benefits of Reactive Programming

So, what are the benefits of Rx? why should you be using it in your projects?

  • Rx enables you to use immutable definitions and pure functions to process snapshots of streams in a reliable composable way. You can easily compose your program into clean input/output functions with its own handlers
  • Rx helps building asynchronous programs to react to new data/events. It lets you write highly performant code that handles huge number of streams/events. Using the tools provided you can write highly interactive applications very easily
  • It is very easy to reason about what your asynchronous program does and how the handlers works for the asynchronous streams. The dataflow is clear and you can easily understand how a block of code works
  • You can easily manage state across your app. Rx provides tools to share state across various asynchronous streams - if you want to
  • The code you write with Rx is very declarative since definitions are immutable and only the streams change over time thus reducing side effects
  • Rx easily lets you bind to UI elements, always representing the latest app state with less work
  • Rx tools provide tools to make resource management simple and easy
  • Reactive extensions exhibits consistent behavior across all platforms. They have common patterns, similar operators and along with libraies for major languages, they aid in cross-platform development. A developer familiar with the Rx library can easily work on code bases with the extensions irrespective of the language or platform
  • Rx has a huge community. Help is always around if you are stuck at a problem. Reactive extensions are being developed for all environments imaginable and the community is very forward thinking. Extensions also specifically target a particular platform. e.g, ReactiveCocoa wraps various aspects of cocoa frameworks with Rx primitives to aid cocoa development

RxSwift Installation

Using Cocoapods, your Podfile will look similiar to this (For testing, you will use a blocking version of RxSwift), before running $pod install:

use_frameworks!

target 'YOUR_TARGET_NAME' do
    pod 'RxSwift',    '~> 3.0'
    pod 'RxCocoa',    '~> 3.0'
end

target 'YOUR_TESTING_TARGET' do
    pod 'RxBlocking', '~> 3.0'
    pod 'RxTest',     '~> 3.0'
end

Using Carthage, your Cartfile will look like the following before running $carthage update

github "ReactiveX/RxSwift" ~> 3.0

Using the Swift Package Manager, your Package.swift will look like the following before running $swift build:

import PackageDescription

let package = Package(
    name: "RxTestProject",
    targets: [],
    dependencies: [
        .Package(url: "https://github.com/ReactiveX/RxSwift.git", majorVersion: 3)
    ]
)

If you want to use it as a git submodule, installation instructions can be found here.

You can run RxSwift in the Playground too. Instructions to run it on the Playgrounds can be found here.

Other Links

You can read more about Rx and the various operators under it here. You can also check the tutorials section for other language Rx implementations. Swift projects using Rx can be browsed here.

What Next?

We will be looking into the core concepts of RxSwift, the operators available, error handling, writing concurrent code and also testing your Rx code. We shall also look at the best architecture for your reactive applications together with platform specific features with a lot of code samples.

That's it for now.

You can view my projects on github.

You can follow me on twitter.

Data Structures and Operations

Let us look at this problem definition. Here, given a word, we need to find all the words that can appear next to it in a word ladder. The list of words is given in a text file. And a word ladder is a sequence of words made by changing one letter at a time. For example, the below sequence defines a word ladder,

cold -> cord -> card -> ward -> warm

What we need to find out are,
1) Given a word, list all the words that can appear next to it in a word ladder
2) Which word has the longest ladder?
3) How many words can be reached, from a source word, in n or fewer steps?

Let us try to come up with a naive algorithm that given a word, it finds all the words that can appear next to it. The code would be like something given below,

In the above naive algorithm, O(n) of time is required. For a given word, we parse the entire list to check if the diff between a pair is 1 and hence the 0(n) time. [The function in the above gist, getDiffCount(), gives the diff count between the pair of words.]

Now, let us try to find the word that has the longest ladder. Making use of the code we have written earlier, we can write the below function,

The runtime of the above code is O(n^2). This is so because we try to get the next words (which is a O(n) operation) for each word which makes the algorithm run in O(n^2) time.

The importance of Data Structures
What if, now, these operations were not one off and are to be repeated often. Using the above naive functions to do these operations is sub-optimal. This is where a good Data Structure can help us. We need to come up with a DS such that we can maintain the state of the program. This is so because during every operation we do not need to create the state afresh.(In the above naive functions, re-creating the state was more expensive than other work we did)

Operations define a Data Structure
How do we choose what Data Structure that we are going to use? The golden rule to effectively choose a Data Structure is to choose it in such a way so as to easily support the operations that we plan to do during the life of the program.

Let us come up with a data structure for the above operations. Looking at the operations that we are going to perform, let us represent each words as an element in the DS as follows,

where, the given word is stored as a string in the element. The noOfLadders contains the value of number of ladders for that particular element. The links contains the list of all the words that can replace the element's word in a word ladder. The next two members in the element struct are there to aid us in traversing the DS specifically to support operation 3 (How many words can be reached, from a source word, in n or fewer steps?).

The data structure (similar to a adjacency lists) can be diagrammatically shown below as,

image001.png

Each node has an element and the links contains the list to the words that can replace the current word.

The code to initialize the vector of elements is given below.

Above, wordToVectorIndex is a map that maps a word to an vector index. Each element is marked as not visited and the distance from source is set as 0. Once the words are collected in the vector, we need to build the ladder DS where we set the links to point to the list of words that can replace the element's word in a word ladder. The code to do that is given below,

Here, we link a pair of words in their respective links if the difference between the words is 1. Note that, the link contains the pointer to the element and not a copy of the element. The built structure is similar to the figure shown earlier.

After building the ladder data structure, we need to update the number of ladders (links) for each element. The code to do that is given below,

Now, let us try to come with the runtime of the operations using our new data structure.

To find the list of words that can appear next to a word in a word ladder, we can use the following code,

The runtime of the above code block is O(m) where m is the number of words that can replace the given word in a ladder. This is very optimal with respect to the naive solution seen earlier.

Similarly, to find the longest ladder in the set of words, it very easy since we have stored the state in each element. Hence the runtime of the operation is 0(n) where n is the number of words. The code to find the longest ladder is given below

Now to find the words that can be reached, from a source word, in n or fewer steps? Using the ladder data structure, we can easily do a depth limited BFS walk from the source to the given depth. The code to do it is given below,

The runtime of the above code block is O(V + E) where V is the number of words in that depth and E is the relationship between the words.

As you can see, we have really improved upon our runtime using the ladder data structure we have built. Remember this - you need a data struture that can help you with your future operations hence operations always define the data structure and not the other way around.

Let me know if anything can be improved.

If you have read this far, you should follow me on twitter.

Also feel free to follow my projects on github.