Collected Observable

Rocket.collectLatestFunction
collectLatest(sources::S, mappingFn::F = copy, callbackFn::C = nothing)
collectLatest(::Type{T}, ::Type{R}, sources::S, mappingFn::F = copy, callbackFn::C = nothing)

Collects values from multible Observables and emits it in one single array every time each inner Observable has a new value. Reemits errors from inner observables. Completes when all inner observables completes.

Arguments

  • sources: input sources
  • mappingFn: optional mappingFn applied to an array of emited values, copy by default, should return a Vector
  • callbackFn: optional callback function, which is called right after mappingFn has been evaluated, accepts the state of the inner actor and the computed value, nothing by default

Note: collectLatest completes immediately if sources are empty.

Optional arguments

  • ::Type{T}: optional type of emmiting values of inner observables
  • ::Type{R}: optional return type after applying mappingFn to a vector of values

Examples

using Rocket

collected = collectLatest([ of(1), from([ 1, 2 ]) ])

subscribe!(collected, logger())
;

# output

[LogActor] Data: [1, 1]
[LogActor] Data: [1, 2]
[LogActor] Completed

See also: Subscribable, subscribe!, combineLatest

source

Description

collectLatest collects the values from all Observables in its vector argument. This is done by subscribing to each Observable in order and, whenever an Observable emits, collecting a vector of the most recent values from each Observable (in order). If you pass n Observables to collectLatest, the returned Observable will always emit an ordered vector of n values.

To ensure that the output vector has a consistent length, collectLatest waits for all input Observables to emit at least once before it starts emitting results. This means that if some Observable emits values before other Observables started emitting, all these values but the last will be lost. On the other hand, if some Observable does not emit a value but completes, the resulting Observable will complete simultaneously without emitting anything. Furthermore, if some input Observable does not emit any value and never completes, collectLatest will also never emit and never complete.

If at least one Observable was passed to collectLatest and all passed Observables emitted, then the resulting Observable will complete when all combined streams complete. So even if some Observable completes, the result of collectLatest will still emit values when the other Observables do. In case of a completed Observable, its value will now remain to be the last emitted value. On the other hand, if any Observable errors, collectLatest will also immediately error.