changeset 157:f25f5fea66af

Common: implement network_stream class, closes #755 @1h This class handles receive/send operations under the hood, it automatically does the following things: - parse/dump JSON message using '\r\n\r\n', - detect errors/disconnections and mark them as network_down, - flush receive/send after a successful operation. Acked-by: Alexis Dörr
author David Demelier <markand@malikania.fr>
date Wed, 13 Dec 2017 17:18:53 +0100
parents ca125345a9cf
children 4b292c20124c
files libcommon/malikania/network_stream.cpp libcommon/malikania/network_stream.hpp tests/libcommon/CMakeLists.txt tests/libcommon/network-stream/CMakeLists.txt tests/libcommon/network-stream/main.cpp tests/libcommon/network-stream/test.crt tests/libcommon/network-stream/test.key
diffstat 7 files changed, 453 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libcommon/malikania/network_stream.cpp	Wed Dec 13 17:18:53 2017 +0100
@@ -0,0 +1,123 @@
+/*
+ * network_stream.cpp -- network socket
+ *
+ * Copyright (c) 2013-2017 David Demelier <markand@malikania.fr>
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <cassert>
+
+#include "network_stream.hpp"
+
+namespace mlk {
+
+void network_stream::do_recv(network_recv_handler handler)
+{
+    boost::asio::async_read_until(socket_, rbuffer_, "\r\n\r\n", [this, handler] (auto code, auto xfer) {
+        if (code)
+            handler(std::move(code), nullptr);
+        else if (xfer == 0U)
+            handler(make_error_code(boost::system::errc::network_down), nullptr);
+        else {
+            std::string str(
+                boost::asio::buffers_begin(rbuffer_.data()),
+                boost::asio::buffers_begin(rbuffer_.data()) + xfer - 4
+            );
+
+            // Remove early in case of errors.
+            rbuffer_.consume(xfer);
+
+            // TODO: catch nlohmann::json::parse_error when 3.0.0 is released.
+            nlohmann::json message;
+
+            try {
+                message = nlohmann::json::parse(str);
+            } catch (...) {}
+
+            if (!message.is_object())
+                handler(make_error_code(boost::system::errc::invalid_argument), nullptr);
+            else
+                handler(code, std::move(message));
+        }
+    });
+}
+
+void network_stream::do_send(const std::string& str, network_send_handler handler)
+{
+    boost::asio::async_write(socket_, boost::asio::buffer(str), [handler] (auto code, auto xfer) {
+        if (xfer == 0U)
+            handler(make_error_code(boost::system::errc::network_down));
+        else
+            handler(code);
+    });
+}
+
+void network_stream::rflush()
+{
+    if (rqueue_.empty())
+        return;
+
+    do_recv([this] (auto code, auto json) {
+        auto handler = rqueue_.front();
+
+        rqueue_.pop_front();
+        handler(code, std::move(json));
+
+        if (!code)
+            rflush();
+    });
+}
+
+void network_stream::sflush()
+{
+    if (squeue_.empty())
+        return;
+
+    do_send(squeue_.front().first, [this] (auto code) {
+        auto handler = squeue_.front().second;
+
+        squeue_.pop_front();
+
+        if (handler)
+            handler(code);
+        if (!code)
+            sflush();
+    });
+}
+
+void network_stream::recv(network_recv_handler handler)
+{
+    assert(handler);
+
+    auto in_progress = !rqueue_.empty();
+
+    rqueue_.push_back(std::move(handler));
+
+    if (!in_progress)
+        rflush();
+}
+
+void network_stream::send(nlohmann::json json, network_send_handler handler)
+{
+    assert(json.is_object());
+
+    auto in_progress = !squeue_.empty();
+
+    squeue_.emplace_back(json.dump(0) + "\r\n\r\n", std::move(handler));
+
+    if (!in_progress)
+        sflush();
+}
+
+} // !mlk
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libcommon/malikania/network_stream.hpp	Wed Dec 13 17:18:53 2017 +0100
@@ -0,0 +1,164 @@
+/*
+ * network_stream.hpp -- network socket
+ *
+ * Copyright (c) 2013-2017 David Demelier <markand@malikania.fr>
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef MALIKANIA_COMMON_NETWORK_STREAM_HPP
+#define MALIKANIA_COMMON_NETWORK_STREAM_HPP
+
+/**
+ * \file network_stream.cpp
+ * \brief Network socket.
+ */
+
+#include "sysconfig.hpp"
+
+#include <deque>
+#include <functional>
+#include <string>
+#include <utility>
+
+#include <boost/asio.hpp>
+#include <boost/asio/ssl.hpp>
+
+#include <json.hpp>
+
+namespace mlk {
+
+/**
+ * Read handler.
+ *
+ * Call this function when a receive operation has finished on success or
+ * failure.
+ */
+using network_recv_handler = std::function<void (boost::system::error_code, nlohmann::json)>;
+
+/**
+ * Send handler.
+ *
+ * Call this function when a send operation has finished on success or failure.
+ */
+using network_send_handler = std::function<void (boost::system::error_code)>;
+
+/**
+ * \brief Base shared network stream.
+ *
+ * This class can be used to perform I/O over a networking socket, it is
+ * implemented as asynchronous operations over Boost.Asio.
+ *
+ * All recv/send operations are placed in a queue and performed when possible.
+ */
+class network_stream {
+private:
+    using socket_t = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>;
+    using rbuffer_t = boost::asio::streambuf;
+    using rqueue_t = std::deque<network_recv_handler>;
+    using squeue_t = std::deque<std::pair<std::string, network_send_handler>>;
+
+    socket_t socket_;
+    rbuffer_t rbuffer_;
+    rqueue_t rqueue_;
+    squeue_t squeue_;
+
+    void rflush();
+    void sflush();
+    void do_recv(network_recv_handler);
+    void do_send(const std::string&, network_send_handler);
+
+public:
+    /**
+     * Construct the stream.
+     *
+     * \param service the IO service
+     * \param ctx the SSL context
+     */
+    inline network_stream(boost::asio::io_service& service,
+                          boost::asio::ssl::context& ctx)
+        : socket_(service, ctx)
+    {
+    }
+
+    /**
+     * Get the underlying socket.
+     *
+     * \return the socket
+     */
+    inline const socket_t& socket() const noexcept
+    {
+        return socket_;
+    }
+
+    /**
+     * Overloaded function.
+     *
+     * \return the socket
+     */
+    inline socket_t& socket() noexcept
+    {
+        return socket_;
+    }
+
+    /**
+     * Tells if receive operations are pending.
+     *
+     * \return true if receiving is in progress
+     */
+    inline bool is_receiving() const noexcept
+    {
+        return !rqueue_.empty();
+    }
+
+    /**
+     * Tells if send operations are pending.
+     *
+     * \return true if sending is in progress
+     */
+    inline bool is_sending() const noexcept
+    {
+        return !squeue_.empty();
+    }
+
+    /**
+     * Tells if there are any I/O pending.
+     *
+     * \return true if sending is in progress
+     */
+    inline bool is_active() const noexcept
+    {
+        return is_receiving() || is_sending();
+    }
+
+    /**
+     * Request a receive operation.
+     *
+     * \pre handler != nullptr
+     * \param handler the handler
+     */
+    void recv(network_recv_handler);
+
+    /**
+     * Request a send operation.
+     *
+     * \pre json.is_object()
+     * \param json the json message
+     * \param handler the optional handler
+     */
+    void send(nlohmann::json json, network_send_handler = nullptr);
+};
+
+} // !mlk
+
+#endif // MALIKANIA_COMMON_NETWORK_STREAM_HPP
--- a/tests/libcommon/CMakeLists.txt	Wed Dec 13 17:14:33 2017 +0100
+++ b/tests/libcommon/CMakeLists.txt	Wed Dec 13 17:18:53 2017 +0100
@@ -16,6 +16,7 @@
 # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 #
 
+add_subdirectory(network-stream)
 add_subdirectory(util)
 add_subdirectory(size)
 add_subdirectory(weak_array)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/libcommon/network-stream/CMakeLists.txt	Wed Dec 13 17:18:53 2017 +0100
@@ -0,0 +1,23 @@
+#
+# CMakeLists.txt -- CMake build system for malikania
+#
+# Copyright (c) 2013-2017 David Demelier <markand@malikania.fr>
+#
+# Permission to use, copy, modify, and/or distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+#
+
+malikania_create_test(
+    NAME network-stream
+    LIBRARIES libmlk-common
+    SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
+)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/libcommon/network-stream/main.cpp	Wed Dec 13 17:18:53 2017 +0100
@@ -0,0 +1,114 @@
+/*
+ * main.cpp -- test network_stream
+ *
+ * Copyright (c) 2013-2017 David Demelier <markand@malikania.fr>
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#define BOOST_TEST_MODULE "network-stream"
+#include <boost/test/unit_test.hpp>
+
+#include <malikania/network_stream.hpp>
+
+namespace mlk {
+
+class test {
+protected:
+    boost::asio::io_service io_;
+
+    boost::asio::ssl::context cctx_;
+    boost::asio::ssl::context sctx_;
+
+    network_stream cstream_;
+    network_stream sstream_;
+
+    boost::asio::ssl::context init()
+    {
+        boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23);
+
+        ctx.use_private_key_file(CMAKE_CURRENT_SOURCE_DIR "/test.key", boost::asio::ssl::context::pem);
+        ctx.use_certificate_file(CMAKE_CURRENT_SOURCE_DIR "/test.crt", boost::asio::ssl::context::pem);
+
+        return ctx;
+    }
+
+    void pair()
+    {
+        boost::asio::ip::tcp::endpoint ep(boost::asio::ip::tcp::v4(), 0U);
+        boost::asio::ip::tcp::acceptor acceptor(io_, std::move(ep));
+
+        acceptor.async_accept(sstream_.socket().lowest_layer(), [] (auto code) {
+            if (code)
+                throw code;
+        });
+
+        cstream_.socket().lowest_layer().async_connect(acceptor.local_endpoint(), [] (auto code) {
+            if (code)
+                throw code;
+        });
+
+        io_.run();
+        io_.reset();
+    }
+
+    void handshake()
+    {
+        sstream_.socket().async_handshake(boost::asio::ssl::stream_base::server, [] (auto code) {
+            if (code)
+                throw code;
+        });
+
+        cstream_.socket().async_handshake(boost::asio::ssl::stream_base::client, [] (auto code) {
+            if (code)
+                throw code;
+        });
+
+        io_.run();
+        io_.reset();
+    }
+
+    test()
+        : cctx_(boost::asio::ssl::context::sslv23)
+        , sctx_(init())
+        , cstream_(io_, cctx_)
+        , sstream_(io_, sctx_)
+    {
+        pair();
+        handshake();
+    }
+};
+
+BOOST_FIXTURE_TEST_SUITE(test_suite, test)
+
+BOOST_AUTO_TEST_CASE(simple_message)
+{
+    cstream_.send({{ "command", "hello" }}, [] (auto code) {
+        if (code)
+            throw code;
+    });
+    sstream_.recv([] (auto code, auto message) {
+        if (code)
+            throw code;
+
+        BOOST_TEST(message["command"].template get<std::string>() == "hello");
+    });
+
+    io_.run();
+}
+
+// TODO: when error codes are implemented, add tests as well.
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // !mlk
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/libcommon/network-stream/test.crt	Wed Dec 13 17:18:53 2017 +0100
@@ -0,0 +1,13 @@
+-----BEGIN CERTIFICATE-----
+MIIB+TCCAWICCQDKnbj8OXYJ5TANBgkqhkiG9w0BAQsFADBBMQswCQYDVQQGEwJG
+UjEPMA0GA1UECAwGRnJhbmNlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0
+eSBMdGQwHhcNMTcxMjEzMTU1NTQ3WhcNMTgxMjEzMTU1NTQ3WjBBMQswCQYDVQQG
+EwJGUjEPMA0GA1UECAwGRnJhbmNlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRz
+IFB0eSBMdGQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBANxztPLLrRWZqk70
+IXcVQwFHms3no7vj7YpNNJEn3BTq+bnxGuE7jxfxGN7YEuN4gpxFA+6Ckzr1220m
+36tIwl7Xee6dechhg1+KYL3ktyOUM6eKpfJQ72JWfnt11VV2iel7BWd85Xa9x4EC
+XYLAIwvYXakGLNp1aNJwfhSBemNtAgMBAAEwDQYJKoZIhvcNAQELBQADgYEAxvKc
+AdB+e4mAV+OQi/C+ELS+3j6Z3PoLz1pTw4LPnwECYtibAC8pOAUll6hfIGthdDHM
+zN8e6Dj9qnxLk4oAx0HcmfsFVV2ZoNyPcGYlDzlwqyq1OR5nQK30Fn55v/QPDBe5
+ES71j5QeVuoUr7tlBlUedGaipzMOiyOsSymmzlA=
+-----END CERTIFICATE-----
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/libcommon/network-stream/test.key	Wed Dec 13 17:18:53 2017 +0100
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQDcc7Tyy60VmapO9CF3FUMBR5rN56O74+2KTTSRJ9wU6vm58Rrh
+O48X8Rje2BLjeIKcRQPugpM69dttJt+rSMJe13nunXnIYYNfimC95LcjlDOniqXy
+UO9iVn57ddVVdonpewVnfOV2vceBAl2CwCML2F2pBizadWjScH4UgXpjbQIDAQAB
+AoGAWok+gBQ7wko4knJaqBBYU1c38WY3bTu/W3Q3qYGINiMGamHlmyidrMR8ZVCx
++S+N3GXPpo2Dr8w20I4Nf8fC4vMuQhuC5Ex6Ewiw++vh9cGGvQUEFfJZyAgk6IXg
+ZjYYz/taAWhYHJ7X3IBIODqx5XipR1705jqTPCTgqSBCS/UCQQD3C+kEwCxwn6ZV
+I1hWdeRLQjAvhnNTLxwR/drn2EuOHGMASf0evZSVsZQNiEzLy9aJol4ersTZ1TVH
+ecGLqaSHAkEA5HENoswyPxui4wq7p4SyN2PTj+HEa6OfV8EPi6Y8khBZw8BdHXRC
+sD8ipaJFr+oNtMg1dCn2rnsdelAZ/z8pawJBAL2taVV6byRxj3Xi01pe3c9inDiB
+FF6T1wuBSuejTC9qYCDCrhNCH6jnVPMm6T+325qFDZqlOQK/Dk84jn+62lUCQG6W
+RBGhZFmkmU/r7DgGRvgFfW9TzfCFvyeOMGZcTUowXCQlRW5yz63egnlIew/T8Fqp
+6SaZAfApbXW+vTcRbo0CQQCTdCpw2Erk07clnFYap1OaCMjYMuCqakyvp32fijIb
+Bcy1ptHQmNPiqNkTEz9PlqAG8NdEnn8S3oqOcqKzEJL9
+-----END RSA PRIVATE KEY-----