changeset 462:9d53c536372e

Socket: - Remove run() in both StreamServer and StreamClient, - StreamServer understands new accept process, - StreamClient understands new connect process.
author David Demelier <markand@malikania.fr>
date Wed, 04 Nov 2015 10:50:23 +0100
parents 41d1a36cc461
children 214f03b47d4e
files C++/modules/Socket/Sockets.h
diffstat 1 files changed, 192 insertions(+), 114 deletions(-) [+]
line wrap: on
line diff
--- a/C++/modules/Socket/Sockets.h	Tue Nov 03 21:48:14 2015 +0100
+++ b/C++/modules/Socket/Sockets.h	Wed Nov 04 10:50:23 2015 +0100
@@ -2601,6 +2601,8 @@
 	using DisconnectionHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &>;
 	using ReadHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &, const std::string &>;
 	using WriteHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &, const std::string &>;
+	using ErrorHandler = Callback<const Error &>;
+	using TimeoutHandler = Callback<>;
 
 private:
 	using ClientMap = std::map<Handle, std::shared_ptr<StreamConnection<Address, Type>>>;
@@ -2610,51 +2612,92 @@
 	DisconnectionHandler m_onDisconnection;
 	ReadHandler m_onRead;
 	WriteHandler m_onWrite;
+	ErrorHandler m_onError;
+	TimeoutHandler m_onTimeout;
 
 	/* Sockets */
 	Socket<Address, Type> m_master;
 	Listener<> m_listener;
 	ClientMap m_clients;
 
+	/*
+	 * Continue accept process. Set ready to true to attempt a call to accept() (e.g. after a select()).
+	 */
+	void processAccept(std::shared_ptr<StreamConnection<Address, Type>> &client, bool ready)
+	{
+		try {
+			if (ready) {
+				client->socket().accept();
+			}
+
+			/* 1. First remove completely the client */
+			m_listener.remove(client->socket().handle());
+
+			switch (client->socket().state()) {
+			case State::AcceptingWrite:
+				m_listener.set(client->socket().handle(), FlagWrite);
+				break;
+			case State::Accepted:
+			case State::AcceptingRead:
+				m_listener.set(client->socket().handle(), FlagRead);
+				break;
+			default:
+				break;
+			}
+
+			/* 2. Successfully accepted? */
+			if (client->socket().state() == State::Accepted) {
+				m_onConnection(client);
+			}
+		} catch (const Error &error) {
+			m_clients.erase(client->socket().handle());
+			m_listener.remove(client->socket().handle());
+			m_onError(error);
+		}
+	}
+
+	/*
+	 * Process accept of master socket.
+	 */
 	void processAccept()
 	{
-		std::shared_ptr<StreamConnection<Address, Type>> connection = std::make_shared<StreamConnection<Address, Type>>(m_master.accept());
-		std::weak_ptr<StreamConnection<Address, Type>> ptr{connection};
-
-		/* Update the listener write flag when output has changed */
-		connection->setWriteHandler([this, ptr] () {
-			auto connection = ptr.lock();
-
-			if (connection && !connection->output().empty()) {
-				m_listener.set(connection->socket().handle(), FlagWrite);
+		// TODO: store address too.
+		std::shared_ptr<StreamConnection<Address, Type>> client = std::make_shared<StreamConnection<Address, Type>>(m_master.accept(nullptr));
+		std::weak_ptr<StreamConnection<Address, Type>> ptr{client};
+
+		/* 1. Register output changed to update listener */
+		client->setWriteHandler([this, ptr] () {
+			auto client = ptr.lock();
+
+			if (client && !client->output().empty()) {
+				m_listener.set(client->socket().handle(), FlagWrite);
 			}
 		});
 
-		/* Notify we have a new client */
-		m_onConnection(connection);
-
-		m_listener.set(connection->socket().handle(), FlagRead);
-		m_clients.emplace(connection->socket().handle(), std::move(connection));
+		/* 2. Do initial accept or update flags */
+		processAccept(client, false);
+
+		/* 3. Add it if everything worked well */
+		m_clients.insert(std::make_pair(client->socket().handle(), std::move(client)));
 	}
 
-	void processSync(const ListenerStatus &status)
+	void processSync(std::shared_ptr<StreamConnection<Address, Type>> &client, int flags)
 	{
-		auto connection = m_clients.at(status.socket);
-		auto remove = false;
+		bool remove{false};
 
 		try {
-			if (status.flags & FlagRead) {
-				auto buffer = connection->socket().recv(512);
+			if (flags & FlagRead) {
+				auto buffer = client->socket().recv(512);
 
 				/* Empty mean normal disconnection */
 				if (buffer.empty()) {
 					remove = true;
 				} else {
-					m_onRead(connection, buffer);
+					m_onRead(client, buffer);
 				}
-			} else if (status.flags & FlagWrite) {
-				auto &output = connection->output();
-				auto nsent = connection->socket().send(output);
+			} else if (flags & FlagWrite) {
+				auto &output = client->output();
+				auto nsent = client->socket().send(output);
 
 				/* 1. Create a copy of content that has been sent */
 				auto sent = output.substr(0, nsent);
@@ -2664,20 +2707,21 @@
 
 				/* 3. Update listener */
 				if (output.empty()) {
-					m_listener.unset(connection->socket().handle(), FlagWrite);
+					m_listener.unset(client->socket().handle(), FlagWrite);
 				}
 
 				/* 4. Notify user */
-				m_onWrite(connection, sent);
+				m_onWrite(client, sent);
 			}
-		} catch (const std::exception &ex) {
+		} catch (const Error &error) {
 			remove = true;
+			m_onError(error);
 		}
 
 		if (remove) {
-			m_onDisconnection(connection);
-			m_listener.remove(status.socket);
-			m_clients.erase(status.socket);
+			m_onDisconnection(client);
+			m_listener.remove(client->socket().handle());
+			m_clients.erase(client->socket().handle());
 		}
 	}
 
@@ -2739,6 +2783,26 @@
 	}
 
 	/**
+	 * Set the error handler, called when unrecoverable error has occured.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setErrorHandler(ErrorHandler handler)
+	{
+		m_onError = std::move(handler);
+	}
+
+	/**
+	 * Set the timeout handler, called when the selection has timeout.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setTimeoutHandler(TimeoutHandler handler)
+	{
+		m_onTimeout = std::move(handler);
+	}
+
+	/**
 	 * Poll for the next event.
 	 *
 	 * @param timeout the timeout (-1 for indefinitely)
@@ -2746,24 +2810,28 @@
 	 */
 	void poll(int timeout = -1)
 	{
-		auto st = m_listener.wait(timeout);
-
-		if (st.socket == m_master.handle()) {
-			processAccept();
-		} else {
-			processSync(st);
-		}
-	}
-
-	/**
-	 * Run forever.
-	 *
-	 * @throw Error on errors
-	 */
-	void run()
-	{
-		for (;;) {
-			poll();
+		try {
+			auto st = m_listener.wait(timeout);
+
+			if (st.socket == m_master.handle()) {
+				/* New client */
+				processAccept();
+			} else {
+				/* Recv / Send / Accept on a client */
+				auto client = m_clients[st.socket];
+
+				if (client->socket().state() == State::Accepted) {
+					processSync(client, st.flags);
+				} else {
+					processAccept(client, true);
+				}
+			}
+		} catch (const Error &error) {
+			if (error.code() == Error::Timeout) {
+				m_onTimeout();
+			} else {
+				m_onError(error);
+			}
 		}
 	}
 };
@@ -2780,6 +2848,7 @@
 	using WriteHandler = Callback<const std::string &>;
 	using DisconnectionHandler = Callback<>;
 	using ErrorHandler = Callback<const Error &>;
+	using TimeoutHandler = Callback<>;
 
 private:
 	/* Signals */
@@ -2788,62 +2857,76 @@
 	WriteHandler m_onWrite;
 	DisconnectionHandler m_onDisconnection;
 	ErrorHandler m_onError;
+	TimeoutHandler m_onTimeout;
 
 	/* Socket */
 	Socket<Address, Type> m_socket;
-	Address m_address;
 	Listener<> m_listener;
 
-	/* Connection status and output buffer */
-	bool m_connected{false};
+	/* Output buffer */
 	std::string m_output;
 
-	void processConnect()
+	/*
+	 * This is the generic connect helper, it will be used to both initiate the connection or to continue the
+	 * connection process if needed.
+	 *
+	 * Thus the template parameter is the appropriate function to call either, m_socket.connect(address) or
+	 * m_socket.connect().
+	 *
+	 * See poll() and connect() to understand.
+	 */
+	template <typename ConnectCall>
+	void processConnect(const ConnectCall &connectFunc)
 	{
-		try {
-			m_socket.connect(m_address);
-			m_connected = true;
+		/* Call m_socket.connect() or m_socket.connect(address) */
+		connectFunc();
+
+		/* Remove entirely */
+		m_listener.remove(m_socket.handle());
+
+		switch (m_socket.state()) {
+		case State::ConnectingWrite:
+			m_listener.set(m_socket.handle(), FlagWrite);
+			break;
+		case State::Connected:
+			/* Connection complete? */
 			m_onConnection();
 
-			if (m_output.empty()) {
-				m_listener.unset(m_socket.handle(), FlagWrite);
-			}
-		} catch (const Error &error) {
-			if (error.code() == Error::WouldBlockRead) {
-				m_listener.set(m_socket.handle(), FlagRead);
-			} else if (error.code() == Error::WouldBlockWrite) {
-				m_listener.set(m_socket.handle(), FlagWrite);
-			} else {
-				m_onError(error);
-			}
+			/* FALLTHROUGH */
+		case State::ConnectingRead:
+			m_listener.set(m_socket.handle(), FlagRead);
+			break;
+		default:
+			break;
 		}
 	}
 
 	void processSync(int flags)
 	{
-		try {
-			if (flags & FlagRead) {
-				m_onRead(m_socket.recv(512));
+		if (flags & FlagRead) {
+			auto received = m_socket.recv(512);
+
+			/* 0 means disconnection */
+			if (received.empty()) {
+				m_onDisconnection();
 			} else {
-				/* 1. Make a copy of what has been sent */
-				auto nsent = m_socket.send(m_output);
-				auto sent = m_output.substr(0, nsent);
-
-				/* 2. Erase sent content */
-				m_output.erase(0, nsent);
-
-				/* 3. Update flags if needed */
-				if (m_output.empty()) {
-					m_listener.unset(m_socket.handle(), FlagWrite);
-				}
-
-				/* 4. Notify user */
-				m_onWrite(sent);
+				m_onRead(received);
 			}
-		} catch (const Error &error) {
-			m_listener.remove(m_socket.handle());
-			m_connected = false;
-			m_onError(error);
+		} else {
+			/* 1. Make a copy of what has been sent */
+			auto nsent = m_socket.send(m_output);
+			auto sent = m_output.substr(0, nsent);
+
+			/* 2. Erase sent content */
+			m_output.erase(0, nsent);
+
+			/* 3. Update flags if needed */
+			if (m_output.empty()) {
+				m_listener.unset(m_socket.handle(), FlagWrite);
+			}
+
+			/* 4. Notify user */
+			m_onWrite(sent);
 		}
 	}
 
@@ -2919,15 +3002,11 @@
 	 * @param address the address to connect to
 	 * @throw Error on failures
 	 */
-	void connect(const Address &address)
+	void connect(const Address &address) noexcept
 	{
-		if (m_connected) {
-			return;
-		}
-
-		m_address = address;
-
-		processConnect();
+		assert(m_socket.state() == State::Open);
+
+		processConnect([&] () { m_socket.connect(address); });
 	}
 
 	/**
@@ -2939,7 +3018,7 @@
 	{
 		m_output += str;
 
-		if (!m_output.empty()) {
+		if (m_socket.state() == State::Connected && !m_output.empty()) {
 			m_listener.set(m_socket.handle(), FlagWrite);
 		}
 	}
@@ -2950,26 +3029,25 @@
 	 * @param timeout the time to wait in milliseconds
 	 * @throw Error on errors
 	 */
-	void poll(int timeout = -1)
+	void poll(int timeout = -1) noexcept
 	{
-		auto st = m_listener.wait(timeout);
-
-		if (!m_connected) {
-			processConnect();
-		} else {
-			processSync(st.flags);
-		}
-	}
-
-	/**
-	 * Run indefinitely.
-	 *
-	 * @throw Error on errors
-	 */
-	void run()
-	{
-		for (;;) {
-			poll();
+		try {
+			auto st = m_listener.wait(timeout);
+
+			if (m_socket.state() != State::Connected) {
+				/* Continue the connection */
+				processConnect([&] () { m_socket.connect(); });
+			} else {
+				/* Read / Write */
+				processSync(st.flags);
+			}
+		} catch (const Error &error) {
+			if (error.code() == Error::Timeout) {
+				m_onTimeout();
+			} else {
+				m_listener.remove(m_socket.handle());
+				m_onError(error);
+			}
 		}
 	}
 };