Variables

Variables are fundamental building blocks of a factor graph. Each variable represents either a latent quantity to be inferred, an observed data point, or a fixed constant. All variable types are subtypes of ReactiveMP.AbstractVariable.

Choosing the right variable type

There are three kinds of variables, each with a distinct role:

TypeConstructorRoleCan be updated?
ReactiveMP.RandomVariableReactiveMP.randomvarLatent quantity to be inferredNo — inference updates its marginal
ReactiveMP.DataVariableReactiveMP.datavarObserved quantity that receives dataYes — via new_observation!
ReactiveMP.ConstVariableReactiveMP.constvarFixed constant, never changesNo — wired at construction time

The choice of variable type affects how the engine allocates streams and handles messages:

  • Use randomvar for any quantity you want to infer a posterior over.
  • Use datavar for observations that may change between inference calls (e.g., in online or streaming settings).
  • Use constvar for fixed hyperparameters, known constants, or any value that will never change.

Variables as reactive streams

In ReactiveMP.jl, a variable is not a single value — it is a source of reactive streams. Each variable holds:

These streams are lazy: they are allocated during construction but carry no values until the graph is activated. After activation, feeding new data into a datavar triggers automatic propagation through the network, updating marginals reactively.

See Inference lifecycle for an overview of the construction → activation → observation flow.

Common variable API

All variable types share a common interface for querying their kind, degree, and reactive streams.

Type predicates

ReactiveMP.isdataFunction
ReactiveMP.isdata(variable)
ReactiveMP.isdata(variables::AbstractArray)

Returns true if variable is a DataVariable. For an array, returns true only if all elements are data variables.

source
ReactiveMP.degreeFunction
ReactiveMP.degree(variable)

Returns the number of factor nodes connected to variable, i.e. the number of message streams.

source

Message stream allocation

ReactiveMP.create_new_stream_of_inbound_messages!Function
ReactiveMP.create_new_stream_of_inbound_messages!(variable)

Allocates a new per-connection ReactiveMP.MessageObservable for variable and registers it as an additional inbound message slot. Returns a tuple (observable, index) where observable is the newly created stream and index is its position in the variable's internal input_messages collection.

Called once per factor node connection at graph construction time. The returned observable is stored as the outbound message stream of the corresponding ReactiveMP.NodeInterface — it is the outbound message from the node's perspective and the inbound message from the variable's perspective. All streams are unconnected (lazy) until ReactiveMP.activate! is called.

For ReactiveMP.ConstVariable the same shared observable is returned for every connection; no per-connection slot is allocated.

See also: ReactiveMP.MessageObservable, ReactiveMP.NodeInterface

source

Marginal and message streams

Every variable exposes a marginal stream — a reactive observable that emits updated Marginal values as inference progresses. The stream is accessed via ReactiveMP.get_stream_of_marginals and wired up during graph activation via ReactiveMP.set_stream_of_marginals!. Initial beliefs can be seeded before inference starts with ReactiveMP.set_initial_marginal!, and initial messages with ReactiveMP.set_initial_message!.

ReactiveMP.set_initial_message!Function
ReactiveMP.set_initial_message!(variable, message)
ReactiveMP.set_initial_message!(variables::AbstractArray, messages)

Sets the initial message for all interfaces of variable by pushing message into each outbound message stream. For arrays, applies element-wise. See also ReactiveMP.set_initial_marginal!.

source

Prediction streams

A prediction stream gives an estimate of what the variable's value would look like from the model's perspective — without conditioning on observed data for that variable. It is accessed via ReactiveMP.get_stream_of_predictions and connected during graph activation.

Random variables

Random variables represent latent (unobserved) quantities in the model. During inference, messages flow through them to update the marginal belief.

Stream creation

A RandomVariable starts empty: its input_messages and output_messages collections are empty vectors. Each time a factor node connects to the variable, ReactiveMP.create_new_stream_of_inbound_messages! is called, which allocates a new MessageObservable{AbstractMessage}, appends it to input_messages, and returns it together with its index. The returned stream becomes the outbound message stream from the factor node's perspective (the message the node will send toward the variable). At this point, the degree equals the number of connected nodes. All streams are unconnected (lazy) until activation.

Activation

ReactiveMP.RandomVariableActivationOptionsType
RandomVariableActivationOptions

Collects all configuration needed to activate a ReactiveMP.RandomVariable. Passed to ReactiveMP.activate!(::RandomVariable, ::RandomVariableActivationOptions).

Fields:

source
ReactiveMP.activate!Method
ReactiveMP.activate!(randomvar::RandomVariable, options::RandomVariableActivationOptions)

Wires all reactive streams of a ReactiveMP.RandomVariable into the factor graph.

Activation proceeds in two steps:

  1. Outbound messages — resizes output_messages to match the number of connected nodes (the ReactiveMP.degree). If degree > 1, an EqualityChain is constructed: for each edge i the outbound message stream emits the product of all inbound messages except the one arriving on edge i, implementing the standard sum-product or variational update. If degree == 1 (a leaf variable), the single outbound stream is connected to never() because there are no other messages to multiply.

  2. MarginalcollectLatest is called over all inbound ReactiveMP.MessageObservables. It waits for all inbound messages to have emitted at least once, then emits the product as a new Marginal via ReactiveMP.set_stream_of_marginals!, and re-emits only once all inbound messages have each updated again.

See also: ReactiveMP.RandomVariableActivationOptions, ReactiveMP.activate!(::DataVariable, ::DataVariableActivationOptions)

source

The prediction stream for a RandomVariable is identical to its marginal stream, since there is no dedicated prediction channel for latent variables.

Data variables

Data variables represent observed quantities. Their value is not fixed at creation time and can be updated later via new_observation!.

ReactiveMP.new_observation!Function
new_observation!(datavar::DataVariable, data)
new_observation!(datavars::AbstractArray{<:DataVariable}, data::AbstractArray)

Provides a new observation to a ReactiveMP.DataVariable (or an array of data variables). The data is wrapped in a PointMass distribution and pushed as a new message. Pass missing to indicate that the observation is not available.

source

Stream creation

A DataVariable has two distinct directions of information flow:

  • Outbound (observation) stream — a RecentSubject{Message} stored in messageout. Calling new_observation! pushes a new Message(PointMass(value), false, false) into this subject. Every factor node connected to the variable receives the same shared messageout stream as its inbound message source; ReactiveMP.get_stream_of_outbound_messages always returns messageout regardless of the connection index.
  • Inbound (backward) messages — each connecting factor node gets its own MessageObservable{AbstractMessage} allocated in input_messages via ReactiveMP.create_new_stream_of_inbound_messages!, the same way as for RandomVariable. These carry messages flowing back from the graph toward the data edge.

All streams are unconnected (lazy) until activation.

Activation

ReactiveMP.DataVariableActivationOptionsType
DataVariableActivationOptions

Collects all configuration needed to activate a ReactiveMP.DataVariable. Passed to ReactiveMP.activate!(::DataVariable, ::DataVariableActivationOptions).

Fields:

  • prediction::Bool — if true, a prediction stream is built during activation as the product of all inbound (backward) messages
  • linked::Bool — if true, the variable's observation stream is driven by a deterministic transformation of other variables' marginals rather than by direct ReactiveMP.new_observation! calls
  • transform — the transformation function applied to the linked variables' marginals (used only when linked = true)
  • args — the collection of linked variables or constants whose marginals are combined (used only when linked = true)
source
ReactiveMP.activate!Method
ReactiveMP.activate!(datavar::DataVariable, options::DataVariableActivationOptions)

Wires all reactive streams of a ReactiveMP.DataVariable into the factor graph.

Activation proceeds in up to three steps:

  1. Prediction — if options.prediction is true, a prediction stream is built via collectLatest over all inbound (backward) ReactiveMP.MessageObservables: once all backward messages have emitted and again when all of them update, their product is emitted as the model's prior expectation for this variable.

  2. Linked variables — if options.linked is true, a subscription is created over a transformed combination of other variables' marginals. Each update is forwarded automatically to ReactiveMP.new_observation!, making the data variable's observation a deterministic function of those variables.

  3. Marginal — always wired: the marginal stream is messageout |> map(as_marginal), so the marginal always equals the most recently pushed observation.

See also: ReactiveMP.DataVariableActivationOptions, ReactiveMP.activate!(::RandomVariable, ::RandomVariableActivationOptions)

source

Constant variables

Constant variables hold a fixed value, wrapped in a PointMass distribution. Messages from constant variables are always marked as clamped.

Stream creation

Unlike RandomVariable and DataVariable, a ConstVariable wires up its streams at construction time, not during graph activation. The constructor immediately connects:

  • messageout to of(Message(PointMass(constant), true, false)) — a single-element observable that emits one clamped message and completes.
  • marginal to of(Marginal(PointMass(constant), true, false)) — similarly fixed and clamped.

When a factor node connects to a ConstVariable, ReactiveMP.create_new_stream_of_inbound_messages! increments the nconnected counter (which defines ReactiveMP.degree) and returns the same shared messageout stream for every connection. There are no per-connection inbound streams: ReactiveMP.get_stream_of_inbound_messages raises an error because a ConstVariable never receives messages from nodes. Calling ReactiveMP.set_stream_of_marginals! or ReactiveMP.set_stream_of_predictions! also raises an error, since the streams are fixed and cannot be rewired. Constant variables require no activation step.