changeset 350:5a1ec6603230

Socket: add new kqueue backend
author David Demelier <markand@malikania.fr>
date Tue, 07 Apr 2015 14:44:44 +0200
parents 3a1380b4428c
children 47a206e724f2
files C++/modules/Socket/SocketListener.cpp C++/modules/Socket/SocketListener.h C++/modules/Socket/SocketTcp.cpp C++/tests/Socket/main.cpp
diffstat 4 files changed, 233 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/C++/modules/Socket/SocketListener.cpp	Mon Apr 06 10:56:40 2015 +0200
+++ b/C++/modules/Socket/SocketListener.cpp	Tue Apr 07 14:44:44 2015 +0200
@@ -256,6 +256,128 @@
 	return sockets;
 }
 
-#endif // !_SOCKET_HAVE_POLL
+#endif // !SOCKET_HAVE_POLL
+
+/* --------------------------------------------------------
+ * Kqueue implementation
+ * -------------------------------------------------------- */
+
+#if defined(SOCKET_HAVE_KQUEUE)
+
+Kqueue::Kqueue()
+	: m_handle(nullptr, nullptr)
+{
+	int handle = kqueue();
+
+	if (handle < 0) {
+		throw SocketError(SocketError::System, "kqueue");
+	}
+
+	m_handle = std::unique_ptr<int, void (*)(int *)>(new int(handle), [] (int *p) {
+		(void)::close(*p);
+	});
+}
+
+std::vector<struct kevent>::iterator Kqueue::find(Socket &s)
+{
+	return std::find_if(m_list.begin(), m_list.end(), [&] (struct kevent &kv) -> bool {
+		return static_cast<Socket::Handle>(kv.ident) == s.handle();
+	});
+}
+
+void Kqueue::set(Socket &s, int direction)
+{
+	struct kevent ev;
+
+	if (direction == SocketListener::Read) {
+		EV_SET(&ev, s.handle(), EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &s);
+	} else if (direction == SocketListener::Write) {
+		EV_SET(&ev, s.handle(), EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &s);
+	}
+
+	auto it = find(s);
+
+	if (it == m_list.end()) {
+		m_list.push_back(ev);
+	} else {
+		*it = ev;
+	}
+
+	m_result.resize(m_list.size());
+}
+
+void Kqueue::unset(Socket &s, int direction)
+{
+	auto it = find(s);
+
+	if (it != m_list.end()) {
+		if (direction & SocketListener::Read) {
+			it->filter &= ~(EVFILT_READ);
+		}
+		if (direction & SocketListener::Write) {
+			it->filter &= ~(EVFILT_WRITE);
+		}
+
+		/* complete removal */
+		if ((it->filter & ~(direction)) == 0) {
+			m_list.erase(it);
+		}
+	}
+}
+
+void Kqueue::remove(Socket &s)
+{
+	auto it = find(s);
+
+	if (it != m_list.end()) {
+		m_list.erase(it);
+	}
+
+	m_result.resize(m_list.size());
+}
+
+void Kqueue::clear()
+{
+	m_list.clear();
+	m_result.clear();
+
+	m_list.resize(0);
+	m_result.resize(0);
+}
+
+SocketStatus Kqueue::wait(int ms)
+{
+	return waitMultiple(ms)[0];
+}
+
+std::vector<SocketStatus> Kqueue::waitMultiple(int ms)
+{
+	std::vector<SocketStatus> sockets;
+	timespec ts = { 0, 0 };
+	timespec *pts = (ms <= 0) ? nullptr : &ts;
+
+	ts.tv_sec = ms / 1000;
+	ts.tv_nsec = (ms % 1000) * 1000000;
+
+	int nevents = kevent(*m_handle, m_list.data(), m_list.size(), &m_result[0], m_result.capacity(), pts);
+
+	if (nevents == 0) {
+		throw SocketError(SocketError::Timeout, "kevent");
+	}
+	if (nevents < 0) { 
+		throw SocketError(SocketError::System, "kevent");
+	}
+
+	for (int i = 0; i < nevents; ++i) {
+		sockets.push_back(SocketStatus{
+			*static_cast<Socket *>(m_result[i].udata),
+			(m_result[i].filter & EVFILT_READ) ? SocketListener::Read : SocketListener::Write
+		});
+	}	
+
+	return sockets;
+}
+
+#endif // !SOCKET_HAVE_KQUEUE
 
 } // !backend
--- a/C++/modules/Socket/SocketListener.h	Mon Apr 06 10:56:40 2015 +0200
+++ b/C++/modules/Socket/SocketListener.h	Tue Apr 07 14:44:44 2015 +0200
@@ -72,9 +72,11 @@
    //#  define SOCKET_DEFAULT_BACKEND backend::Epoll
 #  define SOCKET_DEFAULT_BACKEND backend::Poll
 #elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__APPLE__)
-   // TODO NOT READY YET
-   //#  define SOCKET_DEFAULT_BACKEND backend::Kqueue
-#  define SOCKET_DEFAULT_BACKEND backend::Poll
+#  include <sys/stat.h>
+#  include <sys/event.h>
+#  include <sys/time.h>
+
+#  define SOCKET_DEFAULT_BACKEND backend::Kqueue
 #else
 #  define SOCKET_DEFAULT_BACKEND backend::Select
 #endif
@@ -159,7 +161,27 @@
 #if defined(SOCKET_HAVE_KQUEUE)
 
 class Kqueue {
-	// TODO
+private:
+	std::vector<struct kevent> m_list;
+	std::vector<struct kevent> m_result;
+	std::unique_ptr<int, void (*)(int *)> m_handle;
+
+	Kqueue(const Kqueue &) = delete;
+	Kqueue &operator=(const Kqueue &) = delete;
+
+	std::vector<struct kevent>::iterator find(Socket &s);
+
+public:
+	Kqueue();
+	Kqueue(Kqueue &&) = default;
+	Kqueue &operator=(Kqueue &&) = default;
+
+	void set(Socket &s, int direction);
+	void unset(Socket &s, int direction);
+	void remove(Socket &sc);
+	void clear();
+	SocketStatus wait(int ms);
+	std::vector<SocketStatus> waitMultiple(int ms);
 };
 
 #endif
--- a/C++/modules/Socket/SocketTcp.cpp	Mon Apr 06 10:56:40 2015 +0200
+++ b/C++/modules/Socket/SocketTcp.cpp	Tue Apr 07 14:44:44 2015 +0200
@@ -136,7 +136,6 @@
 			listener.wait(timeout);
 
 			// Socket is writable? Check if there is an error
-
 			int error = get<int>(SOL_SOCKET, SO_ERROR);
 
 			if (error) {
--- a/C++/tests/Socket/main.cpp	Mon Apr 06 10:56:40 2015 +0200
+++ b/C++/tests/Socket/main.cpp	Tue Apr 07 14:44:44 2015 +0200
@@ -417,6 +417,87 @@
 }
 
 /* --------------------------------------------------------
+ * Listener: kqueue
+ * -------------------------------------------------------- */
+
+#if defined(SOCKET_HAVE_KQUEUE)
+
+class ListenerKqueueTest : public testing::Test {
+protected:
+	SocketListenerBase<backend::Kqueue> m_listener;
+	SocketTcp m_masterTcp{AF_INET, 0};
+	SocketTcp m_clientTcp{AF_INET, 0};
+
+	std::thread m_tserver;
+	std::thread m_tclient;
+
+public:
+	ListenerKqueueTest()
+	{
+		m_masterTcp.set(SOL_SOCKET, SO_REUSEADDR, 1);
+		m_masterTcp.bind(Internet("*", 16000, AF_INET));
+		m_masterTcp.listen();
+	}
+
+	~ListenerKqueueTest()
+	{
+		if (m_tserver.joinable()) {
+			m_tserver.join();
+		}
+		if (m_tclient.joinable()) {
+			m_tclient.join();
+		}
+	}
+};
+
+TEST_F(ListenerKqueueTest, accept)
+{
+	m_tserver = std::thread([this] () {
+		try {
+			m_listener.set(m_masterTcp, SocketListener::Read);
+			m_listener.wait();
+			m_masterTcp.accept();
+			m_masterTcp.close();
+		} catch (const std::exception &ex) {
+			FAIL() << ex.what();
+		}
+	});
+
+	std::this_thread::sleep_for(100ms);
+
+	m_tclient = std::thread([this] () {
+		m_clientTcp.connect(Internet("127.0.0.1", 16000, AF_INET));
+	});
+}
+
+TEST_F(ListenerKqueueTest, recv)
+{
+	m_tserver = std::thread([this] () {
+		try {
+			m_listener.set(m_masterTcp, SocketListener::Read);
+			m_listener.wait();
+
+			auto sc = m_masterTcp.accept();
+
+			ASSERT_EQ("hello", sc.recv(512));
+
+			m_masterTcp.close();
+		} catch (const std::exception &ex) {
+			FAIL() << ex.what();
+		}
+	});
+
+	std::this_thread::sleep_for(100ms);
+
+	m_tclient = std::thread([this] () {
+		m_clientTcp.connect(Internet("127.0.0.1", 16000, AF_INET));
+		m_clientTcp.send("hello");
+	});
+}
+
+#endif // !SOCKET_HAVE_KQUEUE
+
+/* --------------------------------------------------------
  * Non-blocking connect
  * -------------------------------------------------------- */
 
@@ -452,6 +533,8 @@
 	m_tserver = std::thread([this] () {
 		SocketTcp client = m_server.accept();
 
+		std::this_thread::sleep_for(100ms);
+
 		m_server.close();
 		client.close();
 	});
@@ -462,7 +545,7 @@
 		try {
 			m_client.waitConnect(Internet("127.0.0.1", 16000, AF_INET), 3000);
 		} catch (const SocketError &error) {
-			FAIL() << error.what();
+			FAIL() << error.function() << ": " << error.what();
 		}
 
 		ASSERT_EQ(SocketState::Connected, m_client.state());