Substitute Operator

Rocket.substituteFunction
substitute(::Type{T}, mapFn::F, handler::SubstituteHandler) where { T, F <: Function }

This operator forces observable to substitute each emmited value with the latest computed value with the corresponding handler and release! function. After release! on handler substitute operator computes new value with mapFn but does not emit it until next emission from source observable. Always calls mapFn on first value from source observable.

Producing

Stream of type <: Subscribable{T}

Examples

using Rocket

subject = Subject(Int)

handler = SubstituteHandler()
source  = subject |> substitute(String, i -> string("i = ", i), handler)

subscription = subscribe!(source, logger())

next!(subject, 1)
next!(subject, 2)
next!(subject, 3)

release!(handler)

next!(subject, 4)
next!(subject, 5)
next!(subject, 6)

unsubscribe!(subscription)
;

# output

[LogActor] Data: i = 1
[LogActor] Data: i = 1
[LogActor] Data: i = 1
[LogActor] Data: i = 3
[LogActor] Data: i = 3
[LogActor] Data: i = 3

See also: SubstituteHandler

source

See also

Operators