# HG changeset patch # User David Demelier # Date 1428410684 -7200 # Node ID 5a1ec660323028040bd8298482dd0aacb308a563 # Parent 3a1380b4428c5cf2b1c84f83e5941eba87684deb Socket: add new kqueue backend diff -r 3a1380b4428c -r 5a1ec6603230 C++/modules/Socket/SocketListener.cpp --- a/C++/modules/Socket/SocketListener.cpp Mon Apr 06 10:56:40 2015 +0200 +++ b/C++/modules/Socket/SocketListener.cpp Tue Apr 07 14:44:44 2015 +0200 @@ -256,6 +256,128 @@ return sockets; } -#endif // !_SOCKET_HAVE_POLL +#endif // !SOCKET_HAVE_POLL + +/* -------------------------------------------------------- + * Kqueue implementation + * -------------------------------------------------------- */ + +#if defined(SOCKET_HAVE_KQUEUE) + +Kqueue::Kqueue() + : m_handle(nullptr, nullptr) +{ + int handle = kqueue(); + + if (handle < 0) { + throw SocketError(SocketError::System, "kqueue"); + } + + m_handle = std::unique_ptr(new int(handle), [] (int *p) { + (void)::close(*p); + }); +} + +std::vector::iterator Kqueue::find(Socket &s) +{ + return std::find_if(m_list.begin(), m_list.end(), [&] (struct kevent &kv) -> bool { + return static_cast(kv.ident) == s.handle(); + }); +} + +void Kqueue::set(Socket &s, int direction) +{ + struct kevent ev; + + if (direction == SocketListener::Read) { + EV_SET(&ev, s.handle(), EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, &s); + } else if (direction == SocketListener::Write) { + EV_SET(&ev, s.handle(), EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, &s); + } + + auto it = find(s); + + if (it == m_list.end()) { + m_list.push_back(ev); + } else { + *it = ev; + } + + m_result.resize(m_list.size()); +} + +void Kqueue::unset(Socket &s, int direction) +{ + auto it = find(s); + + if (it != m_list.end()) { + if (direction & SocketListener::Read) { + it->filter &= ~(EVFILT_READ); + } + if (direction & SocketListener::Write) { + it->filter &= ~(EVFILT_WRITE); + } + + /* complete removal */ + if ((it->filter & ~(direction)) == 0) { + m_list.erase(it); + } + } +} + +void Kqueue::remove(Socket &s) +{ + auto it = find(s); + + if (it != m_list.end()) { + m_list.erase(it); + } + + m_result.resize(m_list.size()); +} + +void Kqueue::clear() +{ + m_list.clear(); + m_result.clear(); + + m_list.resize(0); + m_result.resize(0); +} + +SocketStatus Kqueue::wait(int ms) +{ + return waitMultiple(ms)[0]; +} + +std::vector Kqueue::waitMultiple(int ms) +{ + std::vector sockets; + timespec ts = { 0, 0 }; + timespec *pts = (ms <= 0) ? nullptr : &ts; + + ts.tv_sec = ms / 1000; + ts.tv_nsec = (ms % 1000) * 1000000; + + int nevents = kevent(*m_handle, m_list.data(), m_list.size(), &m_result[0], m_result.capacity(), pts); + + if (nevents == 0) { + throw SocketError(SocketError::Timeout, "kevent"); + } + if (nevents < 0) { + throw SocketError(SocketError::System, "kevent"); + } + + for (int i = 0; i < nevents; ++i) { + sockets.push_back(SocketStatus{ + *static_cast(m_result[i].udata), + (m_result[i].filter & EVFILT_READ) ? SocketListener::Read : SocketListener::Write + }); + } + + return sockets; +} + +#endif // !SOCKET_HAVE_KQUEUE } // !backend diff -r 3a1380b4428c -r 5a1ec6603230 C++/modules/Socket/SocketListener.h --- a/C++/modules/Socket/SocketListener.h Mon Apr 06 10:56:40 2015 +0200 +++ b/C++/modules/Socket/SocketListener.h Tue Apr 07 14:44:44 2015 +0200 @@ -72,9 +72,11 @@ //# define SOCKET_DEFAULT_BACKEND backend::Epoll # define SOCKET_DEFAULT_BACKEND backend::Poll #elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__APPLE__) - // TODO NOT READY YET - //# define SOCKET_DEFAULT_BACKEND backend::Kqueue -# define SOCKET_DEFAULT_BACKEND backend::Poll +# include +# include +# include + +# define SOCKET_DEFAULT_BACKEND backend::Kqueue #else # define SOCKET_DEFAULT_BACKEND backend::Select #endif @@ -159,7 +161,27 @@ #if defined(SOCKET_HAVE_KQUEUE) class Kqueue { - // TODO +private: + std::vector m_list; + std::vector m_result; + std::unique_ptr m_handle; + + Kqueue(const Kqueue &) = delete; + Kqueue &operator=(const Kqueue &) = delete; + + std::vector::iterator find(Socket &s); + +public: + Kqueue(); + Kqueue(Kqueue &&) = default; + Kqueue &operator=(Kqueue &&) = default; + + void set(Socket &s, int direction); + void unset(Socket &s, int direction); + void remove(Socket &sc); + void clear(); + SocketStatus wait(int ms); + std::vector waitMultiple(int ms); }; #endif diff -r 3a1380b4428c -r 5a1ec6603230 C++/modules/Socket/SocketTcp.cpp --- a/C++/modules/Socket/SocketTcp.cpp Mon Apr 06 10:56:40 2015 +0200 +++ b/C++/modules/Socket/SocketTcp.cpp Tue Apr 07 14:44:44 2015 +0200 @@ -136,7 +136,6 @@ listener.wait(timeout); // Socket is writable? Check if there is an error - int error = get(SOL_SOCKET, SO_ERROR); if (error) { diff -r 3a1380b4428c -r 5a1ec6603230 C++/tests/Socket/main.cpp --- a/C++/tests/Socket/main.cpp Mon Apr 06 10:56:40 2015 +0200 +++ b/C++/tests/Socket/main.cpp Tue Apr 07 14:44:44 2015 +0200 @@ -417,6 +417,87 @@ } /* -------------------------------------------------------- + * Listener: kqueue + * -------------------------------------------------------- */ + +#if defined(SOCKET_HAVE_KQUEUE) + +class ListenerKqueueTest : public testing::Test { +protected: + SocketListenerBase m_listener; + SocketTcp m_masterTcp{AF_INET, 0}; + SocketTcp m_clientTcp{AF_INET, 0}; + + std::thread m_tserver; + std::thread m_tclient; + +public: + ListenerKqueueTest() + { + m_masterTcp.set(SOL_SOCKET, SO_REUSEADDR, 1); + m_masterTcp.bind(Internet("*", 16000, AF_INET)); + m_masterTcp.listen(); + } + + ~ListenerKqueueTest() + { + if (m_tserver.joinable()) { + m_tserver.join(); + } + if (m_tclient.joinable()) { + m_tclient.join(); + } + } +}; + +TEST_F(ListenerKqueueTest, accept) +{ + m_tserver = std::thread([this] () { + try { + m_listener.set(m_masterTcp, SocketListener::Read); + m_listener.wait(); + m_masterTcp.accept(); + m_masterTcp.close(); + } catch (const std::exception &ex) { + FAIL() << ex.what(); + } + }); + + std::this_thread::sleep_for(100ms); + + m_tclient = std::thread([this] () { + m_clientTcp.connect(Internet("127.0.0.1", 16000, AF_INET)); + }); +} + +TEST_F(ListenerKqueueTest, recv) +{ + m_tserver = std::thread([this] () { + try { + m_listener.set(m_masterTcp, SocketListener::Read); + m_listener.wait(); + + auto sc = m_masterTcp.accept(); + + ASSERT_EQ("hello", sc.recv(512)); + + m_masterTcp.close(); + } catch (const std::exception &ex) { + FAIL() << ex.what(); + } + }); + + std::this_thread::sleep_for(100ms); + + m_tclient = std::thread([this] () { + m_clientTcp.connect(Internet("127.0.0.1", 16000, AF_INET)); + m_clientTcp.send("hello"); + }); +} + +#endif // !SOCKET_HAVE_KQUEUE + +/* -------------------------------------------------------- * Non-blocking connect * -------------------------------------------------------- */ @@ -452,6 +533,8 @@ m_tserver = std::thread([this] () { SocketTcp client = m_server.accept(); + std::this_thread::sleep_for(100ms); + m_server.close(); client.close(); }); @@ -462,7 +545,7 @@ try { m_client.waitConnect(Internet("127.0.0.1", 16000, AF_INET), 3000); } catch (const SocketError &error) { - FAIL() << error.what(); + FAIL() << error.function() << ": " << error.what(); } ASSERT_EQ(SocketState::Connected, m_client.state());