Getting started
Rocket.jl is a Julia package for reactive programming that makes it easier to work with asynchronous data. It is inspired by the RxJS and ReactiveX communities.
In order to combine good performance with a convenient API, Rocket.jl employs Observer patterns, Actor models and Functional programming.
Installation
Install Rocket.jl through the Julia package manager:
] add Rocket
Concepts
Rocket.jl has been designed with a focus on performance and modularity.
The essential concepts in Rocket.jl are:
- Observable: represents a collection of future messages (data or/and events).
- Actor: is an object that knows how to react on incoming messages delivered by the Observable.
- Subscription: represents a teardown logic that is useful for cancelling the execution of an Observable.
- Operator: an object that deals with collection operations, such as
map
,filter
,reduce
, etc. - Subject: the way of multicasting a message to multiple Observers.
First example
Conventionally, arrays are used for processing data.
for value in array_of_values
doSomethingWithMyData(value)
end
In contrast, Rocket.jl uses observables.
subscription = subscribe!(source_of_values, lambda(
on_next = (data) -> doSomethingWithMyData(data),
on_error = (error) -> doSomethingWithAnError(error),
on_complete = () -> println("Completed!")
))
At some point in time you may decide to stop listening for new messages.
unsubscribe!(subscription)
Actors
In order to process messages from an observable you will need to define an Actor that knows how to react to incoming messages.
struct MyActor <: Rocket.Actor{Int} end
Rocket.on_next!(actor::MyActor, data::Int) = doSomethingWithMyData(data)
Rocket.on_error!(actor::MyActor, error) = doSomethingWithAnError(error)
Rocket.on_complete!(actor::MyActor) = println("Completed!")
An actor can also have its own local state.
struct StoreActor{D} <: Rocket.Actor{D}
values :: Vector{D}
StoreActor{D}() where D = new(Vector{D}())
end
Rocket.on_next!(actor::StoreActor{D}, data::D) where D = push!(actor.values, data)
Rocket.on_error!(actor::StoreActor, error) = doSomethingWithAnError(error)
Rocket.on_complete!(actor::StoreActor) = println("Completed: $(actor.values)")
For debugging purposes you can use a general LambdaActor
actor or just pass a function object as an actor in subscribe!
function..
Operators
What makes Rocket.jl powerful is its ability to help you process, transform and modify the messages that flow through your observables, using Operators.
subscribe!(squared_int_values |> map(Int, (d) -> d ^ 2), lambda(
on_next = (data) -> println(data)
))