Proxy Observable

ProxyObservable might help to create a custom operator. It wraps either source and/or actor with their proxied versions providing additional custom logic for on_subscribe! and/or for on_next!, on_error!, on_complete! methods.

Rocket.proxyFunction
proxy(::Type{L}, source, proxy) where L

Creation operator for the ProxyObservable with a given source and proxy objects.

Example

using Rocket

source = from(1:5)

struct MyCustomProxy <: ActorProxy end

struct MyCustomActor{A} <: Actor{Int}
    actor :: A
end

Rocket.on_next!(actor::MyCustomActor, data::Int) = next!(actor.actor, data ^ 2)
Rocket.on_error!(actor::MyCustomActor, err)      = error!(actor.actor, err)
Rocket.on_complete!(actor::MyCustomActor)        = complete!(actor.actor)

Rocket.actor_proxy!(::Type{Int}, proxy::MyCustomProxy, actor::A) where A = MyCustomActor{A}(actor)

proxied = proxy(Int, source, MyCustomProxy())

subscribe!(proxied, logger())
;

# output

[LogActor] Data: 1
[LogActor] Data: 4
[LogActor] Data: 9
[LogActor] Data: 16
[LogActor] Data: 25
[LogActor] Completed

See also: ProxyObservable, ActorProxy, SourceProxy, ActorSourceProxy

source
Rocket.ActorProxyType
ActorProxy

Can be used as a super type for common proxy object. Automatically specifies ValidActorProxy trait behavior. Each ActorProxy must implement its own method for actor_proxy!(::Type, proxy, actor) function which have to return a valid actor object.

See also: proxy, actor_proxy!

source
Rocket.SourceProxyType
SourceProxy

Can be used as a super type for common proxy object. Automatically specifies ValidSourceProxy trait behavior. Each SourceProxy must implement its own method for source_proxy!(::Type, proxy, source) function which have to return a valid subscribable object.

See also: proxy, source_proxy!

source
Rocket.ActorSourceProxyType
ActorSourceProxy

Can be used as a super type for common proxy object. Automatically specifies ValidActorSourceProxy trait behavior. Each ActorSourceProxy must implement its own method for source_proxy!(::Type, proxy, source) function which have to return a valid subscribable object and also for actor_proxy!(::Type, proxy, actor) function which have to return a valid actor object..

See also: proxy, actor_proxy!, source_proxy!

source
Rocket.actor_proxy!Function
actor_proxy!(::Type, proxy, actor)

This is function is used to wrap an actor with its proxied version given a particular proxy object. Must return another actor. Each valid ActorProxy and ActorSourceProxy must implement its own method for actor_proxy! function. The first argument is the same as the type of data of the connected proxy observable.

See also: proxy, ActorProxy, ActorSourceProxy

source
Rocket.source_proxy!Function
source_proxy!(::Type, proxy, source)

This is function is used to wrap a source with its proxied version given a particular proxy object. Must return another Observable. Each valid SourceProxy and ActorSourceProxy must implement its own method for source_proxy! function. The first argument is the same as the type of data of the connected proxy observable.

See also: proxy, SourceProxy, ActorSourceProxy

source