About subjects

A Rocket.jl Subject is a special type of Observable that allows values to be multicasted to many Actors. While plain Observables are unicast (each subscribed Actor owns an independent execution of the Observable), Subjects are multicast.

Note

A Subject is like an Observable, but can multicast to many Actors. Subjects are like event emitters: they maintain a registry of many listeners.

Every Subject is an Observable. Given a Subject, you can subscribe to it, providing an Actor, which will start receiving values normally. From the perspective of the Actor, it cannot tell whether the Observable execution is coming from a plain unicast Observable or a Subject.

Internally to the Subject, subscribe! does not invoke a new execution that delivers values. Instead, it simply registers the given Actor in a list of Actors.

Every Subject is an Actor itself. It is an object with the methods next!, error!, and complete!. Call next!(subject, theValue) to feed a new value to the Subject, and it will be multicasted to the Actors that listen to the Subject.

In the example below, we have two Observers attached to a Subject, and we feed some values to the Subject:

using Rocket

source = Subject(Int)

subscription1 = subscribe!(source, lambda(
    on_next = (d) -> println("Actor 1: $d")
))

subscription2 = subscribe!(source, lambda(
    on_next = (d) -> println("Actor 2: $d")
))

next!(source, 0)

# Logs
# Actor 1: 0
# Actor 2: 0

unsubscribe!(subscription1)
unsubscribe!(subscription2)

Since a Subject is an actor, this also means you may provide a Subject as the argument to the subscribe of any Observable:

using Rocket

source = Subject(Int)

subscription1 = subscribe!(source, lambda(
    on_next = (d) -> println("Actor 1: $d")
))

subscription2 = subscribe!(source, lambda(
    on_next = (d) -> println("Actor 2: $d")
))

other_source = from([ 1, 2, 3 ])
subscribe!(other_source, source);

# Logs
# Actor 1: 1
# Actor 2: 1
# Actor 1: 2
# Actor 2: 2
# Actor 1: 3
# Actor 2: 3

Here, we essentially convert a unicast Observable execution to multicast, through the Subject. This demonstrates how Subjects offer a unique way to share Observable execution with multiple Observers.

Schedulers

A Subject (and some other observables) uses scheduler objects to schedule message delivery to their listeners.

Synchronous scheduler

One of the variants of schedulers is the AsapScheduler, which delivers every message synchronously. AsapScheduler is a default scheduler for all Subject objects.

Rocket.AsapSchedulerType
AsapScheduler

AsapScheduler executes scheduled actions as soon as possible and does not introduce any additional logic. AsapScheduler is a default scheduler for almost all observables.

source
using Rocket

subject = Subject(Int, scheduler = AsapScheduler())

subscription1 = subscribe!(subject, logger("Actor 1"))

println("Before next")
next!(subject, 1)
next!(subject, 2)
println("After next")

subscription2 = subscribe!(subject, logger("Actor 2"))

next!(subject, 3)

# Logs
# Before next
# [Actor 1] Data: 1
# [Actor 1] Data: 2
# After next
# [Actor 1] Data: 3
# [Actor 2] Data: 3

Asynchronous Scheduler

An AsyncScheduler is similar to a AsapScheduler. Both allows for Subject to scheduler their messages for multiple listeners, but a AsyncScheduler delivers all messages asynchronously (but still ordered) using a Julia's built-in Task object.

Rocket.AsyncSchedulerType
AsyncScheduler

AsyncScheduler executes scheduled actions asynchronously and uses Channel object to order different actions on a single asynchronous task

source
using Rocket

subject = Subject(Int, scheduler = AsyncScheduler())

subscription1 = subscribe!(subject, logger("Actor 1"))

println("Before next")
next!(subject, 1)
next!(subject, 2)
println("After next")

subscription2 = subscribe!(subject, logger("Actor 2"))

next!(subject, 3)

# Logs
# Before next
# After next
# [Actor 1] Data: 1
# [Actor 1] Data: 2
# [Actor 1] Data: 3
# [Actor 2] Data: 3