changeset 452:7781bb45e749

Socket: experimental Stream(Server|Client)
author David Demelier <markand@malikania.fr>
date Mon, 02 Nov 2015 14:04:26 +0100
parents 902b034df6e3
children 2d95f0c8fd1d
files C++/modules/Socket/Sockets.h
diffstat 1 files changed, 410 insertions(+), 59 deletions(-) [+]
line wrap: on
line diff
--- a/C++/modules/Socket/Sockets.h	Mon Nov 02 13:46:56 2015 +0100
+++ b/C++/modules/Socket/Sockets.h	Mon Nov 02 14:04:26 2015 +0100
@@ -2161,7 +2161,7 @@
 	inline void clear()
 	{
 		while (!m_table.empty()) {
-			remove(*m_table.begin());
+			remove(m_table.begin()->first);
 		}
 	}
 
@@ -2225,94 +2225,221 @@
 
 /* }}} */
 
-#if 0
-
-/*
- * High level helpers coming soon.
+/**
+ * @class Callback
+ * @brief Convenient signal owner that checks if the target is valid.
+ *
+ * This class also catch all errors thrown from signals to avoid interfering with our process.
  */
-
-template <typename Address, typename Type>
-class StreamServer;
-
+template <typename... Args>
+class Callback : public std::function<void (Args...)> {
+public:
+	/**
+	 * Inherited constructors.
+	 */
+	using std::function<void (Args...)>::function;
+
+	/**
+	 * Execute the callback only if a target is set.
+	 */
+	void operator()(Args... args) const
+	{
+		if (*this) {
+			try {
+				std::function<void (Args...)>::operator()(args...);
+			} catch (...) {
+			}
+		}
+	}
+};
+
+/**
+ * @class StreamConnection
+ * @brief Connected client on the server side.
+ *
+ * This object is created from StreamServer when a new client is connected, it is the higher
+ * level object of sockets and completely asynchronous.
+ */
 template <typename Address, typename Type>
 class StreamConnection {
+public:
+	using WriteHandler = Callback<>;
+
 private:
+	/* Signals */
+	WriteHandler m_onWrite;
+
+	/* Sockets and output buffer */
 	Socket<Address, Type> m_socket;
 	std::string m_output;
 
 public:
+	/**
+	 * Create the connection.
+	 *
+	 * @param s the socket
+	 */
 	StreamConnection(Socket<Address, Type> s)
 		: m_socket{std::move(s)}
 	{
+		m_socket.setBlockMode(false);
 	}
 
+	/**
+	 * Access the underlying socket.
+	 *
+	 * @return the socket
+	 * @warning use with care
+	 */
 	inline Socket<Address, Type> &socket() noexcept
 	{
 		return m_socket;
 	}
 
-	const std::string &output() const noexcept
+	/**
+	 * Access the current output.
+	 *
+	 * @return the output
+	 */
+	inline const std::string &output() const noexcept
+	{
+		return m_output;
+	}
+
+	/**
+	 * Overloaded function
+	 *
+	 * @return the output
+	 * @warning use with care, avoid modifying the output if you don't know what you're doing
+	 */
+	inline std::string &output() noexcept
 	{
 		return m_output;
 	}
+
+	/**
+	 * Post some data to be sent asynchronously.
+	 *
+	 * @param str the data to append
+	 */
+	inline void send(std::string str)
+	{
+		m_output += str;
+		m_onWrite();
+	}
+
+	/**
+	 * Kill the client.
+	 */
+	inline void close()
+	{
+		m_socket.close();
+	}
+
+	/**
+	 * Set the write handler, the signal is emitted when the output has changed so that the StreamServer owner
+	 * knows that there are some data to send.
+	 *
+	 * @param handler the handler
+	 * @warning you usually never need to set this yourself
+	 */
+	inline void setWriteHandler(WriteHandler handler)
+	{
+		m_onWrite = std::move(handler);
+	}
 };
 
+/**
+ * @class StreamServer
+ * @brief Convenient stream server for TCP and TLS.
+ *
+ * This class does all the things for you as accepting new clients, listening for it and sending data. It works
+ * asynchronously without blocking to let you control your process workflow.
+ */
 template <typename Address, typename Type>
 class StreamServer {
+public:
+	using ConnectionHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &>;
+	using DisconnectionHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &>;
+	using ReadHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &, const std::string &>;
+	using WriteHandler = Callback<const std::shared_ptr<StreamConnection<Address, Type>> &, const std::string &>;
+
 private:
+	using ClientMap = std::map<Handle, std::shared_ptr<StreamConnection<Address, Type>>>;
+
+	/* Signals */
+	ConnectionHandler m_onConnection;
+	DisconnectionHandler m_onDisconnection;
+	ReadHandler m_onRead;
+	WriteHandler m_onWrite;
+
+	/* Sockets */
 	Socket<Address, Type> m_master;
 	Listener<> m_listener;
-	std::map<Handle, StreamConnection<Address, Type>> m_clients;
-
-	/* Callbacks */
-	template <typename... Args>
-	using Callbacks = std::vector<std::function<void (Args...)>>;
-
-	Callbacks<StreamConnection<Address, Type> &> m_onConnection;
-	Callbacks<StreamConnection<Address, Type> &> m_onDisconnection;
-	Callbacks<StreamConnection<Address, Type> &, const std::string &> m_onRead;
-	Callbacks<StreamConnection<Address, Type> &, const std::string &> m_onWrite;
-
-	template <typename List, typename... Args>
-	void notify(const List &list, Args&&... args)
-	{
-		for (const auto &f : list) {
-			f(std::forward<Args>(args)...);
-		}
-	}
+	ClientMap m_clients;
 
 	void processAccept()
 	{
-		StreamConnection<Address, Type> connection{m_master.accept()};
+		std::shared_ptr<StreamConnection<Address, Type>> connection = std::make_shared<StreamConnection<Address, Type>>(m_master.accept());
+		std::weak_ptr<StreamConnection<Address, Type>> ptr{connection};
+
+		/* Update the listener write flag when output has changed */
+		connection->setWriteHandler([this, ptr] () {
+			auto connection = ptr.lock();
+
+			if (connection && !connection->output().empty()) {
+				m_listener.set(connection->socket().handle(), FlagWrite);
+			}
+		});
 
 		/* Notify we have a new client */
-		notify(m_onConnection, connection);
-
-		m_listener.set(connection.socket().handle(), FlagRead);
-		m_clients.emplace(connection.socket().handle(), std::move(connection));
+		m_onConnection(connection);
+
+		m_listener.set(connection->socket().handle(), FlagRead);
+		m_clients.emplace(connection->socket().handle(), std::move(connection));
 	}
 
 	void processSync(const ListenerStatus &status)
 	{
-		auto &connection = m_clients.at(status.socket);
+		auto connection = m_clients.at(status.socket);
+		auto remove = false;
 
 		try {
 			if (status.flags & FlagRead) {
-				auto buffer = connection.socket().recv(512);
+				auto buffer = connection->socket().recv(512);
 
 				/* Empty mean normal disconnection */
 				if (buffer.empty()) {
-					notify(m_onDisconnection, connection);
-					m_listener.remove(status.socket);
-					m_clients.erase(status.socket);
+					remove = true;
 				} else {
-					notify(m_onRead, connection, buffer);
+					m_onRead(connection, buffer);
 				}
 			} else if (status.flags & FlagWrite) {
-				/* Process write */
+				auto &output = connection->output();
+				auto nsent = connection->socket().send(output);
+
+				/* 1. Create a copy of content that has been sent */
+				auto sent = output.substr(0, nsent);
+
+				/* 2. Erase the content sent */
+				output.erase(0, nsent);
+
+				/* 3. Update listener */
+				if (output.empty()) {
+					m_listener.unset(connection->socket().handle(), FlagWrite);
+				}
+
+				/* 4. Notify user */
+				m_onWrite(connection, sent);
 			}
 		} catch (const std::exception &ex) {
-			/* TODO: remove */
+			remove = true;
+		}
+
+		if (remove) {
+			m_onDisconnection(connection);
+			m_listener.remove(status.socket);
+			m_clients.erase(status.socket);
 		}
 	}
 
@@ -2324,8 +2451,7 @@
 	 * @param max the max number to listen
 	 * @throw Error on errors
 	 */
-	template <typename RealAddress>
-	StreamServer(const RealAddress &address, int max = 128)
+	StreamServer(const Address &address, int max = 128)
 		: m_master{address.domain(), SOCK_STREAM, 0}
 	{
 		m_master.set(SOL_SOCKET, SO_REUSEADDR, 1);
@@ -2334,31 +2460,53 @@
 		m_listener.set(m_master.handle(), FlagRead);
 	}
 
-	template <typename Func>
-	inline void addConnectionHandler(Func &&f)
+	/**
+	 * Set the connection handler, called when a new client is connected.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setConnectionHandler(ConnectionHandler handler)
 	{
-		m_onConnection.push_back(std::forward<Func>(f));
+		m_onConnection = std::move(handler);
 	}
 
-	template <typename Func>
-	inline void addDisconnectionHandler(Func &&f)
+	/**
+	 * Set the disconnection handler, called when a client died.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setDisconnectionHandler(DisconnectionHandler handler)
 	{
-		m_onDisconnection.push_back(std::forward<Func>(f));
+		m_onDisconnection = std::move(handler);
 	}
 
-	template <typename Func>
-	inline void addReadHandler(Func &&f)
+	/**
+	 * Set the receive handler, called when a client has sent something.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setReadHandler(ReadHandler handler)
 	{
-		m_onRead.push_back(std::forward<Func>(f));
+		m_onRead = std::move(handler);
 	}
 
-	template <typename Func>
-	inline void addWriteHandler(Func &&f)
+	/**
+	 * Set the writing handler, called when some data has been sent to a client.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setWriteHandler(WriteHandler handler)
 	{
-		m_onWrite.push_back(std::forward<Func>(f));
+		m_onWrite = std::move(handler);
 	}
 
-	void poll(int timeout)
+	/**
+	 * Poll for the next event.
+	 *
+	 * @param timeout the timeout (-1 for indefinitely)
+	 * @throw Error on errors
+	 */
+	void poll(int timeout = -1)
 	{
 		auto st = m_listener.wait(timeout);
 
@@ -2369,13 +2517,216 @@
 		}
 	}
 
-	inline int wait()
+	/**
+	 * Run forever.
+	 *
+	 * @throw Error on errors
+	 */
+	void run()
 	{
-		poll(-1);
+		for (;;) {
+			poll();
+		}
 	}
 };
 
-#endif
+/**
+ * @class StreamClient
+ * @brief Client side connection to a server.
+ */
+template <typename Address, typename Type>
+class StreamClient {
+public:
+	using ConnectionHandler = Callback<>;
+	using ReadHandler = Callback<const std::string &>;
+	using WriteHandler = Callback<const std::string &>;
+	using DisconnectionHandler = Callback<>;
+
+private:
+	/* Signals */
+	ConnectionHandler m_onConnection;
+	ReadHandler m_onRead;
+	WriteHandler m_onWrite;
+	DisconnectionHandler m_onDisconnection;
+
+	/* Socket */
+	Socket<Address, Type> m_socket;
+	Listener<> m_listener;
+
+	/* Connection status and output buffer */
+	bool m_connected{false};
+	std::string m_output;
+
+	void processConnect()
+	{
+		/* 1. Remove from listener the write flag */
+		m_listener.unset(m_socket.handle(), FlagWrite);
+
+		/* 2. Check for an error */
+		if (m_socket.template get<int>(SOL_SOCKET, SO_ERROR) == Failure) {
+			// TODO: ????? m_onDisconnection();
+		}
+
+		/* 3. Listen for input */
+		m_listener.set(m_socket.handle(), FlagRead);
+
+		/* 4. Notify */
+		m_connected = true;
+		m_onConnection();
+	}
+
+	void processSync(int flags)
+	{
+		try {
+			if (flags & FlagRead) {
+				m_onRead(m_socket.recv(512));
+			} else {
+				/* 1. Make a copy of what has been sent */
+				auto nsent = m_socket.send(m_output);
+				auto sent = m_output.substr(0, nsent);
+
+				/* 2. Erase sent content */
+				m_output.erase(0, nsent);
+
+				/* 3. Update flags if needed */
+				if (m_output.empty()) {
+					m_listener.unset(m_socket.handle(), FlagWrite);
+				}
+
+				/* 4. Notify user */
+				m_onWrite(sent);
+			}
+		} catch (const std::exception &ex) {
+			m_listener.remove(m_socket.handle());
+			m_connected = false;
+		}
+	}
+
+public:
+	/**
+	 * Create a client. The client is automatically marked as non-blocking.
+	 *
+	 * @param address the optional address
+	 * @param type the type (Tcp or Tls)
+	 * @throw net::Error on failures
+	 */
+	StreamClient(const Address &address = {}, Type type = {})
+		: m_socket{address, std::move(type)}
+	{
+		m_socket.setBlockMode(false);
+	}
+
+	/**
+	 * Set the connection handler, called when the connection succeed.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setConnectionHandler(ConnectionHandler handler)
+	{
+		m_onConnection = std::move(handler);
+	}
+
+	/**
+	 * Set the disconnection handler, called when the server closed the connection.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setDisconnectionHandler(DisconnectionHandler handler)
+	{
+		m_onDisconnection = std::move(handler);
+	}
+
+	/**
+	 * Set the read handler, called when we received something.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setReadHandler(ReadHandler handler)
+	{
+		m_onRead = std::move(handler);
+	}
+
+	/**
+	 * Set the write handler, called when we successfully sent data.
+	 *
+	 * @param handler the handler
+	 */
+	inline void setWriteHandler(WriteHandler handler)
+	{
+		m_onWrite = std::move(handler);
+	}
+
+	/**
+	 * Connect to a server, this function may connect immediately or not in any case the connection handler
+	 * will be called when the connection completed.
+	 *
+	 * @param address the address to connect to
+	 * @throw Error on failures
+	 */
+	void connect(const Address &address)
+	{
+		if (m_connected) {
+			return;
+		}
+
+		try {
+			m_socket.connect(address);
+			m_connected = true;
+			m_onConnection();
+		} catch (const Error &error) {
+			if (error.code() == Error::WouldBlockRead) {
+				m_listener.set(m_socket.handle(), FlagRead);
+			} else if (error.code() == Error::WouldBlockWrite) {
+				m_listener.set(m_socket.handle(), FlagWrite);
+			} else {
+				throw;
+			}
+		}
+	}
+
+	/**
+	 * Asynchronously send data to the server.
+	 *
+	 * @param str the data to append
+	 */
+	void send(std::string str)
+	{
+		m_output += str;
+
+		if (!m_output.empty()) {
+			m_listener.set(m_socket.handle(), FlagWrite);
+		}
+	}
+
+	/**
+	 * Wait for the next event.
+	 *
+	 * @param timeout the time to wait in milliseconds
+	 * @throw Error on errors
+	 */
+	void poll(int timeout = -1)
+	{
+		auto st = m_listener.wait(timeout);
+
+		if (!m_connected) {
+			processConnect();
+		} else {
+			processSync(st.flags);
+		}
+	}
+
+	/**
+	 * Run indefinitely.
+	 *
+	 * @throw Error on errors
+	 */
+	void run()
+	{
+		for (;;) {
+			poll();
+		}
+	}
+};
 
 } // !net