Learning Resources

Flows

Part 1


Introduction

Data flows in CAF provide a high-level API for building processing pipelines in a data-oriented way. They complement the message-based actor communication and allow expressing complex data processing pipelines in a declarative way. Data flows can span multiple actors and may even connect actors to other, non-actor components. For example, actors can process frames from a WebSocket connection.

The flow API in CAF shares many similarities with ReactiveX-style APIs, but there is a noteworthy differences: CAF schedules flows on top of actors, which means observables cannot be passed around freely between threads and there is no analog to SubscribeOn.

In this guide, we learn how the flow API is structured and how to build processing pipelines using flow operators.

Observables and Observers

The major building block of the flow API is the observable. It represents a potentially unbound sequence of items that an observer may subscribe to. What makes an observable different from a regular container like a std::vector or a range is that items may arrive asynchronously over time.

Once subscribed, subscribers receive all items that the flow produces. A simple way to add an observer to an observable is to call for_each.

To create observables, actors can call make_observable that creates a factory object of type caf::flow::observable_builder. The factory offers many member functions to create items programmatically or to turn containers into observables. In the example below, we use from_container to turn a vector into an observable.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  self->make_observable()
    .from_container(std::vector{1, 2, 3, 4, 5})
    .for_each([self](int val) { self->println("{}", val); });
});

Output

1
2
3
4
5

Note: to use the flow API, make sure to include the header caf/scheduled_actor/flow.hpp. The flow API is not included by default to keep the core CAF headers small.

Operators

On their own, observables just provide an asynchronous way to provide a sequence of events to their observer(s). The real power of the flow API comes from the operators.

An operator provides a declarative way to convert an input observable to a new output observable, e.g., by filtering or transforming data.

The following example generates an observable that represents the infinite series starting at 1 by calling iota and then we apply operators to filter out all odd numbers and to select only the first five items.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  self->make_observable()
    .iota(1)
    .filter([](int value) { return value % 2 == 0; })
    .take(5)
    .for_each([self](int val) { self->println("{}", val); });
});

Output

2
4
6
8
10

The filter operator takes a predicate and only forwards items to the output observable if the predicate returns true. The take operator limits the number of items that the output observable will produce.

Note: a full list of implemented operators is available in the CAF manual.

Operator Fusion

When assembling observables with make_observable() by chaining operators, CAF usually does not produce an actual caf::flow::observable object. Instead, CAF returns a blueprint (a move-only factory type) for an observable that still allows users to add additional operators to it without actually instantiating an observable. This allows CAF to fuse multiple operators into a single object that lazily applies all steps of all fused operators to each item.

Operator fusion reduces the number of heap allocations for assembling flows and also makes processing much more efficient because it can eliminate unnecessary buffers between operators.

Calling as_observable on a blueprint forces CAF to turn it into an actual observable (that will allocate an object on the heap that implements the blueprint). This can be useful to subscribe to an observable at some later time or to apply additional operators later on, as shown below. When not calling as_observable, the blueprint is only automatically converted to an observable when subscribing to it.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  auto items = self->make_observable()
    .iota(1)
    .filter([](int value) { return value % 2 == 0; })
    .take(5)
    .as_observable();
  // ...some time later ...
  items
    .map([](int x) { return x * 3; })
    .for_each([self](auto val) { self->println("{}", val); });
});

Output

6
12
18
24
30

Everything we call before as_observable is added to the blueprint. This means that CAF can lazily evaluate iota, filter and take and fuse them into a single operator. The call to as_observable forces CAF to materialize the observable. After this step, the blueprint is no longer available and we can only use the resulting observable. To make use of operator fusion as much as possible, we recommend to chain operators as much as possible before calling as_observable.

Subscriptions

When subscribing to an observable, the observer receives a subscription object. The subscription object allows the observer to ask for more items by calling request and to unsubscribe from the observable by calling dispose.

Reference Counting

In CAF, caf::flow::observable, caf::flow::observer and caf::flow::subscription are all handle types. This means they only store a (smart) pointer to the underlying object. Copying a handle does not copy the underlying object, it only copies the pointer to it. This makes handles very cheap to copy. All copies refer to the same, reference-counted object. When the last copy goes out of scope, CAF destroys the underlying object.

However, despite being reference counted, the handle types are not thread-safe. Users must ensure that all operations on handles are performed bye the same actor (or on the same thread when using the flow API in a non-actor context).

Protocol between Observables, Observers and Subscriptions

The protocol between observables, observers and subscriptions is designed to enable asynchronous communication.

The interface for the observable consists of only one function:

disposable subscribe(observer<T> what);

The observable returns a disposable object that allows users to cancel the subscription at any time. Most of the time, however, users can simply ignore this object. A disposable represents a subscription, resource or task that can be canceled by calling dispose.

The interface for the observer consists of four functions:

void on_subscribe(subscription sub);

void on_next(const T& item);

void on_complete();

void on_error(const error& what);

When subscribing an observer to an observable, the observable calls on_subscribe on the observer. In case of an error, the observable calls on_error instead and does not generate a subscription object.

Before looking at the other two functions, let's first look at the interface for the subscription:

void request(size_t n);

The subscription object allows the observer to ask for items by calling request. The observable may not call on_next until the observer has signaled demand by calling request.

This demand signaling is important because it allows observers to control the rate at which they receive items. When an observable produces items faster than an observer can consume them, the observable would eventually run out of memory. To prevent this, CAF implements backpressure with the help of the request function. Essentially, backpressure is a mechanism that allows observers to slow down observables.

In case no errors occur, the protocol between observables and observers is straightforward:

Interaction

In the diagram above, the steps are as follows:

  1. The user calls subscribe on the observable, passing the observer as argument.
  2. The observable creates a new subscription object.
  3. The observable calls on_subscribe on the observer, passing the new subscription object as argument.
  4. The observer calls request on the subscription object to signal demand.
  5. The subscription calls on_next on the observer for each item.

Steps 4 and 5 are repeated until the subscription has no more items to produce. Then, the subscription calls on_complete on the observer to signal that no more items will be produced.

If an error occurs at any point, the observable or the subscription calls on_error on the observer.

Hot and Cold Observables

As we have discovered earlier, observables represent a sequence of items. Sometimes, these items can be produced on demand. In our previous examples, we have generated items lazily with the iota factory. An observable of this kind is called cold. It will do nothing until an observer subscribes to the values. Moreover, observers that subscribe to the values at different times will all see the exact same values, because the values are generated by the subscription object on demand.

The other kind of observables are called hot. They usually represent external data sources such as incoming data feeds or hardware events like keyboard strokes and mouse movements. Hot observables are always active and will produce items even if no observer is subscribed to them. Moreover, observers that subscribe to a hot observable at different times will see different values.

Concurrency

Unlike ReactiveX, CAF does not allow to share observers between threads. Observables and observers are not thread-safe. They must remain local to their owner, usually an actor.

However, an observable can be turned into a publisher by calling to_publisher(). A publisher is a thread-safe handle that allows to observe items from multiple actors and threads. An observable can be converted to a publisher by calling to_publisher(), as shown in the example below.

Source Code

// Spawn a new actor to produce items asynchronously.
sys.spawn([](caf::event_based_actor* producer) {
  auto items = producer->make_observable().iota(1).take(5).to_publisher();
  // Spawn a new actor to consume the items asynchronously.
  producer->spawn([items](caf::event_based_actor* consumer) {
    items
      .observe_on(consumer)
      .map([](int value) { return value * 2;})
      .for_each([consumer](int x) { consumer->println("{}", x);
    });
  });
});

Output

2
4
6
8
10

Asynchronous Buffers

Asynchronous buffers in CAF are uni-directional channels that connect a single producer with a single consumer. Thus, they are called SPSC buffers for short. Buffers provide a mechanism for connecting an actor to other asynchronous components, e.g., to a separate thread that reads from a file or a network socket.

SPSC Buffer

The diagram above shows a schematic view with an SPSC buffer. The producer can send items to the buffer and the consumer can receive items from the buffer. The buffer itself is asynchronous and thread-safe.

When using publishers, CAF will create SPSC buffers automatically to connect actors with each other. However, buffers can also be managed directly via buffer resources. CAF uses buffer resources in places where only a single producer or a single consumer is allowed. In general, we recommend sticking to the publisher API and only use buffer resources directly when necessary.

To get a better understanding of how SPSC buffers work, let's look at a more concrete example with two actors that are connected via an SPSC buffer.

Simple Pipeline with Two Actors

Source Code

// Our buffer resource: returns a read and a write end of an SPSC buffer.
auto [pull, push] = caf::async::make_spsc_buffer_resource<int>();
// The first actor pushes items into the buffer.
sys.spawn([push = std::move(push)](caf::event_based_actor* self) {
  self->make_observable().iota(1).take(5).subscribe(push);
});
// The second actor pulls items from the buffer.
sys.spawn([pull = std::move(pull)](caf::event_based_actor* self) {
  pull
    .observe_on(self)
    .map([](int value) { return value * 2;})
    .for_each([self](int x) { self->println("{}", x);
  });
});

Output

2
4
6
8
10

As we can see, CAF provides a factory function make_spsc_buffer_resource that returns a pair of resource handles that represent read and write access. The actual buffer is managed by CAF and is usually not accessible. The buffer is also automatically destroyed when the last handle to it goes out of scope.

When setting up our flows, we can either use pull.observe_on(self) or self->make_observable().from_resource(pull). They are equivalent. In fact, the observe_on function is just a convenience wrapper around from_resource. In both cases, we create an observable from the read end of the buffer. The observable will pull items from the buffer as needed. The write end of the buffer can be used to subscribe an observable to it. The observable will push items into the buffer automatically.

The resources returned by make_spsc_buffer_resource are (thread-safe) handle types. They can be copied and moved around. All instances of a consumer (pull) or producer (push) resource point to the same underlying object. This underlying object is a small synchronization primitive that manages the buffer. CAF "opens" a resource by calling try_open on the resource handle. This function will return the SPSC buffer if it is not already open. Otherwise, it will fail and return null. Hence, a resource can only be opened once. This is because the buffer only supports a single consumer and a single producer. The resource will also close the read or write end of the buffer (depending on the type of the resource) automatically when the last handle goes out of scope and try_open has never called.

Usually, we do not need to worry about the details of the resource handles. The important takeaway is that the resources makes sure that only a single consumer and a single producer can access the buffer. If we pass the same resource handle to multiple actors, then only one of them will be able to open the resource. The others will fail to open the resource and produce an error.

Declarative Setup of Flows with Inactive Actors

In our previous example, we have called make_spsc_buffer_resource manually and then connected two actors to the buffer. There is an operator that can do this for us: observe_on. This function takes the self pointer of an actor that is not yet running. The observe_on function does the exact same steps we have done previously: it creates an SPSC buffer, pushes all items from the input observable into the buffer, and then creates an new observable that reads from the buffer. The new observable from observe_on then runs on the new actor. Basically, observe_on allows us to switch mid-pipeline from one actor to another.

How do we create an actor that is not yet running? The actor system provides a function spawn_inactive for exactly this use case. It returns the self pointer to the new actor and a function object to launch the actor. However, we usually never call this function object directly. Once it goes out of scope, it will launch the actor implicitly. Because the actor is not yet running, we may access the self pointer and set up any flows we want. Once the actor is running, we must not access the self pointer anymore because the actor than runs asynchronously.

Source Code

auto [src, run_src] = sys.spawn_inactive();
auto [snk, run_snk] = sys.spawn_inactive();
src->make_observable()
  .iota(1)
  .take(5)
  .observe_on(snk)
  .map([](int value) { return value * 2;})
  .for_each([snk = snk](int x) { snk->println("{}", x); });

Output

2
4
6
8
10

The observe_on operator is a powerful tool for setting up flows in a declarative manner. However, keep in mind that you may access the self pointer of another actor only in this very specific case and not after launching it.

Since actors started with spawn_inactive have no behavior, they will terminate automatically once the flow is completed or an error occurs.

Streams

Publishers and buffer resources are powerful tools. However, they are limited to a single process. What if we want to stream data in a distributed system? This is where streams come into play. Streams are a general abstraction for connecting multiple actors to a single observable.

Streams do not use SPSC buffers. Instead, they use a message-based protocol to communicate between actors. Because they use regular actor messages, streams can also be used to connect actors across processes or even across machines.

Let use revisit the previous example with SPSC buffers. We had and observable on Actor A created with iota(1).take(5). We then connected this observable to a buffer. The buffer was then connected to an observable on Actor B. With streams, the picture changes as follows:

Simple Stream with Two Actors

This time, there is no SPSC buffer between the two actors. Instead, we have a message-based protocol with two steps. First, Actor B sends a stream_open_msg to Actor A. This message informs Actor A that Actor B wants to subscribe to the stream. On success, Actor A then sends a stream_ack_msg to Actor B to confirm the subscription. After that, Actor B can request items (values) from the stream by sending a stream_demand_msg to Actor A. As long as Actor A has demand, it will send stream_batch_msg messages to Actor B with the requested items.

If, at any point, Actor B wants to unsubscribe from the stream, it sends a stream_cancel_msg to Actor A. In case of an error, Actor A sends a stream_abort_msg to Actor B. Finally, if Actor A has no more items to send, it sends a stream_close_msg.

CAF will take care of all the stream messages automatically. However, it is important to be aware of the protocol because it has a few implications. Most importantly, the protocol uses batching in order to reduce the number of messages. However, this means that the items may be delivered in batches with some delay.

While CAF takes care of the stream protocol, we can still fine-tune the behavior of the stream. For this, we first declare a few constants that we will use later.

using namespace std::literals;
static constexpr auto max_batch_delay = 50ms;
static constexpr auto max_batch_size = 10u;
static constexpr auto max_buffered = 50u;
static constexpr auto demand_threshold = 5u;

Batching limits the number of messages exchanged between actors, which is important for performance. However, it also increases latency. The stream API let's us control the trade-off between throughput and latency by defining the maximum batch delay as well as the maximum batch size. The former is defined by the constant max_batch_delay. The latter is defined by the constant max_batch_size. With these constants, we can tell CAF to send a batch whenever it has ten items to send. If it has less than ten items, it will wait for up to 50 milliseconds before sending the batch regardless of the number of items.

The last two constants are for back-pressure. The constant max_buffered defines how many items the consumer will buffer at most. If the consumer does not request items from the producer, the producer will eventually stop sending items. The constant demand_threshold defines how many items the consumer must be able to buffer before it will request more items from the producer. If we choose a low threshold, the consumer will request more items more often by sending a stream_demand_msg messages to the producer. If we choose a high threshold, we send fewer stream_demand_msg messages. In the worst case, a high demand threshold will lead to a start-stop behavior if the threshold is close to the maximum number of buffered items. In this case, the consumer has to wait until the buffer is empty before it can request more items again.

With these constants in place, we can define our actor for producing items:

caf::behavior source(caf::event_based_actor* self) {
  return {
    [self](caf::get_atom) {
      return self->make_observable()
        .iota(1)
        .take(5)
        .to_stream("int-flow", max_batch_delay, max_batch_size);
    }
  };
}

The actor has a single behavior that responds to get messages. When receiving such a message, the actor creates an observable that generates items from 1 to 5 and then converts the observable into a stream by calling to_stream. The stream is named int-flow and we use the constants we have defined earlier to control the batching behavior.

With the source actor in place, we can now implement actors that consumes the stream:

Source Code

void caf_main(caf::actor_system& sys) {
  // Get the stream handle from the source actor.
  caf::scoped_actor self{sys};
  auto maybe_items = self->mail(caf::get_atom_v)
                         .request(sys.spawn(source), caf::infinite)
                         .receive<caf::stream>();
  if (!maybe_items) {
    self->println("failed to get a valid stream handle: {}", maybe_items.error());
    return;
  }
  auto items = *maybe_items;

  // Subscribe an actor to the stream and wait for it to complete.
  auto snk1 = sys.spawn([items](caf::event_based_actor* snk) {
    snk->observe_as<int>(items, max_buffered, demand_threshold)
      .map([](int value) { return value * 2;})
      .for_each([snk](int x) { snk->println("source-1: {}", x); });
  });

  // Wait for the first actor to complete to have a deterministic output.
  self->wait_for(snk1);

  // Subscribe another actor to the stream.
  sys.spawn([items](caf::event_based_actor* snk) {
    snk->observe_as<int>(items, max_buffered, demand_threshold)
      .map([](int value) { return value * 2;})
      .for_each([snk](int x) { snk->println("source-2: {}", x); });
  });
}

CAF_MAIN()

Output

source-1: 2
source-1: 4
source-1: 6
source-1: 8
source-1: 10
source-2: 2
source-2: 4
source-2: 6
source-2: 8
source-2: 10

In our example, we first obtain the stream handle from the source actor. Then, we spawn two actors that subscribe to the stream. Converting a stream into an observable is done with the observe_as function. The function requires a template parameter that specifies the type of the items in the stream. In our case, we use int. The function then uses the type information to convert the stream into an observable with the correct item type. The remaining arguments to observe_as are the tuning parameters that we have discussed when introducing the constants. Once we have an observable, we can use the map and for_each operators to transform and consume the items.

Note that we use wait_for in this example only to get a deterministic output. The two actors could also run in parallel.

Typed Streams

Just like with actor handles, streams also come in two flavors: statically typed and dynamically typed. We have seen the dynamic variant in the previous example. The statically typed variant is called typed_stream and the API is very similar to the dynamic variant. The main difference is that we need to assign a type ID to typed handle types that we want to use in messages.

// Same constants as before.
using namespace std::literals;
static constexpr auto max_batch_delay = 50ms;
static constexpr auto max_batch_size = 10u;
static constexpr auto max_buffered = 50u;
static constexpr auto demand_threshold = 5u;

// Make caf::typed_stream<int32_t> available to the serialization framework.
CAF_BEGIN_TYPE_ID_BLOCK(example_project, first_custom_type_id)
  CAF_ADD_TYPE_ID(example_project, (caf::typed_stream<int32_t>))
CAF_END_TYPE_ID_BLOCK(example_project)

Once we have added the type ID, we can use the stream type in messages. The source actor looks almost identical to the untyped variant:

caf::behavior source(caf::event_based_actor* self) {
  return {
    [self](caf::get_atom) {
      return self->make_observable()
        .iota(int32_t{1})
        .take(5)
        .to_typed_stream("int-flow", max_batch_delay, max_batch_size);
    }
  };
}

The only difference is that we use to_typed_stream instead of to_stream and force int32_t as item type. The rest of the code is identical. Our remaining code is also very similar to the untyped variant. The only differences are that we use the handle type typed_stream<int32_t> instead of stream and that we use observe instead of observe_as:

Source Code

void caf_main(caf::actor_system& sys) {
  // Get the stream handle from the source actor.
  using stream_t = caf::typed_stream<int32_t>;
  caf::scoped_actor self{sys};
  auto maybe_items = self->mail(caf::get_atom_v)
                         .request(sys.spawn(source), caf::infinite)
                         .receive<stream_t>();
  if (!maybe_items) {
    self->println("failed to get a valid stream handle: {}", maybe_items.error());
    return;
  }
  auto items = *maybe_items;

  // Subscribe an actor to the stream and wait for it to complete.
  auto snk1 = sys.spawn([items](caf::event_based_actor* snk) {
    snk->observe(items, max_buffered, demand_threshold)
      .map([](int value) { return value * 2;})
      .for_each([snk](int x) { snk->println("source-1: {}", x); });
  });

  // Wait for the first actor to complete to have a deterministic output.
  self->wait_for(snk1);

  // Subscribe another actor to the stream.
  sys.spawn([items](caf::event_based_actor* snk) {
    snk->observe(items, max_buffered, demand_threshold)
      .map([](int value) { return value * 2;})
      .for_each([snk](int x) { snk->println("source-2: {}", x); });
  });
}

CAF_MAIN(caf::id_block::example_project)

Output

source-1: 2
source-1: 4
source-1: 6
source-1: 8
source-1: 10
source-2: 2
source-2: 4
source-2: 6
source-2: 8
source-2: 10

Next Up

We hope part 1 on the CAF flow API leaves you with a good understanding of the basic concepts. In part 2, we will look at error handling, merging and how to interface flows with regular actor messages.