Learning Resources

Distributed Actors


Introduction

In this guide, we learn how the networking layer of CAF enables distributed applications and how CAF represents remote actors locally.

All actors in CAF live in an actor system, which provides a runtime environment for actors that implements message dispatching, error propagation, cooperative scheduling and networking. Connecting two or more actor systems results in a single, interconnected distributed actor system.

Distributed Actors

Consider this simple network of actors:

Some actors.

Blue circles represent actors, green boxes represent messages in a mailbox and arrows indicate a sends-to relationship.

This representation omits many details of the application. For example, how many OS processes are involved? Actors cannot exist in a vacuum. For simplicity, we assume that our actors were deployed on two nodes:

Some actors, distributed across two nodes.

As we can learn from this figure, CAF connects its distributed nodes over a protocol called BASP, which CAF embeds into TCP packets. The protocol itself is an implementation detail that we can safely skip over.

However, this figure allows us to probe the design of CAF at a deeper level. How travel messages from actors on node A to actors on node B and vice versa? How can CAF monitor lifetimes of remote actors?

For answering these questions, we focus entirely on Node B and include components that CAF usually hides from the application:

View of the distributed system on Node B.

Here, the grey circles are actor proxies that CAF creates on node B to represent remote actors. Instead of enqueueing into a mailbox, proxies forward all messages they receive to an actor (marked with MM) that belongs to the middleman. This actor ultimately manages the connection to node A and interacts with various system components.

When receiving messages to remote actors, the MM actor serializes the message using a binary_serializer. The output of this serializer generates a byte sequence suitable for transient data only. The format in use follows no formal specification and omits most meta information of data types since CAF assumes both communication endpoints to have access to the same type IDs and meta objects. Hence, we recommend not using the output of this serializer for persistence since the format may change between CAF versions and breaks whenever type IDs change.

When deserializing messages from the wire, the MM actor looks up meta objects from the Meta Objects Table. The meta objects then use a binary_deserializer to convert a sequence of bytes back to C++ values for a particular type. This is also where the Actor Registry comes into play.

On the wire, CAF represents actors as a node ID plus an actor ID. The former identifies a single CAF system in the network. The latter is basically an ascending integer that identifies a single actor inside an actor system. For translating an actor ID back to the actor handle, the deserializer accesses the Actor Registry. The middleman stores all actors that became visible to remote actors in the registry in order to query the handles by their ID later.

In source code, we can access IDs from the actor system as well as from actor handles:

auto hdl = sys.spawn([] {}); // Some dummy actor (terminates immediately).
auto nid = hdl.node();       // The globally unique caf::node_id for the system.
auto aid = hdl.id();         // The system-specific caf::actor_id for the dummy.
assert(nid == sys.node());
assert(aid != caf::invalid_actor_id);

Before querying the IDs from a handle, however, make sure the handle is valid first. The node ID of an actor system is invalid (default constructed) when running CAF without loading a networking module.

Connecting CAF Nodes

There are two APIs in the I/O module for connecting CAF nodes. However, before we discuss the API functions, we look at the handshake process first in order to better understand how the functions operate (and differ).

In all cases, one CAF node assumes the role of a server by opening a TCP port that others can connect to. A client then establishes a TCP connection. Once the TCP handshake has completed, CAF performs a connection handshake in order to make sure that the communication endpoints are compatible.

In the handshake, the two nodes exchange this information:

  • The BASP version (and a magic number). This step simply makes sure that the nodes are running network-compatible CAF versions and the nodes disconnect immediately when finding incompatible versions.
  • The node IDs. When connecting to the same node multiple times, CAF closes redundant connections after the handshake. Two CAF nodes always multiplex all actor communication over a single TCP connection in order to make sure that the ordering of messages remains intact.
  • The application identifiers. This is a user-defined list of strings that users may override by setting caf.middleman.app-identifiers. During the handshake, CAF aborts a connection attempt if the two identifier lists are disjoint. For most use cases, we recommend setting only a single identifier throughout the system.
  • The server optionally sends an actor ID along with a description of its interface. If present, this actor is the default actor of the server for nodes that connect to this port.

After understanding the handshake process, explaining the difference between the two APIs for connecting CAF nodes becomes straightforward. The middleman offers an open/connect function pair for setting up a connection without the optional default actor. In this case, clients retrieve the node ID of the server on success.

When working with a default actor, users can use the function pair publish/remote_actor instead. Here, clients retrieve an actor handle on success.

Connections without Default Actor

When connecting two CAF nodes directly (without using default actors), the server needs to open a local port for clients to connect to. To open a port, we access the middleman of an actor system and call open:

assert(sys.has_middleman());
auto& mm = sys.middleman();
if (auto maybe_port = mm.open(1234)) {
  assert(*maybe_port == 1234);
  // ...
} else {
  sys.println("Failed to open port: {}", maybe_port.error());
}

The return type of open is expected<uint16_t>, which contains the opened port on success and an error otherwise. When calling mm.open(0), the OS chooses an available port and users need to inspect the return value to learn which port was chosen. Otherwise, CAF simply returns the user-defined port on success.

The function also has optional parameters for setting which address (name) CAF passes to bind and whether or not to set the SO_REUSEADDR flag on the socket (defaults to false).

After opening a port on the server, we can connect clients by calling connect.

assert(sys.has_middleman());
auto& mm = sys.middleman();
if (auto maybe_nid = mm.connect("my-host.my-domain", 1234)) {
  auto nid = *maybe_nid; // ID of the connected node.
  // ...
} else {
  sys.println("Failed to connect: {}", maybe_nid.error());
}

After learning the node ID, stored as nid in the example above, we can for example spawn actors on the other node (see the manual section on Remote Spawning of Actors) or query the actor registry remotely by calling mm.remote_lookup("my-actor", nid). Please note that this is a blocking function call. Hence, use this function only outside of actors or detach actors that call into this API. Otherwise, a cooperatively scheduled actor blocks a scheduler thread until the result arrives.

Connections with Default (Published) Actor

Many applications only need to expose a single actor at the server for clients. The I/O module streamlines this use case by offering the publish/remote_actor function pair.

When using this pair of functions, users "bind" an actor to the port on the server and clients retrieve the actor handle instead of the node ID.

At the server, we call publish instead of open and pass it the actor handle we with to "export" as first argument:

assert(sys.has_middleman());
auto adder = sys.spawn([]() -> caf::behavior {
  return {
    [](caf::add_atom, int32_t x, int32_t y) {
      return x + y;
    },
  };
});
auto& mm = sys.middleman();
if (auto maybe_port = mm.publish(adder, 1234)) {
  assert(*maybe_port == 1234);
  // ...
} else {
  sys.println("Failed to open port: {}", maybe_port.error());
}

At the client, we use remote_actor instead of connect that returns an expected<Handle>, where Handle is the optional template parameter to remote_actor that denotes the type of actor we expect. The template parameter defaults to caf::actor. Hence, we don't need to pass an explicit template parameter in our example:

assert(sys.has_middleman());
auto& mm = sys.middleman();
if (auto maybe_adder = mm.remote_actor("my-host.my-domain", 1234)) {
  auto adder = *maybe_adder; // Handle (caf::actor) to the published actor.
  // ...
} else {
  sys.println("Failed to connect: {}", maybe_adder.error());
}

When working with a typed actor handle, e.g., a statically typed actor handle type called adder_actor, we would call mm.remote_actor<adder_actor>(...) instead.

We can call connect to establish a connection to an endpoint that opened its port via publish. However, remote_actor fails if the server called open since there is no default actor available in this case.

Monitoring Remote Nodes and Actors

Whenever CAF creates a proxy for a remote actor, it also informs the other node that it started observing an actor. Once the remote actor fails, all nodes that observe it also receive the termination message and forward it as down or exit messages to all local actors that monitored or linked to the remote actor.

In a distributed system, there are of course more sources of errors. When loosing connection to another node unexpectedly, CAF emits down and exit messages for all proxies that no longer can reach the remote actor.

Actors, in particular actors that supervise other actors and remotely spawn workers, may also monitor a node by calling self->monitor(nid). Once the node goes down (or disconnects), the actor receives a node_down_msg. This is a system-level message. Event-based actors that wish to override the default handler (that does nothing) can do so by calling self->set_node_down_handler(...).

In a distributed setting, detecting node failures may take a long time if a node simply stops responding. TCP connections usually time out after a couple of minutes. In order to detect node failures faster, the I/O module offers two timespan configuration parameters:

  • caf.middleman.heartbeat-interval for triggering periodic traffic between nodes.
  • caf.middleman.connection-timeout for cutting of a connection if the remote node does not send anything for that amount of time.

Ideally, the connection timeout is a multiple of the heartbeat interval. For example, an aggressive timing for a local cluster deployment could set the heartbeat interval to 1s and the connection timeout to 4s. For well-connected nodes in a switched Ethernet, not receiving any message for 4s is a strong indicator for loss of connectivity or that the process (or machine) has failed (either by crashing or by becoming unresponsive).

This set of parameters fine-tunes CAF to the deployment. When in doubt, choose conservative (i.e., longer) times. Disconnecting nodes too aggressively can introduce 'phantom errors' or even render a system unusable.

Available Metrics

The I/O module adds a few extra metrics to CAF in order to provide insight into high-level performance indicators. All of these metrics are histograms.

For sampling the size (in bytes) of messages on the wire (i.e., before deserializing or after serializing them), the I/O module adds the two metrics caf.middleman.inbound-messages-size and caf.middleman.inbound-messages-size. Large messages quickly clog up the wire, so latency-sensitive applications may want to watch these metrics to make sure actors exchange small messages only.

The size of messages can only indicate issues related to the available bandwidth. However, the complexity of message may also cause performance bottlenecks. The I/O modules serializes messages directly into the output buffer and deserializes messages from input buffers. Hence, overly complex types (or poorly performing inspect implementations) may slow down the thread for socket I/O and thus reduce throughput of the system even though the network is underutilized. The two metrics caf.middleman.deserialization-time and caf.middleman.serialization-time sample how long the middleman needs to serialize or deserialize message to/from the wire. Latency-sensitive applications may also want to watch this pair of metrics. Observing times past a few microseconds even though the application sends only small messages may indicate performance issues during the type inspection.

When exporting the metrics to Prometheus, CAF converts the names to the Prometheus-typical underscore-separated encoding that includes the unit. For example, caf.middleman.serialization-time becomes caf_middleman_serialization_time_seconds.

Conclusion

Message passing in CAF is network transparent. Still, actors running on different machines require some form of rendezvous process for setting up communication paths and learning actor handles. The functions publish and remote_actor streamline this rendezvous process for deployments that only need to expose a small number of actors, ideally just one, per server.

More advanced use cases, e.g., for remotely spawning actors, may instead use the more generic open and connect functions that operate on node IDs. After learning the ID of another node, CAF can also query remote actor registries for deployments that co-locate too many actors in the same process to justify reserving a port for each of them.

Even though developers can rely on the network-transparency of actor communication, we hope the high-level overview of the implementation details presented in this guide help to better reason about performance implications and observed system behavior (e.g., from monitoring the system metrics).