Learning Resources

Flows

Part 2


Introduction

In part 1, we have learned about the fundamental building blocks of the flow API: observables, observers, and operators. After covering the basics, we have also learned about publishers and streams.

So far, we have only seen linear flows. In this part, we will learn about combining flows. We will also learn about how to handle errors in flows. Last but not least, we will also learn about how to embed actor communication into flows.

Combining Flows

When dealing with multiple inputs, we often need to combine them into a single output. For example, we may have multiple sensors that provide temperature readings. We may want to combine these readings into a single stream so that we can process them together. This would be an example of merging flows, i.e., subscribing to multiple flows and combining their outputs into a single flow.

The second option to combine flows is to concatenate them. Instead of subscribing to all inputs all at once, we may want to consume the outputs in a particular order. This would mean that we subscribe to the first input, consume its output, then subscribe to the second input, consume its output, and so on. A simple example would be to open files on disk and concatenate their contents into a single stream, in the order in which they were opened.

The third option to combine flows is to zip them. This means that we consume the outputs of multiple flows in lockstep. For example, we may have two sensors that provide temperature and humidity readings. We may want to combine these readings into a single stream so that we can provide periodic updates of both readings.

Merging Flows

When merging flows, we subscribe to multiple flows and combine their outputs into a single flow. The order in which the outputs are combined is not deterministic.

In our first example, we will merge flows of multiple sensors that provide temperature readings. We will combine these readings into a single stream so that we can process them together.

First, we implement an open_sensor function that creates a flow of temperature readings. The resulting flow will provide a random value once every 250 milliseconds.

// A simple random number generator for integer values.
struct rng_t {
  rng_t(int min, int max) : dist(min, max) {
    engine.seed(std::random_device{}());
  }

  int next() {
    return dist(engine);
  }

  std::minstd_rand engine;
  std::uniform_int_distribution<int> dist;
};

// A sensor that provides temperature readings once every 250ms.
caf::flow::observable<int> open_sensor(caf::event_based_actor* self) {
  auto rng = std::make_shared<rng_t>(10, 40);
  return self->make_observable()
    .interval(250ms)
    .map([rng](int64_t) {
      // Ignore the timestamp and return a random temperature reading.
      return rng->next();
    })
    .as_observable();
}

The interval operator emits an ascending sequence of integers at regular intervals. We do not care about the actual values, so we ignore them and return a random temperature reading instead. As we have learned in the last part, CAF uses operator fusion to combine multiple processing steps into a single operator. Hence, the result of map is a blueprint, not a concrete flow. We need to call as_observable to create a flow that emits the mapped values.

Next, we implement an actor that creates three sensors and merges their outputs into a single stream. The actor will print the first ten readings to the console.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  // Create three flows for temperature readings and add an index to each value.
  auto with_index = [](int index) {
    return [index](int value) { return std::pair{index, value}; };
  };
  auto sensor1 = open_sensor(self).map(with_index(1));
  auto sensor2 = open_sensor(self).map(with_index(2));
  auto sensor3 = open_sensor(self).map(with_index(3));
  std::move(sensor1).merge(std::move(sensor2), std::move(sensor3))
    .take(10)
    .for_each([self](std::pair<int, int> reading) {
      auto [index, value] = reading;
      self->println("sensor {}: {} °C", index, value);
    });
});

Output

sensor 1: 27 °C
sensor 2: 11 °C
sensor 3: 13 °C
sensor 1: 18 °C
sensor 2: 28 °C
sensor 3: 19 °C
sensor 1: 12 °C
sensor 2: 25 °C
sensor 3: 28 °C
sensor 1: 21 °C

After "opening" the sensors, we add an index to each value so that we can distinguish between the sensors. We then merge the three flows into a single stream by calling merge on the first flow and passing the other two flows as arguments.

Note that we need to use std::move here. The reason is once more operator fusion: the blueprints are move-only objects. Instead of moving them, we could also call as_observable on each object instead. It makes no difference in this case, since CAF will call as_observable internally when we pass the blueprints to merge.

After calling merge, we can treat the result as a single flow. Note that the output of this example is of course non-deterministic and random. Each run will print different values to the console.

It is worth mentioning that there are two ways to merge flows. The first way is what we have seen in the example above: a.merge(b, c, ...). The second way is to use an observable builder instead: self->make_observable().merge(a, b, c, ...). The end result is the same.

Concatenating Flows

From an API perspective, concatenating flows is similar to merging them. The difference is that we call concat instead of merge. However, the semantics are of course different. Instead of consuming the outputs of all flows in parallel, we consume them in a particular order. This implies that each input flow is finite.

In our example, we will concatenate the contents of two files into a single stream. We will open the files in a particular order and consume their contents in the same order. For the sake of simplicity, we will also simulate the file contents with a fixed sequence of integers.

First, we implement an open_file function that creates a flow of integers.

// A simple observable that emits an ascending sequence of 3 integers.
caf::flow::observable<int> open_file(caf::event_based_actor* self, int begin) {
  return self->make_observable()
    .iota(begin)
    .take(3)
    .as_observable();
}

The iota operator emits an ascending sequence of integers starting from a given value. We limit the sequence to three integers by calling take.

Next, we implement an actor that opens three files and concatenates their contents into a single stream. The actor will print the contents of the files to the console again.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  // Create three flows for temperature readings and add an index to each value.
  auto file1 = open_file(self, 10);
  auto file2 = open_file(self, 20);
  auto file3 = open_file(self, 30);
  std::move(file1).concat(std::move(file2), std::move(file3))
    .for_each([self](int value) {
      self->println("{}", value);
    });
});

Output

10
11
12
20
21
22
30
31
32

This time, the output of the example is deterministic. The actor will always print the same numbers in the same order.

Again, we could write the same code using an observable builder, i.e., by calling self->make_observable().concat(a, b, c, ...) instead of a.concat(b, c, ...).

Zipping Flows

When zipping flows, we consume the outputs of multiple flows in lockstep. This means that we consume the first output of all flows, then the second output of all flows, and so on. Hence, all input flows must produce the same number of elements. If one flow completes before the others, the zipped flow will also complete.

When creating a zipped flow, we must provide a "zipper" function that combines the outputs of all input flows into a single output. The zipper function must accept the same number of arguments as there are input flows.

For our next example, we will use the open_sensor function from the merge example to create two flows. One sensor will provide temperature readings, and the other sensor will provide humidity readings. Our zipper function will simply combine the readings into a pair.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  auto to_pair = [](int temp, int hum) {
    return std::pair{temp, hum};
  };
  auto temps = open_sensor(self);
  auto hums = open_sensor(self);
  self->make_observable()
    .zip_with(to_pair, temps, hums)
    .take(5)
    .for_each([self](std::pair<int, int> reading) {
      auto [temp, hum] = reading;
      self->println("temperature: {} °C, humidity: {} %RH", temp, hum);
    });
});

Output

temperature: 10 °C, humidity: 17 %RH
temperature: 36 °C, humidity: 38 %RH
temperature: 34 °C, humidity: 24 %RH
temperature: 19 °C, humidity: 15 %RH
temperature: 37 °C, humidity: 26 %RH

Since we are using random values, the output of this example is also non-deterministic. Each run will print different values to the console.

This time, we have used the self->make_observable().zip_with method to create the zipped flow. We could also call temps.zip_with(to_pair, hums) instead. Arguably, the former is a bit more readable in this case, because the zipper is the first argument of the method and takes its arguments in the same order as the input flows.

Merging Flows Dynamically

In our first example on merging, we had a fixed number of sensors. However, what if we cannot know the number of sensors in advance? In real-world applications, this is a very common scenario and usually modeled as observables that emit other observables.

Let us revisit our first example, but this time we add a twist to it. Instead of having a fixed number of sensors, we have a flow of indexes that we turn into a flow of sensors. We then merge all sensor flows into a single flow.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  self->make_observable()
    // Create the indexes for our sensors: 1, 2, 3.
    .iota(1)
    .take(3)
    // Turn the indexes into sensor flows and add the index to each value.
    .map([self](int index) {
      return open_sensor(self)
        .map([index](int value) { return std::pair{index, value}; })
        .as_observable();
    })
    // Turn `observable<observable<T>>` into `observable<T>` via merging.
    .merge()
    // Same as before: print the first ten readings to the console.
    .take(10)
    .for_each([self](std::pair<int, int> reading) {
      auto [index, value] = reading;
      self->println("sensor {}: {} °C", index, value);
    });
});

Output

sensor 1: 28 °C
sensor 2: 19 °C
sensor 3: 20 °C
sensor 1: 26 °C
sensor 2: 12 °C
sensor 3: 17 °C
sensor 1: 39 °C
sensor 2: 38 °C
sensor 3: 21 °C
sensor 1: 35 °C

Instead of having three variables for the sensors, we now have produced a flow of sensor observables that we merge into a single flow. If we have an observable<observable<T>>, we can call merge on it with no arguments to flatten it into an observable<T>. The output of this example is also non-deterministic, just like the first merge example.

Turning a sequence of values into an sequence of observables (e.g. by opening a file or connecting to some sensor) and then merging them is such a common pattern, that this operation has its own operator: flat_map. Let us rewrite the example one last time using flat_map.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  self->make_observable()
    .iota(1)
    .take(3)
    .flat_map([self](int index) {
      return open_sensor(self)
        .map([index](int value) { return std::pair{index, value}; });
    })
    .take(10)
    .for_each([self](std::pair<int, int> reading) {
      auto [index, value] = reading;
      self->println("sensor {}: {} °C", index, value);
    });
});

Output

sensor 1: 17 °C
sensor 2: 35 °C
sensor 3: 28 °C
sensor 1: 20 °C
sensor 2: 28 °C
sensor 3: 19 °C
sensor 1: 12 °C
sensor 2: 25 °C
sensor 3: 24 °C
sensor 1: 40 °C

The two versions are equivalent. Using the flat_map operator is simply more concise and easier to read. We can also omit calling as_observable on the inner observables, because flat_map will do this automatically.

Concatenating Flows Dynamically

In the same way we can merge flows dynamically, we can also concatenate them dynamically. The pattern is the same: we have a flow of observables that we concatenate into a single flow. We can do this either by calling map followed by concat or by using the concat_map operator. We will use the latter for brevity.

Source Code

sys.spawn([](caf::event_based_actor* self) {
  self->make_observable()
    // Create the indexes for our files: 10, 20, 30.
    .iota(1)
    .take(3)
    .map([](int value) { return value * 10; })
    // Turn the indexes into file flows.
    .concat_map([self](int begin) {
      return open_file(self, begin);
    })
    // Print the contents of the files to the console.
    .for_each([self](int value) {
      self->println("{}", value);
    });
});

Output

10
11
12
20
21
22
30
31
32

The output of this example is the same as before. The only difference is that we have generated our file observables dynamically.

One note on zip_with before we move on: there is no dynamic version of this operator. The reason is that zipping requires a zipper function that combines the outputs of all input flows into a single output. Without knowing the number of input flows in advance, we cannot provide such a zipper function since the number of arguments is unknown.

Embedding Requests into Flows

So far, we did all the processing in the flow itself. However, sometimes we need to send requests to other actors to delegate some work. For example, we may want to send a request to a database actor to fetch some data. We can embed such requests directly into flows without breaking the chain of operators.

In our example, we will create a flow of integers and send a request to another actor to double each value. We will then print the doubled values to the console.

Source Code

// A simple actor that doubles an integer.
auto doubler = sys.spawn([]() -> caf::behavior {
  return {
    [](int value) {
      return value * 2;
    },
  };
});
// An actor that runs a flow of integers and doubles each value using `doubler`.
sys.spawn([doubler](caf::event_based_actor* self) {
  self->make_observable()
    .iota(1)
    .take(5)
    .concat_map([self, doubler](int value) {
      return self->mail(value)
        .request(doubler, 1s)
        .as_observable<int>();
    })
    .for_each([self](int value) {
      self->println("{}", value);
    });
});

Output

2
4
6
8
10

In this example, we have a simple worker called doubler. From our main actor, we send a request to doubler for each value in the flow. We then convert the responses back into a flow using as_observable<T> to convert the response into an observable<T>. Then, we dynamically concatenate all responses into a single flow and print the values to the console. We could also use flat_map instead of concat_map if the order of the values does not matter.

When sending a request to a typed actor, we call as_observable without a template parameter, because the response type is already known.

However, back to our example above. What if we pass the wrong type to as_observable? Well, the type conversion will fail. Which brings us to our next topic: error handling in flows.

Error Handling

If we would change our last example to use as_observable<float> instead of as_observable<int>, the example would print... nothing. The reason is that the flow aborts as soon as an error occurs and we have no error handling in place to do something about it.

In part 1, we had learned the four member functions that an observer needs to provide. Here is a quick reminder:

void on_subscribe(subscription sub);

void on_next(const T& item);

void on_complete();

void on_error(const error& what);

When using for_each, we only provide the on_next function. However, we can also inject callbacks for on_complete and on_error by using the do_on_complete and do_on_error respectively. CAF also offers do_finally to inject a callback that is called if either on_complete or on_error is called.

Let us use the previous example, but this time we will pass the wrong type to as_observable and provide a callback for the error.

Source Code

// A simple actor that doubles an integer.
auto doubler = sys.spawn([]() -> caf::behavior {
  return {
    [](int value) {
      return value * 2;
    },
  };
});
// An actor that runs a flow of integers and doubles each value using `doubler`.
sys.spawn([doubler](caf::event_based_actor* self) {
  self->make_observable()
    .iota(1)
    .take(5)
    .concat_map([self, doubler](int value) {
      return self->mail(value)
        .request(doubler, 1s)
        .as_observable<float>();
    })
    .do_on_error([self](const caf::error& what) {
      self->println("error: {}", what);
    })
    .for_each([self](int value) {
      self->println("{}", value);
    });
});

Output

error: unexpected_response(message(2))

This time, the example prints an unexpected_response error message to the console. We told CAF that we expect a float, but we received an int. Hence, CAF produces the error message, calls our error callback and aborts the flow.

With the do functions, we can inject callbacks at various points in the flow. However, we cannot recover from an error. Once an error occurs, the flow is aborted and no further values are processed. If we want to recover from an error, we need to use a different approach.

The simplest way to recover from an error is to simply ignore it. We can do this by using the on_error_complete operator. This operator will simply call on_complete if an error occurs. This way, a flow terminates normally even if an error occurs. In our previous example, the error occurred in a dynamically created observable that we concatenated into the main flow.

Let us make the example a bit more interesting by raising an error in doubler whenever the input value is a multiple of 3 and then use on_error_complete to discard errors in the main flow.

caf::behavior doubler_impl() {
  return {
    [](int value) -> caf::result<int> {
      if (value % 3 == 0)
        return caf::make_error(caf::sec::invalid_argument);
      return value * 2;
    },
  };
}

We will re-use this implementation for the doubler actor in our next examples.

With the on_error_complete operator, we can now simply discard errors in the main flow, as shown in the following example.

Source Code

auto doubler = sys.spawn(doubler_impl);
sys.spawn([doubler](caf::event_based_actor* self) {
  self->make_observable()
    .iota(1)
    .take(5)
    .concat_map([self, doubler](int value) {
      return self->mail(value)
        .request(doubler, 1s)
        .as_observable<int>()
        .on_error_complete();
    })
    .do_on_error([self](const caf::error& what) {
      self->println("error: {}", what);
    })
    .for_each([self](int value) {
      self->println("{}", value);
    });
});

Output

2
4
8
10

This time, the example will print the values 2, 4, 8, and 10 to the console. The value 6 will raise an error in doubler, but the error will be discarded by on_error_complete. The flow will continue with the next value.

If instead of discarding the error, we want to use a default value instead, we can use the on_error_return_item operator. This operator will replace the error with a default value and continue the flow.

Source Code

auto doubler = sys.spawn(doubler_impl);
sys.spawn([doubler](caf::event_based_actor* self) {
  self->make_observable()
    .iota(1)
    .take(5)
    .concat_map([self, doubler](int value) {
      return self->mail(value)
        .request(doubler, 1s)
        .as_observable<int>()
        .on_error_return_item(-1);
    })
    .do_on_error([self](const caf::error& what) {
      self->println("error: {}", what);
    })
    .for_each([self](int value) {
      self->println("{}", value);
    });
});

Output

2
4
-1
8
10

With this implementation, the example will print the values 2, 4, -1, 8, and 10 to the console. Again, the value 6 will raise an error in doubler, but the error will be replaced with -1 by on_error_return_item.

The last operator we will look at is on_error_return. This operator takes a function object that takes the error as an argument and returns an expected<T>. This allows us to inspect the error and decide what to do based on the error. We can either return a value to hide the error, simply return the original error, or return a different error.

We will use this operator to return -1 for invalid arguments and the original error for all other errors.

Source Code

auto doubler = sys.spawn(doubler_impl);
sys.spawn([doubler](caf::event_based_actor* self) {
  self->make_observable()
    .iota(1)
    .take(5)
    .concat_map([self, doubler](int value) {
      return self->mail(value)
        .request(doubler, 1s)
        .as_observable<int>()
        .on_error_return([](const caf::error& what) -> caf::expected<int> {
          if (what == caf::sec::invalid_argument)
            return -1;
          return what;
        });
    })
    .do_on_error([self](const caf::error& what) {
      self->println("error: {}", what);
    })
    .for_each([self](int value) {
      self->println("{}", value);
    });
});

Output

2
4
-1
8
10

The output is the same as with on_error_return_item. However, this time we have selectively replaced the error for invalid arguments only. Any other error will still trigger the do_on_error callback and abort the flow.

Next Up

In this part, we have learned how to combine flows. We have seen how to merge, concatenate, and zip flows. We have also learned how to handle errors in flows and how to embed actor communication into flows.

In part 3, we will learn about observable variants such as single and connectable. Further, we will learn how to implement custom transformation steps and discuss how backpressure works in CAF.