Learning Resources

Monitoring and Linking


Introduction

As Carl Hewitt puts it, one actor is no actor. They come in systems where some actors are focused on handling incoming tasks while others focus on orchestration and supervision.

Some actors only exist to manage other actors. They detect failing workers in the distributed system via monitoring or linking. After detecting failing subordinates, they may spin up replacements or at least make sure their subsystem fails collectively to avoid partial errors. In the latter case, an even higher level actor may then jump to action.

Monitoring

The simplest way to observe the lifetime of another actor is by monitoring it. This mechanism is simple, but effective. The API is also straightforward: we call self->monitor(other, callback) to have CAF call the callback once the other actor goes down. The callback takes a caf::error as argument, which indicates the reason why the monitored actor terminated. A default-constructed error means "no error" and means that the actor simply completed its task and stopped, e.g. by calling self->quit().

The monitor function returns disposable object that we can use to cancel the monitoring if we are no longer interested in observing the lifetime of other. However, the returned disposable object can also be simply ignored if we have no intention of canceling the monitoring.

Calling monitor for the same actor twice will run each callback individually when the actor goes down. Before delving into more API details, let us look at our first example:

Source Code

void caf_main(caf::actor_system& sys) {
  auto worker = sys.spawn([] {
    // ... do something ...
  });
  auto observer = sys.spawn([worker](caf::event_based_actor* self) {
    self->println("observer monitors the worker");
    self->monitor(worker, [self](const caf::error& reason) {
      self->println("worker down: {}", reason);
      self->quit();
    });
    // Return a dummy handler to keep the actor alive.
    return caf::behavior{
      [] {
        // nop
      },
    };
  });
  // Block until the observer terminates. Otherwise, the observer would
  // terminate as a result of becoming unreachable.
  caf::scoped_actor self{sys};
  self->wait_for(observer);
}

Output

observer monitors the worker
worker down: none

Here, we have two actors: one worker and one observer. The worker does nothing, so it terminates pretty much immediately after spawning. The reason is a default-constructed caf::error, which simply indicates regular actor termination (no actual error). The observer monitors the worker and prints during initialization as well as when receiving the down message. The observer also shuts down in its callback. Lastly, we use a scoped actor to block caf_main from leaving until the observer terminates. Without calling wait_for, we could terminate the observer before it handles the down message. Actors are reference counted and CAF terminates actors if their strong reference count drops to 0 even if there still remain weak references to it.

This is an important point, because monitors do not keep either actor alive via strong references. Monitors only store weak references. To illustrate this point, we revisit our first example with two changes:

  1. We return a behavior from the initialization function for the worker. This keeps to worker waiting for incoming messages. We already used the same "trick" to keep the observer alive in the first example.
  2. We spawn the worker in the initialization function of the observer. This makes sure that there is no strong reference to the worker left in caf_main. Once the initialization function completes, the observer drops the last strong reference to the worker.

Source Code

caf::behavior worker_impl() {
  return caf::behavior{
    [] {
      // nop
    },
  };
}

void caf_main(caf::actor_system& sys) {
  auto observer = sys.spawn([](caf::event_based_actor* self) {
    self->println("observer spawns and monitors the worker");
    auto worker = self->spawn(worker_impl);
    self->monitor(worker, [self](const caf::error& reason) {
      self->println("worker down: {}", reason);
      self->quit();
    });
    return caf::behavior{
      [] {
        // nop
      },
    };
  });
  caf::scoped_actor self{sys};
  self->wait_for(observer);
}

Output

observer spawns and monitors the worker
worker down: unreachable

In this version, we get a different exit reason for the worker: unreachable. CAF uses this error code when terminating an actor that reached a strong reference count of 0. In our first version, the reason was none, indicating that the monitored actor performed an ordinary shutdown by calling quit() or that it terminated because it had no behavior.

Monitoring works in a network-transparent way. When monitoring remote actors, the local CAF node represents the remotely running actor with a local proxy object. This proxy object then mimics emits events if the remote actor terminates or if the connection fails.

In a distributed setting, actors can also monitor entire CAF nodes. Please read our article on Distributed Actors for more details on that.

Linking

As we have observed earlier, monitoring is unidirectional. In other words, a monitored actor is unaware of its observers. However, we often times group closely related actors together when designing actor systems and think of them as one functional entity or subsystem. For example, one actor (the server) may read requests from clients and forwards them to a set of workers in order to make efficient use of multiple cores. Here, the server needs to respond to worker failures. At the very least, we want to avoid forwarding more requests to failed workers. However, what about the workers if the server fails? There entire job description of a worker is performing tasks for the server, so once the server fails we usually want to shut down the workers as well. This use case is the motivation behind links.

Links are bidirectional: if either one of the actors terminates, it sends an exit message to the other. An exit messages in CAF is a simple data structure:

struct exit_msg {
  actor_addr source;
  error reason;
};

Because links can be established by other actors, we cannot provide a callback per link. Instead, CAF will send an exit_msg to the linked actors. The source field contains the address of the terminated actor, and the reason field contains the reason why the actor terminated (a default-constructed error signals regular actor shutdown, i.e., no-error).

We can choose to handle exit_msg explicitly in our behavior. Otherwise, CAF will call self->quit(reason) when receiving an exit message with a non-default-constructed error. This default behavior means the first error in a set of linked actors "cascades" and brings down the entire subsystem.

In our next example, we use links to bring down workers once the server fails:

Source Code

caf::behavior worker_impl(caf::event_based_actor* self, size_t num) {
  self->attach_functor([self, num] {
    self->println("worker {} down", num);
  });
  return {
    [](int32_t x, int32_t y) {
      return x + y;
    },
  };
}

struct server_state {
  caf::event_based_actor* self;
  std::vector<caf::actor> workers;
  size_t index = 0;

  static inline const char* name = "server";

  server_state(caf::event_based_actor* self) : self(self) {
    // nop
  }

  caf::behavior make_behavior() {
    for (size_t i = 0; i < 3; ++i)
      workers.emplace_back(self->spawn<caf::linked>(worker_impl, i));
    return {
      [this](int32_t x, int32_t y) {
        if (x % 2 == 1 || y % 2 == 1)
          throw std::runtime_error("odd number!");
        auto selected = workers[index];
        index = (index + 1) % workers.size();
        return self->mail(x, y).delegate(selected);
      },
    };
  }
};

void caf_main(caf::actor_system& sys) {
  using namespace std::literals;
  auto server =  sys.spawn(caf::actor_from_state<server_state>);
  sys.spawn([server](caf::event_based_actor* self) {
    using i32_pair = std::pair<int32_t, int32_t>;
    for (auto [x, y]: {i32_pair{2, 4}, i32_pair{7, 9}, i32_pair{40, 2}}) {
      self->mail(x, y).request(server, 1s).then(
        [self, x{x}, y{y}](int32_t res) {
          self->println("{} + {} = {}", x, y, res);
        },
        [self, x{x}, y{y}](const caf::error& reason) {
          self->println("failed to compute {} + {}: {}", x, y, reason);
        }
      );
    }
  });
}

Output

*** unhandled exception: [id: 6, name: server, exception: std.runtime_error]: odd number!
worker 0 down
worker 1 down
failed to compute 7 + 9: runtime_error("unhandled exception of type std.runtime_error: odd number!")
2 + 4 = 6
worker 2 down
failed to compute 40 + 2: runtime_error("unhandled exception of type std.runtime_error: odd number!")

The linked flag is a shorthand for spawning an actor and then calling self->link_to(new_actor). Unlike monitors, links never stack. Two actors can only have one link. Actors can break this link by calling self->unlink_from(other).

By using delegate from the server to the workers, we transfer the responsibility of handling the request to the worker. This way, the worker will respond to the original sender of the request.

By linking the server to the workers, we made sure that workers shut down if their server fails. Great! Right? Well, if we take a step back and think about how CAF handles exit messages, then we need to ask the question "what if a worker fails?" The way we have implemented our example, a single error in a worker would bring down the server as well as all of the other workers.

So what if we instead wish to only replace the failed worker but still terminate all workers if the server fails? Well, we provide a handler for exit_msg to override the default behavior.

For our second example on linking, we throw the exception in the worker instead and provide a handler for exit_msg in the server to spin up replacements for failed workers:

Source Code

caf::behavior worker_impl(caf::event_based_actor* self, size_t num) {
  self->attach_functor([self, num] {
    self->println("worker {} down", num);
  });
  return {
    [](int32_t x, int32_t y) {
      if (x % 2 == 1 || y % 2 == 1)
        throw std::runtime_error("odd number!");
      return x + y;
    },
  };
}

struct server_state {
  caf::event_based_actor* self;
  std::vector<caf::actor> workers;
  size_t index = 0;

  static inline const char* name = "server";

  server_state(caf::event_based_actor* self) : self(self) {
    // nop
  }

  caf::behavior make_behavior() {
    for (size_t i = 0; i < 3; ++i)
      workers.emplace_back(self->spawn<caf::linked>(worker_impl, i));
    return {
      [this](int32_t x, int32_t y) {
        auto selected = workers[index];
        index = (index + 1) % workers.size();
        return self->mail(x, y).delegate(selected);
      },
      [this](const caf::exit_msg& msg) {
        auto is_source = [&msg](const auto& hdl) {
          return hdl == msg.source;
        };
        auto i = std::find_if(workers.begin(), workers.end(), is_source);
        if (i != workers.end()) {
          auto n = std::distance(workers.begin(), i);
          self->println("lost worker {} -> spawn replacement", n);
          *i = self->spawn<caf::linked>(worker_impl, static_cast<size_t>(n));
        }
      },
    };
  }
};

void caf_main(caf::actor_system& sys) {
  using namespace std::literals;
  auto server =  sys.spawn(caf::actor_from_state<server_state>);
  sys.spawn([server](caf::event_based_actor* self) {
    using i32_pair = std::pair<int32_t, int32_t>;
    for (auto [x, y]: {i32_pair{2, 4}, i32_pair{7, 9}, i32_pair{40, 2}}) {
      self->mail(x, y).request(server, 1s).then(
        [self, x{x}, y{y}](int32_t res) {
          self->println("{} + {} = {}", x, y, res);
        },
        [self, x{x}, y{y}](const caf::error& reason) {
          self->println("failed to compute {} + {}: {}", x, y, reason);
        }
      );
    }
  });
}

Output

2 + 4 = 6
40 + 2 = 42
worker 2 down
worker 0 down
*** unhandled exception: [id: 9, name: user.scheduled-actor, exception: std.runtime_error]: odd number!
worker 1 down
failed to compute 7 + 9: runtime_error("unhandled exception of type std.runtime_error: odd number!")

This time, the server stays up and running after the worker fails.

Attachables

Before we conclude this article, we want to cover one more feature of CAF that serves as the foundation for monitors and links: attachables. An attachable is just a piece of code that users can attach (hence the name) to actors. This code runs during shutdown or immediately when trying to attach to an actor that already terminated.

With the attachables API, we can revisit our earlier example once more. This time, we drop the observer and simply attach a lambda to the worker that runs as soon as the worker terminates:

Source Code

void caf_main(caf::actor_system& sys) {
  auto worker = sys.spawn([] {
    return caf::behavior{
      [] {
        // nop
      },
    };
  });
  worker->attach_functor([&sys](const caf::error& reason) {
    sys.println("worker down: {}", reason);
  });
}

Output

worker down: unreachable

We once more see unreachable as exit reason. After adding the attachable to the worker, we leave caf_main. Hence, the last handle to the worker goes out of scope and CAF consequently destroys the actor. The wait_for function we have used earlier builds on the attachable API internally.

The attachable API is a very low-level synchronization mechanism in CAF, so use it with caution. We mention it here for the sake of completeness, but we recommend using monitors and links whenever possible and only falling back to attachables when the other two mechanisms do not suffice. Using attachables directly can lead to subtle bugs when not used carefully. For example, storing a strong actor reference in an attachable is generally a red flag since it can easily result in cyclic references.

Conclusion

Monitors and links are some of the most basic building blocks for actor-based systems. You can find them in one form or another in all implementations of the actor model and CAF is no exception. CAF also includes attachables as a low-level building block.

Linking enables software engineers to easily build subsystems that either run or fail collectively. Especially in a distributed system, this property makes composing large systems out of smaller systems much easier since it avoids partial failures and brings the system always to a defined state. By providing explicit handlers for exit_msg, higher-level actors are able to handle errors in lower-level actors gracefully. We showed this with the simple server actor, but in practice users should also put limits on re-spawns and fail the server eventually if the rate of errors exceeds some threshold. Otherwise, the system may spawn workers over and over again even if they always fail on the first request they receive, e.g., because the system receives nothing but bogus inputs due to an error in the component that generates the request.

Monitors allow actors to react to failures of some other part in the system without getting tightly coupled to it. A good use case is a client that needs to know when a server goes does down but not the other way around.