Messages implementation

In the message passing framework, one of the most important concepts is the message. Given a factor graph, messages are arbitrary functions that flow along the edges of the graph and hold information about the part of the graph from which they originate.

Message as a distribution

Often, a message can be represented in the form of a probability distribution, as a probability distribution can be matched with its probability density function. The representation of messages as probability distributions is not only for convenience but also for performance reasons. For example, a univariate Gaussian distribution can be parameterized with two numbers, which significantly reduce the amount of information needed to pass along the edges of the graph.

using StatsPlots, Distributions

plot(Normal(0.0, 1.0), label = "Univariate Gaussian distribution", fillalpha = 0.4, fill = 0)

Variational Message Passing

The message passing technique is useful for finding the posterior distribution over certain parameters in a model, originating from exact Bayesian inference, which is also known as Belief Propagation. However, the message passing technique can also be used to find approximate solutions to posteriors - a technique known as Variational inference. The ReactiveMP.jl package implements Variational Message Passing since it is a more general form than exact inference, and also because the exact solution can be framed as an approximate solution subject to no constraints. Here are visual schematics of the differences between messages in Belief propagation and Variational inference.

Belief-Propagation (or Sum-Product) message

message Belief propagation message

Variational message

message Variational message with structured factorisation q(x, y)q(z) assumption

Message type

All messages are encoded with the type Message.

ReactiveMP.MessageType
Message(data, is_clamped, is_initial[, annotations])

An implementation of a message in variational message passing framework.

Arguments

  • data::D: message always holds some data object associated with it, which is usually a probability distribution, but can also be an arbitrary function
  • is_clamped::Bool, specifies if this message was the result of constant computations (e.g. clamped constants)
  • is_initial::Bool, specifies if this message was used for initialization
  • annotations::AnnotationDict: optional annotation dictionary carrying extra metadata (e.g. log-scale, input arguments). Defaults to an empty AnnotationDict().

Example

julia> distribution = Gamma(10.0, 2.0)
Distributions.Gamma{Float64}(α=10.0, θ=2.0)

julia> message = Message(distribution, false, true)
Message(Distributions.Gamma{Float64}(α=10.0, θ=2.0))

julia> mean(message)
20.0

julia> getdata(message)
Distributions.Gamma{Float64}(α=10.0, θ=2.0)

julia> is_clamped(message)
false

julia> is_initial(message)
true
source

From an implementation point a view the Message structure does nothing but hold some data object and redirects most of the statistical related functions to that data object. However, this object is used extensively in Julia's multiple dispatch. Our implementation also uses extra is_initial and is_clamped fields to determine if product of two messages results in is_initial or is_clamped posterior marginal. Each message also carries an AnnotationDict for optional metadata such as log-scale factors or computation history (see Annotations).

ReactiveMP.as_messageFunction
as_message(::AbstractMessage)

A function that converts an abstract message to an instance of Message.

source
using ReactiveMP, BayesBase, ExponentialFamily

distribution = ExponentialFamily.NormalMeanPrecision(0.0, 1.0)
message      = Message(distribution, false, true)
Message(ExponentialFamily.NormalMeanPrecision{Float64}(μ=0.0, w=1.0))
mean(message), precision(message)
(0.0, 1.0)
logpdf(message, 1.0)
-1.4189385332046727
is_clamped(message), is_initial(message)
(false, true)

Message observable

Within the reactive message passing framework, messages are not computed once and stored as values — instead each edge of the factor graph carries a stream that continuously emits updated messages as the inference iterates. MessageObservable is the container for such a stream.

ReactiveMP.MessageObservableType
ReactiveMP.MessageObservable{M <: AbstractMessage}

A lazy, connectable reactive stream for message values of type M <: AbstractMessage, used as the per-connection message stream of every variable in the factor graph.

Internally combines two Rocket.jl primitives:

  • a RecentSubject{M} that caches the most recently emitted value, so Rocket.getrecent always returns the latest message and late subscribers receive it immediately
  • a LazyObservable{M} that is the actual subscription target — initially unconnected, and wired to an upstream source during graph activation via ReactiveMP.connect!

connect!(observable, source) sets the lazy stream to source |> multicast(subject) |> ref_count(): all subscribers share one upstream subscription, and every emission is forwarded through the cached subject. Before the upstream is connected, ReactiveMP.set_initial_message! can push an initial message directly into the subject to seed the graph before inference begins.

Each variable-to-node connection owns one MessageObservable. For ReactiveMP.RandomVariable and ReactiveMP.DataVariable these are allocated on demand by ReactiveMP.create_new_stream_of_inbound_messages!; for ReactiveMP.ConstVariable a single shared instance is created at construction time.

See also: ReactiveMP.MarginalObservable, ReactiveMP.set_initial_message!

source

Each connection between a variable and a factor node owns one MessageObservable. From the variable's perspective it is an inbound message stream (a message arriving from a connected node); from the node's perspective the same object is the message that will eventually be used to compute the outbound message on another edge. The observable starts unconnected: its internal LazyObservable has no upstream source until the factor graph is activated. During activation, ReactiveMP.connect! wires the lazy stream to the result of the message update rule computation. After that point, every upstream change (a new observation, a changed prior, an iterated belief) propagates reactively through the MessageObservable to all its subscribers.

The internal RecentSubject ensures that:

  • any subscriber that joins after the first emission immediately receives the current message via Rocket.getrecent
  • ReactiveMP.set_initial_message! can seed a value before activation, so that rules that read an inbound message at iteration zero have something to read

All downstream subscriptions go through the LazyObservable, not the subject directly, so they see the full computed stream rather than only manually pushed values.

Product of messages

In message passing framework, in order to compute a posterior we must compute a normalized product of two messages. For this purpose the ReactiveMP.jl uses the ReactiveMP.MessageProductContext structure, together with the ReactiveMP.compute_product_of_messages and ReactiveMP.compute_product_of_two_messages functions. Both functions accept a ReactiveMP.AbstractVariable as the first argument to identify which variable the product is being computed for — this is useful for callbacks (e.g. ReactiveMP.BeforeProductOfTwoMessagesEvent). The ReactiveMP.compute_product_of_two_messages function internally uses the prod function defined in BayesBase.jl with various product strategies. We refer an interested reader to the documentation of the BayesBase.jl package for more information.

ReactiveMP.MessageProductContextType
MessageProductContext(kwargs...)

The structure that defines the context for the product of two messages within ReactiveMP. The product is executed with the ReactiveMP.compute_product_of_messages function and uses the BayesBase.prod under the hood. See BayesBase product API documentation for detailed description.

The following kwargs are supported:

See also: ReactiveMP.compute_product_of_messages, [ReactiveMP.compute_product_of_two_messages]

source
ReactiveMP.compute_product_of_two_messagesFunction
compute_product_of_two_messages(variable::AbstractVariable, context::MessageProductContext, left::Message, right::Message)

Computes the product of two messages left and right for a given variable using the provided context. Returns a new message with the result of the multiplication (not necessarily normalized). Applies context.form_constraint if context.form_constraint_check_strategy is set to ReactiveMP.FormConstraintCheckEach.

The variable argument identifies which variable this product is being computed for, which is useful for callbacks (see ReactiveMP.BeforeProductOfTwoMessagesEvent).

is_clamped and is_initial

The ReactiveMP.Message carries the is_clamped and is_initial flags. The rules for the product are the following:

  • If both messages are clamped, the result is clamped, OR
  • If both messages are either clamped or initial, the result is initial, OR
  • The result is neither clamped nor initial

See: ReactiveMP.MessageProductContext, ReactiveMP.compute_product_of_messages

source
ReactiveMP.compute_product_of_messagesFunction
compute_product_of_messages(variable::AbstractVariable, context::MessageProductContext, messages)

Computes the product of a collection of messages for a given variable (as opposed to ReactiveMP.compute_product_of_two_messages, which handles exactly two messages). Uses context.fold_strategy to determine the order in which ReactiveMP.compute_product_of_two_messages is called. By default this is ReactiveMP.MessagesProductFromLeftToRight, but can be set to an arbitrary function that accepts variable, context and messages and which must call ReactiveMP.compute_product_of_two_messages under the hood.

See also: ReactiveMP.compute_product_of_two_messages, ReactiveMP.MessagesProductFromLeftToRight

source
compute_product_of_messages(f::Function, variable::AbstractVariable, context::MessageProductContext, messages)

Custom fold strategy for ReactiveMP.compute_product_of_messages. When context.fold_strategy is set to a Function, it will be called with variable, context and messages as arguments. The function must call ReactiveMP.compute_product_of_two_messages under the hood to compute the pairwise products.

source

Deferred messages

ReactiveMP.DeferredMessageType

A special type of a message, for which the actual message is not computed immediately, but is computed later on demand (potentially never). To compute and get the actual message, one needs to call the as_message method.

source

Message mappings

A message mapping defines how messages are transformed or mapped during the propagation process — for example, when combining multiple incoming messages or applying specific transformation rules. This structure helps organize and reuse mapping logic across different inference algorithms.

ReactiveMP.MessageMappingType
MessageMapping

A callable structure representing a deferred computation of a message in the variational message passing framework. It stores all contextual information necessary to compute a message later, such as variable tags, constraints, annotations, and the associated factor node.

MessageMapping replaces the original lambda-based implementation to improve type stability and inference. When invoked as a function, it computes an outgoing Message from given input messages and marginals using the appropriate @rule.

See also: Message, DeferredMessage

source