Learning Resources

Warehouse Backend


Introduction

In this tutorial, we are going to implement a simple backend service for a warehouse management system. The service will offer a REST API to manage products and use a database to store the current stock. The service also offers a control interface to manage the stock with a simple protocol over TCP. This simulates a scenario where the warehouse workers use a handheld device, such as a barcode scanner, to manage the stock.

The source code for this tutorial is available on GitHub at interance/warehouse-backend-example. We will focus on the main aspects of the implementation in this tutorial and omit parts of the source code in this tutorial to keep it concise. Please consult the full source code to fill in the gaps if needed.

The following diagram shows the architecture of the system:

Architecture

Our system has three communication channels:

  • the HTTP REST API
  • the TCP control interface (an octet stream)
  • a WebSocket endpoint to broadcast stock changes to connected clients

We will use JSON as exchange format on all communication channels. Internally, we have three main kinds of actors in the system:

  • the Database Actor manages the stock
  • the Controller Actor handles messages from the TCP control interface
  • the WS Worker forwards events to a WebSocket client.

In the implementation for this tutorial, we will use SQLite as the database engine because it requires no prior setup and is easy to use in a tutorial. However, the source code is structured in a way that makes it easy to swap the database engine with another one if needed.

Building the Source Code

To build the source code, you need to have SQLite and OpenSSL installed on your system. Our CMake setup in CMakeLists.txt for this project will automatically download and build CAF as part of the build process.

Building the project is straightforward and follows the usual CMake workflow:

cmake -S . -B build
cmake --build build

This will build the project and create an executable called warehouse-backend-example in the build directory.

Data Types

We will start by defining the data types used in the system. First, we define the item type to represent a product in the warehouse.

// from: item.hpp

struct item {
  int32_t id;
  int32_t price;
  int32_t available;
  std::string name;
};

template <class Inspector>
bool inspect(Inspector& f, item& x) {
  return f.object(x).fields(f.field("id", x.id), f.field("price", x.price),
                            f.field("available", x.available),
                            f.field("name", x.name));
}

Each item has a unique id, that we use as the primary key in the database. The price field represents the cost of the item, and the available field represents the current stock. Finally, the name field is a human-readable description of the product. In order to be able to serialize and deserialize the item type, we implement the inspect function that CAF uses to introspect the data types.

Of course, this data type is a simplification. It will serve our need for this tutorial, but a real-world application would require a more complex data model.

Database Actor

The database actor is responsible for managing the stock. It will offer a request-based interface to query and update the stock that we define as follows:

// from: database_actor.hpp

struct database_trait {
  using signatures = caf::type_list<
    // Retrieves an item from the database.
    caf::result<item>(get_atom, int32_t),
    // Adds a new item to the database.
    caf::result<void>(add_atom, int32_t, int32_t, std::string),
    // Increments the available count of an item.
    caf::result<int32_t>(inc_atom, int32_t, int32_t),
    // Decrements the available count of an item.
    caf::result<int32_t>(dec_atom, int32_t, int32_t),
    // Deletes an item from the database.
    caf::result<void>(del_atom, int32_t)>;
};

using database_actor = caf::typed_actor<database_trait>;

The first parameter of each function is an atom that identifies the operation (the atom types are defined in types.hpp). The second parameter is the id of the item to operate on. Remaining parameters are specific to each operation:

  • get: Retrieves an item from the database.
  • add: Adds a new item to the database with price and name.
  • inc: Increments the available count of an item by a given amount.
  • dec: Decrements the available count of an item by a given amount.
  • del: Deletes an item from the database.

To report errors in a meaningful way to the user, we also define an error type:

// from: ec.hpp

/// Application-specific error codes.
enum class ec : uint8_t {
  /// No error occurred.
  nil = 0,
  /// Indicates that a database query did not return any results.
  no_such_item,
  /// Indicates that a key already exists in the database.
  key_already_exists,
  /// Indicates that the database is not accessible.
  database_inaccessible,
  /// Indicates that a user-provided argument is invalid.
  invalid_argument,
  /// The number of error codes (must be last entry!).
  /// @note This value is not a valid error code.
  num_ec_codes,
};

The last entry in the enumeration is a sentinel value to indicate the number of error codes in the enum. This is useful for implementing boilerplate functions such as to_string and from_string. By registering this enum using CAF_ERROR_CODE_ENUM(ec), we enable CAF to use this enum as error code type in caf::error.

To separate our code for the database API from the code for the actor itself, we also define a database interface that the actor uses to interact with the database:

// from: database.hpp

/// A simple database interface for storing items.
class database {
public:
  database(std::string db_file) : db_file_(std::move(db_file)) {
    // nop
  }

  ~database();

  /// Opens the database file and creates the table if it does not exist.
  /// @returns `caf::error{}` on success, an error code otherwise.
  [[nodiscard]] caf::error open();

  /// Retrieves the number of items in the database.
  /// @returns the number of items in the database.
  [[nodiscard]] int count();

  /// Retrieves an item from the database.
  /// @returns the item if found, `std::nullopt` otherwise.
  [[nodiscard]] std::optional<item> get(int32_t id);

  /// Inserts a new item into the database.
  /// @returns `ec::nil` on success, an error code otherwise.
  [[nodiscard]] ec insert(const item& new_item);

  /// Increments the available count of an item.
  /// @returns `ec::nil` on success, an error code otherwise.
  [[nodiscard]] ec inc(int32_t id, int32_t amount);

  /// Decrements the available count of an item.
  /// @returns `ec::nil` on success, an error code otherwise.
  [[nodiscard]] ec dec(int32_t id, int32_t amount);

  /// Deletes an item from the database.
  /// @returns `ec::nil` on success, an error code otherwise.
  [[nodiscard]] ec del(int32_t id);

private:
  std::string db_file_;
  sqlite3* db_ = nullptr;
};

/// A smart pointer to an item database.
using database_ptr = std::shared_ptr<database>;

We omit the implementation of the database class in this tutorial, since it is a straightforward SQLite wrapper. You can find the full implementation in the source code repository.

Whenever we update an item, we want to notify the WebSocket clients about the change. To do so, we also define a publisher type:

// from: item.hpp

using item_event = std::shared_ptr<const item>;

using item_events = caf::async::publisher<item_event>;

With this definition, we can now have a flow of item events from the database to any interested party. In our case, we will subscribe the WebSocket clients to this publisher.

For the database actor itself, we define a function that spawns a new instance of the actor. This allows us to hide the full implementation of the actor in the source file.

// from: database_actor.hpp

std::pair<database_actor, item_events>
spawn_database_actor(caf::actor_system& sys, database_ptr db);

The function takes a reference to the actor system that will host the actor and a pointer to the database instance to manage the stock. It returns the actor itself, as well as a publisher for item events that we will use to broadcast stock changes to the WebSocket clients.

The implementation of the function is as follows:

// from: database_actor.cpp

std::pair<database_actor, item_events>
spawn_database_actor(caf::actor_system& sys, database_ptr db) {
  // Note: the actor uses a blocking API (SQLite3) and thus should run in its
  //       own thread.
  using caf::actor_from_state;
  using caf::detached;
  item_events events;
  auto hdl = sys.spawn<detached>(actor_from_state<database_actor_state>, db,
                                 &events);
  return {hdl, std::move(events)};
}

We spawn a new actor using the actor_from_state utility, which creates an actor from the state class that we will cover next. By passing detached to spawn, we tell CAF to run this actor in a separate thread. The actor state requires a pointer to the database instance and we pass a pointer to the local events variable that the actor initializes with the actual object.

For the implementation of the state, we will use a multicaster to broadcast stock changes to the WebSocket clients. Other than that, the implementation is very straightforward:

// from: database_actor.cpp

struct database_actor_state {
  database_actor_state(database_actor::pointer self_ptr, database_ptr db_ptr,
                       item_events* events)
    : self(self_ptr), db(db_ptr), mcast(self) {
    *events = mcast.as_observable().to_publisher();
  }

  database_actor::behavior_type make_behavior();

  database_actor::pointer self;
  database_ptr db;
  caf::flow::multicaster<item_event> mcast;
};

In make_behavior, we simply define one handler for each operation that the database actor supports. In each handler, we also call mcast.push to broadcast the stock changes to the WebSocket clients. For example, the handler for the add operation looks like this:

// from: database_actor.cpp

    [this](add_atom, int32_t id, int32_t price,
           const std::string& name) -> caf::result<void> {
      auto value = item{id, price, 0, std::move(name)};
      if (auto err = db->insert(value); err != ec::nil)
        return {caf::make_error(err)};
      mcast.push(std::make_shared<item>(std::move(value)));
      return {};
    },

One note on the mcast.push call: the multicaster will discard the event if no client is subscribed. This is the desired behavior for our use case, as we do not want to keep events in memory if no client is interested in them. If there are clients, we will always connect them using the on_backpressure_buffer operator to make sure that clients get disconnected when not able to keep up with the flow of events. In different scenarios, you might want to check if items pile up in the multicaster by calling mcast.buffered() and take appropriate action if the buffer grows too large.

Controller Actor

The controller actor handles incoming control messages over TCP. The protocol is very simple: each message is a JSON object with three fields:

  • type: A string that is either "inc" or "dec".
  • id: The ID of the item to operate on.
  • amount: The amount to increment or decrement the stock.

Each message is a single line, terminated by a newline character. The controller actor will parse the JSON object and send a request message to the database actor to perform the operation. The response message from the database actor is then also converted to a JSON object and sent back to the client.

To start a new controller, we provide a function that spawns a new controller actor:

// from: controller_actor.hpp

caf::actor
spawn_controller_actor(caf::actor_system& sys, database_actor db_actor,
                       caf::net::acceptor_resource<std::byte> events);

Before implementing this function, we implement a helper struct command to parse the incoming JSON messages:

// from: controller_actor.cpp

struct command {
  std::string type; // Either "inc" or "dec".
  int32_t id = 0;
  int32_t amount = 0;

  bool valid() const noexcept {
    return type == "inc" || type == "dec";
  }
};

template <class Insepctor>
bool inspect(Insepctor& f, command& x) {
  return f.object(x).fields(f.field("type", x.type), f.field("id", x.id),
                            f.field("amount", x.amount));
}

With this type, we can now use a caf::json_reader to parse the incoming JSON messages into C++ objects that are easier to work with. We could also parse the JSON into a caf::json_object and work with that, but using a C++ object is more convenient for our use case.

Our spawn function simply calls sys.spawn with a lambda that implements the controller logic and returns immediately.

However, the logic is a bit more involved. We break it down into several parts to make it easier to understand.

First, we monitor the database actor, which means the controller actor will terminate if the database actor terminates. Then, we open up our byte flow and convert it to UTF-8 strings:

// from: controller_actor.cpp

caf::actor
spawn_controller_actor(caf::actor_system& sys, database_actor db_actor,
                       caf::net::acceptor_resource<std::byte> events) {
  return sys.spawn([events, db_actor](caf::event_based_actor* self) mutable {
    // Stop if the database actor terminates.
    self->monitor(db_actor, [self](const caf::error& reason) {
      applog::info("controller lost the database actor: {}", reason);
      self->quit(reason);
    });
    // For each buffer pair, we create a new flow ...
    events.observe_on(self).for_each([self, db_actor](auto ev) {
      applog::info("controller added a new client");
      auto [pull, push] = ev.data();
      pull
        .observe_on(self)
        // ... that converts the lines to commands ...
        .transform(caf::flow::byte::split_as_utf8_at('\n'))

Our transformation step turns a flow of std::byte into a flow of caf::cow_string (copy-on-write strings). Each string should be a JSON object that we want to convert into a command object. Since we are in a data flow, we want to pass down objects that are cheap to copy, so we will wrap the command objects into a std::shared_ptr. This will also allow us to pass nullptr to the downstream stages to signal that we have received an invalid command:

.map([self](const caf::cow_string& line) {
          applog::debug("controller received line: {}", line.str());
          caf::json_reader reader;
          if (!reader.load(line.str())) {
            applog::error("controller failed to parse JSON: {}",
                          reader.get_error());
            return std::shared_ptr<command>{}; // Invalid JSON.
          }
          auto ptr = std::make_shared<command>();
          if (!reader.apply(*ptr))
            return std::shared_ptr<command>{}; // Not a command.
          return ptr;
        })

If the map step fails, we want to send an error message to the client. If we do have a valid command, we want to send it to the database actor and send the response back to the client. For the responses (and error messages), we will create observables, that we need to concatenate to the output for the client:

.concat_map([self, db_actor](std::shared_ptr<command> ptr) {
          // If the `map` step failed, inject an error message.
          if (ptr == nullptr || !ptr->valid()) {
            auto str = R"_({"error":"invalid command"})_"s;
            return self->make_observable()
              .just(caf::cow_string{std::move(str)})
              .as_observable();
          }
          // Send the command to the database actor and convert the
          // result message into an observable.
          caf::flow::observable<int32_t> result;
          if (ptr->type == "inc") {
            result = self->mail(inc_atom_v, ptr->id, ptr->amount)
                       .request(db_actor, 1s)
                       .as_observable();
          } else {
            result = self->mail(dec_atom_v, ptr->id, ptr->amount)
                       .request(db_actor, 1s)
                       .as_observable();
          }
          // On error, we return an error message to the client.
          return result
            .map([ptr](int32_t res) {
              applog::debug("controller received result for {} -> {}", *ptr,
                            res);
              auto str = R"_({"result":)_"s;
              str += std::to_string(res);
              str += '}';
              return caf::cow_string{std::move(str)};
            })
            .on_error_return([ptr](const caf::error& what) {
              applog::debug("controller received an error for {} -> {}", *ptr,
                            what);
              auto str = R"_({"error":")_"s;
              str += to_string(what);
              str += R"_("})_";
              auto res = caf::cow_string{std::move(str)};
              return caf::expected<caf::cow_string>{std::move(res)};
            })
            .as_observable();
        })

The last part of the implementation is straightforward. We use on_backpressure_buffer to give the client an upper limit of messages that we will buffer. If the client cannot keep up with the flow of messages, we will disconnect it. We also inject a bit of logging and convert the strings back to bytes so that we can send them back over the network:

// ... disconnects if the client is too slow ...
        .on_backpressure_buffer(32)
        // ... and pushes the results back to the client as bytes.
        .transform(caf::flow::string::to_chars("\n"))
        .do_finally(
          [] { applog::info("controller lost connection to a client"); })
        .map([](char ch) { return static_cast<std::byte>(ch); })
        .subscribe(push);
    });
  });
}

WS (WebSocket) Worker

The WebSocket worker is responsible for forwarding stock changes to a connected client. For communication with the WebSocket client, it will take an accept_event argument when spawned for creating input and output flows. For tapping into the item events from the database actor, we will pass it an item_events argument (an asynchronous publisher).

The first half of the implementation is straightforward. We create a new input flow for the frames we receive from the WebSocket client and simply discard any input by subscribing std::ignore to it:

// from: main.cpp

// The actor for handling a single WebSocket connection.
void ws_worker(caf::event_based_actor* self,
               caf::net::accept_event<ws::frame> new_conn, item_events events) {
  using frame = ws::frame;
  auto [pull, push] = new_conn.data();
  // We ignore whatever the client may send to us.
  pull.observe_on(self)
    .do_finally([] { applog::info("WebSocket client disconnected"); })
    .subscribe(std::ignore);

The second half subscribes to the item events and forwards them to the WebSocket client after converting them to JSON:

// Send all events as JSON objects to the client.
  auto writer = std::make_shared<caf::json_writer>();
  writer->skip_object_type_annotation(true);
  events.observe_on(self)
    .filter([](const item_event& item) { return item != nullptr; })
    .map([writer](const item_event& item) {
      writer->reset();
      if (!writer->apply(*item)) {
        applog::error("failed to serialize an item event: {}",
                      writer->get_error());
        return frame{};
      }
      return frame{writer->str()};
    })
    .filter([](const frame& item) { return !item.empty(); })
    .on_backpressure_buffer(default_max_pending_frames)
    .subscribe(push);
}

Again, we use on_backpressure_buffer to disconnect the client if it cannot keep up with the flow of events.

Note that this time, we don't write a spawn function for the WebSocket worker to hide implementation details, but instead implement the actor directly. This is because we will spawn ws_worker directly from the HTTP server callbacks that are defined in the same file.

Starting our Servers

Last but not least, we need to implement our servers in caf_main. For that, users can configure a port by setting the cmd-port variable in the config file or by passing the port as a command-line argument. CAF offers a convenient way to spawn servers that operate directly on TCP sockets with the caf::net::octet_stream API:

// from: main.cpp

  // Spin up the controller if configured.
  if (auto cmd_port = caf::get_as<uint16_t>(cfg, "cmd-port")) {
    auto addr = caf::get_or(cfg, "cmd-addr", "0.0.0.0"sv);
    auto cmd_server
      = caf::net::octet_stream::with(sys)
          // Bind to the user-defined port.
          .accept(*cmd_port, addr)
          // Stop the server if our database actor terminates.
          .monitor(db_actor)
          // When started, run our worker actor to handle incoming connections.
          .start([&sys, db_actor = db_actor](auto events) {
            spawn_controller_actor(sys, db_actor, std::move(events));
          });
    if (!cmd_server) {
      sys.println("*** failed to start command server: {}", cmd_server.error());
      return EXIT_FAILURE;
    }
  }

The HTTP server has more configuration options, so we will bind them to variables first to keep things clean:

// from: main.cpp

  // Read the configuration for the web server.
  auto port = caf::get_or(cfg, "http-port", default_port);
  auto pem = caf::net::ssl::format::pem;
  auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
  auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
  auto max_connections = caf::get_or(cfg, "max-connections",
                                     default_max_connections);
  auto max_request_size = caf::get_or(cfg, "max-request-size",
                                      default_max_request_size);
  if (!key_file != !cert_file) {
    sys.println("*** inconsistent TLS config: declare neither file or both");
    return EXIT_FAILURE;
  }

If the user enables the TLS options, CAF will automatically spawn an HTTPS server instead of an HTTP server.

Just like for the octet streams, CAF offers a convenient API under caf::net::http for defining an HTTP server. The initial setup for the HTTP server is also similar to the controller server:

// from: main.cpp

  // Start the HTTP server.
  namespace ssl = caf::net::ssl;
  auto impl = std::make_shared<http_server>(db_actor);
  auto server
    = caf::net::http::with(sys)
        // Optionally enable TLS.
        .context(ssl::context::enable(key_file && cert_file)
                   .and_then(ssl::emplace_server(ssl::tls::v1_2))
                   .and_then(ssl::use_private_key_file(key_file, pem))
                   .and_then(ssl::use_certificate_file(cert_file, pem)))
        // Bind to the user-defined port.
        .accept(port)
        // Limit how many clients may be connected at any given time.
        .max_connections(max_connections)
        // Limit the maximum request size.
        .max_request_size(max_request_size)
        // Stop the server if our database actor terminates.
        .monitor(db_actor)

Before starting the server itself, we create an instance of http_server. This is a small convenience class that wraps the communication to the database actor in order to keep the main function clean. The public interface of http_server is as follows:

// from: http_server.hpp

/// Bridges between HTTP requests and the database actor.
class http_server {
public:
  using responder = caf::net::http::responder;

  http_server(database_actor db_actor) : db_actor_(std::move(db_actor)) {
    writer_.skip_object_type_annotation(true);
  }

  static constexpr std::string_view json_mime_type = "application/json";

  void get(responder& res, int32_t key);

  void add(responder& res, int32_t key, const std::string& name, int32_t price);

  void add(responder& res, int32_t key);

  void inc(responder& res, int32_t key, int32_t amount);

  void dec(responder& res, int32_t key, int32_t amount);

  void del(responder& res, int32_t key);

  // ...
};

With this helper class, we can now define callbacks for the HTTP server:

// from: main.cpp

        // Route for retrieving an item from the database.
        .route("/item/<arg>", http::method::get,
               [impl](http::responder& res, int32_t key) {
                 applog::debug("GET /item/{}", key);
                 impl->get(res, key);
               })
        // Route for adding a new item to the database. The payload must be a
        // JSON object with the fields "name" and "price".
        .route("/item/<arg>", http::method::post,
               [impl](http::responder& res, int32_t key) {
                 applog::debug("POST /item/{}, body: {}", key, res.body());
                 impl->add(res, key);
               })
        // Route for incrementing the available amount of an item.
        .route("/item/<arg>/inc/<arg>", http::method::put,
               [impl](http::responder& res, int32_t key, int32_t amount) {
                 applog::debug("PUT /item/{}/inc/{}", key, amount);
                 impl->inc(res, key, amount);
               })
        // Route for decrementing the available amount of an item.
        .route("/item/<arg>/dec/<arg>", http::method::put,
               [impl](http::responder& res, int32_t key, int32_t amount) {
                 applog::debug("PUT /item/{}/dec/{}", key, amount);
                 impl->dec(res, key, amount);
               })
        // Route for deleting an item from the database.
        .route("/item/<arg>", http::method::del,
               [impl](http::responder& res, int32_t key) {
                 applog::debug("DELETE /item/{}", key);
                 impl->del(res, key);
               })

As we can see, the routes map one-to-one to the functions we defined in the http_server class. All that is left now is to add the endpoint for the WebSocket server and to call start:

// from: main.cpp

        // WebSocket route for subscribing to item events.
        .route("/events", http::method::get,
               ws::switch_protocol()
                 .on_request([](ws::acceptor<>& acc) { acc.accept(); })
                 .on_start([&sys, ev = events](auto res) {
                   // Spawn a server for the WebSocket connection that simply
                   // spawns new workers for each incoming connection.
                   sys.spawn([res, ev](caf::event_based_actor* self) {
                     res.observe_on(self).for_each([self, ev](auto new_conn) {
                       applog::info("WebSocket client connected");
                       self->spawn(ws_worker, new_conn, ev);
                     });
                   });
                 }))
        // Start the server.
        .start();

For accepting incoming WebSocket connections, we add a route to the HTTP server that upgrades an HTTP GET request to a WebSocket connection by using switch_protocol. For the WebSocket server itself, we need to define on_request (in our case, we simply accept any incoming connection) and on_start (where we spawn a server actor that in turns spawns a ws_worker for each connected client).

We have already seen the implementation of the ws_worker actor. While we use flows for the WebSocket server, the HTTP server internally uses request-response messages to the database actor. For example, http_server::get is implemented as follows:

// from: http_server.cpp

void http_server::get(responder& res, int32_t key) {
  auto* self = res.self();
  auto prom = std::move(res).to_promise();
  self->mail(get_atom_v, key)
    .request(db_actor_, 2s)
    .then(
      [this, prom](const item& value) mutable { //
        respond_with_item(prom, value);
      },
      [this, prom](const caf::error& what) mutable {
        if (what == ec::no_such_item) {
          respond_with_error(prom, "no_such_item");
          return;
        }
        if (what == caf::sec::request_timeout) {
          respond_with_error(prom, "timeout");
          return;
        }
        respond_with_error(prom, "unexpected_database_result");
      });
}

The respond_with_item and respond_with_error functions are simple helpers that convert the response from the database actor to a JSON object and send it back to the client by calling prom.respond(...) with an appropriate status code.

Interacting with the System

To interact with the system, you can use curl to send HTTP requests to the server. For example, to add a new item to the database, you can use the following command to add a new item with the ID 123 and a price of 10:

curl -X POST -d '{"price": 10, "name": "Item 1"}' http://localhost:8080/item/123

To read the item back from the database, you can use the following command:

curl http://localhost:8080/item/123

To increment or decrement the stock of an item, you can use the following commands:

curl -X PUT http://localhost:8080/item/123/inc/7
curl -X PUT http://localhost:8080/item/123/dec/3

To delete an item from the database, you can use the following command:

curl -X DELETE http://localhost:8080/item/123

To test the WebSocket client interface, we have included a trivial WebSocket client implementation written in Python. You can run it with the following command:

python3 client.py

It will simply connect to the WebSocket server and print out any messages it receives.

To interact with the controller, you can start the binary with --cmd-port 1234 (or any other port number) and send it commands such as {"type": "inc", "id": 123, "amount": 5}, e.g. by using telnet.

Conclusion

In this tutorial, we have implemented a simple backend service for a warehouse management system. We provide a REST API to manage products and use a database to store the current stock. The service also offers a control interface to manage the stock with a simple protocol over TCP. We have also implemented a WebSocket endpoint to broadcast stock changes to connected clients.

We hope this tutorial was helpful to you and will serve as a good starting point for your own projects.

If you have any questions or suggestions, please feel free to reach out to us by sending an email to feedback@interance.io!