# HG changeset patch # User David Demelier # Date 1446630623 -3600 # Node ID 9d53c536372ee281225a84df14f7d1f9a5aba4eb # Parent 41d1a36cc4619bfbcc0b830363fc2a336fa4f213 Socket: - Remove run() in both StreamServer and StreamClient, - StreamServer understands new accept process, - StreamClient understands new connect process. diff -r 41d1a36cc461 -r 9d53c536372e C++/modules/Socket/Sockets.h --- a/C++/modules/Socket/Sockets.h Tue Nov 03 21:48:14 2015 +0100 +++ b/C++/modules/Socket/Sockets.h Wed Nov 04 10:50:23 2015 +0100 @@ -2601,6 +2601,8 @@ using DisconnectionHandler = Callback> &>; using ReadHandler = Callback> &, const std::string &>; using WriteHandler = Callback> &, const std::string &>; + using ErrorHandler = Callback; + using TimeoutHandler = Callback<>; private: using ClientMap = std::map>>; @@ -2610,51 +2612,92 @@ DisconnectionHandler m_onDisconnection; ReadHandler m_onRead; WriteHandler m_onWrite; + ErrorHandler m_onError; + TimeoutHandler m_onTimeout; /* Sockets */ Socket m_master; Listener<> m_listener; ClientMap m_clients; + /* + * Continue accept process. Set ready to true to attempt a call to accept() (e.g. after a select()). + */ + void processAccept(std::shared_ptr> &client, bool ready) + { + try { + if (ready) { + client->socket().accept(); + } + + /* 1. First remove completely the client */ + m_listener.remove(client->socket().handle()); + + switch (client->socket().state()) { + case State::AcceptingWrite: + m_listener.set(client->socket().handle(), FlagWrite); + break; + case State::Accepted: + case State::AcceptingRead: + m_listener.set(client->socket().handle(), FlagRead); + break; + default: + break; + } + + /* 2. Successfully accepted? */ + if (client->socket().state() == State::Accepted) { + m_onConnection(client); + } + } catch (const Error &error) { + m_clients.erase(client->socket().handle()); + m_listener.remove(client->socket().handle()); + m_onError(error); + } + } + + /* + * Process accept of master socket. + */ void processAccept() { - std::shared_ptr> connection = std::make_shared>(m_master.accept()); - std::weak_ptr> ptr{connection}; - - /* Update the listener write flag when output has changed */ - connection->setWriteHandler([this, ptr] () { - auto connection = ptr.lock(); - - if (connection && !connection->output().empty()) { - m_listener.set(connection->socket().handle(), FlagWrite); + // TODO: store address too. + std::shared_ptr> client = std::make_shared>(m_master.accept(nullptr)); + std::weak_ptr> ptr{client}; + + /* 1. Register output changed to update listener */ + client->setWriteHandler([this, ptr] () { + auto client = ptr.lock(); + + if (client && !client->output().empty()) { + m_listener.set(client->socket().handle(), FlagWrite); } }); - /* Notify we have a new client */ - m_onConnection(connection); - - m_listener.set(connection->socket().handle(), FlagRead); - m_clients.emplace(connection->socket().handle(), std::move(connection)); + /* 2. Do initial accept or update flags */ + processAccept(client, false); + + /* 3. Add it if everything worked well */ + m_clients.insert(std::make_pair(client->socket().handle(), std::move(client))); } - void processSync(const ListenerStatus &status) + void processSync(std::shared_ptr> &client, int flags) { - auto connection = m_clients.at(status.socket); - auto remove = false; + bool remove{false}; try { - if (status.flags & FlagRead) { - auto buffer = connection->socket().recv(512); + if (flags & FlagRead) { + auto buffer = client->socket().recv(512); /* Empty mean normal disconnection */ if (buffer.empty()) { remove = true; } else { - m_onRead(connection, buffer); + m_onRead(client, buffer); } - } else if (status.flags & FlagWrite) { - auto &output = connection->output(); - auto nsent = connection->socket().send(output); + } else if (flags & FlagWrite) { + auto &output = client->output(); + auto nsent = client->socket().send(output); /* 1. Create a copy of content that has been sent */ auto sent = output.substr(0, nsent); @@ -2664,20 +2707,21 @@ /* 3. Update listener */ if (output.empty()) { - m_listener.unset(connection->socket().handle(), FlagWrite); + m_listener.unset(client->socket().handle(), FlagWrite); } /* 4. Notify user */ - m_onWrite(connection, sent); + m_onWrite(client, sent); } - } catch (const std::exception &ex) { + } catch (const Error &error) { remove = true; + m_onError(error); } if (remove) { - m_onDisconnection(connection); - m_listener.remove(status.socket); - m_clients.erase(status.socket); + m_onDisconnection(client); + m_listener.remove(client->socket().handle()); + m_clients.erase(client->socket().handle()); } } @@ -2739,6 +2783,26 @@ } /** + * Set the error handler, called when unrecoverable error has occured. + * + * @param handler the handler + */ + inline void setErrorHandler(ErrorHandler handler) + { + m_onError = std::move(handler); + } + + /** + * Set the timeout handler, called when the selection has timeout. + * + * @param handler the handler + */ + inline void setTimeoutHandler(TimeoutHandler handler) + { + m_onTimeout = std::move(handler); + } + + /** * Poll for the next event. * * @param timeout the timeout (-1 for indefinitely) @@ -2746,24 +2810,28 @@ */ void poll(int timeout = -1) { - auto st = m_listener.wait(timeout); - - if (st.socket == m_master.handle()) { - processAccept(); - } else { - processSync(st); - } - } - - /** - * Run forever. - * - * @throw Error on errors - */ - void run() - { - for (;;) { - poll(); + try { + auto st = m_listener.wait(timeout); + + if (st.socket == m_master.handle()) { + /* New client */ + processAccept(); + } else { + /* Recv / Send / Accept on a client */ + auto client = m_clients[st.socket]; + + if (client->socket().state() == State::Accepted) { + processSync(client, st.flags); + } else { + processAccept(client, true); + } + } + } catch (const Error &error) { + if (error.code() == Error::Timeout) { + m_onTimeout(); + } else { + m_onError(error); + } } } }; @@ -2780,6 +2848,7 @@ using WriteHandler = Callback; using DisconnectionHandler = Callback<>; using ErrorHandler = Callback; + using TimeoutHandler = Callback<>; private: /* Signals */ @@ -2788,62 +2857,76 @@ WriteHandler m_onWrite; DisconnectionHandler m_onDisconnection; ErrorHandler m_onError; + TimeoutHandler m_onTimeout; /* Socket */ Socket m_socket; - Address m_address; Listener<> m_listener; - /* Connection status and output buffer */ - bool m_connected{false}; + /* Output buffer */ std::string m_output; - void processConnect() + /* + * This is the generic connect helper, it will be used to both initiate the connection or to continue the + * connection process if needed. + * + * Thus the template parameter is the appropriate function to call either, m_socket.connect(address) or + * m_socket.connect(). + * + * See poll() and connect() to understand. + */ + template + void processConnect(const ConnectCall &connectFunc) { - try { - m_socket.connect(m_address); - m_connected = true; + /* Call m_socket.connect() or m_socket.connect(address) */ + connectFunc(); + + /* Remove entirely */ + m_listener.remove(m_socket.handle()); + + switch (m_socket.state()) { + case State::ConnectingWrite: + m_listener.set(m_socket.handle(), FlagWrite); + break; + case State::Connected: + /* Connection complete? */ m_onConnection(); - if (m_output.empty()) { - m_listener.unset(m_socket.handle(), FlagWrite); - } - } catch (const Error &error) { - if (error.code() == Error::WouldBlockRead) { - m_listener.set(m_socket.handle(), FlagRead); - } else if (error.code() == Error::WouldBlockWrite) { - m_listener.set(m_socket.handle(), FlagWrite); - } else { - m_onError(error); - } + /* FALLTHROUGH */ + case State::ConnectingRead: + m_listener.set(m_socket.handle(), FlagRead); + break; + default: + break; } } void processSync(int flags) { - try { - if (flags & FlagRead) { - m_onRead(m_socket.recv(512)); + if (flags & FlagRead) { + auto received = m_socket.recv(512); + + /* 0 means disconnection */ + if (received.empty()) { + m_onDisconnection(); } else { - /* 1. Make a copy of what has been sent */ - auto nsent = m_socket.send(m_output); - auto sent = m_output.substr(0, nsent); - - /* 2. Erase sent content */ - m_output.erase(0, nsent); - - /* 3. Update flags if needed */ - if (m_output.empty()) { - m_listener.unset(m_socket.handle(), FlagWrite); - } - - /* 4. Notify user */ - m_onWrite(sent); + m_onRead(received); } - } catch (const Error &error) { - m_listener.remove(m_socket.handle()); - m_connected = false; - m_onError(error); + } else { + /* 1. Make a copy of what has been sent */ + auto nsent = m_socket.send(m_output); + auto sent = m_output.substr(0, nsent); + + /* 2. Erase sent content */ + m_output.erase(0, nsent); + + /* 3. Update flags if needed */ + if (m_output.empty()) { + m_listener.unset(m_socket.handle(), FlagWrite); + } + + /* 4. Notify user */ + m_onWrite(sent); } } @@ -2919,15 +3002,11 @@ * @param address the address to connect to * @throw Error on failures */ - void connect(const Address &address) + void connect(const Address &address) noexcept { - if (m_connected) { - return; - } - - m_address = address; - - processConnect(); + assert(m_socket.state() == State::Open); + + processConnect([&] () { m_socket.connect(address); }); } /** @@ -2939,7 +3018,7 @@ { m_output += str; - if (!m_output.empty()) { + if (m_socket.state() == State::Connected && !m_output.empty()) { m_listener.set(m_socket.handle(), FlagWrite); } } @@ -2950,26 +3029,25 @@ * @param timeout the time to wait in milliseconds * @throw Error on errors */ - void poll(int timeout = -1) + void poll(int timeout = -1) noexcept { - auto st = m_listener.wait(timeout); - - if (!m_connected) { - processConnect(); - } else { - processSync(st.flags); - } - } - - /** - * Run indefinitely. - * - * @throw Error on errors - */ - void run() - { - for (;;) { - poll(); + try { + auto st = m_listener.wait(timeout); + + if (m_socket.state() != State::Connected) { + /* Continue the connection */ + processConnect([&] () { m_socket.connect(); }); + } else { + /* Read / Write */ + processSync(st.flags); + } + } catch (const Error &error) { + if (error.code() == Error::Timeout) { + m_onTimeout(); + } else { + m_listener.remove(m_socket.handle()); + m_onError(error); + } } } };