changeset 463:214f03b47d4e

Socket: - New action() and condition() function to check for pending events, - More documentation, - StreamServer, StreamClient can now complete recv/send operation for OpenSSL.
author David Demelier <markand@malikania.fr>
date Wed, 04 Nov 2015 18:01:22 +0100
parents 9d53c536372e
children 61a6f3518c55
files C++/examples/Socket/stream-client.cpp C++/examples/Socket/stream-server.cpp C++/examples/Socket/test.crt C++/examples/Socket/test.key C++/modules/Socket/Sockets.cpp C++/modules/Socket/Sockets.h
diffstat 6 files changed, 760 insertions(+), 223 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/C++/examples/Socket/stream-client.cpp	Wed Nov 04 18:01:22 2015 +0100
@@ -0,0 +1,66 @@
+/*
+ * stream-server -- example of stream server
+ *
+ * Options:
+ *   - WITH_PORT (int) which port to use (default: 12000)
+ *   - WITH_HOST (string literal) which host to connect (default: "localhost")
+ *   - WITH_SSL (bool) true to use SSL (default: false)
+ */
+
+#include <iostream>
+
+#include "Sockets.h"
+
+#if !defined(WITH_PORT)
+#  define WITH_PORT 12000
+#endif
+
+#if !defined(WITH_HOST)
+#  define WITH_HOST "localhost"
+#endif
+
+#if defined(WITH_SSL)
+using Client = net::StreamClient<net::Ipv4, net::Tls>;
+#else
+using Client = net::StreamClient<net::Ipv4, net::Tcp>;
+#endif
+
+int main()
+{
+	Client client;
+
+	/*
+	 * Unfortunately at the moment the socket state is not changed, this will be done
+	 * in the future.
+	 */
+	bool connected{true};
+
+	client.setConnectionHandler([&] () {
+		std::cout << "client: successfully connected" << std::endl;
+		client.send("Hello world!");
+	});
+	client.setDisconnectionHandler([&] () {
+		std::cout << "client: disconnected" << std::endl;
+		connected = false;
+	});
+	client.setErrorHandler([&] (const net::Error &error) {
+		std::cout << "client: error: " << error.function() << ": " << error.what() << std::endl;
+		connected = false;
+	});
+	client.setReadHandler([] (const std::string &data) {
+		std::cout << "client: received: " << data << std::endl;
+	});
+	client.setWriteHandler([] (const std::string &data) {
+		std::cout << "client: sent: " << data << std::endl;
+	});
+
+	client.connect(net::Ipv4{WITH_HOST, WITH_PORT});
+
+	while (connected) {
+		client.poll();
+	}
+
+	std::cout << "client: exiting" << std::endl;
+
+	return 0;
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/C++/examples/Socket/stream-server.cpp	Wed Nov 04 18:01:22 2015 +0100
@@ -0,0 +1,52 @@
+/*
+ * stream-server -- example of stream server
+ *
+ * Options:
+ *   - WITH_PORT (int) which port to use (default: 12000)
+ *   - WITH_SSL (bool) true to use SSL (default: false)
+ */
+
+#include <iostream>
+
+#include "Sockets.h"
+
+#if !defined(WITH_PORT)
+#  define WITH_PORT 12000
+#endif
+
+#if defined(WITH_SSL)
+using Server = net::StreamServer<net::Ipv4, net::Tls>;
+using Connection = net::StreamConnection<net::Ipv4, net::Tls>;
+#else
+using Server = net::StreamServer<net::Ipv4, net::Tcp>;
+using Connection = net::StreamConnection<net::Ipv4, net::Tcp>;
+#endif
+
+int main()
+{
+#if defined(WITH_SSL)
+	Server server{net::Ipv4{"*", WITH_PORT}, net::Tls{net::Tls::Tlsv1, false, "test.key", "test.crt"}};
+#else
+	Server server{net::Ipv4{"*", WITH_PORT}};
+#endif
+
+	server.setConnectionHandler([] (const std::shared_ptr<Connection> &client) {
+		std::cout << "server: new client connected" << std::endl;
+		client->send("Welcome to our server dude");
+	});
+	server.setReadHandler([] (const std::shared_ptr<Connection> &, const std::string &buffer) {
+		std::cout << "server: received: " << buffer << std::endl;
+	});
+	server.setWriteHandler([] (const std::shared_ptr<Connection> &, const std::string &buffer) {
+		std::cout << "server: sent: " << buffer << std::endl;
+	});
+	server.setDisconnectionHandler([] (const std::shared_ptr<Connection> &) {
+		std::cout << "server: client disconnected" << std::endl;
+	});
+
+	while (true) {
+		server.poll(-1);
+	}
+
+	return 0;
+}
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/C++/examples/Socket/test.crt	Wed Nov 04 18:01:22 2015 +0100
@@ -0,0 +1,14 @@
+-----BEGIN CERTIFICATE-----
+MIICITCCAYoCCQCGm4grkVCohjANBgkqhkiG9w0BAQsFADBVMQswCQYDVQQGEwJG
+UjEPMA0GA1UECAwGRnJhbmNlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0
+eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEwMjYyMDM0NThaFw0yNTEw
+MjMyMDM0NThaMFUxCzAJBgNVBAYTAkZSMQ8wDQYDVQQIDAZGcmFuY2UxITAfBgNV
+BAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0
+MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDp13OqVyOWyv5QWD4xr+Duw6SZ
+gU7D5huzsAOcneSI6JUhf+7Ecu6BQ2JGkFn4srIVkMWGQuImETJ8JCpSQH7rk+xO
+L9fTTK+TwhP2hW/Rf/b2gWedhJAS+gilqt4JNT7v2wFv+aTtRt/lpTXVSdtpLa/m
+Pdy219f6MAPgODJ/7QIDAQABMA0GCSqGSIb3DQEBCwUAA4GBAJSnn/IBn1ZblfzP
+rJO/lE1Jwpmx3B7+oR/e4fkZd6JR3s06umGYWr2H+TPl/5dj9x0gPokhoIL9zCGq
+SxCPnOeaxjBkw7yh3Ks6m3xKxmK4aMpAtBHtwmbfQyIcgz71/lfCzbJ3WcKpn1ig
+IZbByt5QSSPcFORRzJJa35eHBdfX
+-----END CERTIFICATE-----
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/C++/examples/Socket/test.key	Wed Nov 04 18:01:22 2015 +0100
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXAIBAAKBgQDp13OqVyOWyv5QWD4xr+Duw6SZgU7D5huzsAOcneSI6JUhf+7E
+cu6BQ2JGkFn4srIVkMWGQuImETJ8JCpSQH7rk+xOL9fTTK+TwhP2hW/Rf/b2gWed
+hJAS+gilqt4JNT7v2wFv+aTtRt/lpTXVSdtpLa/mPdy219f6MAPgODJ/7QIDAQAB
+AoGBANDt4ndQkgi56A1rOm50gVlzTg6lPPXFE/0xB5kYbcdxX0VmI7Q8KCMwTI9V
+jD2rk3e3OPSjr6FpfhzyxylkXMBz2BL5NRNPowCJbiMgZOUIzlcWPKo0tgf1bZJx
+YdB5U003ISGPPBjVOAjyizY7tJnaNvbpLQ0hbIAsvHPEAOnBAkEA9r3g8NQjPrvb
+oIr5SMIxM8HDJ1/q+MEBSFtRFzQpmur6P64Jsu96zCyencUYTxs0L/sottrj6dPC
+vjGCc6PjsQJBAPKdqK1knJv6Y95M2bnEwrymCFVdxCi7AxObStB+bg/+7mMCUqqX
+j2g71bfvhYakHV7CiaYrrORChwj6vTbimv0CQGpd2IZ5LOhyW2+N+YDgFg3Vzac/
+ti+eJEto8kAqgHUELvUctZmpmypBYe9pc91GQO0ePKL3IaE/ZIhRF4d6c0ECQH9A
+XiaD7PiKvjLs0A31u8ZCt4A+7BII9LYl73mntobBSbu4ji9Xyyn6qEAPa1ORZK49
+DwGPSuF2W2lESlYtSOkCQGrtczhx3IyJjk5e2Y1i/UddPKjviAysCSzcW6aVTNr9
+Y2L0sWmva2FKnkl9FDuEqxvmGr6OOkr5Ll7aWLzJri8=
+-----END RSA PRIVATE KEY-----
--- a/C++/modules/Socket/Sockets.cpp	Wed Nov 04 10:50:23 2015 +0100
+++ b/C++/modules/Socket/Sockets.cpp	Wed Nov 04 18:01:22 2015 +0100
@@ -217,7 +217,7 @@
 	}
 }
 
-Ip::Ip(const struct sockaddr_storage *ss, socklen_t length)
+Ip::Ip(const sockaddr_storage *ss, socklen_t length)
 	: m_length{length}
 	, m_domain{ss->ss_family}
 {
@@ -370,7 +370,7 @@
 	if (add) {
 		m_fds.push_back(pollfd{h, topoll(flags), 0});
 	} else {
-		auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const struct pollfd &pfd) {
+		auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const pollfd &pfd) {
 			return pfd.fd == h;
 		});
 
@@ -380,7 +380,7 @@
 
 void Poll::unset(const ListenerTable &, Handle h, int flags, bool remove)
 {
-	auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const struct pollfd &pfd) {
+	auto it = std::find_if(m_fds.begin(), m_fds.end(), [&] (const pollfd &pfd) {
 		return pfd.fd == h;
 	});
 
@@ -449,9 +449,9 @@
 
 void Epoll::update(Handle h, int op, int flags)
 {
-	struct epoll_event ev;
+	epoll_event ev;
 
-	std::memset(&ev, 0, sizeof (struct epoll_event));
+	std::memset(&ev, 0, sizeof (epoll_event));
 
 	ev.events = flags;
 	ev.data.fd = h;
@@ -546,7 +546,7 @@
 
 void Kqueue::update(Handle h, int filter, int flags)
 {
-	struct kevent ev;
+	kevent ev;
 
 	EV_SET(&ev, h, filter, flags, 0, 0, nullptr);
 
--- a/C++/modules/Socket/Sockets.h	Wed Nov 04 10:50:23 2015 +0100
+++ b/C++/modules/Socket/Sockets.h	Wed Nov 04 18:01:22 2015 +0100
@@ -23,7 +23,7 @@
  * @file Sockets.h
  * @brief Portable socket abstraction
  *
- * This file is a portable network library.
+ * This file is a portable networking library.
  *
  * ### User definable options
  *
@@ -313,8 +313,6 @@
 	 * @brief Which kind of error
 	 */
 	enum Code {
-		WouldBlockRead,		///!< The operation would block for reading
-		WouldBlockWrite,	///!< The operation would block for writing
 		Timeout,		///!< The action did timeout
 		System,			///!< There is a system error
 		Other			///!< Other custom error
@@ -396,22 +394,55 @@
 
 /**
  * @enum State
- * @brief Current socket state
+ * @brief Current socket state.
  */
 enum class State {
 	Open,			//!< Socket is open
 	Bound,			//!< Socket is bound to an address
-	ConnectingRead,		//!< Connection is in progress but requires to be readable
-	ConnectingWrite,	//!< Connection is in progress but requires to be writable
+	Connecting,		//!< The connection is in progress
 	Connected,		//!< Connection is complete
-	AcceptingRead,		//!< The socket is being accepted but requires to be readable
-	AcceptingWrite,		//!< The socket is being accepted but requires to be writable
 	Accepted,		//!< Socket has been accepted (client)
+	Accepting,		//!< The client acceptation is in progress
 	Closed,			//!< The socket has been closed
 };
 
 /* }}} */
 
+/**
+ * @enum Action
+ * @brief Define the current operation that must complete.
+ *
+ * Some operations like accept, connect, recv or send must sometimes do several round-trips to complete and the socket
+ * action is set with that enumeration. The user is responsible of calling accept, connect send or recv until the
+ * operation is complete.
+ *
+ * Note: the user must wait for the appropriate condition in Socket::condition to check if the required condition is
+ * met.
+ *
+ * It is important to complete the operation in the correct order because protocols like Tls may require to continue
+ * re-negociating when calling Socket::send or Socket::Recv.
+ */
+enum class Action {
+	None,		//!< No action is required, socket is ready
+	Accept,		//!< The socket is not yet accepted, caller must call accept() again
+	Connect,	//!< The socket is not yet connected, caller must call connect() again
+	Receive,	//!< The received operation has not succeeded yet, caller must call recv() or recvfrom() again
+	Send		//!< The send operation has not succeded yet, caller must call send() or sendto() again
+};
+
+/**
+ * @enum Condition
+ * @brief Define the required condition for the socket.
+ *
+ * As explained in Action enumeration, some operations required to be called several times, before calling these
+ * operations, the user must wait the socket to be readable or writable. This can be checked with Socket::condition.
+ */
+enum class Condition {
+	None,		//!< No condition is required
+	Readable,	//!< The socket must be readable
+	Writable	//!< The socket must be writable
+};
+
 /*
  * Base Socket class
  * ------------------------------------------------------------------
@@ -424,13 +455,15 @@
 
 /**
  * @class Socket
- * @brief Base socket class for socket operations
+ * @brief Base socket class for socket operations.
  */
 template <typename Address, typename Type>
 class Socket {
 private:
 	Type m_type;
 	State m_state{State::Closed};
+	Action m_action{Action::None};
+	Condition m_condition{Condition::None};
 
 protected:
 	/**
@@ -450,6 +483,8 @@
 	 * @param protocol the protocol
 	 * @param iface the implementation
 	 * @throw Error on failures
+	 * @post state is set to Open
+	 * @post handle is not set to Invalid
 	 */
 	Socket(int domain, int type, int protocol, Type iface = Type{})
 		: m_type(std::move(iface))
@@ -465,7 +500,10 @@
 
 		m_type.create(*this);
 		m_state = State::Open;
+
+		assert(m_handle != Invalid);
 	}
+
 	/**
 	 * This tries to create a socket.
 	 *
@@ -485,12 +523,16 @@
 	 * @param handle the native descriptor
 	 * @param state specify the socket state
 	 * @param type the type of socket implementation
+	 * @post action is set to None
+	 * @post condition is set to None
 	 */
 	explicit inline Socket(Handle handle, State state = State::Closed, Type type = Type{}) noexcept
 		: m_type(std::move(type))
 		, m_state{state}
 		, m_handle{handle}
 	{
+		assert(m_action == Action::None);
+		assert(m_condition == Condition::None);
 	}
 
 	/**
@@ -506,11 +548,15 @@
 	inline Socket(Socket &&other) noexcept
 		: m_type(std::move(other.m_type))
 		, m_state{other.m_state}
+		, m_action{other.m_action}
+		, m_condition{other.m_condition}
 		, m_handle{other.m_handle}
 	{
 		/* Invalidate other */
-		other.m_handle = -1;
+		other.m_handle = Invalid;
 		other.m_state = State::Closed;
+		other.m_action = Action::None;
+		other.m_condition = Condition::None;
 	}
 
 	/**
@@ -564,6 +610,49 @@
 	}
 
 	/**
+	 * Get the pending operation.
+	 *
+	 * @return the action to complete before continuing
+	 * @note usually only needed in non-blocking sockets
+	 */
+	inline Action action() const noexcept
+	{
+		return m_action;
+	}
+
+	/**
+	 * Change the pending operation.
+	 *
+	 * @param action the action
+	 * @warning you should not call this function yourself
+	 */
+	inline void setAction(Action action) noexcept
+	{
+		m_action = action;
+	}
+
+	/**
+	 * Get the condition to wait for.
+	 *
+	 * @return the condition
+	 */
+	inline Condition condition() const noexcept
+	{
+		return m_condition;
+	}
+
+	/**
+	 * Change the condition required.
+	 *
+	 * @param condition the condition
+	 * @warning you should not call this function yourself
+	 */
+	inline void setCondition(Condition condition) noexcept
+	{
+		m_condition = condition;
+	}
+
+	/**
 	 * Set an option for the socket.
 	 *
 	 * @param level the setting level
@@ -572,14 +661,11 @@
 	 * @throw Error on error
 	 */
 	template <typename Argument>
-	inline void set(int level, int name, const Argument &arg)
+	void set(int level, int name, const Argument &arg)
 	{
-#if defined(_WIN32)
-		if (setsockopt(m_handle, level, name, (ConstArg)&arg, sizeof (arg)) == Failure)
-#else
-		if (setsockopt(m_handle, level, name, (ConstArg)&arg, sizeof (arg)) < 0)
-#endif
+		if (setsockopt(m_handle, level, name, (ConstArg)&arg, sizeof (arg)) == Failure) {
 			throw Error{Error::System, "set"};
+		}
 	}
 
 	/**
@@ -590,17 +676,14 @@
 	 * @throw Error on error
 	 */
 	template <typename Argument>
-	inline Argument get(int level, int name)
+	Argument get(int level, int name)
 	{
 		Argument desired, result{};
 		socklen_t size = sizeof (result);
 
-#if defined(_WIN32)
-		if (getsockopt(m_handle, level, name, (Arg)&desired, &size) == Failure)
-#else
-		if (getsockopt(m_handle, level, name, (Arg)&desired, &size) < 0)
-#endif
+		if (getsockopt(m_handle, level, name, (Arg)&desired, &size) == Failure) {
 			throw Error{Error::System, "get"};
+		}
 
 		std::memcpy(&result, &desired, size);
 
@@ -657,7 +740,8 @@
 	 *
 	 * @param address the address
 	 * @param length the size
-	 * @pre state() must not be State::Bound
+	 * @pre state must not be Bound
+	 * @throw Error on errors
 	 */
 	void bind(const sockaddr *address, socklen_t length)
 	{
@@ -685,7 +769,8 @@
 	 * Listen for pending connection.
 	 *
 	 * @param max the maximum number
-	 * @pre state() must be Bound
+	 * @pre state must be Bound
+	 * @throw Error on errors
 	 */
 	inline void listen(int max = 128)
 	{
@@ -697,27 +782,37 @@
 	}
 
 	/**
-	 * Connect to the connection to the specified address. If the socket is marked non-blocking and the
-	 * connection cannot be established, the state is set to ConnectingRead or ConnectingWrite depending
-	 * on the underlying implementation. The user is then responsible to wait that the socket is writable
-	 * or readable using a listener and then call the overloaded connect() function which takes 0 argument.
+	 * Connect to the address.
+	 *
+	 * On non-blocking socket, if the connection cannot be established immediately then state is set to
+	 * State::Connecting, action is set to Action::Connect and condition is set to the required condition.
 	 *
+	 * User is then responsible of waiting for the condition to be checked and call the connect() overload
+	 * which takes 0 argument.
+	 *
+	 * @pre state must be State::Open
 	 * @param address the address
-	 * @param length the address length
-	 * @pre state() must be State::Open
+	 * @param length the the address length
+	 * @post if state is Connecting, action and condition are defined
 	 * @throw Error on errors
 	 */
 	void connect(const sockaddr *address, socklen_t length)
 	{
 		assert(m_state == State::Open);
 
+		m_action = Action::None;
+		m_condition = Condition::None;
+
 		m_type.connect(*this, address, length);
+
+		assert((m_state == State::Connected  && m_action == Action::None    && m_condition == Condition::None) ||
+		       (m_state == State::Connecting && m_action == Action::Connect && m_condition != Condition::None));
 	}
 
 	/**
 	 * Overloaded function.
 	 *
-	 * Calls connect(address.address(), address.length());
+	 * Effectively call connect(address.address(), address.length());
 	 *
 	 * @param address the address
 	 */
@@ -730,33 +825,45 @@
 	 * Continue the connection, only required with non-blocking sockets. Just like the initial connect() call,
 	 * this function can still be in progress.
 	 *
-	 * @pre state() must be State::ConnectingRead or State::ConnectingWrite
+	 * @pre state must be State::Connecting
 	 * @throw Error on errors
+	 * @post if connection is completed, state is set to State::Connected
+	 * @post if connection is still in progress, condition is set
 	 */
-	inline void connect()
+	void connect()
 	{
-		assert(m_state == State::ConnectingRead || m_state == State::ConnectingWrite);
+		assert(m_state == State::Connecting);
+
+		m_action = Action::None;
+		m_condition = Condition::None;
 
 		m_type.connect(*this);
+
+		assert((m_state == State::Connected  && m_action == Action::None    && m_condition == Condition::None) ||
+		       (m_state == State::Connecting && m_action == Action::Connect && m_condition != Condition::None));
 	}
 
 	/**
-	 * Accept a pending connection. If the socket is marked non-blocking and has no pending connection then an
-	 * error is thrown with the WouldBlockRead code.
+	 * Accept a new client. If there are no pending connection, throws an error.
 	 *
-	 * If the client is accepted but not yet ready, its state may be set to AcceptingRead or AcceptingWrite
-	 * and the user is responsible of waiting the socket to be readable or writable and then call the
-	 * overloaded accept() (with 0 arguments) until the connection is complete.
+	 * On non-blocking sockets, the client may not be completely accepted yet, if it's the case, the returned
+	 * socket state is set to State::Accepting and user is responsible of calling accept() on it when the required
+	 * condition is met.
 	 *
+	 * @pre state must be State::Bound
 	 * @param info the address where to store client's information (optional)
 	 * @return the new socket
-	 * @pre state() must be State::Bound
 	 * @throw Error on errors
+	 * @post the client socket state is either set to State::Accepting or State::Accepted
+	 * @post if the client state is set to State::Accepting, action and condition are defined
 	 */
-	inline Socket<Address, Type> accept(Address *info)
+	Socket<Address, Type> accept(Address *info)
 	{
 		assert(m_state == State::Bound);
 
+		m_action = Action::None;
+		m_condition = Condition::None;
+
 		sockaddr_storage storage;
 		socklen_t length = sizeof (storage);
 
@@ -766,20 +873,42 @@
 			*info = Address{&storage, length};
 		}
 
+		/* Master do not change */
+		assert(m_state == State::Bound);
+		assert(m_action == Action::None);
+		assert(m_condition == Condition::None);
+
+		/* Client */
+		assert(
+			(sc.state() == State::Accepting && sc.action() == Action::Accept && sc.condition() != Condition::None) ||
+			(sc.state() == State::Accepted  && sc.action() == Action::None   && sc.condition() == Condition::None)
+		);
+
 		return sc;
 	}
 
 	/**
-	 * Continue the accept process on this client.
+	 * Continue the accept process on this client. This function must be called only when the socket is
+	 * ready to be readable or writable! (see condition).
 	 *
-	 * @pre state() must be State::AcceptingRead or State::AcceptingWrite
+	 * @pre state must be State::Accepting
 	 * @throw Error on errors
+	 * @post if connection is complete, state is changed to State::Accepted, action and condition are unset
+	 * @post if connection is still in progress, condition is set
 	 */
-	inline void accept()
+	void accept()
 	{
-		assert(m_state == State::AcceptingRead || m_state == State::AcceptingWrite);
+		assert(m_state == State::Accepting);
+
+		m_action = Action::None;
+		m_condition = Condition::None;
 
 		m_type.accept(*this);
+
+		assert(
+			(m_state == State::Accepting && m_action == Action::Accept && m_condition != Condition::None) ||
+			(m_state == State::Accepted  && m_action == Action::None   && m_condition == Condition::None)
+		);
 	}
 
 	/**
@@ -806,13 +935,31 @@
 	/**
 	 * Receive some data.
 	 *
+	 * If the operation cannot be complete immediately, 0 is returned, action() is set to Action::Receive and
+	 * condition is set to Condition::Readableable or Condition::Writable, the user then must wait for the
+	 * appropriate condition and repeat the recv() call.
+	 *
+	 * If action() is set to None and result is set to 0, disconnection occured.
+	 *
 	 * @param data the destination buffer
 	 * @param length the buffer length
+	 * @pre action() must not be Action::Send
+	 * @return the number of bytes received or 0
 	 * @throw Error on error
 	 */
-	inline unsigned recv(void *data, unsigned length)
+	unsigned recv(void *data, unsigned length)
 	{
-		return m_type.recv(*this, data, length);
+		assert(m_action != Action::Send);
+
+		m_action = Action::None;
+		m_condition = Condition::None;
+
+		unsigned nbread = m_type.recv(*this, data, length);
+
+		assert((m_action == Action::None    && m_condition == Condition::None) ||
+		       (m_action != Action::Receive && m_condition != Condition::None));
+
+		return nbread;
 	}
 
 	/**
@@ -834,15 +981,29 @@
 	}
 
 	/**
-	 * Send some data.
+	 * Send some data. Just like recv(), the operation may not succeed immediately and requires to be
+	 * repeated.
 	 *
 	 * @param data the data buffer
 	 * @param length the buffer length
+	 * @return the number of bytes sent or 0
+	 * @pre action() must not be Flag::Receive
 	 * @throw Error on error
+	 * @see recv
 	 */
-	inline unsigned send(const void *data, unsigned length)
+	unsigned send(const void *data, unsigned length)
 	{
-		return m_type.send(*this, data, length);
+		assert(m_action != Action::Receive);
+
+		m_action = Action::None;
+		m_condition = Condition::None;
+
+		unsigned nbsent = m_type.send(*this, data, length);
+
+		assert((m_action == Action::None && m_condition == Condition::None) ||
+		       (m_action != Action::Send && m_condition != Condition::None));
+
+		return nbsent;
 	}
 
 	/**
@@ -857,6 +1018,8 @@
 		return send(data.c_str(), data.size());
 	}
 
+#if 0
+
 	/**
 	 * Send data to an end point.
 	 *
@@ -919,6 +1082,8 @@
 		return result;
 	}
 
+#endif
+
 	/**
 	 * Close the socket.
 	 *
@@ -933,8 +1098,11 @@
 			::close(m_handle);
 #endif
 			m_handle = Invalid;
-			m_state = State::Closed;
 		}
+
+		m_state = State::Closed;
+		m_action = Action::None;
+		m_condition = Condition::None;
 	}
 
 	/**
@@ -956,10 +1124,14 @@
 		m_handle = other.m_handle;
 		m_type = std::move(other.m_type);
 		m_state = other.m_state;
+		m_action = other.m_action;
+		m_condition = other.m_condition;
 
 		/* Invalidate other */
 		other.m_handle = Invalid;
 		other.m_state = State::Closed;
+		other.m_action = Action::None;
+		other.m_conditon = Condition::None;
 
 		return *this;
 	}
@@ -1062,9 +1234,6 @@
  */
 class Ip {
 private:
-	friend class Ipv6;
-	friend class Ipv4;
-
 	union {
 		sockaddr_in m_sin;
 		sockaddr_in6 m_sin6;
@@ -1097,7 +1266,7 @@
 	 * @param ss the storage
 	 * @param length the length
 	 */
-	Ip(const struct sockaddr_storage *ss, socklen_t length);
+	Ip(const sockaddr_storage *ss, socklen_t length);
 
 	/**
 	 * Get the domain (AF_INET or AF_INET6).
@@ -1180,7 +1349,7 @@
 	 * @param ss the storage
 	 * @param length the length
 	 */
-	inline Ipv6(const struct sockaddr_storage *ss, socklen_t length)
+	inline Ipv6(const sockaddr_storage *ss, socklen_t length)
 		: Ip{ss, length}
 	{
 	}
@@ -1218,7 +1387,7 @@
 	 * @param ss the storage
 	 * @param length the length
 	 */
-	inline Ipv4(const struct sockaddr_storage *ss, socklen_t length)
+	inline Ipv4(const sockaddr_storage *ss, socklen_t length)
 		: Ip{ss, length}
 	{
 	}
@@ -1355,13 +1524,17 @@
 			int error = WSAGetLastError();
 
 			if (error == WSAEWOULDBLOCK) {
-				sc.setState(State::ConnectingWrite);
+				sc.setState(State::Connecting);
+				sc.setAction(Action::Connect);
+				sc.setCondition(Condition::Writable);
 			} else {
 				throw Error{Error::System, "connect", error};
 			}
 #else
 			if (errno == EINPROGRESS) {
-				sc.setState(State::ConnectingWrite);
+				sc.setState(State::Connecting);
+				sc.setAction(Action::Connect);
+				sc.setCondition(Condition::Writable);
 			} else {
 				throw Error{Error::System, "connect"};
 			}
@@ -1403,21 +1576,7 @@
 		Handle handle = ::accept(sc.handle(), address, length);
 
 		if (handle == Invalid) {
-#if defined(_WIN32)
-			int error = WSAGetLastError();
-
-			if (error == WSAEWOULDBLOCK) {
-				throw Error{Error::WouldBlockRead, "accept", error};
-			}
-
-			throw Error{Error::System, "accept", error};
-#else
-			if (errno == EAGAIN || errno == EWOULDBLOCK) {
-				throw Error{Error::WouldBlockRead, "accept"};
-			}
-
 			throw Error{Error::System, "accept"};
-#endif
 		}
 
 		return Socket<Address, Type>{handle, State::Accepted};
@@ -1444,24 +1603,25 @@
 	template <typename Address>
 	unsigned recv(Socket<Address, Tcp> &sc, void *data, unsigned length)
 	{
-		int nbread;
-
-		nbread = ::recv(sc.handle(), (Arg)data, length, 0);
+		int nbread = ::recv(sc.handle(), (Arg)data, length, 0);
+
 		if (nbread == Failure) {
 #if defined(_WIN32)
 			int error = WSAGetLastError();
 
 			if (error == WSAEWOULDBLOCK) {
-				throw Error{Error::WouldBlockRead, "recv", error};
+				sc.setAction(Action::Receive);
+				sc.setCondition(Condition::Readable);
+			} else {
+				throw Error{Error::System, "recv", error};
 			}
-
-			throw Error{Error::System, "recv", error};
 #else
 			if (errno == EAGAIN || errno == EWOULDBLOCK) {
-				throw Error{Error::WouldBlockRead, "recv"};
+				sc.setAction(Action::Receive);
+				sc.setCondition(Condition::Readable);
+			} else {
+				throw Error{Error::System, "recv"};
 			}
-
-			throw Error{Error::System, "recv"};
 #endif
 		}
 
@@ -1480,24 +1640,25 @@
 	template <typename Address>
 	unsigned send(Socket<Address, Tcp> &sc, const void *data, unsigned length)
 	{
-		int nbsent;
-
-		nbsent = ::send(sc.handle(), (ConstArg)data, length, 0);
+		int nbsent = ::send(sc.handle(), (ConstArg)data, length, 0);
+
 		if (nbsent == Failure) {
 #if defined(_WIN32)
 			int error = WSAGetLastError();
 
 			if (error == WSAEWOULDBLOCK) {
-				throw Error{Error::WouldBlockWrite, "send", error};
+				sc.setAction(Action::Send);
+				sc.setCondition(Condition::Writable);
+			} else {
+				throw Error{Error::System, "send", error};
 			}
-
-			throw Error{Error::System, "send", error};
 #else
 			if (errno == EAGAIN || errno == EWOULDBLOCK) {
-				throw Error{Error::WouldBlockWrite, "send"};
+				sc.setAction(Action::Send);
+				sc.setCondition(Condition::Writable);
+			} else {
+				throw Error{Error::System, "send"};
 			}
-
-			throw Error{Error::System, "send"};
 #endif
 		}
 
@@ -1509,6 +1670,8 @@
 
 /* {{{ Udp */
 
+#if 0
+
 /**
  * @class Udp
  * @brief Clear UDP type.
@@ -1553,7 +1716,7 @@
 
 		/* Store information */
 		sockaddr_storage address;
-		socklen_t addrlen = sizeof (struct sockaddr_storage);
+		socklen_t addrlen = sizeof (sockaddr_storage);
 
 		nbread = ::recvfrom(sc.handle(), (Arg)data, length, 0, reinterpret_cast<sockaddr *>(&address), &addrlen);
 		info = Address{&address, addrlen};
@@ -1617,6 +1780,8 @@
 	}
 };
 
+#endif
+
 /* }}} */
 
 /* {{{ Tls */
@@ -1670,6 +1835,21 @@
 		return msg == nullptr ? "" : msg;
 	}
 
+	template <typename Address, typename Type>
+	inline void updateStates(Socket<Address, Type> &sc, State state, Action action, int code)
+	{
+		assert(code == SSL_ERROR_WANT_READ || code == SSL_ERROR_WANT_WRITE);
+
+		sc.setState(state);
+		sc.setAction(action);
+
+		if (code == SSL_ERROR_WANT_READ) {
+			sc.setCondition(Condition::Readable);
+		} else {
+			sc.setCondition(Condition::Writable);
+		}
+	}
+
 	/*
 	 * Continue the connect operation.
 	 */
@@ -1681,10 +1861,8 @@
 		if (ret <= 0) {
 			int no = SSL_get_error(m_ssl.get(), ret);
 
-			if (no == SSL_ERROR_WANT_READ) {
-				sc.setState(State::ConnectingRead);
-			} else if (no == SSL_ERROR_WANT_WRITE) {
-				sc.setState(State::ConnectingWrite);
+			if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) {
+				updateStates(sc, State::Connecting, Action::Connect, no);
 			} else {
 				throw Error{Error::System, "connect", error(no)};
 			}
@@ -1704,10 +1882,8 @@
 		if (ret <= 0) {
 			int no = SSL_get_error(m_ssl.get(), ret);
 
-			if (no == SSL_ERROR_WANT_READ) {
-				sc.setState(State::AcceptingRead);
-			} else if (no == SSL_ERROR_WANT_WRITE) {
-				sc.setState(State::AcceptingWrite);
+			if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) {
+				updateStates(sc, State::Accepting, Action::Accept, no);
 			} else {
 				throw Error(Error::System, "accept", error(no));
 			}
@@ -1738,7 +1914,8 @@
 	/**
 	 * Construct a specific Tls object.
 	 *
-	 * @param method the method to use
+	 * @param method the method to use#
+	 
 	 * @param verify true to verify the certificate
 	 * @param key the private key
 	 * @param certificate the certificate file
@@ -1829,16 +2006,17 @@
 	Socket<Address, Tls> accept(Socket<Address, Tls> &sc, sockaddr *address, socklen_t *length)
 	{
 		Socket<Address, Tls> client = Tcp::accept(sc, address, length);
+		Tls &type = client.type();
 
 		/* 1. Share the context */
-		client.type().m_context = sc.type().m_context;
+		type.m_context = m_context;
 	
 		/* 2. Create new SSL instance */
-		client.type().m_ssl = Ssl{SSL_new(m_context.get()), SSL_free};
-		SSL_set_fd(client.type().m_ssl.get(), client.handle());
-
-		/* 3. Try accept process */
-		processAccept(sc);
+		type.m_ssl = Ssl{SSL_new(m_context.get()), SSL_free};
+		SSL_set_fd(type.m_ssl.get(), client.handle());
+
+		/* 3. Try accept process on the **new** client */
+		type.processAccept(client);
 
 		return client;
 	}
@@ -1864,17 +2042,15 @@
 	 * @throw Error on errors
 	 */
 	template <typename Address>
-	unsigned recv(Socket<Address, Tls> &, void *data, unsigned len)
+	unsigned recv(Socket<Address, Tls> &sc, void *data, unsigned len)
 	{
 		auto nbread = SSL_read(m_ssl.get(), data, len);
 
 		if (nbread <= 0) {
 			auto no = SSL_get_error(m_ssl.get(), nbread);
 
-			if (no == SSL_ERROR_WANT_READ) {
-				throw Error{Error::WouldBlockRead, "recv", "Operation would block"};
-			} else if (no == SSL_ERROR_WANT_WRITE) {
-				throw Error{Error::WouldBlockWrite, "recv", "Operation would block"};
+			if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) {
+				updateStates(sc, sc.state(), Action::Receive, no);
 			} else {
 				throw Error{Error::System, "recv", error(no)};
 			}
@@ -1892,23 +2068,21 @@
 	 * @throw Error on errors
 	 */
 	template <typename Address>
-	unsigned send(Socket<Address, Tls> &, const void *data, unsigned len)
+	unsigned send(Socket<Address, Tls> &sc, const void *data, unsigned len)
 	{
-		auto nbread = SSL_write(m_ssl.get(), data, len);
-
-		if (nbread <= 0) {
-			auto no = SSL_get_error(m_ssl.get(), nbread);
-
-			if (no == SSL_ERROR_WANT_READ) {
-				throw Error{Error::WouldBlockRead, "send", "Operation would block"};
-			} else if (no == SSL_ERROR_WANT_WRITE) {
-				throw Error{Error::WouldBlockWrite, "send", "Operation would block"};
+		auto nbsent = SSL_write(m_ssl.get(), data, len);
+
+		if (nbsent <= 0) {
+			auto no = SSL_get_error(m_ssl.get(), nbsent);
+
+			if (no == SSL_ERROR_WANT_READ || no == SSL_ERROR_WANT_WRITE) {
+				updateStates(sc, sc.state(), Action::Send, no);
 			} else {
 				throw Error{Error::System, "send", error(no)};
 			}
 		}
 
-		return nbread;
+		return nbsent;
 	}
 };
 
@@ -1936,11 +2110,29 @@
 using SocketTcp = Socket<Address, Tcp>;
 
 /**
+ * Helper to create TCP/IP sockets.
+ */
+using SocketTcpIp = Socket<Ip, Tcp>;
+
+#if !defined(_WIN32)
+
+/**
+ * Helper to create TCP/Local sockets.
+ */
+using SocketTcpLocal = Socket<Local, Tcp>;
+
+#endif
+
+#if 0
+
+/**
  * Helper to create UDP sockets.
  */
 template <typename Address>
 using SocketUdp = Socket<Address, Udp>;
 
+#endif
+
 #if !defined(SOCKET_NO_SSL)
 
 /**
@@ -1949,6 +2141,11 @@
 template <typename Address>
 using SocketTls = Socket<Address, Tls>;
 
+/**
+ * Helper to create OpenSSL TCP/Ip sockets.
+ */
+using SocketTlsIp = Socket<Ip, Tls>;
+
 #endif // !SOCKET_NO_SSL
 
 /* }}} */
@@ -1963,7 +2160,7 @@
 /* {{{ Listener */
 
 /**
- * @struct ListenerStatus
+ * @class ListenerStatus
  * @brief Result of polling
  *
  * Result of a select call, returns the first ready socket found with its
@@ -2017,7 +2214,7 @@
 
 /**
  * @class Poll
- * @brief Implements poll(2)
+ * @brief Implements poll(2).
  *
  * Poll is widely supported and is better than select(2). It is still not the
  * best option as selecting the sockets is O(n).
@@ -2030,8 +2227,19 @@
 	int toflags(short &event) const noexcept;
 
 public:
-	void set(const ListenerTable &, Handle sc, int flags, bool add);
-	void unset(const ListenerTable &, Handle sc, int flags, bool remove);
+	/**
+	 * Set the handle.
+	 */
+	void set(const ListenerTable &, Handle, int, bool);
+
+	/**
+	 * Unset the handle.
+	 */
+	void unset(const ListenerTable &, Handle, int, bool);
+
+	/**
+	 * Wait for events.
+	 */
 	std::vector<ListenerStatus> wait(const ListenerTable &, int ms);
 
 	/**
@@ -2047,10 +2255,14 @@
 
 #if defined(SOCKET_HAVE_EPOLL)
 
+/**
+ * @class Epoll
+ * @brief Linux's epoll.
+ */
 class Epoll {
 private:
 	int m_handle;
-	std::vector<struct epoll_event> m_events;
+	std::vector<epoll_event> m_events;
 
 	Epoll(const Epoll &) = delete;
 	Epoll &operator=(const Epoll &) = delete;
@@ -2062,11 +2274,30 @@
 	void update(Handle sc, int op, int flags);
 
 public:
+	/**
+	 * Construct the epoll instance.
+	 */
 	Epoll();
+
+	/**
+	 * Close the epoll instance.
+	 */
 	~Epoll();
-	void set(const ListenerTable &, Handle sc, int flags, bool add);
-	void unset(const ListenerTable &, Handle sc, int flags, bool remove);
-	std::vector<ListenerStatus> wait(const ListenerTable &table, int ms);
+
+	/**
+	 * Set the handle.
+	 */
+	void set(const ListenerTable &, Handle, int, bool);
+
+	/**
+	 * Unset the handle.
+	 */
+	void unset(const ListenerTable &, Handle, int, bool);
+
+	/**
+	 * Wait for events.
+	 */
+	std::vector<ListenerStatus> wait(const ListenerTable &, int);
 
 	/**
 	 * Backend identifier
@@ -2083,14 +2314,14 @@
 
 /**
  * @class Kqueue
- * @brief Implements kqueue(2)
+ * @brief Implements kqueue(2).
  *
  * This implementation is available on all BSD and Mac OS X. It is better than
  * poll(2) because it's O(1), however it's a bit more memory consuming.
  */
 class Kqueue {
 private:
-	std::vector<struct kevent> m_result;
+	std::vector<kevent> m_result;
 	int m_handle;
 
 	Kqueue(const Kqueue &) = delete;
@@ -2101,12 +2332,30 @@
 	void update(Handle sc, int filter, int flags);
 
 public:
+	/**
+	 * Construct the kqueue instance.
+	 */
 	Kqueue();
+
+	/**
+	 * Destroy the kqueue instance.
+	 */
 	~Kqueue();
 
-	void set(const ListenerTable &, Handle sc, int flags, bool add);
-	void unset(const ListenerTable &, Handle sc, int flags, bool remove);
-	std::vector<ListenerStatus> wait(const ListenerTable &, int ms);
+	/**
+	 * Set the handle.
+	 */
+	void set(const ListenerTable &, Handle, int, bool);
+
+	/**
+	 * Unset the handle.
+	 */
+	void unset(const ListenerTable &, Handle, int, bool);
+
+	/**
+	 * Wait for events.
+	 */
+	std::vector<ListenerStatus> wait(const ListenerTable &, int);
 
 	/**
 	 * Backend identifier
@@ -2593,6 +2842,8 @@
  *
  * This class does all the things for you as accepting new clients, listening for it and sending data. It works
  * asynchronously without blocking to let you control your process workflow.
+ *
+ * This class is not thread safe and you must not call any of the functions from different threads.
  */
 template <typename Address, typename Type>
 class StreamServer {
@@ -2621,33 +2872,42 @@
 	ClientMap m_clients;
 
 	/*
-	 * Continue accept process. Set ready to true to attempt a call to accept() (e.g. after a select()).
+	 * Update flags depending on the required condition.
 	 */
-	void processAccept(std::shared_ptr<StreamConnection<Address, Type>> &client, bool ready)
+	void updateFlags(std::shared_ptr<StreamConnection<Address, Type>> &client)
+	{
+		assert(client->socket().action() != Action::None);
+
+		m_listener.remove(client->socket().handle());
+
+		if (client->socket().condition() == Condition::Readable) {
+			m_listener.set(client->socket().handle(), FlagRead);
+		} else {
+			m_listener.set(client->socket().handle(), FlagWrite);
+		}
+	}
+
+	/*
+	 * Continue accept process.
+	 */
+	template <typename AcceptCall>
+	void processAccept(std::shared_ptr<StreamConnection<Address, Type>> &client, const AcceptCall &acceptFunc)
 	{
 		try {
-			if (ready) {
-				client->socket().accept();
-			}
+			/* Do the accept */
+			acceptFunc();
 
 			/* 1. First remove completely the client */
 			m_listener.remove(client->socket().handle());
 
-			switch (client->socket().state()) {
-			case State::AcceptingWrite:
-				m_listener.set(client->socket().handle(), FlagWrite);
-				break;
-			case State::Accepted:
-			case State::AcceptingRead:
+			/* 2. If accept is not finished, wait for the appropriate condition */
+			if (client->socket().state() == State::Accepted) {
+				/* 3. Client is accepted, notify the user */
 				m_listener.set(client->socket().handle(), FlagRead);
-				break;
-			default:
-				break;
-			}
-
-			/* 2. Successfully accepted? */
-			if (client->socket().state() == State::Accepted) {
 				m_onConnection(client);
+			} else {
+				/* Operation still in progress */
+				updateFlags(client);
 			}
 		} catch (const Error &error) {
 			m_clients.erase(client->socket().handle());
@@ -2657,9 +2917,10 @@
 	}
 
 	/*
-	 * Process accept of master socket.
+	 * Process initial accept of master socket, this is the initial accepting process. Except on errors, the
+	 * socket is stored but the user will be notified only once the socket is completely accepted.
 	 */
-	void processAccept()
+	void processInitialAccept()
 	{
 		// TODO: store address too.
 		std::shared_ptr<StreamConnection<Address, Type>> client = std::make_shared<StreamConnection<Address, Type>>(m_master.accept(nullptr));
@@ -2669,56 +2930,101 @@
 		client->setWriteHandler([this, ptr] () {
 			auto client = ptr.lock();
 
-			if (client && !client->output().empty()) {
+			/* Do not update the listener immediately if an action is pending */
+			if (client && client->socket().action() == Action::None && !client->output().empty()) {
 				m_listener.set(client->socket().handle(), FlagWrite);
 			}
 		});
 
-		/* 2. Do initial accept or update flags */
-		processAccept(client, false);
-
-		/* 3. Add it if everything worked well */
-		m_clients.insert(std::make_pair(client->socket().handle(), std::move(client)));
+		/* 2. Add the client */
+		m_clients.insert(std::make_pair(client->socket().handle(), client));
+
+		/*
+		 * 2. Do an initial check to set the listener flags, at this moment the socket may or not be
+		 *    completely accepted.
+		 */
+		processAccept(client, [&] () {});
 	}
 
+	/*
+	 * Read or complete the read operation.
+	 */
+	void processRead(std::shared_ptr<StreamConnection<Address, Type>> &client)
+	{
+		/*
+		 * Read because there is something to read or because the pending operation is
+		 * read and must complete.
+		 */
+		auto buffer = client->socket().recv(512);
+
+		/*
+		 * Now the receive operation may be completed, in that case, two possibilities:
+		 *
+		 * 1. The action is set to None (completed)
+		 * 2. The action is still not complete, update the flags
+		 */
+		if (client->socket().action() == Action::None) {
+			/* Empty mean normal disconnection */
+			if (buffer.empty()) {
+				m_listener.remove(client->socket().handle());
+				m_clients.erase(client->socket().handle());
+				m_onDisconnection(client);
+			} else {
+				/*
+				 * At this step, it is possible that we were completing a receive operation, in this
+				 * case the write flag may be removed, add it if required.
+				 */
+				if (!client->output().empty()) {
+					m_listener.set(client->socket().handle(), FlagWrite);
+				}
+
+				m_onRead(client, buffer);
+			}
+		} else {
+			/* Operation in progress */
+			updateFlags(client);
+		}
+	}
+
+	/*
+	 * Flush the output buffer.
+	 */
+	void processWrite(std::shared_ptr<StreamConnection<Address, Type>> &client)
+	{
+		auto &output = client->output();
+		auto nsent = client->socket().send(output);
+
+		if (client->socket().action() == Action::None) {
+			/* 1. Create a copy of content that has been sent */
+			auto sent = output.substr(0, nsent);
+
+			/* 2. Erase the content sent */
+			output.erase(0, nsent);
+
+			/* 3. Update listener */
+			if (output.empty()) {
+				m_listener.unset(client->socket().handle(), FlagWrite);
+			}
+
+			/* 4. Notify user */
+			m_onWrite(client, sent);
+		} else {
+			updateFlags(client);
+		}
+	}
+
+	
 	void processSync(std::shared_ptr<StreamConnection<Address, Type>> &client, int flags)
 	{
-		bool remove{false};
-
 		try {
-			if (flags & FlagRead) {
-				auto buffer = client->socket().recv(512);
-
-				/* Empty mean normal disconnection */
-				if (buffer.empty()) {
-					remove = true;
-				} else {
-					m_onRead(client, buffer);
-				}
+			auto action = client->socket().action();
+
+			if (action == Action::Receive || (action == Action::None && (flags & FlagRead))) {
+				processRead(client);
 			} else if (flags & FlagWrite) {
-				auto &output = client->output();
-				auto nsent = client->socket().send(output);
-
-				/* 1. Create a copy of content that has been sent */
-				auto sent = output.substr(0, nsent);
-
-				/* 2. Erase the content sent */
-				output.erase(0, nsent);
-
-				/* 3. Update listener */
-				if (output.empty()) {
-					m_listener.unset(client->socket().handle(), FlagWrite);
-				}
-
-				/* 4. Notify user */
-				m_onWrite(client, sent);
+				processWrite(client);
 			}
 		} catch (const Error &error) {
-			remove = true;
-			m_onError(error);
-		}
-
-		if (remove) {
 			m_onDisconnection(client);
 			m_listener.remove(client->socket().handle());
 			m_clients.erase(client->socket().handle());
@@ -2733,9 +3039,10 @@
 	 * @param max the max number to listen
 	 * @throw Error on errors
 	 */
-	StreamServer(const Address &address, int max = 128)
-		: m_master{address.domain(), SOCK_STREAM, 0}
+	StreamServer(const Address &address, Type type = {}, int max = 128)
+		: m_master{address, std::move(type)}
 	{
+		// TODO: m_onError
 		m_master.set(SOL_SOCKET, SO_REUSEADDR, 1);
 		m_master.bind(address);
 		m_master.listen(max);
@@ -2815,7 +3122,7 @@
 
 			if (st.socket == m_master.handle()) {
 				/* New client */
-				processAccept();
+				processInitialAccept();
 			} else {
 				/* Recv / Send / Accept on a client */
 				auto client = m_clients[st.socket];
@@ -2823,7 +3130,7 @@
 				if (client->socket().state() == State::Accepted) {
 					processSync(client, st.flags);
 				} else {
-					processAccept(client, true);
+					processAccept(client, [&] () { client->socket().accept(); });
 				}
 			}
 		} catch (const Error &error) {
@@ -2836,18 +3143,52 @@
 	}
 };
 
+/* }}} */
+
+/*
+ * StreamClient
+ * ------------------------------------------------------------------
+ */
+
+/* {{{ StreamClient */
+
 /**
  * @class StreamClient
  * @brief Client side connection to a server.
+ *
+ * This class is not thread safe and you must not call any of the functions from different threads.
  */
 template <typename Address, typename Type>
 class StreamClient {
 public:
+	/**
+	 * Handler when connection is complete.
+	 */
 	using ConnectionHandler = Callback<>;
+
+	/**
+	 * Handler when data has been received.
+	 */
 	using ReadHandler = Callback<const std::string &>;
+
+	/**
+	 * Handler when data has been sent correctly.
+	 */
 	using WriteHandler = Callback<const std::string &>;
+
+	/**
+	 * Handler when disconnected.
+	 */
 	using DisconnectionHandler = Callback<>;
+
+	/**
+	 * Handler on unrecoverable error.
+	 */
 	using ErrorHandler = Callback<const Error &>;
+
+	/**
+	 * Handler when timeout occured.
+	 */
 	using TimeoutHandler = Callback<>;
 
 private:
@@ -2867,6 +3208,23 @@
 	std::string m_output;
 
 	/*
+	 * Update the flags after an uncompleted operation. This function must only be called when the operation
+	 * has not complete (e.g. connect, recv, send).
+	 */
+	void updateFlags()
+	{
+		assert(m_socket.action() != Action::None);
+
+		m_listener.remove(m_socket.handle());
+
+		if (m_socket.condition() == Condition::Readable) {
+			m_listener.set(m_socket.handle(), FlagRead);
+		} else {
+			m_listener.set(m_socket.handle(), FlagWrite);
+		}
+	}
+
+	/*
 	 * This is the generic connect helper, it will be used to both initiate the connection or to continue the
 	 * connection process if needed.
 	 *
@@ -2884,37 +3242,54 @@
 		/* Remove entirely */
 		m_listener.remove(m_socket.handle());
 
-		switch (m_socket.state()) {
-		case State::ConnectingWrite:
-			m_listener.set(m_socket.handle(), FlagWrite);
-			break;
-		case State::Connected:
-			/* Connection complete? */
+		if (m_socket.state() == State::Connected) {
 			m_onConnection();
-
-			/* FALLTHROUGH */
-		case State::ConnectingRead:
 			m_listener.set(m_socket.handle(), FlagRead);
-			break;
-		default:
-			break;
+		} else {
+			/* Connection still in progress */
+			updateFlags();
 		}
 	}
 
-	void processSync(int flags)
+	/*
+	 * Receive or complete the receive command, if the command is not complete, the listener is updated
+	 * accordingly.
+	 */
+	void processRead()
 	{
-		if (flags & FlagRead) {
-			auto received = m_socket.recv(512);
-
+		auto received = m_socket.recv(512);
+
+		if (m_socket.action() == Action::None) {
 			/* 0 means disconnection */
 			if (received.empty()) {
 				m_onDisconnection();
 			} else {
+				/*
+				 * At this step, it is possible that we were completing a receive operation, in this
+				 * case the write flag may be removed, add it if required.
+				 */
+				if (!m_output.empty()) {
+					m_listener.set(m_socket.handle(), FlagWrite);
+				}
+
 				m_onRead(received);
 			}
 		} else {
+			/* Receive operation in progress */
+			updateFlags();
+		}
+	}
+
+	/*
+	 * Send or complete the send command, if the command is not complete, the listener is updated
+	 * accordingly.
+	 */
+	void processWrite()
+	{
+		auto nsent = m_socket.send(m_output);
+
+		if (m_socket.action() == Action::None) {
 			/* 1. Make a copy of what has been sent */
-			auto nsent = m_socket.send(m_output);
 			auto sent = m_output.substr(0, nsent);
 
 			/* 2. Erase sent content */
@@ -2927,6 +3302,21 @@
 
 			/* 4. Notify user */
 			m_onWrite(sent);
+		} else {
+			/* Send operation in progress */
+			updateFlags();
+		}
+	}
+
+	/*
+	 * Receive or send.
+	 */
+	void processSync(int flags)
+	{
+		if ((m_socket.action() == Action::Receive) || (m_socket.action() == Action::None && (flags & FlagRead))) {
+			processRead();
+		} else {
+			processWrite();
 		}
 	}
 
@@ -3000,7 +3390,6 @@
 	 * will be called when the connection completed.
 	 *
 	 * @param address the address to connect to
-	 * @throw Error on failures
 	 */
 	void connect(const Address &address) noexcept
 	{
@@ -3018,7 +3407,8 @@
 	{
 		m_output += str;
 
-		if (m_socket.state() == State::Connected && !m_output.empty()) {
+		/* Don't update the listener if there is a pending operation */
+		if (m_socket.state() == State::Connected && m_socket.action() == Action::None && !m_output.empty()) {
 			m_listener.set(m_socket.handle(), FlagWrite);
 		}
 	}