Operators API

How to create a custom operator

If you need to write an operator that cannot be made from a combination of existing operators, then you can write a custom operator from scratch.

Each operator (e.g. MyFancyOperator) needs to either be (1) a subtype of one of abstract OperatorTrait trait types, or (2) define a

Rocket.as_operator(::Type{<:MyFancyOperator}) = TypedOperatorTrait{T, R}()
# or
Rocket.as_operator(::Type{<:MyFancyOperator}) = InferableOperatorTrait()

trait behavior.

In addition, an operator must implement a specific method for on_call! function with custom logic which has to return another Observable as a result of applying MyFancyOperator to a source.

Rocket.on_call!(::Type{L}, ::Type{R}, operator::MyFancyOperator, source) where L = # some custom logic here

# or
# for inferable trait types you have to specify 'right' type with Rocket.operator_right which should specify a type of data of produced Observable

Rocket.on_call(::Type{L}, ::Type{R}, operator::MyFancyOperator, source) where L = # some custom logic here
Rocket.operator_right(::MyFancyOperator, ::Type{L}) where L = R # where R should be an actual type, Int or even L itself e.g.
Note

It is not allowed to modify the source Observable in any way; you have to return a new observable.

See more examples on custom operators in Traits API section

Note

It might be useful to return a ProxyObservable from an on_call! function. ProxyObservable is a special Observable which proxying actors with the source and/or source with actors.

Traits

Rocket.TypedOperatorTraitType

Typed operator trait specifies operator to be statically typed with input and output data types. Typed operator with input type L and output type R can only operate on input Observable with data type L and will always produce an Observable with data type R.

Examples

using Rocket

struct MyTypedOperator <: TypedOperator{Int, Int} end

function Rocket.on_call!(::Type{Int}, ::Type{Int}, op::MyTypedOperator, source)
    return proxy(Int, source, MyTypedOperatorProxy())
end

struct MyTypedOperatorProxy <: ActorProxy end

Rocket.actor_proxy!(::Type{Int}, ::MyTypedOperatorProxy, actor::A) where A = MyTypedOperatorProxiedActor{A}(actor)

struct MyTypedOperatorProxiedActor{A} <: Actor{Int}
    actor :: A
end

function Rocket.on_next!(actor::MyTypedOperatorProxiedActor, data::Int)
    # Do something with a data and/or redirect it to actor.actor
    next!(actor.actor, data + 1)
end

Rocket.on_error!(actor::MyTypedOperatorProxiedActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::MyTypedOperatorProxiedActor)   = complete!(actor.actor)

source = from([ 0, 1, 2 ])
subscribe!(source |> MyTypedOperator(), logger())
;

# output

[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed

See also: OperatorTrait, TypedOperator, OperatorTrait, ProxyObservable, ActorProxy, logger

source
Rocket.LeftTypedOperatorTraitType

Left typed operator trait specifies operator to be statically typed with input data type. To infer output data type this object should specify a special function operator_right(operator, ::Type{L}) where L which will be used to infer output data type. Left typed operator with input type L can only operate on input Observable with data type L and will always produce an Observable with data type inferred from operator_right(operator, ::Type{L}).

Examples

using Rocket

struct CountIntegersOperator <: LeftTypedOperator{Int} end

function Rocket.on_call!(::Type{Int}, ::Type{Tuple{Int, Int}}, op::CountIntegersOperator, source)
    return proxy(Tuple{Int, Int}, source, CountIntegersOperatorProxy())
end

Rocket.operator_right(::CountIntegersOperator, ::Type{Int}) = Tuple{Int, Int}

struct CountIntegersOperatorProxy <: ActorProxy end

Rocket.actor_proxy!(::Type{Tuple{Int, Int}}, ::CountIntegersOperatorProxy, actor::A) where A = CountIntegersProxiedActor{A}(0, actor)

mutable struct CountIntegersProxiedActor{A} <: Actor{Int}
    current :: Int
    actor   :: A
end

function Rocket.on_next!(actor::CountIntegersProxiedActor, data::Int)
    current = actor.current
    actor.current += 1
    next!(actor.actor, (current, data)) # e.g.
end

Rocket.on_error!(actor::CountIntegersProxiedActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::CountIntegersProxiedActor)   = complete!(actor.actor)

source = from([ 0, 0, 0 ])
subscribe!(source |> CountIntegersOperator(), logger())
;

# output

[LogActor] Data: (0, 0)
[LogActor] Data: (1, 0)
[LogActor] Data: (2, 0)
[LogActor] Completed

See also: OperatorTrait, LeftTypedOperator, operator_right, OperatorTrait, ProxyObservable, ActorProxy, enumerate, logger

source
Rocket.RightTypedOperatorTraitType

Right typed operator trait specifies operator to be statically typed with output data type. It can operate on input Observable with any data type L but will always produce an Observable with data type R.

Examples

using Rocket

struct ConvertToFloatOperator <: RightTypedOperator{Float64} end

function Rocket.on_call!(::Type{L}, ::Type{Float64}, op::ConvertToFloatOperator, source) where L
    return proxy(Float64, source, ConvertToFloatProxy{L}())
end

struct ConvertToFloatProxy{L} <: ActorProxy end

function Rocket.actor_proxy!(::Type{Float64}, proxy::ConvertToFloatProxy{L}, actor::A) where { L, A }
    return ConvertToFloatProxyActor{L, A}(actor)
end

struct ConvertToFloatProxyActor{L, A} <: Actor{L}
    actor :: A
end

function Rocket.on_next!(actor::ConvertToFloatProxyActor{L}, data::L) where L
    next!(actor.actor, convert(Float64, data)) # e.g.
end

Rocket.on_error!(actor::ConvertToFloatProxyActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::ConvertToFloatProxyActor)   = complete!(actor.actor)

source = from([ 1, 2, 3 ])
subscribe!(source |> ConvertToFloatOperator(), logger())
;

# output

[LogActor] Data: 1.0
[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Completed

See also: OperatorTrait, RightTypedOperator, OperatorTrait, ProxyObservable, ActorProxy, logger

source
Rocket.InferableOperatorTraitType

Inferable operator trait specifies operator to be statically typed neither with input data type nor with output data type. To infer output data type this object should specify a special function operator_right(operator, ::Type{L}) where L where L is input data type which will be used to infer output data type.

using Rocket

struct IdentityOperator <: InferableOperator end

function Rocket.on_call!(::Type{L}, ::Type{L}, op::IdentityOperator, source) where L
    return proxy(L, source, IdentityProxy())
end

struct IdentityProxy <: ActorProxy end

Rocket.operator_right(::IdentityOperator, ::Type{L}) where L = L

Rocket.actor_proxy!(::Type{L}, proxy::IdentityProxy, actor::A) where L where A = IdentityProxyActor{L, A}(actor)

struct IdentityProxyActor{L, A} <: Actor{L}
    actor :: A
end

function Rocket.on_next!(actor::IdentityProxyActor{L}, data::L) where L
    next!(actor.actor, data) # e.g.
end

Rocket.on_error!(actor::IdentityProxyActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::IdentityProxyActor)   = complete!(actor.actor)

source = from([ 1, 2, 3 ])
subscribe!(source |> IdentityOperator(), logger())

source = from([ 1.0, 2.0, 3.0 ])
subscribe!(source |> IdentityOperator(), logger())
;

# output

[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed
[LogActor] Data: 1.0
[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Completed

See also: OperatorTrait, InferableOperator, operator_right, OperatorTrait, ProxyObservable, ActorProxy, logger

source
Rocket.InvalidOperatorTraitType

InvalidOperatorTrait trait specifies special 'invalid' behavior and types with such a trait specification cannot be used as an operator for an observable stream. By default any type has InvalidOperatorTrait trait specification

See also: OperatorTrait

source

Types

Rocket.TypedOperatorType

Can be used as a supertype for any operator. Automatically specifies TypedOperatorTrait behavior.

Examples

using Rocket

struct MyOperator <: TypedOperator{Float64,String} end

as_operator(MyOperator) === TypedOperatorTrait{Float64,String}()

# output
true

See also: AbstractOperator, TypedOperatorTrait

source
Rocket.on_call!Function
on_call!(::Type, ::Type, operator, source)

Each operator must implement its own method for on_call! function. This function is used to invoke operator on some Observable and to produce another Observable with new logic (operator specific).

See also: AbstractOperator

source
Rocket.OperatorsCompositionType
OperatorsComposition(operators)

OperatorsComposition is an object which helps to create a composition of multiple operators. To create a composition of two or more operators overloaded + or |> can be used.

using Rocket

composition = map(Int, (d) -> d ^ 2) + filter(d -> d % 2 == 0)

source = from(1:5) |> composition

subscribe!(source, logger())
;

# output

[LogActor] Data: 4
[LogActor] Data: 16
[LogActor] Completed
using Rocket

composition = map(Int, (d) -> d ^ 2) |> filter(d -> d % 2 == 0)

source = from(1:5) |> composition

subscribe!(source, logger())
;

# output

[LogActor] Data: 4
[LogActor] Data: 16
[LogActor] Completed
source

Errors