Mercurial > code
changeset 452:7781bb45e749
Socket: experimental Stream(Server|Client)
author | David Demelier <markand@malikania.fr> |
---|---|
date | Mon, 02 Nov 2015 14:04:26 +0100 |
parents | 902b034df6e3 |
children | 2d95f0c8fd1d |
files | C++/modules/Socket/Sockets.h |
diffstat | 1 files changed, 410 insertions(+), 59 deletions(-) [+] |
line wrap: on
line diff
--- a/C++/modules/Socket/Sockets.h Mon Nov 02 13:46:56 2015 +0100 +++ b/C++/modules/Socket/Sockets.h Mon Nov 02 14:04:26 2015 +0100 @@ -2161,7 +2161,7 @@ inline void clear() { while (!m_table.empty()) { - remove(*m_table.begin()); + remove(m_table.begin()->first); } } @@ -2225,94 +2225,221 @@ /* }}} */ -#if 0 - -/* - * High level helpers coming soon. +/** + * @class Callback + * @brief Convenient signal owner that checks if the target is valid. + * + * This class also catch all errors thrown from signals to avoid interfering with our process. */ - -template <typename Address, typename Type> -class StreamServer; - +template <typename... Args> +class Callback : public std::function<void (Args...)> { +public: + /** + * Inherited constructors. + */ + using std::function<void (Args...)>::function; + + /** + * Execute the callback only if a target is set. + */ + void operator()(Args... args) const + { + if (*this) { + try { + std::function<void (Args...)>::operator()(args...); + } catch (...) { + } + } + } +}; + +/** + * @class StreamConnection + * @brief Connected client on the server side. + * + * This object is created from StreamServer when a new client is connected, it is the higher + * level object of sockets and completely asynchronous. + */ template <typename Address, typename Type> class StreamConnection { +public: + using WriteHandler = Callback<>; + private: + /* Signals */ + WriteHandler m_onWrite; + + /* Sockets and output buffer */ Socket<Address, Type> m_socket; std::string m_output; public: + /** + * Create the connection. + * + * @param s the socket + */ StreamConnection(Socket<Address, Type> s) : m_socket{std::move(s)} { + m_socket.setBlockMode(false); } + /** + * Access the underlying socket. + * + * @return the socket + * @warning use with care + */ inline Socket<Address, Type> &socket() noexcept { return m_socket; } - const std::string &output() const noexcept + /** + * Access the current output. + * + * @return the output + */ + inline const std::string &output() const noexcept + { + return m_output; + } + + /** + * Overloaded function + * + * @return the output + * @warning use with care, avoid modifying the output if you don't know what you're doing + */ + inline std::string &output() noexcept { return m_output; } + + /** + * Post some data to be sent asynchronously. + * + * @param str the data to append + */ + inline void send(std::string str) + { + m_output += str; + m_onWrite(); + } + + /** + * Kill the client. + */ + inline void close() + { + m_socket.close(); + } + + /** + * Set the write handler, the signal is emitted when the output has changed so that the StreamServer owner + * knows that there are some data to send. + * + * @param handler the handler + * @warning you usually never need to set this yourself + */ + inline void setWriteHandler(WriteHandler handler) + { + m_onWrite = std::move(handler); + } }; +/** + * @class StreamServer + * @brief Convenient stream server for TCP and TLS. + * + * This class does all the things for you as accepting new clients, listening for it and sending data. It works + * asynchronously without blocking to let you control your process workflow. + */ template <typename Address, typename Type> class StreamServer { +public: + using ConnectionHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &>; + using DisconnectionHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &>; + using ReadHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &, const std::string &>; + using WriteHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &, const std::string &>; + private: + using ClientMap = std::map<Handle, std::shared_ptr<StreamConnection<Address, Type>>>; + + /* Signals */ + ConnectionHandler m_onConnection; + DisconnectionHandler m_onDisconnection; + ReadHandler m_onRead; + WriteHandler m_onWrite; + + /* Sockets */ Socket<Address, Type> m_master; Listener<> m_listener; - std::map<Handle, StreamConnection<Address, Type>> m_clients; - - /* Callbacks */ - template <typename... Args> - using Callbacks = std::vector<std::function<void (Args...)>>; - - Callbacks<StreamConnection<Address, Type> &> m_onConnection; - Callbacks<StreamConnection<Address, Type> &> m_onDisconnection; - Callbacks<StreamConnection<Address, Type> &, const std::string &> m_onRead; - Callbacks<StreamConnection<Address, Type> &, const std::string &> m_onWrite; - - template <typename List, typename... Args> - void notify(const List &list, Args&&... args) - { - for (const auto &f : list) { - f(std::forward<Args>(args)...); - } - } + ClientMap m_clients; void processAccept() { - StreamConnection<Address, Type> connection{m_master.accept()}; + std::shared_ptr<StreamConnection<Address, Type>> connection = std::make_shared<StreamConnection<Address, Type>>(m_master.accept()); + std::weak_ptr<StreamConnection<Address, Type>> ptr{connection}; + + /* Update the listener write flag when output has changed */ + connection->setWriteHandler([this, ptr] () { + auto connection = ptr.lock(); + + if (connection && !connection->output().empty()) { + m_listener.set(connection->socket().handle(), FlagWrite); + } + }); /* Notify we have a new client */ - notify(m_onConnection, connection); - - m_listener.set(connection.socket().handle(), FlagRead); - m_clients.emplace(connection.socket().handle(), std::move(connection)); + m_onConnection(connection); + + m_listener.set(connection->socket().handle(), FlagRead); + m_clients.emplace(connection->socket().handle(), std::move(connection)); } void processSync(const ListenerStatus &status) { - auto &connection = m_clients.at(status.socket); + auto connection = m_clients.at(status.socket); + auto remove = false; try { if (status.flags & FlagRead) { - auto buffer = connection.socket().recv(512); + auto buffer = connection->socket().recv(512); /* Empty mean normal disconnection */ if (buffer.empty()) { - notify(m_onDisconnection, connection); - m_listener.remove(status.socket); - m_clients.erase(status.socket); + remove = true; } else { - notify(m_onRead, connection, buffer); + m_onRead(connection, buffer); } } else if (status.flags & FlagWrite) { - /* Process write */ + auto &output = connection->output(); + auto nsent = connection->socket().send(output); + + /* 1. Create a copy of content that has been sent */ + auto sent = output.substr(0, nsent); + + /* 2. Erase the content sent */ + output.erase(0, nsent); + + /* 3. Update listener */ + if (output.empty()) { + m_listener.unset(connection->socket().handle(), FlagWrite); + } + + /* 4. Notify user */ + m_onWrite(connection, sent); } } catch (const std::exception &ex) { - /* TODO: remove */ + remove = true; + } + + if (remove) { + m_onDisconnection(connection); + m_listener.remove(status.socket); + m_clients.erase(status.socket); } } @@ -2324,8 +2451,7 @@ * @param max the max number to listen * @throw Error on errors */ - template <typename RealAddress> - StreamServer(const RealAddress &address, int max = 128) + StreamServer(const Address &address, int max = 128) : m_master{address.domain(), SOCK_STREAM, 0} { m_master.set(SOL_SOCKET, SO_REUSEADDR, 1); @@ -2334,31 +2460,53 @@ m_listener.set(m_master.handle(), FlagRead); } - template <typename Func> - inline void addConnectionHandler(Func &&f) + /** + * Set the connection handler, called when a new client is connected. + * + * @param handler the handler + */ + inline void setConnectionHandler(ConnectionHandler handler) { - m_onConnection.push_back(std::forward<Func>(f)); + m_onConnection = std::move(handler); } - template <typename Func> - inline void addDisconnectionHandler(Func &&f) + /** + * Set the disconnection handler, called when a client died. + * + * @param handler the handler + */ + inline void setDisconnectionHandler(DisconnectionHandler handler) { - m_onDisconnection.push_back(std::forward<Func>(f)); + m_onDisconnection = std::move(handler); } - template <typename Func> - inline void addReadHandler(Func &&f) + /** + * Set the receive handler, called when a client has sent something. + * + * @param handler the handler + */ + inline void setReadHandler(ReadHandler handler) { - m_onRead.push_back(std::forward<Func>(f)); + m_onRead = std::move(handler); } - template <typename Func> - inline void addWriteHandler(Func &&f) + /** + * Set the writing handler, called when some data has been sent to a client. + * + * @param handler the handler + */ + inline void setWriteHandler(WriteHandler handler) { - m_onWrite.push_back(std::forward<Func>(f)); + m_onWrite = std::move(handler); } - void poll(int timeout) + /** + * Poll for the next event. + * + * @param timeout the timeout (-1 for indefinitely) + * @throw Error on errors + */ + void poll(int timeout = -1) { auto st = m_listener.wait(timeout); @@ -2369,13 +2517,216 @@ } } - inline int wait() + /** + * Run forever. + * + * @throw Error on errors + */ + void run() { - poll(-1); + for (;;) { + poll(); + } } }; -#endif +/** + * @class StreamClient + * @brief Client side connection to a server. + */ +template <typename Address, typename Type> +class StreamClient { +public: + using ConnectionHandler = Callback<>; + using ReadHandler = Callback<const std::string &>; + using WriteHandler = Callback<const std::string &>; + using DisconnectionHandler = Callback<>; + +private: + /* Signals */ + ConnectionHandler m_onConnection; + ReadHandler m_onRead; + WriteHandler m_onWrite; + DisconnectionHandler m_onDisconnection; + + /* Socket */ + Socket<Address, Type> m_socket; + Listener<> m_listener; + + /* Connection status and output buffer */ + bool m_connected{false}; + std::string m_output; + + void processConnect() + { + /* 1. Remove from listener the write flag */ + m_listener.unset(m_socket.handle(), FlagWrite); + + /* 2. Check for an error */ + if (m_socket.template get<int>(SOL_SOCKET, SO_ERROR) == Failure) { + // TODO: ????? m_onDisconnection(); + } + + /* 3. Listen for input */ + m_listener.set(m_socket.handle(), FlagRead); + + /* 4. Notify */ + m_connected = true; + m_onConnection(); + } + + void processSync(int flags) + { + try { + if (flags & FlagRead) { + m_onRead(m_socket.recv(512)); + } else { + /* 1. Make a copy of what has been sent */ + auto nsent = m_socket.send(m_output); + auto sent = m_output.substr(0, nsent); + + /* 2. Erase sent content */ + m_output.erase(0, nsent); + + /* 3. Update flags if needed */ + if (m_output.empty()) { + m_listener.unset(m_socket.handle(), FlagWrite); + } + + /* 4. Notify user */ + m_onWrite(sent); + } + } catch (const std::exception &ex) { + m_listener.remove(m_socket.handle()); + m_connected = false; + } + } + +public: + /** + * Create a client. The client is automatically marked as non-blocking. + * + * @param address the optional address + * @param type the type (Tcp or Tls) + * @throw net::Error on failures + */ + StreamClient(const Address &address = {}, Type type = {}) + : m_socket{address, std::move(type)} + { + m_socket.setBlockMode(false); + } + + /** + * Set the connection handler, called when the connection succeed. + * + * @param handler the handler + */ + inline void setConnectionHandler(ConnectionHandler handler) + { + m_onConnection = std::move(handler); + } + + /** + * Set the disconnection handler, called when the server closed the connection. + * + * @param handler the handler + */ + inline void setDisconnectionHandler(DisconnectionHandler handler) + { + m_onDisconnection = std::move(handler); + } + + /** + * Set the read handler, called when we received something. + * + * @param handler the handler + */ + inline void setReadHandler(ReadHandler handler) + { + m_onRead = std::move(handler); + } + + /** + * Set the write handler, called when we successfully sent data. + * + * @param handler the handler + */ + inline void setWriteHandler(WriteHandler handler) + { + m_onWrite = std::move(handler); + } + + /** + * Connect to a server, this function may connect immediately or not in any case the connection handler + * will be called when the connection completed. + * + * @param address the address to connect to + * @throw Error on failures + */ + void connect(const Address &address) + { + if (m_connected) { + return; + } + + try { + m_socket.connect(address); + m_connected = true; + m_onConnection(); + } catch (const Error &error) { + if (error.code() == Error::WouldBlockRead) { + m_listener.set(m_socket.handle(), FlagRead); + } else if (error.code() == Error::WouldBlockWrite) { + m_listener.set(m_socket.handle(), FlagWrite); + } else { + throw; + } + } + } + + /** + * Asynchronously send data to the server. + * + * @param str the data to append + */ + void send(std::string str) + { + m_output += str; + + if (!m_output.empty()) { + m_listener.set(m_socket.handle(), FlagWrite); + } + } + + /** + * Wait for the next event. + * + * @param timeout the time to wait in milliseconds + * @throw Error on errors + */ + void poll(int timeout = -1) + { + auto st = m_listener.wait(timeout); + + if (!m_connected) { + processConnect(); + } else { + processSync(st.flags); + } + } + + /** + * Run indefinitely. + * + * @throw Error on errors + */ + void run() + { + for (;;) { + poll(); + } + } +}; } // !net