Operators API
How to create a custom operator
If you need to write an operator that cannot be made from a combination of existing operators, then you can write a custom operator from scratch.
Each operator (e.g. MyFancyOperator
) needs to either be (1) a subtype of one of abstract OperatorTrait
trait types, or (2) define a
Rocket.as_operator(::Type{<:MyFancyOperator}) = TypedOperatorTrait{T, R}()
# or
Rocket.as_operator(::Type{<:MyFancyOperator}) = InferableOperatorTrait()
trait behavior.
In addition, an operator must implement a specific method for on_call!
function with custom logic which has to return another Observable as a result of applying MyFancyOperator
to a source
.
Rocket.on_call!(::Type{L}, ::Type{R}, operator::MyFancyOperator, source) where L = # some custom logic here
# or
# for inferable trait types you have to specify 'right' type with Rocket.operator_right which should specify a type of data of produced Observable
Rocket.on_call(::Type{L}, ::Type{R}, operator::MyFancyOperator, source) where L = # some custom logic here
Rocket.operator_right(::MyFancyOperator, ::Type{L}) where L = R # where R should be an actual type, Int or even L itself e.g.
It is not allowed to modify the source
Observable in any way; you have to return a new observable.
See more examples on custom operators in Traits API section
It might be useful to return a ProxyObservable from an on_call!
function. ProxyObservable is a special Observable which proxying actors with the source and/or source with actors.
Traits
Rocket.OperatorTrait
— TypeAbstract type for all possible operator traits
See also: TypedOperatorTrait
, LeftTypedOperatorTrait
, RightTypedOperatorTrait
, InferableOperatorTrait
, InvalidOperatorTrait
,
Rocket.as_operator
— Functionas_operator(any)
This function checks operator trait behavior. May be used explicitly to specify operator trait behavior for any object.
See also: OperatorTrait
, AbstractOperator
Rocket.TypedOperatorTrait
— TypeTyped operator trait specifies operator to be statically typed with input and output data types. Typed operator with input type L
and output type R
can only operate on input Observable with data type L
and will always produce an Observable with data type R
.
Examples
using Rocket
struct MyTypedOperator <: TypedOperator{Int, Int} end
function Rocket.on_call!(::Type{Int}, ::Type{Int}, op::MyTypedOperator, source)
return proxy(Int, source, MyTypedOperatorProxy())
end
struct MyTypedOperatorProxy <: ActorProxy end
Rocket.actor_proxy!(::Type{Int}, ::MyTypedOperatorProxy, actor::A) where A = MyTypedOperatorProxiedActor{A}(actor)
struct MyTypedOperatorProxiedActor{A} <: Actor{Int}
actor :: A
end
function Rocket.on_next!(actor::MyTypedOperatorProxiedActor, data::Int)
# Do something with a data and/or redirect it to actor.actor
next!(actor.actor, data + 1)
end
Rocket.on_error!(actor::MyTypedOperatorProxiedActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::MyTypedOperatorProxiedActor) = complete!(actor.actor)
source = from([ 0, 1, 2 ])
subscribe!(source |> MyTypedOperator(), logger())
;
# output
[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed
See also: OperatorTrait
, TypedOperator
, OperatorTrait
, ProxyObservable
, ActorProxy
, logger
Rocket.LeftTypedOperatorTrait
— TypeLeft typed operator trait specifies operator to be statically typed with input data type. To infer output data type this object should specify a special function operator_right(operator, ::Type{L}) where L
which will be used to infer output data type. Left typed operator with input type L
can only operate on input Observable with data type L
and will always produce an Observable with data type inferred from operator_right(operator, ::Type{L})
.
Examples
using Rocket
struct CountIntegersOperator <: LeftTypedOperator{Int} end
function Rocket.on_call!(::Type{Int}, ::Type{Tuple{Int, Int}}, op::CountIntegersOperator, source)
return proxy(Tuple{Int, Int}, source, CountIntegersOperatorProxy())
end
Rocket.operator_right(::CountIntegersOperator, ::Type{Int}) = Tuple{Int, Int}
struct CountIntegersOperatorProxy <: ActorProxy end
Rocket.actor_proxy!(::Type{Tuple{Int, Int}}, ::CountIntegersOperatorProxy, actor::A) where A = CountIntegersProxiedActor{A}(0, actor)
mutable struct CountIntegersProxiedActor{A} <: Actor{Int}
current :: Int
actor :: A
end
function Rocket.on_next!(actor::CountIntegersProxiedActor, data::Int)
current = actor.current
actor.current += 1
next!(actor.actor, (current, data)) # e.g.
end
Rocket.on_error!(actor::CountIntegersProxiedActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::CountIntegersProxiedActor) = complete!(actor.actor)
source = from([ 0, 0, 0 ])
subscribe!(source |> CountIntegersOperator(), logger())
;
# output
[LogActor] Data: (0, 0)
[LogActor] Data: (1, 0)
[LogActor] Data: (2, 0)
[LogActor] Completed
See also: OperatorTrait
, LeftTypedOperator
, operator_right
, OperatorTrait
, ProxyObservable
, ActorProxy
, enumerate
, logger
Rocket.RightTypedOperatorTrait
— TypeRight typed operator trait specifies operator to be statically typed with output data type. It can operate on input Observable with any data type L
but will always produce an Observable with data type R
.
Examples
using Rocket
struct ConvertToFloatOperator <: RightTypedOperator{Float64} end
function Rocket.on_call!(::Type{L}, ::Type{Float64}, op::ConvertToFloatOperator, source) where L
return proxy(Float64, source, ConvertToFloatProxy{L}())
end
struct ConvertToFloatProxy{L} <: ActorProxy end
function Rocket.actor_proxy!(::Type{Float64}, proxy::ConvertToFloatProxy{L}, actor::A) where { L, A }
return ConvertToFloatProxyActor{L, A}(actor)
end
struct ConvertToFloatProxyActor{L, A} <: Actor{L}
actor :: A
end
function Rocket.on_next!(actor::ConvertToFloatProxyActor{L}, data::L) where L
next!(actor.actor, convert(Float64, data)) # e.g.
end
Rocket.on_error!(actor::ConvertToFloatProxyActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::ConvertToFloatProxyActor) = complete!(actor.actor)
source = from([ 1, 2, 3 ])
subscribe!(source |> ConvertToFloatOperator(), logger())
;
# output
[LogActor] Data: 1.0
[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Completed
See also: OperatorTrait
, RightTypedOperator
, OperatorTrait
, ProxyObservable
, ActorProxy
, logger
Rocket.InferableOperatorTrait
— TypeInferable operator trait specifies operator to be statically typed neither with input data type nor with output data type. To infer output data type this object should specify a special function operator_right(operator, ::Type{L}) where L
where L
is input data type which will be used to infer output data type.
using Rocket
struct IdentityOperator <: InferableOperator end
function Rocket.on_call!(::Type{L}, ::Type{L}, op::IdentityOperator, source) where L
return proxy(L, source, IdentityProxy())
end
struct IdentityProxy <: ActorProxy end
Rocket.operator_right(::IdentityOperator, ::Type{L}) where L = L
Rocket.actor_proxy!(::Type{L}, proxy::IdentityProxy, actor::A) where L where A = IdentityProxyActor{L, A}(actor)
struct IdentityProxyActor{L, A} <: Actor{L}
actor :: A
end
function Rocket.on_next!(actor::IdentityProxyActor{L}, data::L) where L
next!(actor.actor, data) # e.g.
end
Rocket.on_error!(actor::IdentityProxyActor, err) = error!(actor.actor, err)
Rocket.on_complete!(actor::IdentityProxyActor) = complete!(actor.actor)
source = from([ 1, 2, 3 ])
subscribe!(source |> IdentityOperator(), logger())
source = from([ 1.0, 2.0, 3.0 ])
subscribe!(source |> IdentityOperator(), logger())
;
# output
[LogActor] Data: 1
[LogActor] Data: 2
[LogActor] Data: 3
[LogActor] Completed
[LogActor] Data: 1.0
[LogActor] Data: 2.0
[LogActor] Data: 3.0
[LogActor] Completed
See also: OperatorTrait
, InferableOperator
, operator_right
, OperatorTrait
, ProxyObservable
, ActorProxy
, logger
Rocket.InvalidOperatorTrait
— TypeInvalidOperatorTrait trait specifies special 'invalid' behavior and types with such a trait specification cannot be used as an operator for an observable stream. By default any type has InvalidOperatorTrait trait specification
See also: OperatorTrait
Types
Rocket.AbstractOperator
— TypeSupertype for all operators
See also: TypedOperator
, LeftTypedOperator
, RightTypedOperator
, InferableOperator
Rocket.TypedOperator
— TypeCan be used as a supertype for any operator. Automatically specifies TypedOperatorTrait behavior.
Examples
using Rocket
struct MyOperator <: TypedOperator{Float64,String} end
as_operator(MyOperator) === TypedOperatorTrait{Float64,String}()
# output
true
See also: AbstractOperator
, TypedOperatorTrait
Rocket.LeftTypedOperator
— TypeCan be used as a supertype for any operator. Automatically specifies LeftTypedOperatorTrait behavior.
Examples
using Rocket
struct MyOperator <: LeftTypedOperator{Float64} end
as_operator(MyOperator) === LeftTypedOperatorTrait{Float64}()
# output
true
See also: AbstractOperator
, LeftTypedOperatorTrait
, operator_right
Rocket.RightTypedOperator
— TypeCan be used as a supertype for any operator. Automatically specifies RightTypedOperatorTrait behavior.
Examples
using Rocket
struct MyOperator <: RightTypedOperator{Float64} end
as_operator(MyOperator) === RightTypedOperatorTrait{Float64}()
# output
true
See also: AbstractOperator
, RightTypedOperatorTrait
Rocket.InferableOperator
— TypeCan be used as a supertype for any operator. Automatically specifies InferableOperatorTrait behavior.
Examples
using Rocket
struct MyOperator <: InferableOperator end
as_operator(MyOperator) === InferableOperatorTrait()
# output
true
See also: AbstractOperator
, InferableOperatorTrait
, operator_right
Rocket.on_call!
— Functionon_call!(::Type, ::Type, operator, source)
Each operator must implement its own method for on_call!
function. This function is used to invoke operator on some Observable and to produce another Observable with new logic (operator specific).
See also: AbstractOperator
Rocket.operator_right
— Functionoperator_right(operator, L)
Both LeftTypedOperator and InferableOperator must implement its own method for operator_right
function. This function is used to infer type of data of output Observable given the type of data of input Observable.
See also: AbstractOperator
, LeftTypedOperator
, InferableOperator
Rocket.OperatorsComposition
— TypeOperatorsComposition(operators)
OperatorsComposition is an object which helps to create a composition of multiple operators. To create a composition of two or more operators overloaded +
or |>
can be used.
using Rocket
composition = map(Int, (d) -> d ^ 2) + filter(d -> d % 2 == 0)
source = from(1:5) |> composition
subscribe!(source, logger())
;
# output
[LogActor] Data: 4
[LogActor] Data: 16
[LogActor] Completed
using Rocket
composition = map(Int, (d) -> d ^ 2) |> filter(d -> d % 2 == 0)
source = from(1:5) |> composition
subscribe!(source, logger())
;
# output
[LogActor] Data: 4
[LogActor] Data: 16
[LogActor] Completed
Errors
Rocket.InvalidOperatorTraitUsageError
— TypeThis error will be thrown if |>
pipe operator is called with invalid operator object
See also: on_call!
Rocket.InconsistentSourceOperatorDataTypesError
— TypeThis error will be thrown if |>
pipe operator is called with inconsistent data type
See also: on_call!
Rocket.MissingOnCallImplementationError
— TypeThis error will be thrown if Julia cannot find specific method of on_call!
function for a given operator.
See also: on_call!
Rocket.MissingOperatorRightImplementationError
— TypeThis error will be thrown if Julia cannot find specific method of operator_right
function for a given operator.
See also: operator_right