Substitute Operator

Rocket.substituteFunction
substitute(::Type{T}, mapFn::F, handler::SubstituteHandler) where { T, F <: Function }

This operator makes an observable substitute each emitted value with the latest value computed by the corresponding handler and release! function. After release! is called on handler, the substitute operator computes a new value with mapFn, but does not emit it until the next emission from the source observable. It always calls mapFn on the first value from the source observable.

Producing

Stream of type <: Subscribable{T}

Examples

using Rocket

subject = Subject(Int)

handler = SubstituteHandler()
source  = subject |> substitute(String, i -> string("i = ", i), handler)

subscription = subscribe!(source, logger())

next!(subject, 1)
next!(subject, 2)
next!(subject, 3)

release!(handler)

next!(subject, 4)
next!(subject, 5)
next!(subject, 6)

unsubscribe!(subscription)
;

# output

[LogActor] Data: i = 1
[LogActor] Data: i = 1
[LogActor] Data: i = 1
[LogActor] Data: i = 3
[LogActor] Data: i = 3
[LogActor] Data: i = 3

See also: SubstituteHandler

source

See also

Operators