Learning Resources
Warehouse Backend
Introduction
In this tutorial, we are going to implement a simple backend service for a warehouse management system. The service will offer a REST API to manage products and use a database to store the current stock. The service also offers a control interface to manage the stock with a simple protocol over TCP. This simulates a scenario where the warehouse workers use a handheld device, such as a barcode scanner, to manage the stock.
The source code for this tutorial is available on GitHub at interance/warehouse-backend-example. We will focus on the main aspects of the implementation in this tutorial and omit parts of the source code in this tutorial to keep it concise. Please consult the full source code to fill in the gaps if needed.
The following diagram shows the architecture of the system:
Our system has three communication channels:
- the HTTP REST API
- the TCP control interface (an octet stream)
- a WebSocket endpoint to broadcast stock changes to connected clients
We will use JSON as exchange format on all communication channels. Internally, we have three main kinds of actors in the system:
- the Database Actor manages the stock
- the Controller Actor handles messages from the TCP control interface
- the WS Worker forwards events to a WebSocket client.
In the implementation for this tutorial, we will use SQLite as the database engine because it requires no prior setup and is easy to use in a tutorial. However, the source code is structured in a way that makes it easy to swap the database engine with another one if needed.
Building the Source Code
To build the source code, you need to have SQLite and OpenSSL installed on your
system. Our CMake setup in CMakeLists.txt
for this project will automatically
download and build CAF as part of the build process.
Building the project is straightforward and follows the usual CMake workflow:
cmake -S . -B build
cmake --build build
This will build the project and create an executable called
warehouse-backend-example
in the build
directory.
Data Types
We will start by defining the data types used in the system. First, we define
the item
type to represent a product in the warehouse.
// from: item.hpp
struct item {
int32_t id;
int32_t price;
int32_t available;
std::string name;
};
template <class Inspector>
bool inspect(Inspector& f, item& x) {
return f.object(x).fields(f.field("id", x.id), f.field("price", x.price),
f.field("available", x.available),
f.field("name", x.name));
}
Each item has a unique id
, that we use as the primary key in the database. The
price
field represents the cost of the item, and the available
field
represents the current stock. Finally, the name
field is a human-readable
description of the product. In order to be able to serialize and deserialize the
item
type, we implement the inspect
function that CAF uses to introspect the
data types.
Of course, this data type is a simplification. It will serve our need for this tutorial, but a real-world application would require a more complex data model.
Database Actor
The database actor is responsible for managing the stock. It will offer a request-based interface to query and update the stock that we define as follows:
// from: database_actor.hpp
struct database_trait {
using signatures = caf::type_list<
// Retrieves an item from the database.
caf::result<item>(get_atom, int32_t),
// Adds a new item to the database.
caf::result<void>(add_atom, int32_t, int32_t, std::string),
// Increments the available count of an item.
caf::result<int32_t>(inc_atom, int32_t, int32_t),
// Decrements the available count of an item.
caf::result<int32_t>(dec_atom, int32_t, int32_t),
// Deletes an item from the database.
caf::result<void>(del_atom, int32_t)>;
};
using database_actor = caf::typed_actor<database_trait>;
The first parameter of each function is an atom that identifies the operation
(the atom types are defined in types.hpp
). The second parameter is the id
of
the item to operate on. Remaining parameters are specific to each operation:
get
: Retrieves an item from the database.add
: Adds a new item to the database withprice
andname
.inc
: Increments the available count of an item by a givenamount
.dec
: Decrements the available count of an item by a givenamount
.del
: Deletes an item from the database.
To report errors in a meaningful way to the user, we also define an error type:
// from: ec.hpp
/// Application-specific error codes.
enum class ec : uint8_t {
/// No error occurred.
nil = 0,
/// Indicates that a database query did not return any results.
no_such_item,
/// Indicates that a key already exists in the database.
key_already_exists,
/// Indicates that the database is not accessible.
database_inaccessible,
/// Indicates that a user-provided argument is invalid.
invalid_argument,
/// The number of error codes (must be last entry!).
/// @note This value is not a valid error code.
num_ec_codes,
};
The last entry in the enumeration is a sentinel value to indicate the number of
error codes in the enum. This is useful for implementing boilerplate functions
such as to_string
and from_string
. By registering this enum using
CAF_ERROR_CODE_ENUM(ec)
, we enable CAF to use this enum as error code type in
caf::error
.
To separate our code for the database API from the code for the actor itself, we
also define a database
interface that the actor uses to interact with the
database:
// from: database.hpp
/// A simple database interface for storing items.
class database {
public:
database(std::string db_file) : db_file_(std::move(db_file)) {
// nop
}
~database();
/// Opens the database file and creates the table if it does not exist.
/// @returns `caf::error{}` on success, an error code otherwise.
[[nodiscard]] caf::error open();
/// Retrieves the number of items in the database.
/// @returns the number of items in the database.
[[nodiscard]] int count();
/// Retrieves an item from the database.
/// @returns the item if found, `std::nullopt` otherwise.
[[nodiscard]] std::optional<item> get(int32_t id);
/// Inserts a new item into the database.
/// @returns `ec::nil` on success, an error code otherwise.
[[nodiscard]] ec insert(const item& new_item);
/// Increments the available count of an item.
/// @returns `ec::nil` on success, an error code otherwise.
[[nodiscard]] ec inc(int32_t id, int32_t amount);
/// Decrements the available count of an item.
/// @returns `ec::nil` on success, an error code otherwise.
[[nodiscard]] ec dec(int32_t id, int32_t amount);
/// Deletes an item from the database.
/// @returns `ec::nil` on success, an error code otherwise.
[[nodiscard]] ec del(int32_t id);
private:
std::string db_file_;
sqlite3* db_ = nullptr;
};
/// A smart pointer to an item database.
using database_ptr = std::shared_ptr<database>;
We omit the implementation of the database
class in this tutorial, since it is
a straightforward SQLite wrapper. You can find the full implementation in the
source code repository.
Whenever we update an item, we want to notify the WebSocket clients about the change. To do so, we also define a publisher type:
// from: item.hpp
using item_event = std::shared_ptr<const item>;
using item_events = caf::async::publisher<item_event>;
With this definition, we can now have a flow of item events from the database to any interested party. In our case, we will subscribe the WebSocket clients to this publisher.
For the database actor itself, we define a function that spawns a new instance of the actor. This allows us to hide the full implementation of the actor in the source file.
// from: database_actor.hpp
std::pair<database_actor, item_events>
spawn_database_actor(caf::actor_system& sys, database_ptr db);
The function takes a reference to the actor system that will host the actor and a pointer to the database instance to manage the stock. It returns the actor itself, as well as a publisher for item events that we will use to broadcast stock changes to the WebSocket clients.
The implementation of the function is as follows:
// from: database_actor.cpp
std::pair<database_actor, item_events>
spawn_database_actor(caf::actor_system& sys, database_ptr db) {
// Note: the actor uses a blocking API (SQLite3) and thus should run in its
// own thread.
using caf::actor_from_state;
using caf::detached;
item_events events;
auto hdl = sys.spawn<detached>(actor_from_state<database_actor_state>, db,
&events);
return {hdl, std::move(events)};
}
We spawn a new actor using the actor_from_state
utility, which creates an
actor from the state class that we will cover next. By passing detached
to
spawn
, we tell CAF to run this actor in a separate thread. The actor state
requires a pointer to the database instance and we pass a pointer to the local
events
variable that the actor initializes with the actual object.
For the implementation of the state, we will use a multicaster
to broadcast
stock changes to the WebSocket clients. Other than that, the implementation is
very straightforward:
// from: database_actor.cpp
struct database_actor_state {
database_actor_state(database_actor::pointer self_ptr, database_ptr db_ptr,
item_events* events)
: self(self_ptr), db(db_ptr), mcast(self) {
*events = mcast.as_observable().to_publisher();
}
database_actor::behavior_type make_behavior();
database_actor::pointer self;
database_ptr db;
caf::flow::multicaster<item_event> mcast;
};
In make_behavior
, we simply define one handler for each operation that the
database actor supports. In each handler, we also call mcast.push
to broadcast
the stock changes to the WebSocket clients. For example, the handler for the
add
operation looks like this:
// from: database_actor.cpp
[this](add_atom, int32_t id, int32_t price,
const std::string& name) -> caf::result<void> {
auto value = item{id, price, 0, std::move(name)};
if (auto err = db->insert(value); err != ec::nil)
return {caf::make_error(err)};
mcast.push(std::make_shared<item>(std::move(value)));
return {};
},
One note on the mcast.push
call: the multicaster will discard the event if no
client is subscribed. This is the desired behavior for our use case, as we do
not want to keep events in memory if no client is interested in them. If there
are clients, we will always connect them using the on_backpressure_buffer
operator to make sure that clients get disconnected when not able to keep up
with the flow of events. In different scenarios, you might want to check if
items pile up in the multicaster by calling mcast.buffered()
and take
appropriate action if the buffer grows too large.
Controller Actor
The controller actor handles incoming control messages over TCP. The protocol is very simple: each message is a JSON object with three fields:
type
: A string that is either"inc"
or"dec"
.id
: The ID of the item to operate on.amount
: The amount to increment or decrement the stock.
Each message is a single line, terminated by a newline character. The controller actor will parse the JSON object and send a request message to the database actor to perform the operation. The response message from the database actor is then also converted to a JSON object and sent back to the client.
To start a new controller, we provide a function that spawns a new controller actor:
// from: controller_actor.hpp
caf::actor
spawn_controller_actor(caf::actor_system& sys, database_actor db_actor,
caf::net::acceptor_resource<std::byte> events);
Before implementing this function, we implement a helper struct command
to
parse the incoming JSON messages:
// from: controller_actor.cpp
struct command {
std::string type; // Either "inc" or "dec".
int32_t id = 0;
int32_t amount = 0;
bool valid() const noexcept {
return type == "inc" || type == "dec";
}
};
template <class Insepctor>
bool inspect(Insepctor& f, command& x) {
return f.object(x).fields(f.field("type", x.type), f.field("id", x.id),
f.field("amount", x.amount));
}
With this type, we can now use a caf::json_reader
to parse the incoming JSON
messages into C++ objects that are easier to work with. We could also parse the
JSON into a caf::json_object
and work with that, but using a C++ object is
more convenient for our use case.
Our spawn function simply calls sys.spawn
with a lambda that implements the
controller logic and returns immediately.
However, the logic is a bit more involved. We break it down into several parts to make it easier to understand.
First, we monitor the database actor, which means the controller actor will terminate if the database actor terminates. Then, we open up our byte flow and convert it to UTF-8 strings:
// from: controller_actor.cpp
caf::actor
spawn_controller_actor(caf::actor_system& sys, database_actor db_actor,
caf::net::acceptor_resource<std::byte> events) {
return sys.spawn([events, db_actor](caf::event_based_actor* self) mutable {
// Stop if the database actor terminates.
self->monitor(db_actor, [self](const caf::error& reason) {
applog::info("controller lost the database actor: {}", reason);
self->quit(reason);
});
// For each buffer pair, we create a new flow ...
events.observe_on(self).for_each([self, db_actor](auto ev) {
applog::info("controller added a new client");
auto [pull, push] = ev.data();
pull
.observe_on(self)
// ... that converts the lines to commands ...
.transform(caf::flow::byte::split_as_utf8_at('\n'))
Our transformation step turns a flow of std::byte
into a flow of
caf::cow_string
(copy-on-write strings). Each string should be a JSON object
that we want to convert into a command
object. Since we are in a data flow, we
want to pass down objects that are cheap to copy, so we will wrap the command
objects into a std::shared_ptr
. This will also allow us to pass nullptr
to
the downstream stages to signal that we have received an invalid command:
.map([self](const caf::cow_string& line) {
applog::debug("controller received line: {}", line.str());
caf::json_reader reader;
if (!reader.load(line.str())) {
applog::error("controller failed to parse JSON: {}",
reader.get_error());
return std::shared_ptr<command>{}; // Invalid JSON.
}
auto ptr = std::make_shared<command>();
if (!reader.apply(*ptr))
return std::shared_ptr<command>{}; // Not a command.
return ptr;
})
If the map
step fails, we want to send an error message to the client. If we
do have a valid command, we want to send it to the database actor and send the
response back to the client. For the responses (and error messages), we will
create observables, that we need to concatenate to the output for the client:
.concat_map([self, db_actor](std::shared_ptr<command> ptr) {
// If the `map` step failed, inject an error message.
if (ptr == nullptr || !ptr->valid()) {
auto str = R"_({"error":"invalid command"})_"s;
return self->make_observable()
.just(caf::cow_string{std::move(str)})
.as_observable();
}
// Send the command to the database actor and convert the
// result message into an observable.
caf::flow::observable<int32_t> result;
if (ptr->type == "inc") {
result = self->mail(inc_atom_v, ptr->id, ptr->amount)
.request(db_actor, 1s)
.as_observable();
} else {
result = self->mail(dec_atom_v, ptr->id, ptr->amount)
.request(db_actor, 1s)
.as_observable();
}
// On error, we return an error message to the client.
return result
.map([ptr](int32_t res) {
applog::debug("controller received result for {} -> {}", *ptr,
res);
auto str = R"_({"result":)_"s;
str += std::to_string(res);
str += '}';
return caf::cow_string{std::move(str)};
})
.on_error_return([ptr](const caf::error& what) {
applog::debug("controller received an error for {} -> {}", *ptr,
what);
auto str = R"_({"error":")_"s;
str += to_string(what);
str += R"_("})_";
auto res = caf::cow_string{std::move(str)};
return caf::expected<caf::cow_string>{std::move(res)};
})
.as_observable();
})
The last part of the implementation is straightforward. We use
on_backpressure_buffer
to give the client an upper limit of messages that we
will buffer. If the client cannot keep up with the flow of messages, we will
disconnect it. We also inject a bit of logging and convert the strings back to
bytes so that we can send them back over the network:
// ... disconnects if the client is too slow ...
.on_backpressure_buffer(32)
// ... and pushes the results back to the client as bytes.
.transform(caf::flow::string::to_chars("\n"))
.do_finally(
[] { applog::info("controller lost connection to a client"); })
.map([](char ch) { return static_cast<std::byte>(ch); })
.subscribe(push);
});
});
}
WS (WebSocket) Worker
The WebSocket worker is responsible for forwarding stock changes to a connected
client. For communication with the WebSocket client, it will take an
accept_event
argument when spawned for creating input and output flows. For
tapping into the item events from the database actor, we will pass it an
item_events
argument (an asynchronous publisher).
The first half of the implementation is straightforward. We create a new
input flow for the frames we receive from the WebSocket client and simply
discard any input by subscribing std::ignore
to it:
// from: main.cpp
// The actor for handling a single WebSocket connection.
void ws_worker(caf::event_based_actor* self,
caf::net::accept_event<ws::frame> new_conn, item_events events) {
using frame = ws::frame;
auto [pull, push] = new_conn.data();
// We ignore whatever the client may send to us.
pull.observe_on(self)
.do_finally([] { applog::info("WebSocket client disconnected"); })
.subscribe(std::ignore);
The second half subscribes to the item events and forwards them to the WebSocket client after converting them to JSON:
// Send all events as JSON objects to the client.
auto writer = std::make_shared<caf::json_writer>();
writer->skip_object_type_annotation(true);
events.observe_on(self)
.filter([](const item_event& item) { return item != nullptr; })
.map([writer](const item_event& item) {
writer->reset();
if (!writer->apply(*item)) {
applog::error("failed to serialize an item event: {}",
writer->get_error());
return frame{};
}
return frame{writer->str()};
})
.filter([](const frame& item) { return !item.empty(); })
.on_backpressure_buffer(default_max_pending_frames)
.subscribe(push);
}
Again, we use on_backpressure_buffer
to disconnect the client if it cannot
keep up with the flow of events.
Note that this time, we don't write a spawn
function for the WebSocket worker
to hide implementation details, but instead implement the actor directly. This
is because we will spawn ws_worker
directly from the HTTP server callbacks
that are defined in the same file.
Starting our Servers
Last but not least, we need to implement our servers in caf_main
. For that,
users can configure a port by setting the cmd-port
variable in the config file
or by passing the port as a command-line argument. CAF offers a convenient way
to spawn servers that operate directly on TCP sockets with the
caf::net::octet_stream
API:
// from: main.cpp
// Spin up the controller if configured.
if (auto cmd_port = caf::get_as<uint16_t>(cfg, "cmd-port")) {
auto addr = caf::get_or(cfg, "cmd-addr", "0.0.0.0"sv);
auto cmd_server
= caf::net::octet_stream::with(sys)
// Bind to the user-defined port.
.accept(*cmd_port, addr)
// Stop the server if our database actor terminates.
.monitor(db_actor)
// When started, run our worker actor to handle incoming connections.
.start([&sys, db_actor = db_actor](auto events) {
spawn_controller_actor(sys, db_actor, std::move(events));
});
if (!cmd_server) {
sys.println("*** failed to start command server: {}", cmd_server.error());
return EXIT_FAILURE;
}
}
The HTTP server has more configuration options, so we will bind them to variables first to keep things clean:
// from: main.cpp
// Read the configuration for the web server.
auto port = caf::get_or(cfg, "http-port", default_port);
auto pem = caf::net::ssl::format::pem;
auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
auto max_connections = caf::get_or(cfg, "max-connections",
default_max_connections);
auto max_request_size = caf::get_or(cfg, "max-request-size",
default_max_request_size);
if (!key_file != !cert_file) {
sys.println("*** inconsistent TLS config: declare neither file or both");
return EXIT_FAILURE;
}
If the user enables the TLS options, CAF will automatically spawn an HTTPS server instead of an HTTP server.
Just like for the octet streams, CAF offers a convenient API under
caf::net::http
for defining an HTTP server. The initial setup for the HTTP
server is also similar to the controller server:
// from: main.cpp
// Start the HTTP server.
namespace ssl = caf::net::ssl;
auto impl = std::make_shared<http_server>(db_actor);
auto server
= caf::net::http::with(sys)
// Optionally enable TLS.
.context(ssl::context::enable(key_file && cert_file)
.and_then(ssl::emplace_server(ssl::tls::v1_2))
.and_then(ssl::use_private_key_file(key_file, pem))
.and_then(ssl::use_certificate_file(cert_file, pem)))
// Bind to the user-defined port.
.accept(port)
// Limit how many clients may be connected at any given time.
.max_connections(max_connections)
// Limit the maximum request size.
.max_request_size(max_request_size)
// Stop the server if our database actor terminates.
.monitor(db_actor)
Before starting the server itself, we create an instance of http_server
. This
is a small convenience class that wraps the communication to the database actor
in order to keep the main
function clean. The public interface of
http_server
is as follows:
// from: http_server.hpp
/// Bridges between HTTP requests and the database actor.
class http_server {
public:
using responder = caf::net::http::responder;
http_server(database_actor db_actor) : db_actor_(std::move(db_actor)) {
writer_.skip_object_type_annotation(true);
}
static constexpr std::string_view json_mime_type = "application/json";
void get(responder& res, int32_t key);
void add(responder& res, int32_t key, const std::string& name, int32_t price);
void add(responder& res, int32_t key);
void inc(responder& res, int32_t key, int32_t amount);
void dec(responder& res, int32_t key, int32_t amount);
void del(responder& res, int32_t key);
// ...
};
With this helper class, we can now define callbacks for the HTTP server:
// from: main.cpp
// Route for retrieving an item from the database.
.route("/item/<arg>", http::method::get,
[impl](http::responder& res, int32_t key) {
applog::debug("GET /item/{}", key);
impl->get(res, key);
})
// Route for adding a new item to the database. The payload must be a
// JSON object with the fields "name" and "price".
.route("/item/<arg>", http::method::post,
[impl](http::responder& res, int32_t key) {
applog::debug("POST /item/{}, body: {}", key, res.body());
impl->add(res, key);
})
// Route for incrementing the available amount of an item.
.route("/item/<arg>/inc/<arg>", http::method::put,
[impl](http::responder& res, int32_t key, int32_t amount) {
applog::debug("PUT /item/{}/inc/{}", key, amount);
impl->inc(res, key, amount);
})
// Route for decrementing the available amount of an item.
.route("/item/<arg>/dec/<arg>", http::method::put,
[impl](http::responder& res, int32_t key, int32_t amount) {
applog::debug("PUT /item/{}/dec/{}", key, amount);
impl->dec(res, key, amount);
})
// Route for deleting an item from the database.
.route("/item/<arg>", http::method::del,
[impl](http::responder& res, int32_t key) {
applog::debug("DELETE /item/{}", key);
impl->del(res, key);
})
As we can see, the routes map one-to-one to the functions we defined in the
http_server
class. All that is left now is to add the endpoint for the
WebSocket server and to call start
:
// from: main.cpp
// WebSocket route for subscribing to item events.
.route("/events", http::method::get,
ws::switch_protocol()
.on_request([](ws::acceptor<>& acc) { acc.accept(); })
.on_start([&sys, ev = events](auto res) {
// Spawn a server for the WebSocket connection that simply
// spawns new workers for each incoming connection.
sys.spawn([res, ev](caf::event_based_actor* self) {
res.observe_on(self).for_each([self, ev](auto new_conn) {
applog::info("WebSocket client connected");
self->spawn(ws_worker, new_conn, ev);
});
});
}))
// Start the server.
.start();
For accepting incoming WebSocket connections, we add a route to the HTTP server
that upgrades an HTTP GET request to a WebSocket connection by using
switch_protocol
. For the WebSocket server itself, we need to define
on_request
(in our case, we simply accept any incoming connection) and
on_start
(where we spawn a server actor that in turns spawns a ws_worker
for
each connected client).
We have already seen the implementation of the ws_worker
actor. While we use
flows for the WebSocket server, the HTTP server internally uses request-response
messages to the database actor. For example, http_server::get
is implemented
as follows:
// from: http_server.cpp
void http_server::get(responder& res, int32_t key) {
auto* self = res.self();
auto prom = std::move(res).to_promise();
self->mail(get_atom_v, key)
.request(db_actor_, 2s)
.then(
[this, prom](const item& value) mutable { //
respond_with_item(prom, value);
},
[this, prom](const caf::error& what) mutable {
if (what == ec::no_such_item) {
respond_with_error(prom, "no_such_item");
return;
}
if (what == caf::sec::request_timeout) {
respond_with_error(prom, "timeout");
return;
}
respond_with_error(prom, "unexpected_database_result");
});
}
The respond_with_item
and respond_with_error
functions are simple helpers
that convert the response from the database actor to a JSON object and send it
back to the client by calling prom.respond(...)
with an appropriate status
code.
Interacting with the System
To interact with the system, you can use curl
to send HTTP requests to the
server. For example, to add a new item to the database, you can use the
following command to add a new item with the ID 123
and a price of 10
:
curl -X POST -d '{"price": 10, "name": "Item 1"}' http://localhost:8080/item/123
To read the item back from the database, you can use the following command:
curl http://localhost:8080/item/123
To increment or decrement the stock of an item, you can use the following commands:
curl -X PUT http://localhost:8080/item/123/inc/7
curl -X PUT http://localhost:8080/item/123/dec/3
To delete an item from the database, you can use the following command:
curl -X DELETE http://localhost:8080/item/123
To test the WebSocket client interface, we have included a trivial WebSocket client implementation written in Python. You can run it with the following command:
python3 client.py
It will simply connect to the WebSocket server and print out any messages it receives.
To interact with the controller, you can start the binary with --cmd-port 1234
(or any other port number) and send it commands such as
{"type": "inc", "id": 123, "amount": 5}
, e.g. by using telnet
.
Conclusion
In this tutorial, we have implemented a simple backend service for a warehouse management system. We provide a REST API to manage products and use a database to store the current stock. The service also offers a control interface to manage the stock with a simple protocol over TCP. We have also implemented a WebSocket endpoint to broadcast stock changes to connected clients.
We hope this tutorial was helpful to you and will serve as a good starting point for your own projects.
If you have any questions or suggestions, please feel free to reach out to us by sending an email to feedback@interance.io!