Mercurial > irccd
view libirccd/irccd/stream.hpp @ 932:0e11221c9bcc
man: split irccd-api into individual ones
author | David Demelier <markand@malikania.fr> |
---|---|
date | Tue, 05 Jan 2021 22:25:47 +0100 |
parents | 5e25439fe98d |
children |
line wrap: on
line source
/* * stream.hpp -- abstract stream interface * * Copyright (c) 2013-2020 David Demelier <markand@malikania.fr> * * Permission to use, copy, modify, and/or distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #ifndef IRCCD_STREAM_HPP #define IRCCD_STREAM_HPP /** * \file stream.hpp * \brief Abstract stream interface. */ #include <irccd/sysconfig.hpp> #include <cassert> #include <cstddef> #include <functional> #include <ostream> #include <string> #include <system_error> #include <utility> #include <boost/asio.hpp> #if defined(IRCCD_HAVE_SSL) # include <boost/asio/ssl.hpp> #endif #include "json.hpp" namespace irccd { // {{{ stream /** * \brief Abstract stream interface * \ingroup core-streams * * Abstract I/O interface that allows reading/writing from a stream in an * asynchronous manner. * * The derived classes must implement non-blocking recv and send operations. */ class stream { public: /** * \brief Read completion handler. */ using recv_handler = std::function<void (std::error_code, nlohmann::json)>; /** * \brief Write completion handler. */ using send_handler = std::function<void (std::error_code)>; /** * Default constructor. */ stream() = default; /** * Virtual destructor defaulted. */ virtual ~stream() = default; /** * Start asynchronous read. * * \pre another read operation must not be running * \pre handler != nullptr * \param handler the handler */ virtual void recv(recv_handler handler) = 0; /** * Start asynchronous write. * * \pre json.is_object() * \pre another write operation must not be running * \pre handler != nullptr * \param json the JSON message * \param handler the handler */ virtual void send(const nlohmann::json& json, send_handler handler) = 0; }; // }}} // {{{ basic_socket_stream /** * \brief Complete implementation for basic sockets * \ingroup core-streams * \tparam Socket Boost.Asio socket (e.g. boost::asio::ip::tcp::socket) */ template <typename Socket> class basic_socket_stream : public stream { private: boost::asio::streambuf input_{2048}; boost::asio::streambuf output_; #if !defined(NDEBUG) bool is_receiving_{false}; bool is_sending_{false}; #endif void handle_recv(boost::system::error_code, std::size_t, recv_handler); void handle_send(boost::system::error_code, std::size_t, send_handler); Socket socket_; public: /** * Construct a socket stream. * * \param args the arguments to pass to the constructor */ template <typename... Args> basic_socket_stream(Args&&... args); /** * Get the underlying socket. * * \return the socket */ auto get_socket() const noexcept -> const Socket&; /** * Overloaded function. * * \return the socket */ auto get_socket() noexcept -> Socket&; /** * Default implementation for Boost.Asio sockets. * * \pre another read operation must not be running * \pre handler != nullptr * \param handler the handler */ void recv(recv_handler handler) override; /** * Default implementation for Boost.Asio sockets. * * \pre json.is_object() * \pre another write operation must not be running * \pre handler != nullptr * \param json the JSON message * \param handler the handler */ void send(const nlohmann::json& json, send_handler handler) override; }; template <typename Socket> template <typename... Args> inline basic_socket_stream<Socket>::basic_socket_stream(Args&&... args) : socket_(std::forward<Args>(args)...) { } template <typename Socket> inline auto basic_socket_stream<Socket>::get_socket() const noexcept -> const Socket& { return socket_; } template <typename Socket> inline auto basic_socket_stream<Socket>::get_socket() noexcept -> Socket& { return socket_; } template <typename Socket> inline void basic_socket_stream<Socket>::handle_recv(boost::system::error_code code, std::size_t xfer, recv_handler handler) { #if !defined(NDEBUG) is_receiving_ = false; #endif if (code == boost::asio::error::not_found) { handler(make_error_code(std::errc::argument_list_too_long), nullptr); return; } if (code == boost::asio::error::eof || xfer == 0) { handler(make_error_code(std::errc::connection_reset), nullptr); return; } if (code) { handler(std::move(code), nullptr); return; } // 1. Convert the buffer safely. std::string buffer; try { buffer = std::string( boost::asio::buffers_begin(input_.data()), boost::asio::buffers_begin(input_.data()) + xfer - /* \r\n\r\n */ 4 ); input_.consume(xfer); } catch (const std::bad_alloc&) { handler(make_error_code(std::errc::not_enough_memory), nullptr); return; } // 2. Convert to JSON. nlohmann::json doc; try { doc = nlohmann::json::parse(buffer); } catch (const std::exception&) { handler(make_error_code(std::errc::invalid_argument), nullptr); return; } if (!doc.is_object()) handler(make_error_code(std::errc::invalid_argument), nullptr); else handler(std::error_code(), std::move(doc)); } template <typename Socket> inline void basic_socket_stream<Socket>::handle_send(boost::system::error_code code, std::size_t xfer, send_handler handler) { #if !defined(NDEBUG) is_sending_ = false; #endif if (code == boost::asio::error::eof || xfer == 0) { handler(make_error_code(std::errc::connection_reset)); return; } else handler(std::move(code)); } template <typename Socket> inline void basic_socket_stream<Socket>::recv(recv_handler handler) { #if !defined(NDEBUG) assert(!is_receiving_); is_receiving_ = true; #endif assert(handler); async_read_until(socket_, input_, "\r\n\r\n", [this, handler] (auto code, auto xfer) { handle_recv(code, xfer, handler); }); } template <typename Socket> inline void basic_socket_stream<Socket>::send(const nlohmann::json& json, send_handler handler) { #if !defined(NDEBUG) assert(!is_sending_); is_sending_ = true; #endif assert(json.is_object()); assert(handler); std::ostream out(&output_); out << json.dump(0); out << "\r\n\r\n"; out << std::flush; async_write(socket_, output_, [this, handler] (auto code, auto xfer) { handle_send(code, xfer, handler); }); } // }}} // {{{ ip_stream /** * \brief Convenient alias for boost::asio::ip::tcp::socket * \ingroup core-streams */ using ip_stream = basic_socket_stream<boost::asio::ip::tcp::socket>; // }}} // {{{ local_stream #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) /** * \brief Convenient alias for boost::asio::local::stream_protocol::socket * \ingroup core-streams */ using local_stream = basic_socket_stream<boost::asio::local::stream_protocol::socket>; #endif // !BOOST_ASIO_HAS_LOCAL_SOCKETS // }}} // {{{ tls_stream #if defined(IRCCD_HAVE_SSL) /** * \brief TLS/SSL streams. * \ingroup core-streams * \tparam Socket the Boost.Asio compatible socket. */ template <typename Socket> class tls_stream : public basic_socket_stream<boost::asio::ssl::stream<Socket>> { private: std::shared_ptr<boost::asio::ssl::context> context_; using recv_handler = stream::recv_handler; using send_handler = stream::send_handler; public: /** * Constructor. * * \param service the I/O service * \param ctx the shared context */ tls_stream(boost::asio::io_context& service, std::shared_ptr<boost::asio::ssl::context> ctx); }; template <typename Socket> inline tls_stream<Socket>::tls_stream(boost::asio::io_context& service, std::shared_ptr<boost::asio::ssl::context> ctx) : basic_socket_stream<boost::asio::ssl::stream<Socket>>(service, *ctx) , context_(std::move(ctx)) { } /** * \brief Convenient alias. */ using tls_ip_stream = tls_stream<boost::asio::ip::tcp::socket>; #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) /** * \brief Convenient alias. */ using tls_local_stream = tls_stream<boost::asio::local::stream_protocol::socket>; #endif // !BOOST_ASIO_HAS_LOCAL_SOCKETS #endif // !IRCCD_HAVE_SSL // }}} } // !irccd #endif // !IRCCD_STREAM_HPP