Variables
Variables are fundamental building blocks of a factor graph. Each variable represents either a latent quantity to be inferred, an observed data point, or a fixed constant. All variable types are subtypes of ReactiveMP.AbstractVariable.
Choosing the right variable type
There are three kinds of variables, each with a distinct role:
| Type | Constructor | Role | Can be updated? |
|---|---|---|---|
ReactiveMP.RandomVariable | ReactiveMP.randomvar | Latent quantity to be inferred | No — inference updates its marginal |
ReactiveMP.DataVariable | ReactiveMP.datavar | Observed quantity that receives data | Yes — via new_observation! |
ReactiveMP.ConstVariable | ReactiveMP.constvar | Fixed constant, never changes | No — wired at construction time |
The choice of variable type affects how the engine allocates streams and handles messages:
- Use
randomvarfor any quantity you want to infer a posterior over. - Use
datavarfor observations that may change between inference calls (e.g., in online or streaming settings). - Use
constvarfor fixed hyperparameters, known constants, or any value that will never change.
Variables as reactive streams
In ReactiveMP.jl, a variable is not a single value — it is a source of reactive streams. Each variable holds:
- A marginal stream (
ReactiveMP.MarginalObservable) that emits updatedMarginalbeliefs as inference progresses. - One message stream (
ReactiveMP.MessageObservable) per connected factor node, carrying messages flowing between the variable and that node.
These streams are lazy: they are allocated during construction but carry no values until the graph is activated. After activation, feeding new data into a datavar triggers automatic propagation through the network, updating marginals reactively.
See Inference lifecycle for an overview of the construction → activation → observation flow.
ReactiveMP.AbstractVariable — Type
AbstractVariableAn abstract supertype for all variable types in the factor graph. Concrete subtypes include:
Common variable API
All variable types share a common interface for querying their kind, degree, and reactive streams.
Type predicates
ReactiveMP.israndom — Function
ReactiveMP.israndom(variable)
ReactiveMP.israndom(variables::AbstractArray)Returns true if variable is a ReactiveMP.RandomVariable. For an array, returns true only if all elements are random variables.
ReactiveMP.isdata — Function
ReactiveMP.isdata(variable)
ReactiveMP.isdata(variables::AbstractArray)Returns true if variable is a DataVariable. For an array, returns true only if all elements are data variables.
ReactiveMP.isconst — Function
ReactiveMP.isconst(variable)
ReactiveMP.isconst(variables::AbstractArray)Returns true if variable is a ReactiveMP.ConstVariable. For an array, returns true only if all elements are const variables.
ReactiveMP.degree — Function
ReactiveMP.degree(variable)Returns the number of factor nodes connected to variable, i.e. the number of message streams.
Message stream allocation
ReactiveMP.create_new_stream_of_inbound_messages! — Function
ReactiveMP.create_new_stream_of_inbound_messages!(variable)Allocates a new per-connection ReactiveMP.MessageObservable for variable and registers it as an additional inbound message slot. Returns a tuple (observable, index) where observable is the newly created stream and index is its position in the variable's internal input_messages collection.
Called once per factor node connection at graph construction time. The returned observable is stored as the outbound message stream of the corresponding ReactiveMP.NodeInterface — it is the outbound message from the node's perspective and the inbound message from the variable's perspective. All streams are unconnected (lazy) until ReactiveMP.activate! is called.
For ReactiveMP.ConstVariable the same shared observable is returned for every connection; no per-connection slot is allocated.
See also: ReactiveMP.MessageObservable, ReactiveMP.NodeInterface
Marginal and message streams
Every variable exposes a marginal stream — a reactive observable that emits updated Marginal values as inference progresses. The stream is accessed via ReactiveMP.get_stream_of_marginals and wired up during graph activation via ReactiveMP.set_stream_of_marginals!. Initial beliefs can be seeded before inference starts with ReactiveMP.set_initial_marginal!, and initial messages with ReactiveMP.set_initial_message!.
ReactiveMP.get_stream_of_marginals — Function
ReactiveMP.get_stream_of_marginals(variable)Returns the marginal observable stream for variable. See also ReactiveMP.set_stream_of_marginals!, ReactiveMP.set_initial_marginal!.
ReactiveMP.set_stream_of_marginals! — Function
ReactiveMP.set_stream_of_marginals!(variable, stream)Connects stream as the marginal observable for variable. See also ReactiveMP.get_stream_of_marginals.
ReactiveMP.set_initial_marginal! — Function
ReactiveMP.set_initial_marginal!(variable, marginal)
ReactiveMP.set_initial_marginal!(variables::AbstractArray, marginals)Sets the initial marginal belief for variable by pushing marginal as an initial (non-clamped) value into ReactiveMP.get_stream_of_marginals. For arrays, applies element-wise. See also ReactiveMP.set_initial_message!.
ReactiveMP.set_initial_message! — Function
ReactiveMP.set_initial_message!(variable, message)
ReactiveMP.set_initial_message!(variables::AbstractArray, messages)Sets the initial message for all interfaces of variable by pushing message into each outbound message stream. For arrays, applies element-wise. See also ReactiveMP.set_initial_marginal!.
Prediction streams
A prediction stream gives an estimate of what the variable's value would look like from the model's perspective — without conditioning on observed data for that variable. It is accessed via ReactiveMP.get_stream_of_predictions and connected during graph activation.
ReactiveMP.get_stream_of_predictions — Function
ReactiveMP.get_stream_of_predictions(variable)Returns the prediction observable stream for variable. For DataVariable, the prediction is the product of all inbound messages. See also ReactiveMP.set_stream_of_predictions!.
ReactiveMP.set_stream_of_predictions! — Function
ReactiveMP.set_stream_of_predictions!(variable, stream)Connects stream as the prediction observable for variable. See also ReactiveMP.get_stream_of_predictions.
Random variables
Random variables represent latent (unobserved) quantities in the model. During inference, messages flow through them to update the marginal belief.
ReactiveMP.RandomVariable — Type
RandomVariable <: AbstractVariableRepresents a latent (unobserved) variable in the factor graph. Random variables collect incoming and outgoing messages from connected factor nodes and maintain a marginal belief. Use randomvar to create an instance.
See also: ReactiveMP.ConstVariable, ReactiveMP.DataVariable
ReactiveMP.randomvar — Function
randomvar(; label = nothing)Creates a new ReactiveMP.RandomVariable with an optional label for identification.
Stream creation
A RandomVariable starts empty: its input_messages and output_messages collections are empty vectors. Each time a factor node connects to the variable, ReactiveMP.create_new_stream_of_inbound_messages! is called, which allocates a new MessageObservable{AbstractMessage}, appends it to input_messages, and returns it together with its index. The returned stream becomes the outbound message stream from the factor node's perspective (the message the node will send toward the variable). At this point, the degree equals the number of connected nodes. All streams are unconnected (lazy) until activation.
Activation
ReactiveMP.RandomVariableActivationOptions — Type
RandomVariableActivationOptionsCollects all configuration needed to activate a ReactiveMP.RandomVariable. Passed to ReactiveMP.activate!(::RandomVariable, ::RandomVariableActivationOptions).
Fields:
stream_postprocessor— optional stream postprocessor applied to every created stream (seeReactiveMP.AbstractStreamPostprocessor)prod_context_for_message_computation— aReactiveMP.MessageProductContextused when computing outbound messages (product of all-but-one inbound messages in theEqualityChain)prod_context_for_marginal_computation— aReactiveMP.MessageProductContextused when computing the marginal (product of all inbound messages)
ReactiveMP.activate! — Method
ReactiveMP.activate!(randomvar::RandomVariable, options::RandomVariableActivationOptions)Wires all reactive streams of a ReactiveMP.RandomVariable into the factor graph.
Activation proceeds in two steps:
Outbound messages — resizes
output_messagesto match the number of connected nodes (theReactiveMP.degree). If degree > 1, anEqualityChainis constructed: for each edge i the outbound message stream emits the product of all inbound messages except the one arriving on edge i, implementing the standard sum-product or variational update. If degree == 1 (a leaf variable), the single outbound stream is connected tonever()because there are no other messages to multiply.Marginal —
collectLatestis called over all inboundReactiveMP.MessageObservables. It waits for all inbound messages to have emitted at least once, then emits the product as a newMarginalviaReactiveMP.set_stream_of_marginals!, and re-emits only once all inbound messages have each updated again.
See also: ReactiveMP.RandomVariableActivationOptions, ReactiveMP.activate!(::DataVariable, ::DataVariableActivationOptions)
The prediction stream for a RandomVariable is identical to its marginal stream, since there is no dedicated prediction channel for latent variables.
Data variables
Data variables represent observed quantities. Their value is not fixed at creation time and can be updated later via new_observation!.
ReactiveMP.DataVariable — Type
DataVariable <: AbstractVariableRepresents an observed variable in the factor graph. Unlike ReactiveMP.ConstVariable, the data is not fixed at creation time and can be updated later via ReactiveMP.new_observation!. Use datavar to create an instance.
See also: ReactiveMP.RandomVariable, ReactiveMP.ConstVariable
ReactiveMP.datavar — Function
datavar(; label = nothing)Creates a new ReactiveMP.DataVariable with an optional label for identification.
ReactiveMP.new_observation! — Function
new_observation!(datavar::DataVariable, data)
new_observation!(datavars::AbstractArray{<:DataVariable}, data::AbstractArray)Provides a new observation to a ReactiveMP.DataVariable (or an array of data variables). The data is wrapped in a PointMass distribution and pushed as a new message. Pass missing to indicate that the observation is not available.
Stream creation
A DataVariable has two distinct directions of information flow:
- Outbound (observation) stream — a
RecentSubject{Message}stored inmessageout. Callingnew_observation!pushes a newMessage(PointMass(value), false, false)into this subject. Every factor node connected to the variable receives the same sharedmessageoutstream as its inbound message source;ReactiveMP.get_stream_of_outbound_messagesalways returnsmessageoutregardless of the connection index. - Inbound (backward) messages — each connecting factor node gets its own
MessageObservable{AbstractMessage}allocated ininput_messagesviaReactiveMP.create_new_stream_of_inbound_messages!, the same way as forRandomVariable. These carry messages flowing back from the graph toward the data edge.
All streams are unconnected (lazy) until activation.
Activation
ReactiveMP.DataVariableActivationOptions — Type
DataVariableActivationOptionsCollects all configuration needed to activate a ReactiveMP.DataVariable. Passed to ReactiveMP.activate!(::DataVariable, ::DataVariableActivationOptions).
Fields:
prediction::Bool— iftrue, a prediction stream is built during activation as the product of all inbound (backward) messageslinked::Bool— iftrue, the variable's observation stream is driven by a deterministic transformation of other variables' marginals rather than by directReactiveMP.new_observation!callstransform— the transformation function applied to the linked variables' marginals (used only whenlinked = true)args— the collection of linked variables or constants whose marginals are combined (used only whenlinked = true)
ReactiveMP.activate! — Method
ReactiveMP.activate!(datavar::DataVariable, options::DataVariableActivationOptions)Wires all reactive streams of a ReactiveMP.DataVariable into the factor graph.
Activation proceeds in up to three steps:
Prediction — if
options.predictionistrue, a prediction stream is built viacollectLatestover all inbound (backward)ReactiveMP.MessageObservables: once all backward messages have emitted and again when all of them update, their product is emitted as the model's prior expectation for this variable.Linked variables — if
options.linkedistrue, a subscription is created over a transformed combination of other variables' marginals. Each update is forwarded automatically toReactiveMP.new_observation!, making the data variable's observation a deterministic function of those variables.Marginal — always wired: the marginal stream is
messageout |> map(as_marginal), so the marginal always equals the most recently pushed observation.
See also: ReactiveMP.DataVariableActivationOptions, ReactiveMP.activate!(::RandomVariable, ::RandomVariableActivationOptions)
Constant variables
Constant variables hold a fixed value, wrapped in a PointMass distribution. Messages from constant variables are always marked as clamped.
ReactiveMP.ConstVariable — Type
ConstVariable <: AbstractVariableRepresents a constant (clamped) variable in the factor graph. The value is fixed at creation time and wrapped in a PointMass distribution. Messages and marginals from this variable are always marked as clamped. Use constvar to create an instance.
See also: ReactiveMP.RandomVariable, ReactiveMP.DataVariable
ReactiveMP.constvar — Function
constvar(constant; label = nothing)Creates a new ReactiveMP.ConstVariable with the given constant value and an optional label for identification.
Stream creation
Unlike RandomVariable and DataVariable, a ConstVariable wires up its streams at construction time, not during graph activation. The constructor immediately connects:
messageouttoof(Message(PointMass(constant), true, false))— a single-element observable that emits one clamped message and completes.marginaltoof(Marginal(PointMass(constant), true, false))— similarly fixed and clamped.
When a factor node connects to a ConstVariable, ReactiveMP.create_new_stream_of_inbound_messages! increments the nconnected counter (which defines ReactiveMP.degree) and returns the same shared messageout stream for every connection. There are no per-connection inbound streams: ReactiveMP.get_stream_of_inbound_messages raises an error because a ConstVariable never receives messages from nodes. Calling ReactiveMP.set_stream_of_marginals! or ReactiveMP.set_stream_of_predictions! also raises an error, since the streams are fixed and cannot be rewired. Constant variables require no activation step.