Combined Observable
Rocket.combineLatest
— FunctioncombineLatest(sources...; strategy = PushEach())
combineLatest(sources::S, strategy::G = PushEach()) where { S <: Tuple, U }
Combines multiple Observables to create an Observable whose values are calculated from the latest values of each of its input Observables. Accept optimal update strategy object.
Arguments
sources
: input sourcesstrategy
: optional update strategy for batching new values together
Note: combineLatest()
completes immediately if sources
are empty.
Examples
using Rocket
latest = combineLatest(of(1), from(2:5))
subscribe!(latest, logger())
;
# output
[LogActor] Data: (1, 2)
[LogActor] Data: (1, 3)
[LogActor] Data: (1, 4)
[LogActor] Data: (1, 5)
[LogActor] Completed
using Rocket
latest = combineLatest(of(1) |> async(0), from(2:5) |> async(0), strategy = PushNew())
subscribe!(latest, logger())
;
# output
[LogActor] Data: (1, 2)
[LogActor] Completed
See also: Subscribable
, subscribe!
, PushEach
, PushEachBut
, PushNew
, PushNewBut
, PushStrategy
Rocket.PushEach
— TypePushEach
PushEach
update strategy specifies the strategy to emit new value each time an inner observable emit a new value
See also: combineLatest
, PushEachBut
, PushNew
, PushNewBut
, PushStrategy
Rocket.PushEachBut
— TypePushEachBut{I}
PushEachBut
update strategy specifies the strategy to emit new value if and only if an inner observable with index I
have a new value
See also: combineLatest
, PushEach
, PushNew
, PushNewBut
, PushStrategy
Rocket.PushNew
— TypePushNew
PushNew
update strategy specifies the strategy to emit new value if and only if all inner observables have a new value
See also: combineLatest
, PushEach
, PushEachBut
, PushNewBut
, PushStrategy
Rocket.PushNewBut
— TypePushNewBut{I}
PushNewBut{I}
update strategy specifies the strategy to emit new value if and only if all inner observables except with index I
have a new value
See also: combineLatest
, PushEach
, PushEachBut
, PushNew
, PushStrategy
Rocket.PushStrategy
— TypePushStrategy(strategy::BitArray{1})
PushStrategy
update strategy specifies the strategy to emit new value if and only if all inner observables with index such that strategy[index] = false
have a new value
See also: combineLatest
, PushEach
, PushEachBut
, PushNew
, PushNewBut
, collectLatest
Description
combineLatest
combines the values from all Observables in its arguments. This is done by subscribing to each Observable in order and, whenever an Observable emits, collecting a tuple of the most recent values from each Observable (in order). If you pass n
Observables to combineLatest
, the returned Observable will always emit an ordered tuple of n
values.
To ensure that the output tuple has a consistent length, combineLatest
waits for all input Observables to emit at least once before it starts emitting results. This means that if some Observable emits values before other Observables started emitting, all these values but the last will be lost. On the other hand, if some Observable does not emit a value but completes, the resulting Observable will complete simultaneously without emitting anything. Furthermore, if some input Observable does not emit any value and never completes, combineLatest
will also never emit and never complete.
If at least one Observable was passed to combineLatest
and all passed Observables emitted, then the resulting Observable will complete when all combined streams complete. So even if some Observable completes, the result of combineLatest
will still emit values when the other Observables do. In case of a completed Observable, its value will now remain to be the last emitted value. On the other hand, if any Observable errors, combineLatest
will also immediately error.
It is possible to change default update/complete strategy behaviour with an optional strategy
object.