Learning Resources
Distributed Actors
Introduction
In this guide, we learn how the networking layer of CAF enables distributed applications and how CAF represents remote actors locally.
All actors in CAF live in an actor system, which provides a runtime environment for actors that implements message dispatching, error propagation, cooperative scheduling and networking. Connecting two or more actor systems results in a single, interconnected distributed actor system.
Distributed Actors
Consider this simple network of actors:
Blue circles represent actors, green boxes represent messages in a mailbox and arrows indicate a sends-to relationship.
This representation omits many details of the application. For example, how many OS processes are involved? Actors cannot exist in a vacuum. For simplicity, we assume that our actors were deployed on two nodes:
As we can learn from this figure, CAF connects its distributed nodes over a protocol called BASP, which CAF embeds into TCP packets. The protocol itself is an implementation detail that we can safely skip over.
However, this figure allows us to probe the design of CAF at a deeper level. How travel messages from actors on node A to actors on node B and vice versa? How can CAF monitor lifetimes of remote actors?
For answering these questions, we focus entirely on Node B and include components that CAF usually hides from the application:
Here, the grey circles are actor proxies that CAF creates on node B to represent remote actors. Instead of enqueueing into a mailbox, proxies forward all messages they receive to an actor (marked with MM) that belongs to the middleman. This actor ultimately manages the connection to node A and interacts with various system components.
When receiving messages to remote actors, the MM actor serializes the message
using a binary_serializer
. The output of this serializer generates a byte
sequence suitable for transient data only. The format in use follows no formal
specification and omits most meta information of data types since CAF assumes
both communication endpoints to have access to the same type IDs and meta
objects. Hence, we recommend not using the output of this serializer for
persistence since the format may change between CAF versions and breaks whenever
type IDs change.
When deserializing messages from the wire, the MM actor looks up meta objects
from the Meta Objects Table. The meta objects then use a binary_deserializer
to convert a sequence of bytes back to C++ values for a particular type.
This is also where the Actor Registry comes into play.
On the wire, CAF represents actors as a node ID plus an actor ID. The former identifies a single CAF system in the network. The latter is basically an ascending integer that identifies a single actor inside an actor system. For translating an actor ID back to the actor handle, the deserializer accesses the Actor Registry. The middleman stores all actors that became visible to remote actors in the registry in order to query the handles by their ID later.
In source code, we can access IDs from the actor system as well as from actor handles:
auto hdl = sys.spawn([] {}); // Some dummy actor (terminates immediately).
auto nid = hdl.node(); // The globally unique caf::node_id for the system.
auto aid = hdl.id(); // The system-specific caf::actor_id for the dummy.
assert(nid == sys.node());
assert(aid != caf::invalid_actor_id);
Before querying the IDs from a handle, however, make sure the handle is valid first. The node ID of an actor system is invalid (default constructed) when running CAF without loading a networking module.
Connecting CAF Nodes
There are two APIs in the I/O module for connecting CAF nodes. However, before we discuss the API functions, we look at the handshake process first in order to better understand how the functions operate (and differ).
In all cases, one CAF node assumes the role of a server by opening a TCP port that others can connect to. A client then establishes a TCP connection. Once the TCP handshake has completed, CAF performs a connection handshake in order to make sure that the communication endpoints are compatible.
In the handshake, the two nodes exchange this information:
- The BASP version (and a magic number). This step simply makes sure that the nodes are running network-compatible CAF versions and the nodes disconnect immediately when finding incompatible versions.
- The node IDs. When connecting to the same node multiple times, CAF closes redundant connections after the handshake. Two CAF nodes always multiplex all actor communication over a single TCP connection in order to make sure that the ordering of messages remains intact.
- The application identifiers. This is a user-defined list of strings that users
may override by setting
caf.middleman.app-identifiers
. During the handshake, CAF aborts a connection attempt if the two identifier lists are disjoint. For most use cases, we recommend setting only a single identifier throughout the system. - The server optionally sends an actor ID along with a description of its interface. If present, this actor is the default actor of the server for nodes that connect to this port.
After understanding the handshake process, explaining the difference between the
two APIs for connecting CAF nodes becomes straightforward. The middleman offers
an open
/connect
function pair for setting up a connection without the
optional default actor. In this case, clients retrieve the node ID of the server
on success.
When working with a default actor, users can use the function pair
publish
/remote_actor
instead. Here, clients retrieve an actor handle on
success.
Connections without Default Actor
When connecting two CAF nodes directly (without using default actors), the
server needs to open a local port for clients to connect to. To open a port, we
access the middleman of an actor system and call open
:
assert(sys.has_middleman());
auto& mm = sys.middleman();
if (auto maybe_port = mm.open(1234)) {
assert(*maybe_port == 1234);
// ...
} else {
sys.println("Failed to open port: {}", maybe_port.error());
}
The return type of open
is expected<uint16_t>
, which contains the opened
port on success and an error otherwise. When calling mm.open(0)
, the OS
chooses an available port and users need to inspect the return value to learn
which port was chosen. Otherwise, CAF simply returns the user-defined port on
success.
The function also has optional parameters for setting which address (name) CAF
passes to bind
and whether or not to set the SO_REUSEADDR
flag on the socket
(defaults to false
).
After opening a port on the server, we can connect clients by calling connect
.
assert(sys.has_middleman());
auto& mm = sys.middleman();
if (auto maybe_nid = mm.connect("my-host.my-domain", 1234)) {
auto nid = *maybe_nid; // ID of the connected node.
// ...
} else {
sys.println("Failed to connect: {}", maybe_nid.error());
}
After learning the node ID, stored as nid
in the example above, we can for
example spawn actors on the other node (see the manual section on
Remote Spawning of Actors)
or query the actor registry remotely by calling mm.remote_lookup("my-actor", nid)
. Please note that this is a blocking function call. Hence, use this
function only outside of actors or detach actors that call into this API.
Otherwise, a cooperatively scheduled actor blocks a scheduler thread until the
result arrives.
Connections with Default (Published) Actor
Many applications only need to expose a single actor at the server for clients.
The I/O module streamlines this use case by offering the
publish
/remote_actor
function pair.
When using this pair of functions, users "bind" an actor to the port on the server and clients retrieve the actor handle instead of the node ID.
At the server, we call publish
instead of open
and pass it the actor handle
we with to "export" as first argument:
assert(sys.has_middleman());
auto adder = sys.spawn([]() -> caf::behavior {
return {
[](caf::add_atom, int32_t x, int32_t y) {
return x + y;
},
};
});
auto& mm = sys.middleman();
if (auto maybe_port = mm.publish(adder, 1234)) {
assert(*maybe_port == 1234);
// ...
} else {
sys.println("Failed to open port: {}", maybe_port.error());
}
At the client, we use remote_actor
instead of connect
that returns an
expected<Handle>
, where Handle
is the optional template parameter to
remote_actor
that denotes the type of actor we expect. The template parameter
defaults to caf::actor
. Hence, we don't need to pass an explicit template
parameter in our example:
assert(sys.has_middleman());
auto& mm = sys.middleman();
if (auto maybe_adder = mm.remote_actor("my-host.my-domain", 1234)) {
auto adder = *maybe_adder; // Handle (caf::actor) to the published actor.
// ...
} else {
sys.println("Failed to connect: {}", maybe_adder.error());
}
When working with a typed actor handle, e.g., a statically typed actor handle
type called adder_actor
, we would call mm.remote_actor<adder_actor>(...)
instead.
We can call connect
to establish a connection to an endpoint that opened its
port via publish
. However, remote_actor
fails if the server called open
since there is no default actor available in this case.
Monitoring Remote Nodes and Actors
Whenever CAF creates a proxy for a remote actor, it also informs the other node that it started observing an actor. Once the remote actor fails, all nodes that observe it also receive the termination message and forward it as down or exit messages to all local actors that monitored or linked to the remote actor.
In a distributed system, there are of course more sources of errors. When loosing connection to another node unexpectedly, CAF emits down and exit messages for all proxies that no longer can reach the remote actor.
Actors, in particular actors that supervise other actors and remotely spawn
workers, may also monitor a node by calling self->monitor(nid)
. Once the node
goes down (or disconnects), the actor receives a node_down_msg
. This is a
system-level message. Event-based actors that wish to override the default
handler (that does nothing) can do so by calling
self->set_node_down_handler(...)
.
In a distributed setting, detecting node failures may take a long time if a node
simply stops responding. TCP connections usually time out after a couple of
minutes. In order to detect node failures faster, the I/O module offers two
timespan
configuration parameters:
caf.middleman.heartbeat-interval
for triggering periodic traffic between nodes.caf.middleman.connection-timeout
for cutting of a connection if the remote node does not send anything for that amount of time.
Ideally, the connection timeout is a multiple of the heartbeat interval. For example, an aggressive timing for a local cluster deployment could set the heartbeat interval to 1s and the connection timeout to 4s. For well-connected nodes in a switched Ethernet, not receiving any message for 4s is a strong indicator for loss of connectivity or that the process (or machine) has failed (either by crashing or by becoming unresponsive).
This set of parameters fine-tunes CAF to the deployment. When in doubt, choose conservative (i.e., longer) times. Disconnecting nodes too aggressively can introduce 'phantom errors' or even render a system unusable.
Available Metrics
The I/O module adds a few extra metrics to CAF in order to provide insight into high-level performance indicators. All of these metrics are histograms.
For sampling the size (in bytes) of messages on the wire (i.e., before
deserializing or after serializing them), the I/O module adds the two metrics
caf.middleman.inbound-messages-size
and caf.middleman.inbound-messages-size
.
Large messages quickly clog up the wire, so latency-sensitive applications may
want to watch these metrics to make sure actors exchange small messages only.
The size of messages can only indicate issues related to the available
bandwidth. However, the complexity of message may also cause performance
bottlenecks. The I/O modules serializes messages directly into the output buffer
and deserializes messages from input buffers. Hence, overly complex types (or
poorly performing inspect
implementations) may slow down the thread for socket
I/O and thus reduce throughput of the system even though the network is
underutilized. The two metrics caf.middleman.deserialization-time
and
caf.middleman.serialization-time
sample how long the middleman needs to
serialize or deserialize message to/from the wire. Latency-sensitive
applications may also want to watch this pair of metrics. Observing times past a
few microseconds even though the application sends only small messages may
indicate performance issues during the type inspection.
When exporting the metrics to Prometheus, CAF converts the names to the
Prometheus-typical underscore-separated encoding that includes the unit. For
example, caf.middleman.serialization-time
becomes
caf_middleman_serialization_time_seconds
.
Conclusion
Message passing in CAF is network transparent. Still, actors running on
different machines require some form of rendezvous process for setting up
communication paths and learning actor handles. The functions publish
and
remote_actor
streamline this rendezvous process for deployments that only need
to expose a small number of actors, ideally just one, per server.
More advanced use cases, e.g., for remotely spawning actors, may instead use the
more generic open
and connect
functions that operate on node IDs. After
learning the ID of another node, CAF can also query remote actor registries for
deployments that co-locate too many actors in the same process to justify
reserving a port for each of them.
Even though developers can rely on the network-transparency of actor communication, we hope the high-level overview of the implementation details presented in this guide help to better reason about performance implications and observed system behavior (e.g., from monitoring the system metrics).