Mercurial > malikania
changeset 157:f25f5fea66af
Common: implement network_stream class, closes #755 @1h
This class handles receive/send operations under the hood, it automatically does
the following things:
- parse/dump JSON message using '\r\n\r\n',
- detect errors/disconnections and mark them as network_down,
- flush receive/send after a successful operation.
Acked-by: Alexis Dörr
author | David Demelier <markand@malikania.fr> |
---|---|
date | Wed, 13 Dec 2017 17:18:53 +0100 |
parents | ca125345a9cf |
children | 4b292c20124c |
files | libcommon/malikania/network_stream.cpp libcommon/malikania/network_stream.hpp tests/libcommon/CMakeLists.txt tests/libcommon/network-stream/CMakeLists.txt tests/libcommon/network-stream/main.cpp tests/libcommon/network-stream/test.crt tests/libcommon/network-stream/test.key |
diffstat | 7 files changed, 453 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libcommon/malikania/network_stream.cpp Wed Dec 13 17:18:53 2017 +0100 @@ -0,0 +1,123 @@ +/* + * network_stream.cpp -- network socket + * + * Copyright (c) 2013-2017 David Demelier <markand@malikania.fr> + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <cassert> + +#include "network_stream.hpp" + +namespace mlk { + +void network_stream::do_recv(network_recv_handler handler) +{ + boost::asio::async_read_until(socket_, rbuffer_, "\r\n\r\n", [this, handler] (auto code, auto xfer) { + if (code) + handler(std::move(code), nullptr); + else if (xfer == 0U) + handler(make_error_code(boost::system::errc::network_down), nullptr); + else { + std::string str( + boost::asio::buffers_begin(rbuffer_.data()), + boost::asio::buffers_begin(rbuffer_.data()) + xfer - 4 + ); + + // Remove early in case of errors. + rbuffer_.consume(xfer); + + // TODO: catch nlohmann::json::parse_error when 3.0.0 is released. + nlohmann::json message; + + try { + message = nlohmann::json::parse(str); + } catch (...) {} + + if (!message.is_object()) + handler(make_error_code(boost::system::errc::invalid_argument), nullptr); + else + handler(code, std::move(message)); + } + }); +} + +void network_stream::do_send(const std::string& str, network_send_handler handler) +{ + boost::asio::async_write(socket_, boost::asio::buffer(str), [handler] (auto code, auto xfer) { + if (xfer == 0U) + handler(make_error_code(boost::system::errc::network_down)); + else + handler(code); + }); +} + +void network_stream::rflush() +{ + if (rqueue_.empty()) + return; + + do_recv([this] (auto code, auto json) { + auto handler = rqueue_.front(); + + rqueue_.pop_front(); + handler(code, std::move(json)); + + if (!code) + rflush(); + }); +} + +void network_stream::sflush() +{ + if (squeue_.empty()) + return; + + do_send(squeue_.front().first, [this] (auto code) { + auto handler = squeue_.front().second; + + squeue_.pop_front(); + + if (handler) + handler(code); + if (!code) + sflush(); + }); +} + +void network_stream::recv(network_recv_handler handler) +{ + assert(handler); + + auto in_progress = !rqueue_.empty(); + + rqueue_.push_back(std::move(handler)); + + if (!in_progress) + rflush(); +} + +void network_stream::send(nlohmann::json json, network_send_handler handler) +{ + assert(json.is_object()); + + auto in_progress = !squeue_.empty(); + + squeue_.emplace_back(json.dump(0) + "\r\n\r\n", std::move(handler)); + + if (!in_progress) + sflush(); +} + +} // !mlk
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libcommon/malikania/network_stream.hpp Wed Dec 13 17:18:53 2017 +0100 @@ -0,0 +1,164 @@ +/* + * network_stream.hpp -- network socket + * + * Copyright (c) 2013-2017 David Demelier <markand@malikania.fr> + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef MALIKANIA_COMMON_NETWORK_STREAM_HPP +#define MALIKANIA_COMMON_NETWORK_STREAM_HPP + +/** + * \file network_stream.cpp + * \brief Network socket. + */ + +#include "sysconfig.hpp" + +#include <deque> +#include <functional> +#include <string> +#include <utility> + +#include <boost/asio.hpp> +#include <boost/asio/ssl.hpp> + +#include <json.hpp> + +namespace mlk { + +/** + * Read handler. + * + * Call this function when a receive operation has finished on success or + * failure. + */ +using network_recv_handler = std::function<void (boost::system::error_code, nlohmann::json)>; + +/** + * Send handler. + * + * Call this function when a send operation has finished on success or failure. + */ +using network_send_handler = std::function<void (boost::system::error_code)>; + +/** + * \brief Base shared network stream. + * + * This class can be used to perform I/O over a networking socket, it is + * implemented as asynchronous operations over Boost.Asio. + * + * All recv/send operations are placed in a queue and performed when possible. + */ +class network_stream { +private: + using socket_t = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>; + using rbuffer_t = boost::asio::streambuf; + using rqueue_t = std::deque<network_recv_handler>; + using squeue_t = std::deque<std::pair<std::string, network_send_handler>>; + + socket_t socket_; + rbuffer_t rbuffer_; + rqueue_t rqueue_; + squeue_t squeue_; + + void rflush(); + void sflush(); + void do_recv(network_recv_handler); + void do_send(const std::string&, network_send_handler); + +public: + /** + * Construct the stream. + * + * \param service the IO service + * \param ctx the SSL context + */ + inline network_stream(boost::asio::io_service& service, + boost::asio::ssl::context& ctx) + : socket_(service, ctx) + { + } + + /** + * Get the underlying socket. + * + * \return the socket + */ + inline const socket_t& socket() const noexcept + { + return socket_; + } + + /** + * Overloaded function. + * + * \return the socket + */ + inline socket_t& socket() noexcept + { + return socket_; + } + + /** + * Tells if receive operations are pending. + * + * \return true if receiving is in progress + */ + inline bool is_receiving() const noexcept + { + return !rqueue_.empty(); + } + + /** + * Tells if send operations are pending. + * + * \return true if sending is in progress + */ + inline bool is_sending() const noexcept + { + return !squeue_.empty(); + } + + /** + * Tells if there are any I/O pending. + * + * \return true if sending is in progress + */ + inline bool is_active() const noexcept + { + return is_receiving() || is_sending(); + } + + /** + * Request a receive operation. + * + * \pre handler != nullptr + * \param handler the handler + */ + void recv(network_recv_handler); + + /** + * Request a send operation. + * + * \pre json.is_object() + * \param json the json message + * \param handler the optional handler + */ + void send(nlohmann::json json, network_send_handler = nullptr); +}; + +} // !mlk + +#endif // MALIKANIA_COMMON_NETWORK_STREAM_HPP
--- a/tests/libcommon/CMakeLists.txt Wed Dec 13 17:14:33 2017 +0100 +++ b/tests/libcommon/CMakeLists.txt Wed Dec 13 17:18:53 2017 +0100 @@ -16,6 +16,7 @@ # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # +add_subdirectory(network-stream) add_subdirectory(util) add_subdirectory(size) add_subdirectory(weak_array)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/libcommon/network-stream/CMakeLists.txt Wed Dec 13 17:18:53 2017 +0100 @@ -0,0 +1,23 @@ +# +# CMakeLists.txt -- CMake build system for malikania +# +# Copyright (c) 2013-2017 David Demelier <markand@malikania.fr> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +# + +malikania_create_test( + NAME network-stream + LIBRARIES libmlk-common + SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp +)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/libcommon/network-stream/main.cpp Wed Dec 13 17:18:53 2017 +0100 @@ -0,0 +1,114 @@ +/* + * main.cpp -- test network_stream + * + * Copyright (c) 2013-2017 David Demelier <markand@malikania.fr> + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#define BOOST_TEST_MODULE "network-stream" +#include <boost/test/unit_test.hpp> + +#include <malikania/network_stream.hpp> + +namespace mlk { + +class test { +protected: + boost::asio::io_service io_; + + boost::asio::ssl::context cctx_; + boost::asio::ssl::context sctx_; + + network_stream cstream_; + network_stream sstream_; + + boost::asio::ssl::context init() + { + boost::asio::ssl::context ctx(boost::asio::ssl::context::sslv23); + + ctx.use_private_key_file(CMAKE_CURRENT_SOURCE_DIR "/test.key", boost::asio::ssl::context::pem); + ctx.use_certificate_file(CMAKE_CURRENT_SOURCE_DIR "/test.crt", boost::asio::ssl::context::pem); + + return ctx; + } + + void pair() + { + boost::asio::ip::tcp::endpoint ep(boost::asio::ip::tcp::v4(), 0U); + boost::asio::ip::tcp::acceptor acceptor(io_, std::move(ep)); + + acceptor.async_accept(sstream_.socket().lowest_layer(), [] (auto code) { + if (code) + throw code; + }); + + cstream_.socket().lowest_layer().async_connect(acceptor.local_endpoint(), [] (auto code) { + if (code) + throw code; + }); + + io_.run(); + io_.reset(); + } + + void handshake() + { + sstream_.socket().async_handshake(boost::asio::ssl::stream_base::server, [] (auto code) { + if (code) + throw code; + }); + + cstream_.socket().async_handshake(boost::asio::ssl::stream_base::client, [] (auto code) { + if (code) + throw code; + }); + + io_.run(); + io_.reset(); + } + + test() + : cctx_(boost::asio::ssl::context::sslv23) + , sctx_(init()) + , cstream_(io_, cctx_) + , sstream_(io_, sctx_) + { + pair(); + handshake(); + } +}; + +BOOST_FIXTURE_TEST_SUITE(test_suite, test) + +BOOST_AUTO_TEST_CASE(simple_message) +{ + cstream_.send({{ "command", "hello" }}, [] (auto code) { + if (code) + throw code; + }); + sstream_.recv([] (auto code, auto message) { + if (code) + throw code; + + BOOST_TEST(message["command"].template get<std::string>() == "hello"); + }); + + io_.run(); +} + +// TODO: when error codes are implemented, add tests as well. + +BOOST_AUTO_TEST_SUITE_END() + +} // !mlk
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/libcommon/network-stream/test.crt Wed Dec 13 17:18:53 2017 +0100 @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIIB+TCCAWICCQDKnbj8OXYJ5TANBgkqhkiG9w0BAQsFADBBMQswCQYDVQQGEwJG +UjEPMA0GA1UECAwGRnJhbmNlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0 +eSBMdGQwHhcNMTcxMjEzMTU1NTQ3WhcNMTgxMjEzMTU1NTQ3WjBBMQswCQYDVQQG +EwJGUjEPMA0GA1UECAwGRnJhbmNlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRz +IFB0eSBMdGQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBANxztPLLrRWZqk70 +IXcVQwFHms3no7vj7YpNNJEn3BTq+bnxGuE7jxfxGN7YEuN4gpxFA+6Ckzr1220m +36tIwl7Xee6dechhg1+KYL3ktyOUM6eKpfJQ72JWfnt11VV2iel7BWd85Xa9x4EC +XYLAIwvYXakGLNp1aNJwfhSBemNtAgMBAAEwDQYJKoZIhvcNAQELBQADgYEAxvKc +AdB+e4mAV+OQi/C+ELS+3j6Z3PoLz1pTw4LPnwECYtibAC8pOAUll6hfIGthdDHM +zN8e6Dj9qnxLk4oAx0HcmfsFVV2ZoNyPcGYlDzlwqyq1OR5nQK30Fn55v/QPDBe5 +ES71j5QeVuoUr7tlBlUedGaipzMOiyOsSymmzlA= +-----END CERTIFICATE-----
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/libcommon/network-stream/test.key Wed Dec 13 17:18:53 2017 +0100 @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQDcc7Tyy60VmapO9CF3FUMBR5rN56O74+2KTTSRJ9wU6vm58Rrh +O48X8Rje2BLjeIKcRQPugpM69dttJt+rSMJe13nunXnIYYNfimC95LcjlDOniqXy +UO9iVn57ddVVdonpewVnfOV2vceBAl2CwCML2F2pBizadWjScH4UgXpjbQIDAQAB +AoGAWok+gBQ7wko4knJaqBBYU1c38WY3bTu/W3Q3qYGINiMGamHlmyidrMR8ZVCx ++S+N3GXPpo2Dr8w20I4Nf8fC4vMuQhuC5Ex6Ewiw++vh9cGGvQUEFfJZyAgk6IXg +ZjYYz/taAWhYHJ7X3IBIODqx5XipR1705jqTPCTgqSBCS/UCQQD3C+kEwCxwn6ZV +I1hWdeRLQjAvhnNTLxwR/drn2EuOHGMASf0evZSVsZQNiEzLy9aJol4ersTZ1TVH +ecGLqaSHAkEA5HENoswyPxui4wq7p4SyN2PTj+HEa6OfV8EPi6Y8khBZw8BdHXRC +sD8ipaJFr+oNtMg1dCn2rnsdelAZ/z8pawJBAL2taVV6byRxj3Xi01pe3c9inDiB +FF6T1wuBSuejTC9qYCDCrhNCH6jnVPMm6T+325qFDZqlOQK/Dk84jn+62lUCQG6W +RBGhZFmkmU/r7DgGRvgFfW9TzfCFvyeOMGZcTUowXCQlRW5yz63egnlIew/T8Fqp +6SaZAfApbXW+vTcRbo0CQQCTdCpw2Erk07clnFYap1OaCMjYMuCqakyvp32fijIb +Bcy1ptHQmNPiqNkTEz9PlqAG8NdEnn8S3oqOcqKzEJL9 +-----END RSA PRIVATE KEY-----