Observables

Observables are lazy Push collections of multiple values. They fill the missing spot in the following table:

TypeSingleMultiple
PullFunctionIterator
PushPromiseObservable

First example

For example, the following code specifies an Observable that pushes the values 1, 2, 3 immediately (synchronously) when subscribed to, and the value 4 after one second has passed since subscription.

using Rocket

source = make(Int) do actor
    next!(actor, 1)
    next!(actor, 2)
    next!(actor, 3)
    setTimeout(1000) do
        next!(actor, 4)
        complete!(actor)
    end
end

To invoke the Observable and inspect these values, we need to subscribe to it. It is important to note that observables are lazy collections which means they don't emit anything until someone subscribes to it. Every subscription spawns its own independent execution of observable. There are some exceptions to this rule, e.g. Subjects and some operators (share(), etc..) which may change this behaviour

using Rocket

source = make(Int) do actor
    next!(actor, 1)
    next!(actor, 2)
    next!(actor, 3)
    setTimeout(1000) do
        next!(actor, 4)
        complete!(actor)
    end
end

println("Just before subscribe")
subscribe!(source, lambda(
    on_next     = (d) -> println(d),
    on_complete = ()  -> println("Completed")
))
println("Just after subscribe")

# Logs
# Just before subscribe
# 1
# 2
# 3
# Just after subscribe
# 4
# Completed

Pull vs Push

Pull and Push are two different protocols that describe how a data Producer communicates with a data Consumer.

In a Pull system, the Consumer determines when it receives data from the Producer. The Producer itself is unaware of when the data are delivered to the Consumer.

Every Julia Function is a Pull system. The function is a Producer of data, and the code that calls the function is consuming data by "pulling" a return value from the call.

TypePRODUCERCONSUMER
PullPassive: produces data when requested.Active: decides when data is requested.
PushActive: produces data at its own pace.Passive: reacts to received data.

In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.

Futures and promises are the most common type of Push systems today. A Promise (the Producer) delivers a resolved value to registered callbacks (the Consumers). Unlike functions, it is the Promise that determines precisely when a value is "pushed" to the callbacks.

Rocket.jl introduces Observables, a new Push system for Julia. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers or Actors).

  • A Function is a lazily evaluated computation that synchronously returns a single value on invocation.
  • A Generator is a lazily evaluated computation that synchronously returns zero to (potentially) infinite values on iteration.
  • A Promise is a computation that may (or may not) eventually return a single value.
  • An Observable is a lazily evaluated computation that can synchronously or asynchronously return zero to (potentially) infinite values from the time it's invoked.

Observables as generalizations of functions

In contrast to functions, Observables can "return" multiple values over time. For example, functions can't do this:

function foo()
    println("Hello!")
    return 0
    return 1 # Dead code, will never happen
end

Observables, however, can do this:

using Rocket

foo = make(Int) do actor
    next!(actor, 0)
    next!(actor, 1)
    complete!(actor)
end

Observables can also "return" values asynchronously after some time:

using Rocket

foo = make(Int) do actor
    setTimeout(1000) do
        next!(actor, 0)
        complete!(actor)
    end
end

Assume we have a function foo and some observable:

  • Function call foo(args...) means "give me one value synchronously" (pull strategy)
  • In contrast subscription to an observable with subscribe(observable, ...) means "notify me about any amount of values, either synchronously or asynchronously" (push strategy)

When a new value is available in an observable we say that the observable emits or generates an update. Values that are generated by an observable are sometimes also called future values, because there is no principled way to predict when an observable will generate a new value. In some programming languages, observables are referred to as generators or streams and we will use all three terms interchangeably.

Anatomy of an Observable

Observables are (1) created using creation operators (it is also possible to build an Observable from scratch with custom logic); (2) subscribed to with an Actor; (3) execute to deliver next! / error! / complete! notifications to the Actor, and (4) their execution may be disposed. These four aspects are all encoded in an Observable instance, but some of these aspects are related to other types, such as Subscribable and Subscription.

The core responsibilities of an Observable are:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables

Creating Observables

You can create an Observable in various ways using Creation operators. You can also build an Observable from scratch. To see how you can build an Observable with custom logic, consult the API Section.

Subscribing to Observables

The Observable source in the example can be subscribed to.

using Rocket

subscribe!(source, lambda(
    on_next = (d) -> println(d)
))

This example shows how subscribe calls are not shared among multiple Actors of the same Observable. When calling subscribe! with an Actor, the function on_subscribe! that is attached to this particular Observable is executed for that given actor. Each call to subscribe! triggers its own independent setup for that given actor.

Note

Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

The subscribe! function also supports multiple subscriptions at once. If the input argument to the subscribe! function is a tuple or a vector, it will first check that all of the arguments are valid source objects and actors and if its true will subscribe from each of them individually.


source1 = Subject(Int)
source2 = Subject(Int)

subscriptions = subscribe!([
    (source1, logger()),
    (source2, logger()),
])

# Later on
# unsubscribe!(subscriptions)

Executing Observables

The execution produces multiple values over time, either synchronously or asynchronously.

An Observable Execution can deliver three types of notifications:

  • Next: sends a value, such as an Int, String, Dict, etc.;
  • Error: sends any error as a value;
  • Complete: does not send a value.

"Next" notifications are the most important and most common type: they represent actual data being delivered to an subscriber. "Error" and "Complete" notifications terminate the Observable Execution.

Note

In an Observable Execution, any number of Next notifications may be delivered. However, once a single Error or Complete notification is delivered, nothing else can be delivered afterwards.

The following is an example of an Observable execution that delivers three Next notifications and subsequently completes:

using Rocket

source = make(Int) do actor
    next!(actor, 1)
    next!(actor, 2)
    next!(actor, 3)
    complete!(actor)
end

# or the same with creation operator

source = from([ 1, 2, 3 ])

It is advised to wrap any code in subscribe by a try/catch block that delivers an Error notification upon an exception:

using Rocket

source = make(Int) do actor
    try
        next!(actor, 1)
        next!(actor, 2)
        next!(actor, 3)
        complete!(actor)
    catch e
        error!(actor, e)
    end
end

Disposing Observable Executions

It is common for an Actor to abort execution of an Observable Execution. Once the Actor is done receiving values, it may stop the execution in order to free computation power or memory resources.

When subscribe! is called, the Actor gets attached to the newly created Observable execution. This call also returns an object, the Subscription:

subscription = subscribe!(source, actor)

The Subscription represents the ongoing execution, and has a minimal API that allows you to cancel the execution. Read more about Subscription type here.

With

unsubscribe!(subscription)

you can cancel the ongoing execution.

Note

subscribe! returns a Subscription that represents the ongoing execution. Simply call unsubscribe! on the Subscription to cancel the execution.