Learning Resources
Flows
Part 1
Introduction
Data flows in CAF provide a high-level API for building processing pipelines in a data-oriented way. They complement the message-based actor communication and allow expressing complex data processing pipelines in a declarative way. Data flows can span multiple actors and may even connect actors to other, non-actor components. For example, actors can process frames from a WebSocket connection.
The flow API in CAF shares many similarities with
ReactiveX-style APIs, but there is a noteworthy
differences: CAF schedules flows on top of actors, which means observables
cannot be passed around freely between threads and there is no analog to
SubscribeOn
.
In this guide, we learn how the flow API is structured and how to build processing pipelines using flow operators.
Observables and Observers
The major building block of the flow API is the observable. It represents a
potentially unbound sequence of items that an observer may subscribe to. What
makes an observable different from a regular container like a std::vector
or a
range is that items may arrive asynchronously over time.
Once subscribed, subscribers receive all items that the flow produces. A simple
way to add an observer to an observable is to call for_each
.
To create observables, actors can call make_observable
that creates a factory
object of type caf::flow::observable_builder
. The factory offers many member
functions to create items programmatically or to turn containers into
observables. In the example below, we use from_container
to turn a vector into
an observable.
Source Code
sys.spawn([](caf::event_based_actor* self) {
self->make_observable()
.from_container(std::vector{1, 2, 3, 4, 5})
.for_each([self](int val) { self->println("{}", val); });
});
Output
1
2
3
4
5
Note: to use the flow API, make sure to include the header
caf/scheduled_actor/flow.hpp
. The flow API is not included by default to keep
the core CAF headers small.
Operators
On their own, observables just provide an asynchronous way to provide a sequence of events to their observer(s). The real power of the flow API comes from the operators.
An operator provides a declarative way to convert an input observable to a new output observable, e.g., by filtering or transforming data.
The following example generates an observable that represents the infinite
series starting at 1 by calling iota
and then we apply operators to filter out
all odd numbers and to select only the first five items.
Source Code
sys.spawn([](caf::event_based_actor* self) {
self->make_observable()
.iota(1)
.filter([](int value) { return value % 2 == 0; })
.take(5)
.for_each([self](int val) { self->println("{}", val); });
});
Output
2
4
6
8
10
The filter
operator takes a predicate and only forwards items to the output
observable if the predicate returns true
. The take
operator limits the
number of items that the output observable will produce.
Note: a full list of implemented operators is available in the CAF manual.
Operator Fusion
When assembling observables with make_observable()
by chaining operators, CAF
usually does not produce an actual caf::flow::observable
object. Instead, CAF
returns a blueprint (a move-only factory type) for an observable that still
allows users to add additional operators to it without actually instantiating an
observable. This allows CAF to fuse multiple operators into a single object
that lazily applies all steps of all fused operators to each item.
Operator fusion reduces the number of heap allocations for assembling flows and also makes processing much more efficient because it can eliminate unnecessary buffers between operators.
Calling as_observable
on a blueprint forces CAF to turn it into an actual
observable (that will allocate an object on the heap that implements the
blueprint). This can be useful to subscribe to an observable at some later time
or to apply additional operators later on, as shown below. When not calling
as_observable
, the blueprint is only automatically converted to an observable
when subscribing to it.
Source Code
sys.spawn([](caf::event_based_actor* self) {
auto items = self->make_observable()
.iota(1)
.filter([](int value) { return value % 2 == 0; })
.take(5)
.as_observable();
// ...some time later ...
items
.map([](int x) { return x * 3; })
.for_each([self](auto val) { self->println("{}", val); });
});
Output
6
12
18
24
30
Everything we call before as_observable
is added to the blueprint. This means
that CAF can lazily evaluate iota
, filter
and take
and fuse them into a
single operator. The call to as_observable
forces CAF to materialize the
observable. After this step, the blueprint is no longer available and we can
only use the resulting observable. To make use of operator fusion as much as
possible, we recommend to chain operators as much as possible before calling
as_observable
.
Subscriptions
When subscribing to an observable, the observer receives a subscription
object. The subscription object allows the observer to ask for more items by
calling request
and to unsubscribe from the observable by calling dispose
.
Reference Counting
In CAF, caf::flow::observable
, caf::flow::observer
and
caf::flow::subscription
are all handle types. This means they only store a
(smart) pointer to the underlying object. Copying a handle does not copy the
underlying object, it only copies the pointer to it. This makes handles very
cheap to copy. All copies refer to the same, reference-counted object. When the
last copy goes out of scope, CAF destroys the underlying object.
However, despite being reference counted, the handle types are not thread-safe. Users must ensure that all operations on handles are performed bye the same actor (or on the same thread when using the flow API in a non-actor context).
Protocol between Observables, Observers and Subscriptions
The protocol between observables, observers and subscriptions is designed to enable asynchronous communication.
The interface for the observable consists of only one function:
disposable subscribe(observer<T> what);
The observable returns a disposable
object that allows users to cancel the
subscription at any time. Most of the time, however, users can simply ignore
this object. A disposable
represents a subscription, resource or task that can
be canceled by calling dispose
.
The interface for the observer consists of four functions:
void on_subscribe(subscription sub);
void on_next(const T& item);
void on_complete();
void on_error(const error& what);
When subscribing an observer to an observable, the observable calls
on_subscribe
on the observer. In case of an error, the observable calls
on_error
instead and does not generate a subscription object.
Before looking at the other two functions, let's first look at the interface for the subscription:
void request(size_t n);
The subscription object allows the observer to ask for items by calling
request
. The observable may not call on_next
until the observer has signaled
demand by calling request
.
This demand signaling is important because it allows observers to control the
rate at which they receive items. When an observable produces items faster than
an observer can consume them, the observable would eventually run out of memory.
To prevent this, CAF implements backpressure with the help of the request
function. Essentially, backpressure is a mechanism that allows observers to slow
down observables.
In case no errors occur, the protocol between observables and observers is straightforward:
In the diagram above, the steps are as follows:
- The user calls
subscribe
on the observable, passing the observer as argument. - The observable creates a new subscription object.
- The observable calls
on_subscribe
on the observer, passing the new subscription object as argument. - The observer calls
request
on the subscription object to signal demand. - The subscription calls
on_next
on the observer for each item.
Steps 4 and 5 are repeated until the subscription has no more items to produce.
Then, the subscription calls on_complete
on the observer to signal that no
more items will be produced.
If an error occurs at any point, the observable or the subscription calls
on_error
on the observer.
Hot and Cold Observables
As we have discovered earlier, observables represent a sequence of items.
Sometimes, these items can be produced on demand. In our previous examples, we
have generated items lazily with the iota
factory. An observable of this kind
is called cold. It will do nothing until an observer subscribes to the values.
Moreover, observers that subscribe to the values at different times will all see
the exact same values, because the values are generated by the subscription
object on demand.
The other kind of observables are called hot. They usually represent external data sources such as incoming data feeds or hardware events like keyboard strokes and mouse movements. Hot observables are always active and will produce items even if no observer is subscribed to them. Moreover, observers that subscribe to a hot observable at different times will see different values.
Concurrency
Unlike ReactiveX, CAF does not allow to share observers between threads. Observables and observers are not thread-safe. They must remain local to their owner, usually an actor.
However, an observable can be turned into a publisher by calling
to_publisher()
. A publisher is a thread-safe handle that allows to observe
items from multiple actors and threads. An observable can be converted to a
publisher by calling to_publisher()
, as shown in the example below.
Source Code
// Spawn a new actor to produce items asynchronously.
sys.spawn([](caf::event_based_actor* producer) {
auto items = producer->make_observable().iota(1).take(5).to_publisher();
// Spawn a new actor to consume the items asynchronously.
producer->spawn([items](caf::event_based_actor* consumer) {
items
.observe_on(consumer)
.map([](int value) { return value * 2;})
.for_each([consumer](int x) { consumer->println("{}", x);
});
});
});
Output
2
4
6
8
10
Asynchronous Buffers
Asynchronous buffers in CAF are uni-directional channels that connect a single producer with a single consumer. Thus, they are called SPSC buffers for short. Buffers provide a mechanism for connecting an actor to other asynchronous components, e.g., to a separate thread that reads from a file or a network socket.
The diagram above shows a schematic view with an SPSC buffer. The producer can send items to the buffer and the consumer can receive items from the buffer. The buffer itself is asynchronous and thread-safe.
When using publishers, CAF will create SPSC buffers automatically to connect actors with each other. However, buffers can also be managed directly via buffer resources. CAF uses buffer resources in places where only a single producer or a single consumer is allowed. In general, we recommend sticking to the publisher API and only use buffer resources directly when necessary.
To get a better understanding of how SPSC buffers work, let's look at a more concrete example with two actors that are connected via an SPSC buffer.
Source Code
// Our buffer resource: returns a read and a write end of an SPSC buffer.
auto [pull, push] = caf::async::make_spsc_buffer_resource<int>();
// The first actor pushes items into the buffer.
sys.spawn([push = std::move(push)](caf::event_based_actor* self) {
self->make_observable().iota(1).take(5).subscribe(push);
});
// The second actor pulls items from the buffer.
sys.spawn([pull = std::move(pull)](caf::event_based_actor* self) {
pull
.observe_on(self)
.map([](int value) { return value * 2;})
.for_each([self](int x) { self->println("{}", x);
});
});
Output
2
4
6
8
10
As we can see, CAF provides a factory function make_spsc_buffer_resource
that
returns a pair of resource handles that represent read and write access. The
actual buffer is managed by CAF and is usually not accessible. The buffer is
also automatically destroyed when the last handle to it goes out of scope.
When setting up our flows, we can either use pull.observe_on(self)
or
self->make_observable().from_resource(pull)
. They are equivalent. In fact, the
observe_on
function is just a convenience wrapper around from_resource
. In
both cases, we create an observable from the read end of the buffer. The
observable will pull items from the buffer as needed. The write end of the
buffer can be used to subscribe an observable to it. The observable will push
items into the buffer automatically.
The resources returned by make_spsc_buffer_resource
are (thread-safe) handle
types. They can be copied and moved around. All instances of a consumer (pull)
or producer (push) resource point to the same underlying object. This underlying
object is a small synchronization primitive that manages the buffer. CAF "opens"
a resource by calling try_open
on the resource handle. This function will
return the SPSC buffer if it is not already open. Otherwise, it will fail and
return null
. Hence, a resource can only be opened once. This is because the
buffer only supports a single consumer and a single producer. The resource will
also close the read or write end of the buffer (depending on the type of the
resource) automatically when the last handle goes out of scope and try_open
has never called.
Usually, we do not need to worry about the details of the resource handles. The important takeaway is that the resources makes sure that only a single consumer and a single producer can access the buffer. If we pass the same resource handle to multiple actors, then only one of them will be able to open the resource. The others will fail to open the resource and produce an error.
Declarative Setup of Flows with Inactive Actors
In our previous example, we have called make_spsc_buffer_resource
manually and
then connected two actors to the buffer. There is an operator that can do this
for us: observe_on
. This function takes the self pointer of an actor that is
not yet running. The observe_on
function does the exact same steps we have
done previously: it creates an SPSC buffer, pushes all items from the input
observable into the buffer, and then creates an new observable that reads from
the buffer. The new observable from observe_on
then runs on the new actor.
Basically, observe_on
allows us to switch mid-pipeline from one actor to
another.
How do we create an actor that is not yet running? The actor system provides a
function spawn_inactive
for exactly this use case. It returns the self pointer
to the new actor and a function object to launch the actor. However, we usually
never call this function object directly. Once it goes out of scope, it will
launch the actor implicitly. Because the actor is not yet running, we may access
the self pointer and set up any flows we want. Once the actor is running, we
must not access the self pointer anymore because the actor than runs
asynchronously.
Source Code
auto [src, run_src] = sys.spawn_inactive();
auto [snk, run_snk] = sys.spawn_inactive();
src->make_observable()
.iota(1)
.take(5)
.observe_on(snk)
.map([](int value) { return value * 2;})
.for_each([snk = snk](int x) { snk->println("{}", x); });
Output
2
4
6
8
10
The observe_on
operator is a powerful tool for setting up flows in a
declarative manner. However, keep in mind that you may access the self pointer
of another actor only in this very specific case and not after launching it.
Since actors started with spawn_inactive
have no behavior, they will terminate
automatically once the flow is completed or an error occurs.
Streams
Publishers and buffer resources are powerful tools. However, they are limited to a single process. What if we want to stream data in a distributed system? This is where streams come into play. Streams are a general abstraction for connecting multiple actors to a single observable.
Streams do not use SPSC buffers. Instead, they use a message-based protocol to communicate between actors. Because they use regular actor messages, streams can also be used to connect actors across processes or even across machines.
Let use revisit the previous example with SPSC buffers. We had and observable on
Actor A created with iota(1).take(5)
. We then connected this observable to a
buffer. The buffer was then connected to an observable on Actor B. With streams,
the picture changes as follows:
This time, there is no SPSC buffer between the two actors. Instead, we have a
message-based protocol with two steps. First, Actor B sends a stream_open_msg
to Actor A. This message informs Actor A that Actor B wants to subscribe to the
stream. On success, Actor A then sends a stream_ack_msg
to Actor B to confirm
the subscription. After that, Actor B can request items (values) from the stream
by sending a stream_demand_msg
to Actor A. As long as Actor A has demand, it
will send stream_batch_msg
messages to Actor B with the requested items.
If, at any point, Actor B wants to unsubscribe from the stream, it sends a
stream_cancel_msg
to Actor A. In case of an error, Actor A sends a
stream_abort_msg
to Actor B. Finally, if Actor A has no more items to send, it
sends a stream_close_msg
.
CAF will take care of all the stream messages automatically. However, it is important to be aware of the protocol because it has a few implications. Most importantly, the protocol uses batching in order to reduce the number of messages. However, this means that the items may be delivered in batches with some delay.
While CAF takes care of the stream protocol, we can still fine-tune the behavior of the stream. For this, we first declare a few constants that we will use later.
using namespace std::literals;
static constexpr auto max_batch_delay = 50ms;
static constexpr auto max_batch_size = 10u;
static constexpr auto max_buffered = 50u;
static constexpr auto demand_threshold = 5u;
Batching limits the number of messages exchanged between actors, which is
important for performance. However, it also increases latency. The stream API
let's us control the trade-off between throughput and latency by defining the
maximum batch delay as well as the maximum batch size. The former is defined by
the constant max_batch_delay
. The latter is defined by the constant
max_batch_size
. With these constants, we can tell CAF to send a batch whenever
it has ten items to send. If it has less than ten items, it will wait for up to
50 milliseconds before sending the batch regardless of the number of items.
The last two constants are for back-pressure. The constant max_buffered
defines how many items the consumer will buffer at most. If the consumer does
not request items from the producer, the producer will eventually stop sending
items. The constant demand_threshold
defines how many items the consumer must
be able to buffer before it will request more items from the producer. If we
choose a low threshold, the consumer will request more items more often by
sending a stream_demand_msg
messages to the producer. If we choose a high
threshold, we send fewer stream_demand_msg
messages. In the worst case, a high
demand threshold will lead to a start-stop behavior if the threshold is close to
the maximum number of buffered items. In this case, the consumer has to wait
until the buffer is empty before it can request more items again.
With these constants in place, we can define our actor for producing items:
caf::behavior source(caf::event_based_actor* self) {
return {
[self](caf::get_atom) {
return self->make_observable()
.iota(1)
.take(5)
.to_stream("int-flow", max_batch_delay, max_batch_size);
}
};
}
The actor has a single behavior that responds to get
messages. When receiving
such a message, the actor creates an observable that generates items from 1 to 5
and then converts the observable into a stream by calling to_stream
. The
stream is named int-flow
and we use the constants we have defined earlier to
control the batching behavior.
With the source actor in place, we can now implement actors that consumes the stream:
Source Code
void caf_main(caf::actor_system& sys) {
// Get the stream handle from the source actor.
caf::scoped_actor self{sys};
auto maybe_items = self->mail(caf::get_atom_v)
.request(sys.spawn(source), caf::infinite)
.receive<caf::stream>();
if (!maybe_items) {
self->println("failed to get a valid stream handle: {}", maybe_items.error());
return;
}
auto items = *maybe_items;
// Subscribe an actor to the stream and wait for it to complete.
auto snk1 = sys.spawn([items](caf::event_based_actor* snk) {
snk->observe_as<int>(items, max_buffered, demand_threshold)
.map([](int value) { return value * 2;})
.for_each([snk](int x) { snk->println("source-1: {}", x); });
});
// Wait for the first actor to complete to have a deterministic output.
self->wait_for(snk1);
// Subscribe another actor to the stream.
sys.spawn([items](caf::event_based_actor* snk) {
snk->observe_as<int>(items, max_buffered, demand_threshold)
.map([](int value) { return value * 2;})
.for_each([snk](int x) { snk->println("source-2: {}", x); });
});
}
CAF_MAIN()
Output
source-1: 2
source-1: 4
source-1: 6
source-1: 8
source-1: 10
source-2: 2
source-2: 4
source-2: 6
source-2: 8
source-2: 10
In our example, we first obtain the stream handle from the source actor. Then,
we spawn two actors that subscribe to the stream. Converting a stream into an
observable is done with the observe_as
function. The function requires a
template parameter that specifies the type of the items in the stream. In our
case, we use int
. The function then uses the type information to convert the
stream into an observable with the correct item type. The remaining arguments to
observe_as
are the tuning parameters that we have discussed when introducing
the constants. Once we have an observable, we can use the map
and for_each
operators to transform and consume the items.
Note that we use wait_for
in this example only to get a deterministic output.
The two actors could also run in parallel.
Typed Streams
Just like with actor handles, streams also come in two flavors: statically typed
and dynamically typed. We have seen the dynamic variant in the previous example.
The statically typed variant is called typed_stream
and the API is very
similar to the dynamic variant. The main difference is that we need to assign a
type ID to typed handle types that we want to use in messages.
// Same constants as before.
using namespace std::literals;
static constexpr auto max_batch_delay = 50ms;
static constexpr auto max_batch_size = 10u;
static constexpr auto max_buffered = 50u;
static constexpr auto demand_threshold = 5u;
// Make caf::typed_stream<int32_t> available to the serialization framework.
CAF_BEGIN_TYPE_ID_BLOCK(example_project, first_custom_type_id)
CAF_ADD_TYPE_ID(example_project, (caf::typed_stream<int32_t>))
CAF_END_TYPE_ID_BLOCK(example_project)
Once we have added the type ID, we can use the stream type in messages. The source actor looks almost identical to the untyped variant:
caf::behavior source(caf::event_based_actor* self) {
return {
[self](caf::get_atom) {
return self->make_observable()
.iota(int32_t{1})
.take(5)
.to_typed_stream("int-flow", max_batch_delay, max_batch_size);
}
};
}
The only difference is that we use to_typed_stream
instead of to_stream
and
force int32_t
as item type. The rest of the code is identical. Our remaining
code is also very similar to the untyped variant. The only differences are that
we use the handle type typed_stream<int32_t>
instead of stream
and that we
use observe
instead of observe_as
:
Source Code
void caf_main(caf::actor_system& sys) {
// Get the stream handle from the source actor.
using stream_t = caf::typed_stream<int32_t>;
caf::scoped_actor self{sys};
auto maybe_items = self->mail(caf::get_atom_v)
.request(sys.spawn(source), caf::infinite)
.receive<stream_t>();
if (!maybe_items) {
self->println("failed to get a valid stream handle: {}", maybe_items.error());
return;
}
auto items = *maybe_items;
// Subscribe an actor to the stream and wait for it to complete.
auto snk1 = sys.spawn([items](caf::event_based_actor* snk) {
snk->observe(items, max_buffered, demand_threshold)
.map([](int value) { return value * 2;})
.for_each([snk](int x) { snk->println("source-1: {}", x); });
});
// Wait for the first actor to complete to have a deterministic output.
self->wait_for(snk1);
// Subscribe another actor to the stream.
sys.spawn([items](caf::event_based_actor* snk) {
snk->observe(items, max_buffered, demand_threshold)
.map([](int value) { return value * 2;})
.for_each([snk](int x) { snk->println("source-2: {}", x); });
});
}
CAF_MAIN(caf::id_block::example_project)
Output
source-1: 2
source-1: 4
source-1: 6
source-1: 8
source-1: 10
source-2: 2
source-2: 4
source-2: 6
source-2: 8
source-2: 10
Next Up
We hope part 1 on the CAF flow API leaves you with a good understanding of the basic concepts. In part 2, we will look at error handling, merging and how to interface flows with regular actor messages.