Stream postprocessors
A stream postprocessor is a composable transformation applied to one of the reactive observables produced during graph Activation. It wraps a Rocket.jl observable and returns a new observable of the same element type, leaving the message passing logic itself untouched.
The same postprocessor can be applied to three different kinds of streams produced by the inference engine:
- streams of outbound messages leaving a factor node interface or a leg of an
ReactiveMP.EqualityChain; - streams of marginals emitted by a
RandomVariableor by the local cluster of a factor node; - streams of scores (free-energy contributions) used to assemble Bethe Free Energy.
Stream postprocessors are useful for:
- Scheduling — controlling when downstream subscribers observe updates (e.g. batching a wave of inbound observations into a single propagation step using a
PendingScheduler, or moving work onto a worker thread using anAsyncScheduler). - Custom instrumentation — applying any Rocket.jl operator (filtering, sampling, side-effects) on top of every stream produced by activation.
The previous AbstractPipelineStage API and the per-node scheduler argument have been unified into ReactiveMP.AbstractStreamPostprocessor. The old LoggerPipelineStage is gone — equivalent behaviour can now be achieved through callbacks without subscribing to the streams themselves. The migration guide also covers this change.
Available stream postprocessors
| Postprocessor | Purpose |
|---|---|
nothing | No-op; the implicit default when no postprocessor is attached. The three postprocess_stream_of_* methods all have a ::Nothing fallback that returns the stream unchanged. |
ReactiveMP.ScheduleOnStreamPostprocessor | Redirects every emission to a Rocket.jl scheduler via the schedule_on(scheduler) operator. |
ReactiveMP.CompositeStreamPostprocessor | Applies a sequence of postprocessors in order. |
Composing stream postprocessors
Multiple postprocessors are chained by wrapping them in a ReactiveMP.CompositeStreamPostprocessor:
postprocessor = CompositeStreamPostprocessor((
ScheduleOnStreamPostprocessor(PendingScheduler()),
MyCustomStreamPostprocessor(),
))The output of stage i is fed as the input of stage i + 1, independently for each of the three stream kinds.
Attaching a stream postprocessor
Stream postprocessors are provided when activating a factor node via ReactiveMP.FactorNodeActivationOptions and a random variable via ReactiveMP.RandomVariableActivationOptions. In practice this is done through the model specification layer (e.g. RxInfer.jl's @model macro), but at the low level it looks like:
postprocessor = ScheduleOnStreamPostprocessor(PendingScheduler())
# For a factor node
options = ReactiveMP.FactorNodeActivationOptions(
metadata,
dependencies,
postprocessor, # <-- attached to all streams produced for this node
annotations,
rulefallback,
callbacks,
)
ReactiveMP.activate!(node, options)
# For a random variable
ReactiveMP.activate!(
var,
ReactiveMP.RandomVariableActivationOptions(
postprocessor,
ReactiveMP.MessageProductContext(),
ReactiveMP.MessageProductContext(),
),
)The same postprocessor instance is applied to every outbound message stream, every marginal stream, and every score stream produced by these activations. A subtype of ReactiveMP.AbstractStreamPostprocessor must therefore implement every postprocess_stream_of_* method that the kinds of streams it is attached to will go through; to opt out for a particular kind of stream, just forward the stream unchanged.
Custom stream postprocessors
Custom postprocessors are created by subtyping ReactiveMP.AbstractStreamPostprocessor and implementing one or more of ReactiveMP.postprocess_stream_of_outbound_messages, ReactiveMP.postprocess_stream_of_marginals, and ReactiveMP.postprocess_stream_of_scores:
using Rocket
struct MyStreamPostprocessor <: ReactiveMP.AbstractStreamPostprocessor end
# Postprocess outbound messages — `tap` performs a side effect and forwards
# the value unchanged.
function ReactiveMP.postprocess_stream_of_outbound_messages(::MyStreamPostprocessor, stream)
return stream |> tap(msg -> println("Intercepted: ", msg))
end
# Pass marginals and scores through unchanged.
ReactiveMP.postprocess_stream_of_marginals(::MyStreamPostprocessor, stream) = stream
ReactiveMP.postprocess_stream_of_scores(::MyStreamPostprocessor, stream) = streamIf a postprocessor is attached to a stream whose corresponding postprocess_stream_of_* method is not implemented for it, a MethodError is raised at activation time. To pass a kind of stream through unchanged, simply return the input stream as shown above.
API reference
ReactiveMP.AbstractStreamPostprocessor — Type
AbstractStreamPostprocessorAbstract supertype for stream postprocessors — composable transformations applied to the reactive observables produced during graph activation.
A stream postprocessor wraps a Rocket.jl observable and returns a new observable of the same element type. The same postprocessor can be applied to three different kinds of streams produced by the inference engine, each with its own entry point:
ReactiveMP.postprocess_stream_of_outbound_messages— the stream of outboundMessages leaving a factor node interface (or a leg of anReactiveMP.EqualityChain).ReactiveMP.postprocess_stream_of_marginals— the stream ofMarginals emitted by aRandomVariableor by the local cluster of a factor node.ReactiveMP.postprocess_stream_of_scores— the stream of free-energy contributions.
Stream postprocessors are attached to an inference run via ReactiveMP.FactorNodeActivationOptions and ReactiveMP.RandomVariableActivationOptions. Multiple postprocessors can be chained with ReactiveMP.CompositeStreamPostprocessor.
Built-in implementations
ReactiveMP.ScheduleOnStreamPostprocessor— redirects the computation onto a custom Rocket.jl scheduler (e.g.PendingScheduler,AsyncScheduler).ReactiveMP.CompositeStreamPostprocessor— applies a sequence of postprocessors in order.
See also: ReactiveMP.postprocess_stream_of_outbound_messages, ReactiveMP.postprocess_stream_of_marginals, ReactiveMP.postprocess_stream_of_scores.
ReactiveMP.postprocess_stream_of_outbound_messages — Function
postprocess_stream_of_outbound_messages(postprocessor, stream)Apply postprocessor to a stream of outbound Messages and return the transformed stream. Called by ReactiveMP.activate! on every outbound message stream produced by a factor node interface.
The default fallback for ::Nothing returns stream unchanged. Subtypes of ReactiveMP.AbstractStreamPostprocessor may override this method to e.g. redirect emissions to a Rocket.jl scheduler.
ReactiveMP.postprocess_stream_of_marginals — Function
postprocess_stream_of_marginals(postprocessor, stream)Apply postprocessor to a stream of Marginals and return the transformed stream. Called by ReactiveMP.activate! on every marginal stream produced for a RandomVariable or for a local cluster of a factor node.
The default fallback for ::Nothing returns stream unchanged. Subtypes of ReactiveMP.AbstractStreamPostprocessor may override this method.
ReactiveMP.postprocess_stream_of_scores — Function
postprocess_stream_of_scores(postprocessor, stream)Apply postprocessor to a stream of free-energy score contributions and return the transformed stream.
The default fallback for ::Nothing returns stream unchanged. Subtypes of ReactiveMP.AbstractStreamPostprocessor may override this method.
ReactiveMP.CompositeStreamPostprocessor — Type
CompositeStreamPostprocessor{T} <: AbstractStreamPostprocessorA ReactiveMP.AbstractStreamPostprocessor that applies a sequence of inner postprocessors in order. The output of stage i is fed as the input of stage i + 1, for each of the three stream kinds independently.
Fields
stages::T— a tuple (or any iterable) of postprocessors to apply in order.
Example
composite = CompositeStreamPostprocessor((
ScheduleOnStreamPostprocessor(PendingScheduler()),
MyCustomPostprocessor(),
))See also: ReactiveMP.postprocess_stream_of_outbound_messages, ReactiveMP.postprocess_stream_of_marginals, ReactiveMP.postprocess_stream_of_scores.
ReactiveMP.ScheduleOnStreamPostprocessor — Type
ScheduleOnStreamPostprocessor{S} <: AbstractStreamPostprocessorA ReactiveMP.AbstractStreamPostprocessor that redirects every emission of the wrapped stream onto a Rocket.jl scheduler via the schedule_on(scheduler) operator. This is the standard way to control when downstream subscribers observe updates — for example, to batch a wave of inbound observations into a single propagation step using a PendingScheduler, or to move work onto a worker thread using an AsyncScheduler.
The same scheduler is applied to all three stream kinds (outbound messages, marginals, scores), which makes ScheduleOnStreamPostprocessor the direct successor of the v5/early-v6 ScheduleOnPipelineStage + node-level scheduler pair.
Fields
scheduler::S— a Rocket.jl scheduler. Must be compatible withRocket.schedule_on.
Releasing scheduled updates
If the wrapped scheduler buffers updates (e.g. PendingScheduler), call Rocket.release! on the postprocessor to flush them. release! is also defined for tuples and arrays of ScheduleOnStreamPostprocessors for convenience.
See also: ReactiveMP.AbstractStreamPostprocessor, ReactiveMP.CompositeStreamPostprocessor.