Mercurial > code
changeset 463:214f03b47d4e
Socket:
- New action() and condition() function to check for pending events,
- More documentation,
- StreamServer, StreamClient can now complete recv/send operation for OpenSSL.
author | David Demelier <markand@malikania.fr> |
---|---|
date | Wed, 04 Nov 2015 18:01:22 +0100 |
parents | 9d53c536372e |
children | 61a6f3518c55 |
files | C++/examples/Socket/stream-client.cpp C++/examples/Socket/stream-server.cpp C++/examples/Socket/test.crt C++/examples/Socket/test.key C++/modules/Socket/Sockets.cpp C++/modules/Socket/Sockets.h |
diffstat | 6 files changed, 760 insertions(+), 223 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/C++/examples/Socket/stream-client.cpp Wed Nov 04 18:01:22 2015 +0100 @@ -0,0 +1,66 @@ +/* + * stream-server -- example of stream server + * + * Options: + * - WITH_PORT (int) which port to use (default: 12000) + * - WITH_HOST (string literal) which host to connect (default: "localhost") + * - WITH_SSL (bool) true to use SSL (default: false) + */ + +#include <iostream> + +#include "Sockets.h" + +#if !defined(WITH_PORT) +# define WITH_PORT 12000 +#endif + +#if !defined(WITH_HOST) +# define WITH_HOST "localhost" +#endif + +#if defined(WITH_SSL) +using Client = net::StreamClient<net::Ipv4, net::Tls>; +#else +using Client = net::StreamClient<net::Ipv4, net::Tcp>; +#endif + +int main() +{ + Client client; + + /* + * Unfortunately at the moment the socket state is not changed, this will be done + * in the future. + */ + bool connected{true}; + + client.setConnectionHandler([&] () { + std::cout << "client: successfully connected" << std::endl; + client.send("Hello world!"); + }); + client.setDisconnectionHandler([&] () { + std::cout << "client: disconnected" << std::endl; + connected = false; + }); + client.setErrorHandler([&] (const net::Error &error) { + std::cout << "client: error: " << error.function() << ": " << error.what() << std::endl; + connected = false; + }); + client.setReadHandler([] (const std::string &data) { + std::cout << "client: received: " << data << std::endl; + }); + client.setWriteHandler([] (const std::string &data) { + std::cout << "client: sent: " << data << std::endl; + }); + + client.connect(net::Ipv4{WITH_HOST, WITH_PORT}); + + while (connected) { + client.poll(); + } + + std::cout << "client: exiting" << std::endl; + + return 0; +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/C++/examples/Socket/stream-server.cpp Wed Nov 04 18:01:22 2015 +0100 @@ -0,0 +1,52 @@ +/* + * stream-server -- example of stream server + * + * Options: + * - WITH_PORT (int) which port to use (default: 12000) + * - WITH_SSL (bool) true to use SSL (default: false) + */ + +#include <iostream> + +#include "Sockets.h" + +#if !defined(WITH_PORT) +# define WITH_PORT 12000 +#endif + +#if defined(WITH_SSL) +using Server = net::StreamServer<net::Ipv4, net::Tls>; +using Connection = net::StreamConnection<net::Ipv4, net::Tls>; +#else +using Server = net::StreamServer<net::Ipv4, net::Tcp>; +using Connection = net::StreamConnection<net::Ipv4, net::Tcp>; +#endif + +int main() +{ +#if defined(WITH_SSL) + Server server{net::Ipv4{"*", WITH_PORT}, net::Tls{net::Tls::Tlsv1, false, "test.key", "test.crt"}}; +#else + Server server{net::Ipv4{"*", WITH_PORT}}; +#endif + + server.setConnectionHandler([] (const std::shared_ptr<Connection> &client) { + std::cout << "server: new client connected" << std::endl; + client->send("Welcome to our server dude"); + }); + server.setReadHandler([] (const std::shared_ptr<Connection> &, const std::string &buffer) { + std::cout << "server: received: " << buffer << std::endl; + }); + server.setWriteHandler([] (const std::shared_ptr<Connection> &, const std::string &buffer) { + std::cout << "server: sent: " << buffer << std::endl; + }); + server.setDisconnectionHandler([] (const std::shared_ptr<Connection> &) { + std::cout << "server: client disconnected" << std::endl; + }); + + while (true) { + server.poll(-1); + } + + return 0; +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/C++/examples/Socket/test.crt Wed Nov 04 18:01:22 2015 +0100 @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICITCCAYoCCQCGm4grkVCohjANBgkqhkiG9w0BAQsFADBVMQswCQYDVQQGEwJG +UjEPMA0GA1UECAwGRnJhbmNlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0 +eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEwMjYyMDM0NThaFw0yNTEw +MjMyMDM0NThaMFUxCzAJBgNVBAYTAkZSMQ8wDQYDVQQIDAZGcmFuY2UxITAfBgNV +BAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0 +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDp13OqVyOWyv5QWD4xr+Duw6SZ +gU7D5huzsAOcneSI6JUhf+7Ecu6BQ2JGkFn4srIVkMWGQuImETJ8JCpSQH7rk+xO +L9fTTK+TwhP2hW/Rf/b2gWedhJAS+gilqt4JNT7v2wFv+aTtRt/lpTXVSdtpLa/m +Pdy219f6MAPgODJ/7QIDAQABMA0GCSqGSIb3DQEBCwUAA4GBAJSnn/IBn1ZblfzP +rJO/lE1Jwpmx3B7+oR/e4fkZd6JR3s06umGYWr2H+TPl/5dj9x0gPokhoIL9zCGq +SxCPnOeaxjBkw7yh3Ks6m3xKxmK4aMpAtBHtwmbfQyIcgz71/lfCzbJ3WcKpn1ig +IZbByt5QSSPcFORRzJJa35eHBdfX +-----END CERTIFICATE-----
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/C++/examples/Socket/test.key Wed Nov 04 18:01:22 2015 +0100 @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQDp13OqVyOWyv5QWD4xr+Duw6SZgU7D5huzsAOcneSI6JUhf+7E +cu6BQ2JGkFn4srIVkMWGQuImETJ8JCpSQH7rk+xOL9fTTK+TwhP2hW/Rf/b2gWed +hJAS+gilqt4JNT7v2wFv+aTtRt/lpTXVSdtpLa/mPdy219f6MAPgODJ/7QIDAQAB +AoGBANDt4ndQkgi56A1rOm50gVlzTg6lPPXFE/0xB5kYbcdxX0VmI7Q8KCMwTI9V +jD2rk3e3OPSjr6FpfhzyxylkXMBz2BL5NRNPowCJbiMgZOUIzlcWPKo0tgf1bZJx +YdB5U003ISGPPBjVOAjyizY7tJnaNvbpLQ0hbIAsvHPEAOnBAkEA9r3g8NQjPrvb +oIr5SMIxM8HDJ1/q+MEBSFtRFzQpmur6P64Jsu96zCyencUYTxs0L/sottrj6dPC +vjGCc6PjsQJBAPKdqK1knJv6Y95M2bnEwrymCFVdxCi7AxObStB+bg/+7mMCUqqX +j2g71bfvhYakHV7CiaYrrORChwj6vTbimv0CQGpd2IZ5LOhyW2+N+YDgFg3Vzac/ +ti+eJEto8kAqgHUELvUctZmpmypBYe9pc91GQO0ePKL3IaE/ZIhRF4d6c0ECQH9A +XiaD7PiKvjLs0A31u8ZCt4A+7BII9LYl73mntobBSbu4ji9Xyyn6qEAPa1ORZK49 +DwGPSuF2W2lESlYtSOkCQGrtczhx3IyJjk5e2Y1i/UddPKjviAysCSzcW6aVTNr9 +Y2L0sWmva2FKnkl9FDuEqxvmGr6OOkr5Ll7aWLzJri8= +-----END RSA PRIVATE KEY-----
--- a/C++/modules/Socket/Sockets.cpp Wed Nov 04 10:50:23 2015 +0100 +++ b/C++/modules/Socket/Sockets.cpp Wed Nov 04 18:01:22 2015 +0100 @@ -217,7 +217,7 @@ } } -Ip::Ip(const struct sockaddr_storage *ss, socklen_t length) +Ip::Ip(const sockaddr_storage *ss, socklen_t length) : m_length{length} , m_domain{ss->ss_family} { @@ -370,7 +370,7 @@ if (add) { m_fds.push_back(pollfd{h, topoll(flags), 0}); } else { - auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const struct pollfd &pfd) { + auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const pollfd &pfd) { return pfd.fd == h; }); @@ -380,7 +380,7 @@ void Poll::unset(const ListenerTable &, Handle h, int flags, bool remove) { - auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const struct pollfd &pfd) { + auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const pollfd &pfd) { return pfd.fd == h; }); @@ -449,9 +449,9 @@ void Epoll::update(Handle h, int op, int flags) { - struct epoll_event ev; + epoll_event ev; - std::memset(&ev, 0, sizeof (struct epoll_event)); + std::memset(&ev, 0, sizeof (epoll_event)); ev.events = flags; ev.data.fd = h; @@ -546,7 +546,7 @@ void Kqueue::update(Handle h, int filter, int flags) { - struct kevent ev; + kevent ev; EV_SET(&ev, h, filter, flags, 0, 0, nullptr);
--- a/C++/modules/Socket/Sockets.h Wed Nov 04 10:50:23 2015 +0100 +++ b/C++/modules/Socket/Sockets.h Wed Nov 04 18:01:22 2015 +0100 @@ -23,7 +23,7 @@ * @file Sockets.h * @brief Portable socket abstraction * - * This file is a portable network library. + * This file is a portable networking library. * * ### User definable options * @@ -313,8 +313,6 @@ * @brief Which kind of error */ enum Code { - WouldBlockRead, ///!< The operation would block for reading - WouldBlockWrite, ///!< The operation would block for writing Timeout, ///!< The action did timeout System, ///!< There is a system error Other ///!< Other custom error @@ -396,22 +394,55 @@ /** * @enum State - * @brief Current socket state + * @brief Current socket state. */ enum class State { Open, //!< Socket is open Bound, //!< Socket is bound to an address - ConnectingRead, //!< Connection is in progress but requires to be readable - ConnectingWrite, //!< Connection is in progress but requires to be writable + Connecting, //!< The connection is in progress Connected, //!< Connection is complete - AcceptingRead, //!< The socket is being accepted but requires to be readable - AcceptingWrite, //!< The socket is being accepted but requires to be writable Accepted, //!< Socket has been accepted (client) + Accepting, //!< The client acceptation is in progress Closed, //!< The socket has been closed }; /* }}} */ +/** + * @enum Action + * @brief Define the current operation that must complete. + * + * Some operations like accept, connect, recv or send must sometimes do several round-trips to complete and the socket + * action is set with that enumeration. The user is responsible of calling accept, connect send or recv until the + * operation is complete. + * + * Note: the user must wait for the appropriate condition in Socket::condition to check if the required condition is + * met. + * + * It is important to complete the operation in the correct order because protocols like Tls may require to continue + * re-negociating when calling Socket::send or Socket::Recv. + */ +enum class Action { + None, //!< No action is required, socket is ready + Accept, //!< The socket is not yet accepted, caller must call accept() again + Connect, //!< The socket is not yet connected, caller must call connect() again + Receive, //!< The received operation has not succeeded yet, caller must call recv() or recvfrom() again + Send //!< The send operation has not succeded yet, caller must call send() or sendto() again +}; + +/** + * @enum Condition + * @brief Define the required condition for the socket. + * + * As explained in Action enumeration, some operations required to be called several times, before calling these + * operations, the user must wait the socket to be readable or writable. This can be checked with Socket::condition. + */ +enum class Condition { + None, //!< No condition is required + Readable, //!< The socket must be readable + Writable //!< The socket must be writable +}; + /* * Base Socket class * ------------------------------------------------------------------ @@ -424,13 +455,15 @@ /** * @class Socket - * @brief Base socket class for socket operations + * @brief Base socket class for socket operations. */ template <typename Address, typename Type> class Socket { private: Type m_type; State m_state{State::Closed}; + Action m_action{Action::None}; + Condition m_condition{Condition::None}; protected: /** @@ -450,6 +483,8 @@ * @param protocol the protocol * @param iface the implementation * @throw Error on failures + * @post state is set to Open + * @post handle is not set to Invalid */ Socket(int domain, int type, int protocol, Type iface = Type{}) : m_type(std::move(iface)) @@ -465,7 +500,10 @@ m_type.create(*this); m_state = State::Open; + + assert(m_handle != Invalid); } + /** * This tries to create a socket. * @@ -485,12 +523,16 @@ * @param handle the native descriptor * @param state specify the socket state * @param type the type of socket implementation + * @post action is set to None + * @post condition is set to None */ explicit inline Socket(Handle handle, State state = State::Closed, Type type = Type{}) noexcept : m_type(std::move(type)) , m_state{state} , m_handle{handle} { + assert(m_action == Action::None); + assert(m_condition == Condition::None); } /** @@ -506,11 +548,15 @@ inline Socket(Socket &&other) noexcept : m_type(std::move(other.m_type)) , m_state{other.m_state} + , m_action{other.m_action} + , m_condition{other.m_condition} , m_handle{other.m_handle} { /* Invalidate other */ - other.m_handle = -1; + other.m_handle = Invalid; other.m_state = State::Closed; + other.m_action = Action::None; + other.m_condition = Condition::None; } /** @@ -564,6 +610,49 @@ } /** + * Get the pending operation. + * + * @return the action to complete before continuing + * @note usually only needed in non-blocking sockets + */ + inline Action action() const noexcept + { + return m_action; + } + + /** + * Change the pending operation. + * + * @param action the action + * @warning you should not call this function yourself + */ + inline void setAction(Action action) noexcept + { + m_action = action; + } + + /** + * Get the condition to wait for. + * + * @return the condition + */ + inline Condition condition() const noexcept + { + return m_condition; + } + + /** + * Change the condition required. + * + * @param condition the condition + * @warning you should not call this function yourself + */ + inline void setCondition(Condition condition) noexcept + { + m_condition = condition; + } + + /** * Set an option for the socket. * * @param level the setting level @@ -572,14 +661,11 @@ * @throw Error on error */ template <typename Argument> - inline void set(int level, int name, const Argument &arg) + void set(int level, int name, const Argument &arg) { -#if defined(_WIN32) - if (setsockopt(m_handle, level, name, (ConstArg)&arg, sizeof (arg)) == Failure) -#else - if (setsockopt(m_handle, level, name, (ConstArg)&arg, sizeof (arg)) < 0) -#endif + if (setsockopt(m_handle, level, name, (ConstArg)&arg, sizeof (arg)) == Failure) { throw Error{Error::System, "set"}; + } } /** @@ -590,17 +676,14 @@ * @throw Error on error */ template <typename Argument> - inline Argument get(int level, int name) + Argument get(int level, int name) { Argument desired, result{}; socklen_t size = sizeof (result); -#if defined(_WIN32) - if (getsockopt(m_handle, level, name, (Arg)&desired, &size) == Failure) -#else - if (getsockopt(m_handle, level, name, (Arg)&desired, &size) < 0) -#endif + if (getsockopt(m_handle, level, name, (Arg)&desired, &size) == Failure) { throw Error{Error::System, "get"}; + } std::memcpy(&result, &desired, size); @@ -657,7 +740,8 @@ * * @param address the address * @param length the size - * @pre state() must not be State::Bound + * @pre state must not be Bound + * @throw Error on errors */ void bind(const sockaddr *address, socklen_t length) { @@ -685,7 +769,8 @@ * Listen for pending connection. * * @param max the maximum number - * @pre state() must be Bound + * @pre state must be Bound + * @throw Error on errors */ inline void listen(int max = 128) { @@ -697,27 +782,37 @@ } /** - * Connect to the connection to the specified address. If the socket is marked non-blocking and the - * connection cannot be established, the state is set to ConnectingRead or ConnectingWrite depending - * on the underlying implementation. The user is then responsible to wait that the socket is writable - * or readable using a listener and then call the overloaded connect() function which takes 0 argument. + * Connect to the address. + * + * On non-blocking socket, if the connection cannot be established immediately then state is set to + * State::Connecting, action is set to Action::Connect and condition is set to the required condition. * + * User is then responsible of waiting for the condition to be checked and call the connect() overload + * which takes 0 argument. + * + * @pre state must be State::Open * @param address the address - * @param length the address length - * @pre state() must be State::Open + * @param length the the address length + * @post if state is Connecting, action and condition are defined * @throw Error on errors */ void connect(const sockaddr *address, socklen_t length) { assert(m_state == State::Open); + m_action = Action::None; + m_condition = Condition::None; + m_type.connect(*this, address, length); + + assert((m_state == State::Connected && m_action == Action::None && m_condition == Condition::None) || + (m_state == State::Connecting && m_action == Action::Connect && m_condition != Condition::None)); } /** * Overloaded function. * - * Calls connect(address.address(), address.length()); + * Effectively call connect(address.address(), address.length()); * * @param address the address */ @@ -730,33 +825,45 @@ * Continue the connection, only required with non-blocking sockets. Just like the initial connect() call, * this function can still be in progress. * - * @pre state() must be State::ConnectingRead or State::ConnectingWrite + * @pre state must be State::Connecting * @throw Error on errors + * @post if connection is completed, state is set to State::Connected + * @post if connection is still in progress, condition is set */ - inline void connect() + void connect() { - assert(m_state == State::ConnectingRead || m_state == State::ConnectingWrite); + assert(m_state == State::Connecting); + + m_action = Action::None; + m_condition = Condition::None; m_type.connect(*this); + + assert((m_state == State::Connected && m_action == Action::None && m_condition == Condition::None) || + (m_state == State::Connecting && m_action == Action::Connect && m_condition != Condition::None)); } /** - * Accept a pending connection. If the socket is marked non-blocking and has no pending connection then an - * error is thrown with the WouldBlockRead code. + * Accept a new client. If there are no pending connection, throws an error. * - * If the client is accepted but not yet ready, its state may be set to AcceptingRead or AcceptingWrite - * and the user is responsible of waiting the socket to be readable or writable and then call the - * overloaded accept() (with 0 arguments) until the connection is complete. + * On non-blocking sockets, the client may not be completely accepted yet, if it's the case, the returned + * socket state is set to State::Accepting and user is responsible of calling accept() on it when the required + * condition is met. * + * @pre state must be State::Bound * @param info the address where to store client's information (optional) * @return the new socket - * @pre state() must be State::Bound * @throw Error on errors + * @post the client socket state is either set to State::Accepting or State::Accepted + * @post if the client state is set to State::Accepting, action and condition are defined */ - inline Socket<Address, Type> accept(Address *info) + Socket<Address, Type> accept(Address *info) { assert(m_state == State::Bound); + m_action = Action::None; + m_condition = Condition::None; + sockaddr_storage storage; socklen_t length = sizeof (storage); @@ -766,20 +873,42 @@ *info = Address{&storage, length}; } + /* Master do not change */ + assert(m_state == State::Bound); + assert(m_action == Action::None); + assert(m_condition == Condition::None); + + /* Client */ + assert( + (sc.state() == State::Accepting && sc.action() == Action::Accept && sc.condition() != Condition::None) || + (sc.state() == State::Accepted && sc.action() == Action::None && sc.condition() == Condition::None) + ); + return sc; } /** - * Continue the accept process on this client. + * Continue the accept process on this client. This function must be called only when the socket is + * ready to be readable or writable! (see condition). * - * @pre state() must be State::AcceptingRead or State::AcceptingWrite + * @pre state must be State::Accepting * @throw Error on errors + * @post if connection is complete, state is changed to State::Accepted, action and condition are unset + * @post if connection is still in progress, condition is set */ - inline void accept() + void accept() { - assert(m_state == State::AcceptingRead || m_state == State::AcceptingWrite); + assert(m_state == State::Accepting); + + m_action = Action::None; + m_condition = Condition::None; m_type.accept(*this); + + assert( + (m_state == State::Accepting && m_action == Action::Accept && m_condition != Condition::None) || + (m_state == State::Accepted && m_action == Action::None && m_condition == Condition::None) + ); } /** @@ -806,13 +935,31 @@ /** * Receive some data. * + * If the operation cannot be complete immediately, 0 is returned, action() is set to Action::Receive and + * condition is set to Condition::Readableable or Condition::Writable, the user then must wait for the + * appropriate condition and repeat the recv() call. + * + * If action() is set to None and result is set to 0, disconnection occured. + * * @param data the destination buffer * @param length the buffer length + * @pre action() must not be Action::Send + * @return the number of bytes received or 0 * @throw Error on error */ - inline unsigned recv(void *data, unsigned length) + unsigned recv(void *data, unsigned length) { - return m_type.recv(*this, data, length); + assert(m_action != Action::Send); + + m_action = Action::None; + m_condition = Condition::None; + + unsigned nbread = m_type.recv(*this, data, length); + + assert((m_action == Action::None && m_condition == Condition::None) || + (m_action != Action::Receive && m_condition != Condition::None)); + + return nbread; } /** @@ -834,15 +981,29 @@ } /** - * Send some data. + * Send some data. Just like recv(), the operation may not succeed immediately and requires to be + * repeated. * * @param data the data buffer * @param length the buffer length + * @return the number of bytes sent or 0 + * @pre action() must not be Flag::Receive * @throw Error on error + * @see recv */ - inline unsigned send(const void *data, unsigned length) + unsigned send(const void *data, unsigned length) { - return m_type.send(*this, data, length); + assert(m_action != Action::Receive); + + m_action = Action::None; + m_condition = Condition::None; + + unsigned nbsent = m_type.send(*this, data, length); + + assert((m_action == Action::None && m_condition == Condition::None) || + (m_action != Action::Send && m_condition != Condition::None)); + + return nbsent; } /** @@ -857,6 +1018,8 @@ return send(data.c_str(), data.size()); } +#if 0 + /** * Send data to an end point. * @@ -919,6 +1082,8 @@ return result; } +#endif + /** * Close the socket. * @@ -933,8 +1098,11 @@ ::close(m_handle); #endif m_handle = Invalid; - m_state = State::Closed; } + + m_state = State::Closed; + m_action = Action::None; + m_condition = Condition::None; } /** @@ -956,10 +1124,14 @@ m_handle = other.m_handle; m_type = std::move(other.m_type); m_state = other.m_state; + m_action = other.m_action; + m_condition = other.m_condition; /* Invalidate other */ other.m_handle = Invalid; other.m_state = State::Closed; + other.m_action = Action::None; + other.m_conditon = Condition::None; return *this; } @@ -1062,9 +1234,6 @@ */ class Ip { private: - friend class Ipv6; - friend class Ipv4; - union { sockaddr_in m_sin; sockaddr_in6 m_sin6; @@ -1097,7 +1266,7 @@ * @param ss the storage * @param length the length */ - Ip(const struct sockaddr_storage *ss, socklen_t length); + Ip(const sockaddr_storage *ss, socklen_t length); /** * Get the domain (AF_INET or AF_INET6). @@ -1180,7 +1349,7 @@ * @param ss the storage * @param length the length */ - inline Ipv6(const struct sockaddr_storage *ss, socklen_t length) + inline Ipv6(const sockaddr_storage *ss, socklen_t length) : Ip{ss, length} { } @@ -1218,7 +1387,7 @@ * @param ss the storage * @param length the length */ - inline Ipv4(const struct sockaddr_storage *ss, socklen_t length) + inline Ipv4(const sockaddr_storage *ss, socklen_t length) : Ip{ss, length} { } @@ -1355,13 +1524,17 @@ int error = WSAGetLastError(); if (error == WSAEWOULDBLOCK) { - sc.setState(State::ConnectingWrite); + sc.setState(State::Connecting); + sc.setAction(Action::Connect); + sc.setCondition(Condition::Writable); } else { throw Error{Error::System, "connect", error}; } #else if (errno == EINPROGRESS) { - sc.setState(State::ConnectingWrite); + sc.setState(State::Connecting); + sc.setAction(Action::Connect); + sc.setCondition(Condition::Writable); } else { throw Error{Error::System, "connect"}; } @@ -1403,21 +1576,7 @@ Handle handle = ::accept(sc.handle(), address, length); if (handle == Invalid) { -#if defined(_WIN32) - int error = WSAGetLastError(); - - if (error == WSAEWOULDBLOCK) { - throw Error{Error::WouldBlockRead, "accept", error}; - } - - throw Error{Error::System, "accept", error}; -#else - if (errno == EAGAIN || errno == EWOULDBLOCK) { - throw Error{Error::WouldBlockRead, "accept"}; - } - throw Error{Error::System, "accept"}; -#endif } return Socket<Address, Type>{handle, State::Accepted}; @@ -1444,24 +1603,25 @@ template <typename Address> unsigned recv(Socket<Address, Tcp> &sc, void *data, unsigned length) { - int nbread; - - nbread = ::recv(sc.handle(), (Arg)data, length, 0); + int nbread = ::recv(sc.handle(), (Arg)data, length, 0); + if (nbread == Failure) { #if defined(_WIN32) int error = WSAGetLastError(); if (error == WSAEWOULDBLOCK) { - throw Error{Error::WouldBlockRead, "recv", error}; + sc.setAction(Action::Receive); + sc.setCondition(Condition::Readable); + } else { + throw Error{Error::System, "recv", error}; } - - throw Error{Error::System, "recv", error}; #else if (errno == EAGAIN || errno == EWOULDBLOCK) { - throw Error{Error::WouldBlockRead, "recv"}; + sc.setAction(Action::Receive); + sc.setCondition(Condition::Readable); + } else { + throw Error{Error::System, "recv"}; } - - throw Error{Error::System, "recv"}; #endif } @@ -1480,24 +1640,25 @@ template <typename Address> unsigned send(Socket<Address, Tcp> &sc, const void *data, unsigned length) { - int nbsent; - - nbsent = ::send(sc.handle(), (ConstArg)data, length, 0); + int nbsent = ::send(sc.handle(), (ConstArg)data, length, 0); + if (nbsent == Failure) { #if defined(_WIN32) int error = WSAGetLastError(); if (error == WSAEWOULDBLOCK) { - throw Error{Error::WouldBlockWrite, "send", error}; + sc.setAction(Action::Send); + sc.setCondition(Condition::Writable); + } else { + throw Error{Error::System, "send", error}; } - - throw Error{Error::System, "send", error}; #else if (errno == EAGAIN || errno == EWOULDBLOCK) { - throw Error{Error::WouldBlockWrite, "send"}; + sc.setAction(Action::Send); + sc.setCondition(Condition::Writable); + } else { + throw Error{Error::System, "send"}; } - - throw Error{Error::System, "send"}; #endif } @@ -1509,6 +1670,8 @@ /* {{{ Udp */ +#if 0 + /** * @class Udp * @brief Clear UDP type. @@ -1553,7 +1716,7 @@ /* Store information */ sockaddr_storage address; - socklen_t addrlen = sizeof (struct sockaddr_storage); + socklen_t addrlen = sizeof (sockaddr_storage); nbread = ::recvfrom(sc.handle(), (Arg)data, length, 0, reinterpret_cast<sockaddr *>(&address), &addrlen); info = Address{&address, addrlen}; @@ -1617,6 +1780,8 @@ } }; +#endif + /* }}} */ /* {{{ Tls */ @@ -1670,6 +1835,21 @@ return msg == nullptr ? "" : msg; } + template <typename Address, typename Type> + inline void updateStates(Socket<Address, Type> &sc, State state, Action action, int code) + { + assert(code == SSL_ERROR_WANT_READ || code == SSL_ERROR_WANT_WRITE); + + sc.setState(state); + sc.setAction(action); + + if (code == SSL_ERROR_WANT_READ) { + sc.setCondition(Condition::Readable); + } else { + sc.setCondition(Condition::Writable); + } + } + /* * Continue the connect operation. */ @@ -1681,10 +1861,8 @@ if (ret <= 0) { int no = SSL_get_error(m_ssl.get(), ret); - if (no == SSL_ERROR_WANT_READ) { - sc.setState(State::ConnectingRead); - } else if (no == SSL_ERROR_WANT_WRITE) { - sc.setState(State::ConnectingWrite); + if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) { + updateStates(sc, State::Connecting, Action::Connect, no); } else { throw Error{Error::System, "connect", error(no)}; } @@ -1704,10 +1882,8 @@ if (ret <= 0) { int no = SSL_get_error(m_ssl.get(), ret); - if (no == SSL_ERROR_WANT_READ) { - sc.setState(State::AcceptingRead); - } else if (no == SSL_ERROR_WANT_WRITE) { - sc.setState(State::AcceptingWrite); + if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) { + updateStates(sc, State::Accepting, Action::Accept, no); } else { throw Error(Error::System, "accept", error(no)); } @@ -1738,7 +1914,8 @@ /** * Construct a specific Tls object. * - * @param method the method to use + * @param method the method to use# + * @param verify true to verify the certificate * @param key the private key * @param certificate the certificate file @@ -1829,16 +2006,17 @@ Socket<Address, Tls> accept(Socket<Address, Tls> &sc, sockaddr *address, socklen_t *length) { Socket<Address, Tls> client = Tcp::accept(sc, address, length); + Tls &type = client.type(); /* 1. Share the context */ - client.type().m_context = sc.type().m_context; + type.m_context = m_context; /* 2. Create new SSL instance */ - client.type().m_ssl = Ssl{SSL_new(m_context.get()), SSL_free}; - SSL_set_fd(client.type().m_ssl.get(), client.handle()); - - /* 3. Try accept process */ - processAccept(sc); + type.m_ssl = Ssl{SSL_new(m_context.get()), SSL_free}; + SSL_set_fd(type.m_ssl.get(), client.handle()); + + /* 3. Try accept process on the **new** client */ + type.processAccept(client); return client; } @@ -1864,17 +2042,15 @@ * @throw Error on errors */ template <typename Address> - unsigned recv(Socket<Address, Tls> &, void *data, unsigned len) + unsigned recv(Socket<Address, Tls> &sc, void *data, unsigned len) { auto nbread = SSL_read(m_ssl.get(), data, len); if (nbread <= 0) { auto no = SSL_get_error(m_ssl.get(), nbread); - if (no == SSL_ERROR_WANT_READ) { - throw Error{Error::WouldBlockRead, "recv", "Operation would block"}; - } else if (no == SSL_ERROR_WANT_WRITE) { - throw Error{Error::WouldBlockWrite, "recv", "Operation would block"}; + if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) { + updateStates(sc, sc.state(), Action::Receive, no); } else { throw Error{Error::System, "recv", error(no)}; } @@ -1892,23 +2068,21 @@ * @throw Error on errors */ template <typename Address> - unsigned send(Socket<Address, Tls> &, const void *data, unsigned len) + unsigned send(Socket<Address, Tls> &sc, const void *data, unsigned len) { - auto nbread = SSL_write(m_ssl.get(), data, len); - - if (nbread <= 0) { - auto no = SSL_get_error(m_ssl.get(), nbread); - - if (no == SSL_ERROR_WANT_READ) { - throw Error{Error::WouldBlockRead, "send", "Operation would block"}; - } else if (no == SSL_ERROR_WANT_WRITE) { - throw Error{Error::WouldBlockWrite, "send", "Operation would block"}; + auto nbsent = SSL_write(m_ssl.get(), data, len); + + if (nbsent <= 0) { + auto no = SSL_get_error(m_ssl.get(), nbsent); + + if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) { + updateStates(sc, sc.state(), Action::Send, no); } else { throw Error{Error::System, "send", error(no)}; } } - return nbread; + return nbsent; } }; @@ -1936,11 +2110,29 @@ using SocketTcp = Socket<Address, Tcp>; /** + * Helper to create TCP/IP sockets. + */ +using SocketTcpIp = Socket<Ip, Tcp>; + +#if !defined(_WIN32) + +/** + * Helper to create TCP/Local sockets. + */ +using SocketTcpLocal = Socket<Local, Tcp>; + +#endif + +#if 0 + +/** * Helper to create UDP sockets. */ template <typename Address> using SocketUdp = Socket<Address, Udp>; +#endif + #if !defined(SOCKET_NO_SSL) /** @@ -1949,6 +2141,11 @@ template <typename Address> using SocketTls = Socket<Address, Tls>; +/** + * Helper to create OpenSSL TCP/Ip sockets. + */ +using SocketTlsIp = Socket<Ip, Tls>; + #endif // !SOCKET_NO_SSL /* }}} */ @@ -1963,7 +2160,7 @@ /* {{{ Listener */ /** - * @struct ListenerStatus + * @class ListenerStatus * @brief Result of polling * * Result of a select call, returns the first ready socket found with its @@ -2017,7 +2214,7 @@ /** * @class Poll - * @brief Implements poll(2) + * @brief Implements poll(2). * * Poll is widely supported and is better than select(2). It is still not the * best option as selecting the sockets is O(n). @@ -2030,8 +2227,19 @@ int toflags(short &event) const noexcept; public: - void set(const ListenerTable &, Handle sc, int flags, bool add); - void unset(const ListenerTable &, Handle sc, int flags, bool remove); + /** + * Set the handle. + */ + void set(const ListenerTable &, Handle, int, bool); + + /** + * Unset the handle. + */ + void unset(const ListenerTable &, Handle, int, bool); + + /** + * Wait for events. + */ std::vector<ListenerStatus> wait(const ListenerTable &, int ms); /** @@ -2047,10 +2255,14 @@ #if defined(SOCKET_HAVE_EPOLL) +/** + * @class Epoll + * @brief Linux's epoll. + */ class Epoll { private: int m_handle; - std::vector<struct epoll_event> m_events; + std::vector<epoll_event> m_events; Epoll(const Epoll &) = delete; Epoll &operator=(const Epoll &) = delete; @@ -2062,11 +2274,30 @@ void update(Handle sc, int op, int flags); public: + /** + * Construct the epoll instance. + */ Epoll(); + + /** + * Close the epoll instance. + */ ~Epoll(); - void set(const ListenerTable &, Handle sc, int flags, bool add); - void unset(const ListenerTable &, Handle sc, int flags, bool remove); - std::vector<ListenerStatus> wait(const ListenerTable &table, int ms); + + /** + * Set the handle. + */ + void set(const ListenerTable &, Handle, int, bool); + + /** + * Unset the handle. + */ + void unset(const ListenerTable &, Handle, int, bool); + + /** + * Wait for events. + */ + std::vector<ListenerStatus> wait(const ListenerTable &, int); /** * Backend identifier @@ -2083,14 +2314,14 @@ /** * @class Kqueue - * @brief Implements kqueue(2) + * @brief Implements kqueue(2). * * This implementation is available on all BSD and Mac OS X. It is better than * poll(2) because it's O(1), however it's a bit more memory consuming. */ class Kqueue { private: - std::vector<struct kevent> m_result; + std::vector<kevent> m_result; int m_handle; Kqueue(const Kqueue &) = delete; @@ -2101,12 +2332,30 @@ void update(Handle sc, int filter, int flags); public: + /** + * Construct the kqueue instance. + */ Kqueue(); + + /** + * Destroy the kqueue instance. + */ ~Kqueue(); - void set(const ListenerTable &, Handle sc, int flags, bool add); - void unset(const ListenerTable &, Handle sc, int flags, bool remove); - std::vector<ListenerStatus> wait(const ListenerTable &, int ms); + /** + * Set the handle. + */ + void set(const ListenerTable &, Handle, int, bool); + + /** + * Unset the handle. + */ + void unset(const ListenerTable &, Handle, int, bool); + + /** + * Wait for events. + */ + std::vector<ListenerStatus> wait(const ListenerTable &, int); /** * Backend identifier @@ -2593,6 +2842,8 @@ * * This class does all the things for you as accepting new clients, listening for it and sending data. It works * asynchronously without blocking to let you control your process workflow. + * + * This class is not thread safe and you must not call any of the functions from different threads. */ template <typename Address, typename Type> class StreamServer { @@ -2621,33 +2872,42 @@ ClientMap m_clients; /* - * Continue accept process. Set ready to true to attempt a call to accept() (e.g. after a select()). + * Update flags depending on the required condition. */ - void processAccept(std::shared_ptr<StreamConnection<Address, Type>> &client, bool ready) + void updateFlags(std::shared_ptr<StreamConnection<Address, Type>> &client) + { + assert(client->socket().action() != Action::None); + + m_listener.remove(client->socket().handle()); + + if (client->socket().condition() == Condition::Readable) { + m_listener.set(client->socket().handle(), FlagRead); + } else { + m_listener.set(client->socket().handle(), FlagWrite); + } + } + + /* + * Continue accept process. + */ + template <typename AcceptCall> + void processAccept(std::shared_ptr<StreamConnection<Address, Type>> &client, const AcceptCall &acceptFunc) { try { - if (ready) { - client->socket().accept(); - } + /* Do the accept */ + acceptFunc(); /* 1. First remove completely the client */ m_listener.remove(client->socket().handle()); - switch (client->socket().state()) { - case State::AcceptingWrite: - m_listener.set(client->socket().handle(), FlagWrite); - break; - case State::Accepted: - case State::AcceptingRead: + /* 2. If accept is not finished, wait for the appropriate condition */ + if (client->socket().state() == State::Accepted) { + /* 3. Client is accepted, notify the user */ m_listener.set(client->socket().handle(), FlagRead); - break; - default: - break; - } - - /* 2. Successfully accepted? */ - if (client->socket().state() == State::Accepted) { m_onConnection(client); + } else { + /* Operation still in progress */ + updateFlags(client); } } catch (const Error &error) { m_clients.erase(client->socket().handle()); @@ -2657,9 +2917,10 @@ } /* - * Process accept of master socket. + * Process initial accept of master socket, this is the initial accepting process. Except on errors, the + * socket is stored but the user will be notified only once the socket is completely accepted. */ - void processAccept() + void processInitialAccept() { // TODO: store address too. std::shared_ptr<StreamConnection<Address, Type>> client = std::make_shared<StreamConnection<Address, Type>>(m_master.accept(nullptr)); @@ -2669,56 +2930,101 @@ client->setWriteHandler([this, ptr] () { auto client = ptr.lock(); - if (client && !client->output().empty()) { + /* Do not update the listener immediately if an action is pending */ + if (client && client->socket().action() == Action::None && !client->output().empty()) { m_listener.set(client->socket().handle(), FlagWrite); } }); - /* 2. Do initial accept or update flags */ - processAccept(client, false); - - /* 3. Add it if everything worked well */ - m_clients.insert(std::make_pair(client->socket().handle(), std::move(client))); + /* 2. Add the client */ + m_clients.insert(std::make_pair(client->socket().handle(), client)); + + /* + * 2. Do an initial check to set the listener flags, at this moment the socket may or not be + * completely accepted. + */ + processAccept(client, [&] () {}); } + /* + * Read or complete the read operation. + */ + void processRead(std::shared_ptr<StreamConnection<Address, Type>> &client) + { + /* + * Read because there is something to read or because the pending operation is + * read and must complete. + */ + auto buffer = client->socket().recv(512); + + /* + * Now the receive operation may be completed, in that case, two possibilities: + * + * 1. The action is set to None (completed) + * 2. The action is still not complete, update the flags + */ + if (client->socket().action() == Action::None) { + /* Empty mean normal disconnection */ + if (buffer.empty()) { + m_listener.remove(client->socket().handle()); + m_clients.erase(client->socket().handle()); + m_onDisconnection(client); + } else { + /* + * At this step, it is possible that we were completing a receive operation, in this + * case the write flag may be removed, add it if required. + */ + if (!client->output().empty()) { + m_listener.set(client->socket().handle(), FlagWrite); + } + + m_onRead(client, buffer); + } + } else { + /* Operation in progress */ + updateFlags(client); + } + } + + /* + * Flush the output buffer. + */ + void processWrite(std::shared_ptr<StreamConnection<Address, Type>> &client) + { + auto &output = client->output(); + auto nsent = client->socket().send(output); + + if (client->socket().action() == Action::None) { + /* 1. Create a copy of content that has been sent */ + auto sent = output.substr(0, nsent); + + /* 2. Erase the content sent */ + output.erase(0, nsent); + + /* 3. Update listener */ + if (output.empty()) { + m_listener.unset(client->socket().handle(), FlagWrite); + } + + /* 4. Notify user */ + m_onWrite(client, sent); + } else { + updateFlags(client); + } + } + + void processSync(std::shared_ptr<StreamConnection<Address, Type>> &client, int flags) { - bool remove{false}; - try { - if (flags & FlagRead) { - auto buffer = client->socket().recv(512); - - /* Empty mean normal disconnection */ - if (buffer.empty()) { - remove = true; - } else { - m_onRead(client, buffer); - } + auto action = client->socket().action(); + + if (action == Action::Receive || (action == Action::None && (flags & FlagRead))) { + processRead(client); } else if (flags & FlagWrite) { - auto &output = client->output(); - auto nsent = client->socket().send(output); - - /* 1. Create a copy of content that has been sent */ - auto sent = output.substr(0, nsent); - - /* 2. Erase the content sent */ - output.erase(0, nsent); - - /* 3. Update listener */ - if (output.empty()) { - m_listener.unset(client->socket().handle(), FlagWrite); - } - - /* 4. Notify user */ - m_onWrite(client, sent); + processWrite(client); } } catch (const Error &error) { - remove = true; - m_onError(error); - } - - if (remove) { m_onDisconnection(client); m_listener.remove(client->socket().handle()); m_clients.erase(client->socket().handle()); @@ -2733,9 +3039,10 @@ * @param max the max number to listen * @throw Error on errors */ - StreamServer(const Address &address, int max = 128) - : m_master{address.domain(), SOCK_STREAM, 0} + StreamServer(const Address &address, Type type = {}, int max = 128) + : m_master{address, std::move(type)} { + // TODO: m_onError m_master.set(SOL_SOCKET, SO_REUSEADDR, 1); m_master.bind(address); m_master.listen(max); @@ -2815,7 +3122,7 @@ if (st.socket == m_master.handle()) { /* New client */ - processAccept(); + processInitialAccept(); } else { /* Recv / Send / Accept on a client */ auto client = m_clients[st.socket]; @@ -2823,7 +3130,7 @@ if (client->socket().state() == State::Accepted) { processSync(client, st.flags); } else { - processAccept(client, true); + processAccept(client, [&] () { client->socket().accept(); }); } } } catch (const Error &error) { @@ -2836,18 +3143,52 @@ } }; +/* }}} */ + +/* + * StreamClient + * ------------------------------------------------------------------ + */ + +/* {{{ StreamClient */ + /** * @class StreamClient * @brief Client side connection to a server. + * + * This class is not thread safe and you must not call any of the functions from different threads. */ template <typename Address, typename Type> class StreamClient { public: + /** + * Handler when connection is complete. + */ using ConnectionHandler = Callback<>; + + /** + * Handler when data has been received. + */ using ReadHandler = Callback<const std::string &>; + + /** + * Handler when data has been sent correctly. + */ using WriteHandler = Callback<const std::string &>; + + /** + * Handler when disconnected. + */ using DisconnectionHandler = Callback<>; + + /** + * Handler on unrecoverable error. + */ using ErrorHandler = Callback<const Error &>; + + /** + * Handler when timeout occured. + */ using TimeoutHandler = Callback<>; private: @@ -2867,6 +3208,23 @@ std::string m_output; /* + * Update the flags after an uncompleted operation. This function must only be called when the operation + * has not complete (e.g. connect, recv, send). + */ + void updateFlags() + { + assert(m_socket.action() != Action::None); + + m_listener.remove(m_socket.handle()); + + if (m_socket.condition() == Condition::Readable) { + m_listener.set(m_socket.handle(), FlagRead); + } else { + m_listener.set(m_socket.handle(), FlagWrite); + } + } + + /* * This is the generic connect helper, it will be used to both initiate the connection or to continue the * connection process if needed. * @@ -2884,37 +3242,54 @@ /* Remove entirely */ m_listener.remove(m_socket.handle()); - switch (m_socket.state()) { - case State::ConnectingWrite: - m_listener.set(m_socket.handle(), FlagWrite); - break; - case State::Connected: - /* Connection complete? */ + if (m_socket.state() == State::Connected) { m_onConnection(); - - /* FALLTHROUGH */ - case State::ConnectingRead: m_listener.set(m_socket.handle(), FlagRead); - break; - default: - break; + } else { + /* Connection still in progress */ + updateFlags(); } } - void processSync(int flags) + /* + * Receive or complete the receive command, if the command is not complete, the listener is updated + * accordingly. + */ + void processRead() { - if (flags & FlagRead) { - auto received = m_socket.recv(512); - + auto received = m_socket.recv(512); + + if (m_socket.action() == Action::None) { /* 0 means disconnection */ if (received.empty()) { m_onDisconnection(); } else { + /* + * At this step, it is possible that we were completing a receive operation, in this + * case the write flag may be removed, add it if required. + */ + if (!m_output.empty()) { + m_listener.set(m_socket.handle(), FlagWrite); + } + m_onRead(received); } } else { + /* Receive operation in progress */ + updateFlags(); + } + } + + /* + * Send or complete the send command, if the command is not complete, the listener is updated + * accordingly. + */ + void processWrite() + { + auto nsent = m_socket.send(m_output); + + if (m_socket.action() == Action::None) { /* 1. Make a copy of what has been sent */ - auto nsent = m_socket.send(m_output); auto sent = m_output.substr(0, nsent); /* 2. Erase sent content */ @@ -2927,6 +3302,21 @@ /* 4. Notify user */ m_onWrite(sent); + } else { + /* Send operation in progress */ + updateFlags(); + } + } + + /* + * Receive or send. + */ + void processSync(int flags) + { + if ((m_socket.action() == Action::Receive) || (m_socket.action() == Action::None && (flags & FlagRead))) { + processRead(); + } else { + processWrite(); } } @@ -3000,7 +3390,6 @@ * will be called when the connection completed. * * @param address the address to connect to - * @throw Error on failures */ void connect(const Address &address) noexcept { @@ -3018,7 +3407,8 @@ { m_output += str; - if (m_socket.state() == State::Connected && !m_output.empty()) { + /* Don't update the listener if there is a pending operation */ + if (m_socket.state() == State::Connected && m_socket.action() == Action::None && !m_output.empty()) { m_listener.set(m_socket.handle(), FlagWrite); } }