Substitute Operator
Rocket.substitute
— Functionsubstitute(::Type{T}, mapFn::F, handler::SubstituteHandler) where { T, F <: Function }
This operator forces observable to substitute each emmited value with the latest computed value with the corresponding handler
and release!
function. After release!
on handler
substitute
operator computes new value with mapFn
but does not emit it until next emission from source observable. Always calls mapFn
on first value from source observable.
Producing
Stream of type <: Subscribable{T}
Examples
using Rocket
subject = Subject(Int)
handler = SubstituteHandler()
source = subject |> substitute(String, i -> string("i = ", i), handler)
subscription = subscribe!(source, logger())
next!(subject, 1)
next!(subject, 2)
next!(subject, 3)
release!(handler)
next!(subject, 4)
next!(subject, 5)
next!(subject, 6)
unsubscribe!(subscription)
;
# output
[LogActor] Data: i = 1
[LogActor] Data: i = 1
[LogActor] Data: i = 1
[LogActor] Data: i = 3
[LogActor] Data: i = 3
[LogActor] Data: i = 3
See also: SubstituteHandler
Rocket.SubstituteHandler
— Type