changeset 280:91eb0583df52

Socket: * Fix select method that require fdset's to be rebuild at each iteration, * While here always create a m_interface Task: #306
author David Demelier <markand@malikania.fr>
date Fri, 24 Oct 2014 18:28:39 +0200
parents af630354610f
children 88f9d8b406c6 ea55a3886da0
files C++/Socket.cpp C++/Socket.h C++/SocketListener.cpp C++/Tests/Sockets/main.cpp
diffstat 4 files changed, 174 insertions(+), 82 deletions(-) [+]
line wrap: on
line diff
--- a/C++/Socket.cpp	Fri Oct 24 10:28:17 2014 +0200
+++ b/C++/Socket.cpp	Fri Oct 24 18:28:39 2014 +0200
@@ -202,7 +202,7 @@
 	// Usually accept works only with SOCK_STREAM
 	info = SocketAddress(address, addrlen);
 
-	return Socket(handle);
+	return Socket(handle, std::make_shared<Standard>());
 }
 
 void Standard::listen(Socket &s, int max)
@@ -316,8 +316,13 @@
  * Socket code
  * -------------------------------------------------------- */
 
+Socket::Socket()
+	: m_interface(std::make_shared<Standard>())
+{
+}
+
 Socket::Socket(int domain, int type, int protocol)
-	: m_interface(std::make_unique<Standard>())
+	: Socket()
 {
 	m_handle = socket(domain, type, protocol);
 
@@ -325,8 +330,8 @@
 		throw error::Failure("socket", syserror());
 }
 
-Socket::Socket(Handle handle)
-	: m_interface(std::make_shared<Standard>())
+Socket::Socket(Handle handle, std::shared_ptr<SocketInterface> interface)
+	: m_interface(std::move(interface))
 	, m_handle(handle)
 {
 }
--- a/C++/Socket.h	Fri Oct 24 10:28:17 2014 +0200
+++ b/C++/Socket.h	Fri Oct 24 18:28:39 2014 +0200
@@ -279,7 +279,7 @@
 	/**
 	 * Default constructor.
 	 */
-	Socket() = default;
+	Socket();
 
 	/**
 	 * Constructor to create a new socket.
@@ -295,8 +295,9 @@
 	 * Create a socket object with a already initialized socket.
 	 *
 	 * @param handle the handle
+	 * @param interface the interface to use
 	 */
-	Socket(Handle handle);
+	Socket(Handle handle, std::shared_ptr<SocketInterface> interface);
 
 	/**
 	 * Close the socket.
--- a/C++/SocketListener.cpp	Fri Oct 24 10:28:17 2014 +0200
+++ b/C++/SocketListener.cpp	Fri Oct 24 18:28:39 2014 +0200
@@ -19,7 +19,7 @@
 #include <algorithm>
 #include <map>
 #include <set>
-#include <tuple>
+#include <utility>
 #include <vector>
 
 #include "SocketListener.h"
@@ -36,17 +36,9 @@
  */
 class SelectMethod final : public SocketListener::Interface {
 private:
-	std::set<Socket> m_rsockets;
-	std::set<Socket> m_wsockets;
-	std::map<Socket::Handle, std::tuple<Socket, int>> m_lookup;
-
-	fd_set m_readset;
-	fd_set m_writeset;
-
-	Socket::Handle m_max { 0 };
+	std::map<Socket::Handle, std::pair<Socket, int>> m_table;
 
 public:
-	SelectMethod();
 	void add(Socket s, int direction) override;
 	void remove(const Socket &s, int direction) override;
 	void list(const SocketListener::MapFunc &func) override;
@@ -56,76 +48,39 @@
 	std::vector<SocketStatus> selectMultiple(int ms) override;
 };
 
-SelectMethod::SelectMethod()
-{
-	FD_ZERO(&m_readset);
-	FD_ZERO(&m_writeset);
-}
-
 void SelectMethod::add(Socket s, int direction)
 {
-	if (m_lookup.count(s.handle()) > 0)
-		std::get<1>(m_lookup[s.handle()]) |= direction;
+	if (m_table.count(s.handle()) > 0)
+		m_table[s.handle()].second |= direction;
 	else
-		m_lookup[s.handle()] = std::make_tuple(s, direction);
-
-	if (direction & Read) {
-		m_rsockets.insert(s);
-		FD_SET(s.handle(), &m_readset);
-	}
-	if (direction & Write) {
-		m_wsockets.insert(s);
-		FD_SET(s.handle(), &m_writeset);
-	}
-
-	if (s.handle() > m_max)
-		m_max = s.handle();
+		m_table[s.handle()] = { s, direction };
 }
 
 void SelectMethod::remove(const Socket &s, int direction)
 {
-	std::get<1>(m_lookup[s.handle()]) &= ~(direction);
+	if (m_table.count(s.handle()) != 0) {
+		m_table[s.handle()].second &= ~(direction);
 
-	if (static_cast<int>(std::get<1>(m_lookup[s.handle()])) == 0)
-		m_lookup.erase(s.handle());
-
-	if (direction & Read) {
-		m_rsockets.erase(s.handle());
-		FD_CLR(s.handle(), &m_readset);
+		// If no read, no write is requested, remove it
+		if (m_table[s.handle()].second == 0)
+			m_table.erase(s.handle());
 	}
-	if (direction & Write) {
-		m_wsockets.erase(s.handle());
-		FD_CLR(s.handle(), &m_writeset);
-	}
-
-	// Refind the max file descriptor
-	if (m_lookup.size() > 0) {
-		m_max = std::get<0>(std::max_element(m_lookup.begin(), m_lookup.end())->second).handle();
-	} else
-		m_max = 0;
 }
 
 void SelectMethod::list(const SocketListener::MapFunc &func)
 {
-	for (auto &s : m_lookup)
-		func(std::get<0>(s.second), std::get<1>(s.second));
+	for (auto &s : m_table)
+		func(s.second.first, s.second.second);
 }
 
 void SelectMethod::clear()
 {
-	m_rsockets.clear();
-	m_wsockets.clear();
-	m_lookup.clear();
-
-	FD_ZERO(&m_readset);
-	FD_ZERO(&m_writeset);
-
-	m_max = 0;
+	m_table.clear();
 }
 
 unsigned SelectMethod::size() const
 {
-	return m_lookup.size();
+	return m_table.size();
 }
 
 SocketStatus SelectMethod::select(int ms)
@@ -141,6 +96,23 @@
 std::vector<SocketStatus> SelectMethod::selectMultiple(int ms)
 {
 	timeval maxwait, *towait;
+	fd_set readset;
+	fd_set writeset;
+
+	FD_ZERO(&readset);
+	FD_ZERO(&writeset);
+
+	Socket::Handle max = 0;
+
+	for (auto &s : m_table) {
+		if (s.second.second & Read)
+			FD_SET(s.first, &readset);
+		if (s.second.second & Write)
+			FD_SET(s.first, &writeset);
+
+		if (s.first > max)
+			max = s.first;
+	}
 
 	maxwait.tv_sec = 0;
 	maxwait.tv_usec = ms * 1000;
@@ -148,19 +120,20 @@
 	// Set to nullptr for infinite timeout.
 	towait = (ms <= 0) ? nullptr : &maxwait;
 
-	auto error = ::select(m_max + 1, &m_readset, &m_writeset, NULL, towait);
+	auto error = ::select(max + 1, &readset, &writeset, nullptr, towait);
 	if (error == SOCKET_ERROR)
 		throw error::Failure("select", Socket::syserror());
 	if (error == 0)
 		throw error::Timeout("select");
 
 	std::vector<SocketStatus> sockets;
-	for (auto &c : m_lookup)
-		if (FD_ISSET(c.first, &m_readset))
-			sockets.push_back({ std::get<0>(c.second), Read });
-	for (auto &c : m_lookup)
-		if (FD_ISSET(c.first, &m_writeset))
-			sockets.push_back({ std::get<0>(c.second), Write });
+
+	for (auto &c : m_table) {
+		if (FD_ISSET(c.first, &readset))
+			sockets.push_back({ c.second.first, Read });
+		if (FD_ISSET(c.first, &writeset))
+			sockets.push_back({ c.second.first, Write });
+	}
 
 	return sockets;
 }
@@ -235,7 +208,7 @@
 void PollMethod::remove(const Socket &s, int direction)
 {
 	for (auto i = m_fds.begin(); i != m_fds.end();) {
-		if (i->fd == s) {
+		if (i->fd == s.handle()) {
 			i->events &= ~(topoll(direction));
 
 			if (i->events == 0) {
--- a/C++/Tests/Sockets/main.cpp	Fri Oct 24 10:28:17 2014 +0200
+++ b/C++/Tests/Sockets/main.cpp	Fri Oct 24 18:28:39 2014 +0200
@@ -30,6 +30,7 @@
 
 using namespace std::literals::chrono_literals;
 
+using namespace error;
 using namespace address;
 
 /* --------------------------------------------------------
@@ -86,6 +87,62 @@
  * Select tests
  * -------------------------------------------------------- */
 
+TEST(ListenerMethodSelect, timeout)
+{
+	std::thread server([] () {
+		Socket s, client;
+		SocketListener listener(Select);
+		bool running = true;
+		int tries = 0;
+
+		try {
+			s = { AF_INET, SOCK_STREAM, 0 };
+			s.set(SOL_SOCKET, SO_REUSEADDR, 1);
+			s.bind(Internet{"*", 10000, AF_INET});
+			s.listen(10);
+
+			listener.add(s, Read);
+
+			while (running) {
+				try {
+					listener.select(500ms);
+					client = s.accept();
+					running = false;
+
+					// Abort if no client connected
+					if (tries >= 10)
+						running = false;
+				} catch (const Timeout &) {
+					puts("DEBUG: TIMEOUT");
+				}
+			}
+		} catch (const std::exception &ex) {
+			std::cerr << "warning: " << ex.what() << std::endl;
+		}
+
+		s.close();
+		client.close();
+	});
+
+	std::thread client([] () {
+		std::this_thread::sleep_for(2s);
+
+		Socket s;
+
+		try {
+			s = { AF_INET, SOCK_STREAM, 0 };
+			s.connect(Internet{"localhost", 10000, AF_INET});
+		} catch (const std::exception &ex) {
+			std::cerr << "warning: " << ex.what() << std::endl;
+		}
+
+		s.close();
+	});
+
+	server.join();
+	client.join();
+}
+
 TEST(ListenerMethodSelect, add)
 {
 	Socket s, s2;
@@ -93,7 +150,7 @@
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
 		s2 = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Select);
+		SocketListener listener(Select);
 
 		listener.add(s, Read);
 		listener.add(s2, Read);
@@ -114,7 +171,7 @@
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
 		s2 = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Select);
+		SocketListener listener(Select);
 
 		listener.add(s, Read);
 		listener.add(s2, Read);
@@ -141,7 +198,7 @@
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
 		s2 = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Select);
+		SocketListener listener(Select);
 
 		listener.add(s, Read | Write);
 		listener.add(s2, Read | Write);
@@ -179,7 +236,7 @@
 
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Select);
+		SocketListener listener(Select);
 
 		listener.add(s, Read);
 		ASSERT_EQ(1UL, listener.size());
@@ -216,6 +273,62 @@
  * Poll tests
  * -------------------------------------------------------- */
 
+TEST(ListenerMethodPoll, timeout)
+{
+	std::thread server([] () {
+		Socket s, client;
+		SocketListener listener(Poll);
+		bool running = true;
+		int tries = 0;
+
+		try {
+			s = { AF_INET, SOCK_STREAM, 0 };
+			s.set(SOL_SOCKET, SO_REUSEADDR, 1);
+			s.bind(Internet{"*", 10000, AF_INET});
+			s.listen(10);
+
+			listener.add(s, Read);
+
+			while (running) {
+				try {
+					listener.select(500ms);
+					client = s.accept();
+					running = false;
+
+					// Abort if no client connected
+					if (tries >= 10)
+						running = false;
+				} catch (const Timeout &) {
+					puts("DEBUG: TIMEOUT");
+				}
+			}
+		} catch (const std::exception &ex) {
+			std::cerr << "warning: " << ex.what() << std::endl;
+		}
+
+		s.close();
+		client.close();
+	});
+
+	std::thread client([] () {
+		std::this_thread::sleep_for(2s);
+
+		Socket s;
+
+		try {
+			s = { AF_INET, SOCK_STREAM, 0 };
+			s.connect(Internet{"localhost", 10000, AF_INET});
+		} catch (const std::exception &ex) {
+			std::cerr << "warning: " << ex.what() << std::endl;
+		}
+
+		s.close();
+	});
+
+	server.join();
+	client.join();
+}
+
 TEST(ListenerMethodPoll, add)
 {
 	Socket s, s2;
@@ -223,7 +336,7 @@
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
 		s2 = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Poll);
+		SocketListener listener(Poll);
 
 		listener.add(s, Read);
 		listener.add(s2, Read);
@@ -244,7 +357,7 @@
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
 		s2 = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Poll);
+		SocketListener listener(Poll);
 
 		listener.add(s, Read);
 		listener.add(s2, Read);
@@ -267,7 +380,7 @@
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
 		s2 = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Poll);
+		SocketListener listener(Poll);
 
 		listener.add(s, Read | Write);
 		listener.add(s2, Read | Write);
@@ -305,7 +418,7 @@
 
 	try {
 		s = { AF_INET, SOCK_STREAM, 0 };
-		SocketListener listener(SocketMethod::Poll);
+		SocketListener listener(Poll);
 
 		listener.add(s, Read);
 		ASSERT_EQ(1UL, listener.size());
@@ -612,7 +725,7 @@
 		Socket master;
 
 		try {
-			SocketListener masterListener(SocketMethod::Poll), clientListener(SocketMethod::Poll);
+			SocketListener masterListener(Poll), clientListener(Poll);
 			master = { AF_INET, SOCK_STREAM, 0 };
 
 			master.set(SOL_SOCKET, SO_REUSEADDR, 1);