changeset 364:d88c3644ebc8

Socket: fix kqueue implementation
author David Demelier <markand@malikania.fr>
date Tue, 28 Apr 2015 18:48:32 +0200
parents 3908306107d4
children 93c8f8a1fd3f
files C++/modules/Socket/SocketListener.cpp C++/modules/Socket/SocketListener.h
diffstat 2 files changed, 83 insertions(+), 75 deletions(-) [+]
line wrap: on
line diff
--- a/C++/modules/Socket/SocketListener.cpp	Tue Apr 28 14:40:17 2015 +0200
+++ b/C++/modules/Socket/SocketListener.cpp	Tue Apr 28 18:48:32 2015 +0200
@@ -17,10 +17,7 @@
  */
 
 #include <algorithm>
-#include <map>
 #include <set>
-#include <utility>
-#include <vector>
 
 #include "SocketListener.h"
 
@@ -269,88 +266,98 @@
 #if defined(SOCKET_HAVE_KQUEUE)
 
 Kqueue::Kqueue()
-	: m_handle(nullptr, nullptr)
+	: m_handle(kqueue())
 {
-	int handle = kqueue();
-
-	if (handle < 0) {
+	if (m_handle < 0) {
 		throw SocketError(SocketError::System, "kqueue");
 	}
-
-	m_handle = std::unique_ptr<int, void (*)(int *)>(new int(handle), [] (int *p) {
-		(void)::close(*p);
-	});
-}
-
-std::vector<kevent>::iterator Kqueue::find(const Socket &s) const
-{
-	return std::find_if(m_list.begin(), m_list.end(), [&] (const struct kevent &kv) -> bool {
-		return static_cast<Socket::Handle>(kv.ident) == s.handle();
-	});
 }
 
-void Kqueue::set(Socket &s, int flags)
+Kqueue::~Kqueue()
 {
-	auto it = find(s);
-	int filter = 0;
-
-	if (flags & SocketListener::Read) {
-		filter |= EVFILT_READ;
-	}
-	if (flags & SocketListener::Write) {
-		filter |= EVFILT_WRITE;
-	}
-
-	if (it == m_list.end()) {
-		struct kevent ev;
-
-		EV_SET(&ev, s.handle(), filter, EV_ADD | EV_ENABLE, 0, 0, &s);
-
-		m_list.push_back(ev);
-	} else {
-		it->filter |= filter;
-	}
-
-	m_result.resize(m_list.size());
+	close(m_handle);
 }
 
-void Kqueue::unset(Socket s, int flags)
+void Kqueue::update(Socket &sc, int filter, int flags)
 {
-	auto it = find(s);
+	struct kevent ev;
 
-	if (it != m_list.end()) {
-		if (flags & SocketListener::Read) {
-			it->filter &= ~(EVFILT_READ);
-		}
-		if (flags & SocketListener::Write) {
-			it->filter &= ~(EVFILT_WRITE);
-		}
+	EV_SET(&ev, sc.handle(), filter, flags, 0, 0, nullptr);
 
-		/* complete removal */
-		if (it->filter == 0) {
-			m_list.erase(it);
-		}
+	if (kevent(m_handle, &ev, 1, nullptr, 0, nullptr) < 0) {
+		throw SocketError(SocketError::System, "kevent");
 	}
 }
 
-void Kqueue::remove(Socket s)
+void Kqueue::set(Socket sc, int flags)
 {
-	auto it = find(s);
+	if (flags & SocketListener::Read) {
+		puts("About to set to add");
+		update(sc, EVFILT_READ, EV_ADD | EV_ENABLE);
+	}
+	if (flags & SocketListener::Write) {
+		puts("About to set to remove");
+		update(sc, EVFILT_WRITE, EV_ADD | EV_ENABLE);
+	}
 
-	if (it != m_list.end()) {
-		m_list.erase(it);
+	auto it = m_table.find(sc.handle());
+	if (it == m_table.end()) {
+		m_table.emplace(sc.handle(), std::make_pair(std::move(sc), flags));
+	} else {
+		it->second.second |= flags;
 	}
 
-	m_result.resize(m_list.size());
+	m_result.resize(m_table.size());
+}
+
+void Kqueue::unset(Socket sc, int flags)
+{
+	if (flags & SocketListener::Read) {
+		update(sc, EVFILT_READ, EV_DELETE);
+	}
+	if (flags & SocketListener::Write) {
+		update(sc, EVFILT_WRITE, EV_DELETE);
+	}
+
+	auto it = m_table.find(sc.handle());
+	if (it != m_table.end()) {
+		it->second.second &= ~(flags);
+
+		if (it->second.second == 0) {
+			m_table.erase(it);
+		}
+	}
+
+	m_result.resize(m_table.size());
+}
+
+void Kqueue::remove(Socket sc)
+{
+	auto it = m_table.find(sc.handle());
+
+	if (it != m_table.end()) {
+		if (it->second.second & SocketListener::Read) {
+			update(sc, EVFILT_READ, EV_DELETE);
+		}
+		if (it->second.second & SocketListener::Write) {
+			update(sc, EVFILT_WRITE, EV_DELETE);
+		}
+
+		m_table.erase(sc.handle());
+	}
+
+	m_result.resize(m_table.size());
 }
 
 void Kqueue::clear()
 {
-	m_list.clear();
-	m_result.clear();
+	for (auto &pair : m_table) {
+		update(pair.second.first, EVFILT_READ, EV_DELETE);
+		update(pair.second.first, EVFILT_WRITE, EV_DELETE);
+	}
 
-	m_list.resize(0);
-	m_result.resize(0);
+	m_table.clear();
+	m_result.resize(0U);
 }
 
 SocketStatus Kqueue::wait(int ms)
@@ -367,7 +374,7 @@
 	ts.tv_sec = ms / 1000;
 	ts.tv_nsec = (ms % 1000) * 1000000;
 
-	int nevents = kevent(*m_handle, m_list.data(), m_list.size(), &m_result[0], m_result.capacity(), pts);
+	int nevents = kevent(m_handle, nullptr, 0, &m_result[0], m_result.capacity(), pts);
 
 	if (nevents == 0) {
 		throw SocketError(SocketError::Timeout, "kevent");
@@ -377,10 +384,10 @@
 	}
 
 	for (int i = 0; i < nevents; ++i) {
-		sockets.push_back(SocketStatus{
-			*static_cast<Socket *>(m_result[i].udata),
-			(m_result[i].filter & EVFILT_READ) ? SocketListener::Read : SocketListener::Write
-		});
+		Socket sc = m_table.at(m_result[i].ident).first;
+		int flags = m_result[i].filter == EVFILT_READ ? SocketListener::Read : SocketListener::Write;
+
+		sockets.push_back(SocketStatus{sc, flags});
 	}
 
 	return sockets;
--- a/C++/modules/Socket/SocketListener.h	Tue Apr 28 14:40:17 2015 +0200
+++ b/C++/modules/Socket/SocketListener.h	Tue Apr 28 18:48:32 2015 +0200
@@ -176,23 +176,24 @@
  */
 class Kqueue {
 private:
-	std::vector<struct kevent> m_list;
+	std::map<Socket::Handle, std::pair<Socket, int>> m_table;
 	std::vector<struct kevent> m_result;
-	std::unique_ptr<int, void (*)(int *)> m_handle;
+	int m_handle;
 
 	Kqueue(const Kqueue &) = delete;
 	Kqueue &operator=(const Kqueue &) = delete;
+	Kqueue(Kqueue &&) = delete;
+	Kqueue &operator=(Kqueue &&) = delete;
 
-	std::vector<struct kevent>::iterator find(Socket &s) const;
+	void update(Socket &sc, int filter, int flags);
 
 public:
 	Kqueue();
-	Kqueue(Kqueue &&) = default;
-	Kqueue &operator=(Kqueue &&) = default;
+	~Kqueue();
 
-	void set(Socket &s, int flags);
-	void unset(Socket &s, int flags);
-	void remove(Socket &sc);
+	void set(Socket sc, int flags);
+	void unset(Socket sc, int flags);
+	void remove(Socket sc);
 	void clear();
 	SocketStatus wait(int ms);
 	std::vector<SocketStatus> waitMultiple(int ms);