Marginal implementation

Marginal type

All marginals are encoded with the type Marginal.

ReactiveMP.MarginalType
Marginal(data, is_clamped, is_initial[, annotations])

An implementation of a marginal in variational message passing framework.

Arguments

  • data::D: marginal always holds some data object associated with it, which is usually a probability distribution
  • is_clamped::Bool, specifies if this marginal was the result of constant computations (e.g. clamped constants)
  • is_initial::Bool, specifies if this marginal was used for initialization
  • annotations::AnnotationDict: optional annotation dictionary carrying extra metadata (e.g. log-scale, input arguments). Defaults to an empty AnnotationDict().

Example

julia> distribution = Gamma(10.0, 2.0)
Distributions.Gamma{Float64}(α=10.0, θ=2.0)

julia> message = Marginal(distribution, false, true)
Marginal(Distributions.Gamma{Float64}(α=10.0, θ=2.0))

julia> mean(message)
20.0

julia> getdata(message)
Distributions.Gamma{Float64}(α=10.0, θ=2.0)

julia> is_clamped(message)
false

julia> is_initial(message)
true
source

From an implementation point a view the Marginal structure does nothing but hold some data object and redirects most of the statistical related functions to that data object. However, this object is used extensively in Julia's multiple dispatch.

ReactiveMP.as_marginalFunction
as_marginal(any)

A function that converts an instance of Message to an instance of Marginal. For Marginal itself it returns the input unchanged. This is an internal function and is not supposed to be used outside of ReactiveMP package.

source
ReactiveMP.to_marginalFunction
to_marginal(any)

Transforms an input to a proper marginal distribution. Called inside as_marginal. Some nodes do not use Distributions.jl, but instead implement their own equivalents for messages for better efficiency. Effectively to_marginal is needed to convert internal effective implementation to a user-friendly equivalent (e.g. from Distributions.jl). By default does nothing and returns its input, but some nodes may override this behaviour (see for example Wishart and InverseWishart).

Note: This function is a part of the private API and is not intended to be used outside of the ReactiveMP package.

source
using ReactiveMP, BayesBase, ExponentialFamily

distribution  = ExponentialFamily.NormalMeanPrecision(0.0, 1.0)
marginal      = Marginal(distribution, false, true)
Marginal(ExponentialFamily.NormalMeanPrecision{Float64}(μ=0.0, w=1.0))
mean(marginal), precision(marginal)
(0.0, 1.0)
logpdf(marginal, 1.0)
-1.4189385332046727
is_clamped(marginal), is_initial(marginal)
(false, true)

Marginal observable

Within the reactive message passing framework, marginals are not computed once and stored as values — instead they live as streams that continuously emit updated beliefs as new messages arrive. MarginalObservable is the container for such a stream.

ReactiveMP.MarginalObservableType
ReactiveMP.MarginalObservable

A lazy, connectable reactive stream for Marginal values, used as the marginal stream of every variable in the factor graph.

Internally combines two Rocket.jl primitives:

  • a RecentSubject{Marginal} that caches the most recently emitted value, so Rocket.getrecent always returns the latest belief and late subscribers receive it immediately
  • a LazyObservable{Marginal} that is the actual subscription target — initially unconnected, and wired to an upstream source during graph activation via ReactiveMP.connect!

connect!(observable, source) sets the lazy stream to source |> multicast(subject) |> ref_count(): all subscribers share one upstream subscription, and every emission is forwarded through the cached subject. Before the upstream is connected, ReactiveMP.set_initial_marginal! can push an initial belief directly into the subject to seed the graph before inference begins.

See also: ReactiveMP.MessageObservable, ReactiveMP.get_stream_of_marginals, ReactiveMP.set_initial_marginal!

source

Every ReactiveMP.AbstractVariable holds one MarginalObservable, accessed via ReactiveMP.get_stream_of_marginals. The observable starts unconnected: its internal LazyObservable has no upstream source until the factor graph is activated. During activation, ReactiveMP.connect! wires the lazy stream to a computed source (e.g. collectLatest over inbound messages for a ReactiveMP.RandomVariable, or the observation channel for a ReactiveMP.DataVariable). After that point, every message update propagates through the graph and the MarginalObservable emits a fresh Marginal.

The internal RecentSubject ensures that:

  • any subscriber that joins after the first emission immediately receives the current belief via Rocket.getrecent
  • ReactiveMP.set_initial_marginal! can seed an initial value before activation, so that rules which depend on a marginal at iteration zero have something to read

All downstream subscriptions go through the LazyObservable, not the subject directly, so they see the full computed stream rather than only manually pushed values.