Operators API

How to create a custom operator

If you need to write an operator that cannot be made from a combination of existing operators, then you can write a custom operator from scratch.

Each operator (e.g. MyFancyOperator) needs to either be (1) a subtype of one of abstract OperatorTrait trait types, or (2) define a

Rocket.as_operator(::Type{<:MyFancyOperator}) = TypedOperatorTrait{T, R}()
# or
Rocket.as_operator(::Type{<:MyFancyOperator}) = InferableOperatorTrait()

trait behavior.

In addition, an operator must implement a specific method for on_call! function with custom logic which has to return another Observable as a result of applying MyFancyOperator to a source.

Rocket.on_call!(::Type{L}, ::Type{R}, operator::MyFancyOperator, source) where L = # some custom logic here

# or
# for inferable trait types you have to specify 'right' type with Rocket.operator_right which should specify a type of data of produced Observable

Rocket.on_call(::Type{L}, ::Type{R}, operator::MyFancyOperator, source) where L = # some custom logic here
Rocket.operator_right(::MyFancyOperator, ::Type{L}) where L = R # where R should be an actual type, Int or even L itself e.g.

It is not allowed to modify the source Observable in any way; you have to return a new observable.

See more examples on custom operators in Traits API section


It might be useful to return a ProxyObservable from an on_call! function. ProxyObservable is a special Observable which proxying actors with the source and/or source with actors.



Typed operator trait specifies operator to be statically typed with input and output data types. Typed operator with input type L and output type R can only operate on input Observable with data type L and will always produce an Observable with data type R.


using Rocket

struct MyTypedOperator <: TypedOperator{Int, Int} end

function Rocket.on_call!(::Type{Int}, ::Type{Int}, op::MyTypedOperator, source)
    return proxy(Int, source, MyTypedOperatorProxy())

struct MyTypedOperatorProxy <: ActorProxy end

Rocket.actor_proxy!(::Type{Int}, ::MyTypedOperatorProxy, actor::A) where A = MyTypedOperatorProxiedActor{A}(actor)

struct MyTypedOperatorProxiedActor{A} <: Actor{Int}
    actor :: A

function Rocket.on_next!(actor::MyTypedOperatorProxiedActor, data::Int)
    # Do something with a data and/or redirect it to actor.actor
    next!(actor.actor, data + 1)

Rocket.on_error!(actor::MyTypedOperatorProxiedActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::MyTypedOperatorProxiedActor)   = complete!(actor.actor)

source = from([ 0, 1, 2 ])
subscribe!(source |> MyTypedOperator(), logger())

# output

[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed

Left typed operator trait specifies operator to be statically typed with input data type. To infer output data type this object should specify a special function operator_right(operator, ::Type{L}) where L which will be used to infer output data type. Left typed operator with input type L can only operate on input Observable with data type L and will always produce an Observable with data type inferred from operator_right(operator, ::Type{L}).


using Rocket

struct CountIntegersOperator <: LeftTypedOperator{Int} end

function Rocket.on_call!(::Type{Int}, ::Type{Tuple{Int, Int}}, op::CountIntegersOperator, source)
    return proxy(Tuple{Int, Int}, source, CountIntegersOperatorProxy())

Rocket.operator_right(::CountIntegersOperator, ::Type{Int}) = Tuple{Int, Int}

struct CountIntegersOperatorProxy <: ActorProxy end

Rocket.actor_proxy!(::Type{Tuple{Int, Int}}, ::CountIntegersOperatorProxy, actor::A) where A = CountIntegersProxiedActor{A}(0, actor)

mutable struct CountIntegersProxiedActor{A} <: Actor{Int}
    current :: Int
    actor   :: A

function Rocket.on_next!(actor::CountIntegersProxiedActor, data::Int)
    current = actor.current
    actor.current += 1
    next!(actor.actor, (current, data)) # e.g.

Rocket.on_error!(actor::CountIntegersProxiedActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::CountIntegersProxiedActor)   = complete!(actor.actor)

source = from([ 0, 0, 0 ])
subscribe!(source |> CountIntegersOperator(), logger())

# output

[LogActor] Data: (0, 0)
[LogActor] Data: (1, 0)
[LogActor] Data: (2, 0)
[LogActor] Completed

Right typed operator trait specifies operator to be statically typed with output data type. It can operate on input Observable with any data type L but will always produce an Observable with data type R.


using Rocket

struct ConvertToFloatOperator <: RightTypedOperator{Float64} end

function Rocket.on_call!(::Type{L}, ::Type{Float64}, op::ConvertToFloatOperator, source) where L
    return proxy(Float64, source, ConvertToFloatProxy{L}())

struct ConvertToFloatProxy{L} <: ActorProxy end

function Rocket.actor_proxy!(::Type{Float64}, proxy::ConvertToFloatProxy{L}, actor::A) where { L, A }
    return ConvertToFloatProxyActor{L, A}(actor)

struct ConvertToFloatProxyActor{L, A} <: Actor{L}
    actor :: A

function Rocket.on_next!(actor::ConvertToFloatProxyActor{L}, data::L) where L
    next!(actor.actor, convert(Float64, data)) # e.g.

Rocket.on_error!(actor::ConvertToFloatProxyActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::ConvertToFloatProxyActor)   = complete!(actor.actor)

source = from([ 1, 2, 3 ])
subscribe!(source |> ConvertToFloatOperator(), logger())

# output

[LogActor] Data: 1.0
[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Completed

Inferable operator trait specifies operator to be statically typed neither with input data type nor with output data type. To infer output data type this object should specify a special function operator_right(operator, ::Type{L}) where L where L is input data type which will be used to infer output data type.

using Rocket

struct IdentityOperator <: InferableOperator end

function Rocket.on_call!(::Type{L}, ::Type{L}, op::IdentityOperator, source) where L
    return proxy(L, source, IdentityProxy())

struct IdentityProxy <: ActorProxy end

Rocket.operator_right(::IdentityOperator, ::Type{L}) where L = L

Rocket.actor_proxy!(::Type{L}, proxy::IdentityProxy, actor::A) where L where A = IdentityProxyActor{L, A}(actor)

struct IdentityProxyActor{L, A} <: Actor{L}
    actor :: A

function Rocket.on_next!(actor::IdentityProxyActor{L}, data::L) where L
    next!(actor.actor, data) # e.g.

Rocket.on_error!(actor::IdentityProxyActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::IdentityProxyActor)   = complete!(actor.actor)

source = from([ 1, 2, 3 ])
subscribe!(source |> IdentityOperator(), logger())

source = from([ 1.0, 2.0, 3.0 ])
subscribe!(source |> IdentityOperator(), logger())

# output

[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed
[LogActor] Data: 1.0
[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Completed

InvalidOperatorTrait trait specifies special 'invalid' behavior and types with such a trait specification cannot be used as an operator for an observable stream. By default any type has InvalidOperatorTrait trait specification

Can be used as a supertype for any operator. Automatically specifies TypedOperatorTrait behavior.


using Rocket

struct MyOperator <: TypedOperator{Float64,String} end

as_operator(MyOperator) === TypedOperatorTrait{Float64,String}()

# output

on_call!(::Type, ::Type, operator, source)

Each operator must implement its own method for on_call! function. This function is used to invoke operator on some Observable and to produce another Observable with new logic (operator specific).

OperatorsComposition is an object which helps to create a composition of multiple operators. To create a composition of two or more operators overloaded + or |> can be used.

using Rocket

composition = map(Int, (d) -> d ^ 2) + filter(d -> d % 2 == 0)

source = from(1:5) |> composition

subscribe!(source, logger())

# output

[LogActor] Data: 4
[LogActor] Data: 16
[LogActor] Completed
using Rocket

composition = map(Int, (d) -> d ^ 2) |> filter(d -> d % 2 == 0)

source = from(1:5) |> composition

subscribe!(source, logger())

# output

[LogActor] Data: 4
[LogActor] Data: 16
[LogActor] Completed
