Stream postprocessors

A stream postprocessor is a composable transformation applied to one of the reactive observables produced during graph Activation. It wraps a Rocket.jl observable and returns a new observable of the same element type, leaving the message passing logic itself untouched.

The same postprocessor can be applied to three different kinds of streams produced by the inference engine:

  • streams of outbound messages leaving a factor node interface or a leg of an ReactiveMP.EqualityChain;
  • streams of marginals emitted by a RandomVariable or by the local cluster of a factor node;
  • streams of scores (free-energy contributions) used to assemble Bethe Free Energy.

Stream postprocessors are useful for:

  • Scheduling — controlling when downstream subscribers observe updates (e.g. batching a wave of inbound observations into a single propagation step using a PendingScheduler, or moving work onto a worker thread using an AsyncScheduler).
  • Custom instrumentation — applying any Rocket.jl operator (filtering, sampling, side-effects) on top of every stream produced by activation.
Note

The previous AbstractPipelineStage API and the per-node scheduler argument have been unified into ReactiveMP.AbstractStreamPostprocessor. The old LoggerPipelineStage is gone — equivalent behaviour can now be achieved through callbacks without subscribing to the streams themselves. The migration guide also covers this change.

Available stream postprocessors

PostprocessorPurpose
nothingNo-op; the implicit default when no postprocessor is attached. The three postprocess_stream_of_* methods all have a ::Nothing fallback that returns the stream unchanged.
ReactiveMP.ScheduleOnStreamPostprocessorRedirects every emission to a Rocket.jl scheduler via the schedule_on(scheduler) operator.
ReactiveMP.CompositeStreamPostprocessorApplies a sequence of postprocessors in order.

Composing stream postprocessors

Multiple postprocessors are chained by wrapping them in a ReactiveMP.CompositeStreamPostprocessor:

postprocessor = CompositeStreamPostprocessor((
    ScheduleOnStreamPostprocessor(PendingScheduler()),
    MyCustomStreamPostprocessor(),
))

The output of stage i is fed as the input of stage i + 1, independently for each of the three stream kinds.

Attaching a stream postprocessor

Stream postprocessors are provided when activating a factor node via ReactiveMP.FactorNodeActivationOptions and a random variable via ReactiveMP.RandomVariableActivationOptions. In practice this is done through the model specification layer (e.g. RxInfer.jl's @model macro), but at the low level it looks like:

postprocessor = ScheduleOnStreamPostprocessor(PendingScheduler())

# For a factor node
options = ReactiveMP.FactorNodeActivationOptions(
    metadata,
    dependencies,
    postprocessor,   # <-- attached to all streams produced for this node
    annotations,
    rulefallback,
    callbacks,
)
ReactiveMP.activate!(node, options)

# For a random variable
ReactiveMP.activate!(
    var,
    ReactiveMP.RandomVariableActivationOptions(
        postprocessor,
        ReactiveMP.MessageProductContext(),
        ReactiveMP.MessageProductContext(),
    ),
)

The same postprocessor instance is applied to every outbound message stream, every marginal stream, and every score stream produced by these activations. A subtype of ReactiveMP.AbstractStreamPostprocessor must therefore implement every postprocess_stream_of_* method that the kinds of streams it is attached to will go through; to opt out for a particular kind of stream, just forward the stream unchanged.

Custom stream postprocessors

Custom postprocessors are created by subtyping ReactiveMP.AbstractStreamPostprocessor and implementing one or more of ReactiveMP.postprocess_stream_of_outbound_messages, ReactiveMP.postprocess_stream_of_marginals, and ReactiveMP.postprocess_stream_of_scores:

using Rocket

struct MyStreamPostprocessor <: ReactiveMP.AbstractStreamPostprocessor end

# Postprocess outbound messages — `tap` performs a side effect and forwards
# the value unchanged.
function ReactiveMP.postprocess_stream_of_outbound_messages(::MyStreamPostprocessor, stream)
    return stream |> tap(msg -> println("Intercepted: ", msg))
end

# Pass marginals and scores through unchanged.
ReactiveMP.postprocess_stream_of_marginals(::MyStreamPostprocessor, stream) = stream
ReactiveMP.postprocess_stream_of_scores(::MyStreamPostprocessor, stream)    = stream

If a postprocessor is attached to a stream whose corresponding postprocess_stream_of_* method is not implemented for it, a MethodError is raised at activation time. To pass a kind of stream through unchanged, simply return the input stream as shown above.

API reference

ReactiveMP.AbstractStreamPostprocessorType
AbstractStreamPostprocessor

Abstract supertype for stream postprocessors — composable transformations applied to the reactive observables produced during graph activation.

A stream postprocessor wraps a Rocket.jl observable and returns a new observable of the same element type. The same postprocessor can be applied to three different kinds of streams produced by the inference engine, each with its own entry point:

Stream postprocessors are attached to an inference run via ReactiveMP.FactorNodeActivationOptions and ReactiveMP.RandomVariableActivationOptions. Multiple postprocessors can be chained with ReactiveMP.CompositeStreamPostprocessor.

Built-in implementations

See also: ReactiveMP.postprocess_stream_of_outbound_messages, ReactiveMP.postprocess_stream_of_marginals, ReactiveMP.postprocess_stream_of_scores.

source
ReactiveMP.postprocess_stream_of_outbound_messagesFunction
postprocess_stream_of_outbound_messages(postprocessor, stream)

Apply postprocessor to a stream of outbound Messages and return the transformed stream. Called by ReactiveMP.activate! on every outbound message stream produced by a factor node interface.

The default fallback for ::Nothing returns stream unchanged. Subtypes of ReactiveMP.AbstractStreamPostprocessor may override this method to e.g. redirect emissions to a Rocket.jl scheduler.

source
ReactiveMP.CompositeStreamPostprocessorType
CompositeStreamPostprocessor{T} <: AbstractStreamPostprocessor

A ReactiveMP.AbstractStreamPostprocessor that applies a sequence of inner postprocessors in order. The output of stage i is fed as the input of stage i + 1, for each of the three stream kinds independently.

Fields

  • stages::T — a tuple (or any iterable) of postprocessors to apply in order.

Example

composite = CompositeStreamPostprocessor((
    ScheduleOnStreamPostprocessor(PendingScheduler()),
    MyCustomPostprocessor(),
))

See also: ReactiveMP.postprocess_stream_of_outbound_messages, ReactiveMP.postprocess_stream_of_marginals, ReactiveMP.postprocess_stream_of_scores.

source
ReactiveMP.ScheduleOnStreamPostprocessorType
ScheduleOnStreamPostprocessor{S} <: AbstractStreamPostprocessor

A ReactiveMP.AbstractStreamPostprocessor that redirects every emission of the wrapped stream onto a Rocket.jl scheduler via the schedule_on(scheduler) operator. This is the standard way to control when downstream subscribers observe updates — for example, to batch a wave of inbound observations into a single propagation step using a PendingScheduler, or to move work onto a worker thread using an AsyncScheduler.

The same scheduler is applied to all three stream kinds (outbound messages, marginals, scores), which makes ScheduleOnStreamPostprocessor the direct successor of the v5/early-v6 ScheduleOnPipelineStage + node-level scheduler pair.

Fields

  • scheduler::S — a Rocket.jl scheduler. Must be compatible with Rocket.schedule_on.

Releasing scheduled updates

If the wrapped scheduler buffers updates (e.g. PendingScheduler), call Rocket.release! on the postprocessor to flush them. release! is also defined for tuples and arrays of ScheduleOnStreamPostprocessors for convenience.

See also: ReactiveMP.AbstractStreamPostprocessor, ReactiveMP.CompositeStreamPostprocessor.

source