Learning Resources
Flows
Part 2
Introduction
In part 1, we have learned about the fundamental building blocks of the flow API: observables, observers, and operators. After covering the basics, we have also learned about publishers and streams.
So far, we have only seen linear flows. In this part, we will learn about combining flows. We will also learn about how to handle errors in flows. Last but not least, we will also learn about how to embed actor communication into flows.
Combining Flows
When dealing with multiple inputs, we often need to combine them into a single output. For example, we may have multiple sensors that provide temperature readings. We may want to combine these readings into a single stream so that we can process them together. This would be an example of merging flows, i.e., subscribing to multiple flows and combining their outputs into a single flow.
The second option to combine flows is to concatenate them. Instead of subscribing to all inputs all at once, we may want to consume the outputs in a particular order. This would mean that we subscribe to the first input, consume its output, then subscribe to the second input, consume its output, and so on. A simple example would be to open files on disk and concatenate their contents into a single stream, in the order in which they were opened.
The third option to combine flows is to zip them. This means that we consume the outputs of multiple flows in lockstep. For example, we may have two sensors that provide temperature and humidity readings. We may want to combine these readings into a single stream so that we can provide periodic updates of both readings.
Merging Flows
When merging flows, we subscribe to multiple flows and combine their outputs into a single flow. The order in which the outputs are combined is not deterministic.
In our first example, we will merge flows of multiple sensors that provide temperature readings. We will combine these readings into a single stream so that we can process them together.
First, we implement an open_sensor
function that creates a flow of temperature
readings. The resulting flow will provide a random value once every 250
milliseconds.
// A simple random number generator for integer values.
struct rng_t {
rng_t(int min, int max) : dist(min, max) {
engine.seed(std::random_device{}());
}
int next() {
return dist(engine);
}
std::minstd_rand engine;
std::uniform_int_distribution<int> dist;
};
// A sensor that provides temperature readings once every 250ms.
caf::flow::observable<int> open_sensor(caf::event_based_actor* self) {
auto rng = std::make_shared<rng_t>(10, 40);
return self->make_observable()
.interval(250ms)
.map([rng](int64_t) {
// Ignore the timestamp and return a random temperature reading.
return rng->next();
})
.as_observable();
}
The interval
operator emits an ascending sequence of integers at regular
intervals. We do not care about the actual values, so we ignore them and return
a random temperature reading instead. As we have learned in the last part, CAF
uses operator fusion to combine multiple processing steps into a single
operator. Hence, the result of map
is a blueprint, not a concrete flow. We
need to call as_observable
to create a flow that emits the mapped values.
Next, we implement an actor that creates three sensors and merges their outputs into a single stream. The actor will print the first ten readings to the console.
Source Code
sys.spawn([](caf::event_based_actor* self) {
// Create three flows for temperature readings and add an index to each value.
auto with_index = [](int index) {
return [index](int value) { return std::pair{index, value}; };
};
auto sensor1 = open_sensor(self).map(with_index(1));
auto sensor2 = open_sensor(self).map(with_index(2));
auto sensor3 = open_sensor(self).map(with_index(3));
std::move(sensor1).merge(std::move(sensor2), std::move(sensor3))
.take(10)
.for_each([self](std::pair<int, int> reading) {
auto [index, value] = reading;
self->println("sensor {}: {} °C", index, value);
});
});
Output
sensor 1: 27 °C
sensor 2: 11 °C
sensor 3: 13 °C
sensor 1: 18 °C
sensor 2: 28 °C
sensor 3: 19 °C
sensor 1: 12 °C
sensor 2: 25 °C
sensor 3: 28 °C
sensor 1: 21 °C
After "opening" the sensors, we add an index to each value so that we can
distinguish between the sensors. We then merge the three flows into a single
stream by calling merge
on the first flow and passing the other two flows as
arguments.
Note that we need to use std::move
here. The reason is once more operator
fusion: the blueprints are move-only objects. Instead of moving them, we could
also call as_observable
on each object instead. It makes no difference in this
case, since CAF will call as_observable
internally when we pass the blueprints
to merge
.
After calling merge
, we can treat the result as a single flow. Note that the
output of this example is of course non-deterministic and random. Each run will
print different values to the console.
It is worth mentioning that there are two ways to merge flows. The first way is
what we have seen in the example above: a.merge(b, c, ...)
. The second way is
to use an observable builder instead:
self->make_observable().merge(a, b, c, ...)
. The end result is the same.
Concatenating Flows
From an API perspective, concatenating flows is similar to merging them. The
difference is that we call concat
instead of merge
. However, the semantics
are of course different. Instead of consuming the outputs of all flows in
parallel, we consume them in a particular order. This implies that each input
flow is finite.
In our example, we will concatenate the contents of two files into a single stream. We will open the files in a particular order and consume their contents in the same order. For the sake of simplicity, we will also simulate the file contents with a fixed sequence of integers.
First, we implement an open_file
function that creates a flow of integers.
// A simple observable that emits an ascending sequence of 3 integers.
caf::flow::observable<int> open_file(caf::event_based_actor* self, int begin) {
return self->make_observable()
.iota(begin)
.take(3)
.as_observable();
}
The iota
operator emits an ascending sequence of integers starting from a
given value. We limit the sequence to three integers by calling take
.
Next, we implement an actor that opens three files and concatenates their contents into a single stream. The actor will print the contents of the files to the console again.
Source Code
sys.spawn([](caf::event_based_actor* self) {
// Create three flows for temperature readings and add an index to each value.
auto file1 = open_file(self, 10);
auto file2 = open_file(self, 20);
auto file3 = open_file(self, 30);
std::move(file1).concat(std::move(file2), std::move(file3))
.for_each([self](int value) {
self->println("{}", value);
});
});
Output
10
11
12
20
21
22
30
31
32
This time, the output of the example is deterministic. The actor will always print the same numbers in the same order.
Again, we could write the same code using an observable builder, i.e., by
calling self->make_observable().concat(a, b, c, ...)
instead of
a.concat(b, c, ...)
.
Zipping Flows
When zipping flows, we consume the outputs of multiple flows in lockstep. This means that we consume the first output of all flows, then the second output of all flows, and so on. Hence, all input flows must produce the same number of elements. If one flow completes before the others, the zipped flow will also complete.
When creating a zipped flow, we must provide a "zipper" function that combines the outputs of all input flows into a single output. The zipper function must accept the same number of arguments as there are input flows.
For our next example, we will use the open_sensor
function from the merge
example to create two flows. One sensor will provide temperature readings, and
the other sensor will provide humidity readings. Our zipper function will simply
combine the readings into a pair.
Source Code
sys.spawn([](caf::event_based_actor* self) {
auto to_pair = [](int temp, int hum) {
return std::pair{temp, hum};
};
auto temps = open_sensor(self);
auto hums = open_sensor(self);
self->make_observable()
.zip_with(to_pair, temps, hums)
.take(5)
.for_each([self](std::pair<int, int> reading) {
auto [temp, hum] = reading;
self->println("temperature: {} °C, humidity: {} %RH", temp, hum);
});
});
Output
temperature: 10 °C, humidity: 17 %RH
temperature: 36 °C, humidity: 38 %RH
temperature: 34 °C, humidity: 24 %RH
temperature: 19 °C, humidity: 15 %RH
temperature: 37 °C, humidity: 26 %RH
Since we are using random values, the output of this example is also non-deterministic. Each run will print different values to the console.
This time, we have used the self->make_observable().zip_with
method to create
the zipped flow. We could also call temps.zip_with(to_pair, hums)
instead.
Arguably, the former is a bit more readable in this case, because the zipper is
the first argument of the method and takes its arguments in the same order as
the input flows.
Merging Flows Dynamically
In our first example on merging, we had a fixed number of sensors. However, what if we cannot know the number of sensors in advance? In real-world applications, this is a very common scenario and usually modeled as observables that emit other observables.
Let us revisit our first example, but this time we add a twist to it. Instead of having a fixed number of sensors, we have a flow of indexes that we turn into a flow of sensors. We then merge all sensor flows into a single flow.
Source Code
sys.spawn([](caf::event_based_actor* self) {
self->make_observable()
// Create the indexes for our sensors: 1, 2, 3.
.iota(1)
.take(3)
// Turn the indexes into sensor flows and add the index to each value.
.map([self](int index) {
return open_sensor(self)
.map([index](int value) { return std::pair{index, value}; })
.as_observable();
})
// Turn `observable<observable<T>>` into `observable<T>` via merging.
.merge()
// Same as before: print the first ten readings to the console.
.take(10)
.for_each([self](std::pair<int, int> reading) {
auto [index, value] = reading;
self->println("sensor {}: {} °C", index, value);
});
});
Output
sensor 1: 28 °C
sensor 2: 19 °C
sensor 3: 20 °C
sensor 1: 26 °C
sensor 2: 12 °C
sensor 3: 17 °C
sensor 1: 39 °C
sensor 2: 38 °C
sensor 3: 21 °C
sensor 1: 35 °C
Instead of having three variables for the sensors, we now have produced a flow
of sensor observables that we merge into a single flow. If we have an
observable<observable<T>>
, we can call merge
on it with no arguments to
flatten it into an observable<T>
. The output of this example is also
non-deterministic, just like the first merge example.
Turning a sequence of values into an sequence of observables (e.g. by opening a
file or connecting to some sensor) and then merging them is such a common
pattern, that this operation has its own operator: flat_map
. Let us rewrite
the example one last time using flat_map
.
Source Code
sys.spawn([](caf::event_based_actor* self) {
self->make_observable()
.iota(1)
.take(3)
.flat_map([self](int index) {
return open_sensor(self)
.map([index](int value) { return std::pair{index, value}; });
})
.take(10)
.for_each([self](std::pair<int, int> reading) {
auto [index, value] = reading;
self->println("sensor {}: {} °C", index, value);
});
});
Output
sensor 1: 17 °C
sensor 2: 35 °C
sensor 3: 28 °C
sensor 1: 20 °C
sensor 2: 28 °C
sensor 3: 19 °C
sensor 1: 12 °C
sensor 2: 25 °C
sensor 3: 24 °C
sensor 1: 40 °C
The two versions are equivalent. Using the flat_map
operator is simply more
concise and easier to read. We can also omit calling as_observable
on the
inner observables, because flat_map
will do this automatically.
Concatenating Flows Dynamically
In the same way we can merge flows dynamically, we can also concatenate them
dynamically. The pattern is the same: we have a flow of observables that we
concatenate into a single flow. We can do this either by calling map
followed
by concat
or by using the concat_map
operator. We will use the latter for
brevity.
Source Code
sys.spawn([](caf::event_based_actor* self) {
self->make_observable()
// Create the indexes for our files: 10, 20, 30.
.iota(1)
.take(3)
.map([](int value) { return value * 10; })
// Turn the indexes into file flows.
.concat_map([self](int begin) {
return open_file(self, begin);
})
// Print the contents of the files to the console.
.for_each([self](int value) {
self->println("{}", value);
});
});
Output
10
11
12
20
21
22
30
31
32
The output of this example is the same as before. The only difference is that we have generated our file observables dynamically.
One note on zip_with
before we move on: there is no dynamic version of this
operator. The reason is that zipping requires a zipper function that combines
the outputs of all input flows into a single output. Without knowing the number
of input flows in advance, we cannot provide such a zipper function since the
number of arguments is unknown.
Embedding Requests into Flows
So far, we did all the processing in the flow itself. However, sometimes we need to send requests to other actors to delegate some work. For example, we may want to send a request to a database actor to fetch some data. We can embed such requests directly into flows without breaking the chain of operators.
In our example, we will create a flow of integers and send a request to another actor to double each value. We will then print the doubled values to the console.
Source Code
// A simple actor that doubles an integer.
auto doubler = sys.spawn([]() -> caf::behavior {
return {
[](int value) {
return value * 2;
},
};
});
// An actor that runs a flow of integers and doubles each value using `doubler`.
sys.spawn([doubler](caf::event_based_actor* self) {
self->make_observable()
.iota(1)
.take(5)
.concat_map([self, doubler](int value) {
return self->mail(value)
.request(doubler, 1s)
.as_observable<int>();
})
.for_each([self](int value) {
self->println("{}", value);
});
});
Output
2
4
6
8
10
In this example, we have a simple worker called doubler
. From our main actor,
we send a request to doubler
for each value in the flow. We then convert the
responses back into a flow using as_observable<T>
to convert the response into
an observable<T>
. Then, we dynamically concatenate all responses into a single
flow and print the values to the console. We could also use flat_map
instead
of concat_map
if the order of the values does not matter.
When sending a request to a typed actor, we call as_observable
without a
template parameter, because the response type is already known.
However, back to our example above. What if we pass the wrong type to
as_observable
? Well, the type conversion will fail. Which brings us to our
next topic: error handling in flows.
Error Handling
If we would change our last example to use as_observable<float>
instead of
as_observable<int>
, the example would print... nothing. The reason is that the
flow aborts as soon as an error occurs and we have no error handling in place to
do something about it.
In part 1, we had learned the four member functions that an observer
needs to
provide. Here is a quick reminder:
void on_subscribe(subscription sub);
void on_next(const T& item);
void on_complete();
void on_error(const error& what);
When using for_each
, we only provide the on_next
function. However, we can
also inject callbacks for on_complete
and on_error
by using the
do_on_complete
and do_on_error
respectively. CAF also offers do_finally
to
inject a callback that is called if either on_complete
or on_error
is
called.
Let us use the previous example, but this time we will pass the wrong type to
as_observable
and provide a callback for the error.
Source Code
// A simple actor that doubles an integer.
auto doubler = sys.spawn([]() -> caf::behavior {
return {
[](int value) {
return value * 2;
},
};
});
// An actor that runs a flow of integers and doubles each value using `doubler`.
sys.spawn([doubler](caf::event_based_actor* self) {
self->make_observable()
.iota(1)
.take(5)
.concat_map([self, doubler](int value) {
return self->mail(value)
.request(doubler, 1s)
.as_observable<float>();
})
.do_on_error([self](const caf::error& what) {
self->println("error: {}", what);
})
.for_each([self](int value) {
self->println("{}", value);
});
});
Output
error: unexpected_response(message(2))
This time, the example prints an unexpected_response
error message to the
console. We told CAF that we expect a float
, but we received an int
. Hence,
CAF produces the error message, calls our error callback and aborts the flow.
With the do
functions, we can inject callbacks at various points in the flow.
However, we cannot recover from an error. Once an error occurs, the flow is
aborted and no further values are processed. If we want to recover from an
error, we need to use a different approach.
The simplest way to recover from an error is to simply ignore it. We can do this
by using the on_error_complete
operator. This operator will simply call
on_complete
if an error occurs. This way, a flow terminates normally even if
an error occurs. In our previous example, the error occurred in a dynamically
created observable that we concatenated into the main flow.
Let us make the example a bit more interesting by raising an error in doubler
whenever the input value is a multiple of 3 and then use on_error_complete
to
discard errors in the main flow.
caf::behavior doubler_impl() {
return {
[](int value) -> caf::result<int> {
if (value % 3 == 0)
return caf::make_error(caf::sec::invalid_argument);
return value * 2;
},
};
}
We will re-use this implementation for the doubler
actor in our next examples.
With the on_error_complete
operator, we can now simply discard errors in the
main flow, as shown in the following example.
Source Code
auto doubler = sys.spawn(doubler_impl);
sys.spawn([doubler](caf::event_based_actor* self) {
self->make_observable()
.iota(1)
.take(5)
.concat_map([self, doubler](int value) {
return self->mail(value)
.request(doubler, 1s)
.as_observable<int>()
.on_error_complete();
})
.do_on_error([self](const caf::error& what) {
self->println("error: {}", what);
})
.for_each([self](int value) {
self->println("{}", value);
});
});
Output
2
4
8
10
This time, the example will print the values 2, 4, 8, and 10 to the console. The
value 6 will raise an error in doubler
, but the error will be discarded by
on_error_complete
. The flow will continue with the next value.
If instead of discarding the error, we want to use a default value instead,
we can use the on_error_return_item
operator. This operator will replace the
error with a default value and continue the flow.
Source Code
auto doubler = sys.spawn(doubler_impl);
sys.spawn([doubler](caf::event_based_actor* self) {
self->make_observable()
.iota(1)
.take(5)
.concat_map([self, doubler](int value) {
return self->mail(value)
.request(doubler, 1s)
.as_observable<int>()
.on_error_return_item(-1);
})
.do_on_error([self](const caf::error& what) {
self->println("error: {}", what);
})
.for_each([self](int value) {
self->println("{}", value);
});
});
Output
2
4
-1
8
10
With this implementation, the example will print the values 2, 4, -1, 8, and 10
to the console. Again, the value 6 will raise an error in doubler
, but the
error will be replaced with -1 by on_error_return_item
.
The last operator we will look at is on_error_return
. This operator takes a
function object that takes the error as an argument and returns an
expected<T>
. This allows us to inspect the error and decide what to do based
on the error. We can either return a value to hide the error, simply return the
original error, or return a different error.
We will use this operator to return -1 for invalid arguments and the original error for all other errors.
Source Code
auto doubler = sys.spawn(doubler_impl);
sys.spawn([doubler](caf::event_based_actor* self) {
self->make_observable()
.iota(1)
.take(5)
.concat_map([self, doubler](int value) {
return self->mail(value)
.request(doubler, 1s)
.as_observable<int>()
.on_error_return([](const caf::error& what) -> caf::expected<int> {
if (what == caf::sec::invalid_argument)
return -1;
return what;
});
})
.do_on_error([self](const caf::error& what) {
self->println("error: {}", what);
})
.for_each([self](int value) {
self->println("{}", value);
});
});
Output
2
4
-1
8
10
The output is the same as with on_error_return_item
. However, this time we
have selectively replaced the error for invalid arguments only. Any other error
will still trigger the do_on_error
callback and abort the flow.
Next Up
In this part, we have learned how to combine flows. We have seen how to merge, concatenate, and zip flows. We have also learned how to handle errors in flows and how to embed actor communication into flows.
In part 3, we will learn about observable variants such as single and connectable. Further, we will learn how to implement custom transformation steps and discuss how backpressure works in CAF.