Mercurial > code
changeset 280:91eb0583df52
Socket:
* Fix select method that require fdset's to be rebuild at each iteration,
* While here always create a m_interface
Task: #306
author | David Demelier <markand@malikania.fr> |
---|---|
date | Fri, 24 Oct 2014 18:28:39 +0200 |
parents | af630354610f |
children | 88f9d8b406c6 ea55a3886da0 |
files | C++/Socket.cpp C++/Socket.h C++/SocketListener.cpp C++/Tests/Sockets/main.cpp |
diffstat | 4 files changed, 174 insertions(+), 82 deletions(-) [+] |
line wrap: on
line diff
--- a/C++/Socket.cpp Fri Oct 24 10:28:17 2014 +0200 +++ b/C++/Socket.cpp Fri Oct 24 18:28:39 2014 +0200 @@ -202,7 +202,7 @@ // Usually accept works only with SOCK_STREAM info = SocketAddress(address, addrlen); - return Socket(handle); + return Socket(handle, std::make_shared<Standard>()); } void Standard::listen(Socket &s, int max) @@ -316,8 +316,13 @@ * Socket code * -------------------------------------------------------- */ +Socket::Socket() + : m_interface(std::make_shared<Standard>()) +{ +} + Socket::Socket(int domain, int type, int protocol) - : m_interface(std::make_unique<Standard>()) + : Socket() { m_handle = socket(domain, type, protocol); @@ -325,8 +330,8 @@ throw error::Failure("socket", syserror()); } -Socket::Socket(Handle handle) - : m_interface(std::make_shared<Standard>()) +Socket::Socket(Handle handle, std::shared_ptr<SocketInterface> interface) + : m_interface(std::move(interface)) , m_handle(handle) { }
--- a/C++/Socket.h Fri Oct 24 10:28:17 2014 +0200 +++ b/C++/Socket.h Fri Oct 24 18:28:39 2014 +0200 @@ -279,7 +279,7 @@ /** * Default constructor. */ - Socket() = default; + Socket(); /** * Constructor to create a new socket. @@ -295,8 +295,9 @@ * Create a socket object with a already initialized socket. * * @param handle the handle + * @param interface the interface to use */ - Socket(Handle handle); + Socket(Handle handle, std::shared_ptr<SocketInterface> interface); /** * Close the socket.
--- a/C++/SocketListener.cpp Fri Oct 24 10:28:17 2014 +0200 +++ b/C++/SocketListener.cpp Fri Oct 24 18:28:39 2014 +0200 @@ -19,7 +19,7 @@ #include <algorithm> #include <map> #include <set> -#include <tuple> +#include <utility> #include <vector> #include "SocketListener.h" @@ -36,17 +36,9 @@ */ class SelectMethod final : public SocketListener::Interface { private: - std::set<Socket> m_rsockets; - std::set<Socket> m_wsockets; - std::map<Socket::Handle, std::tuple<Socket, int>> m_lookup; - - fd_set m_readset; - fd_set m_writeset; - - Socket::Handle m_max { 0 }; + std::map<Socket::Handle, std::pair<Socket, int>> m_table; public: - SelectMethod(); void add(Socket s, int direction) override; void remove(const Socket &s, int direction) override; void list(const SocketListener::MapFunc &func) override; @@ -56,76 +48,39 @@ std::vector<SocketStatus> selectMultiple(int ms) override; }; -SelectMethod::SelectMethod() -{ - FD_ZERO(&m_readset); - FD_ZERO(&m_writeset); -} - void SelectMethod::add(Socket s, int direction) { - if (m_lookup.count(s.handle()) > 0) - std::get<1>(m_lookup[s.handle()]) |= direction; + if (m_table.count(s.handle()) > 0) + m_table[s.handle()].second |= direction; else - m_lookup[s.handle()] = std::make_tuple(s, direction); - - if (direction & Read) { - m_rsockets.insert(s); - FD_SET(s.handle(), &m_readset); - } - if (direction & Write) { - m_wsockets.insert(s); - FD_SET(s.handle(), &m_writeset); - } - - if (s.handle() > m_max) - m_max = s.handle(); + m_table[s.handle()] = { s, direction }; } void SelectMethod::remove(const Socket &s, int direction) { - std::get<1>(m_lookup[s.handle()]) &= ~(direction); + if (m_table.count(s.handle()) != 0) { + m_table[s.handle()].second &= ~(direction); - if (static_cast<int>(std::get<1>(m_lookup[s.handle()])) == 0) - m_lookup.erase(s.handle()); - - if (direction & Read) { - m_rsockets.erase(s.handle()); - FD_CLR(s.handle(), &m_readset); + // If no read, no write is requested, remove it + if (m_table[s.handle()].second == 0) + m_table.erase(s.handle()); } - if (direction & Write) { - m_wsockets.erase(s.handle()); - FD_CLR(s.handle(), &m_writeset); - } - - // Refind the max file descriptor - if (m_lookup.size() > 0) { - m_max = std::get<0>(std::max_element(m_lookup.begin(), m_lookup.end())->second).handle(); - } else - m_max = 0; } void SelectMethod::list(const SocketListener::MapFunc &func) { - for (auto &s : m_lookup) - func(std::get<0>(s.second), std::get<1>(s.second)); + for (auto &s : m_table) + func(s.second.first, s.second.second); } void SelectMethod::clear() { - m_rsockets.clear(); - m_wsockets.clear(); - m_lookup.clear(); - - FD_ZERO(&m_readset); - FD_ZERO(&m_writeset); - - m_max = 0; + m_table.clear(); } unsigned SelectMethod::size() const { - return m_lookup.size(); + return m_table.size(); } SocketStatus SelectMethod::select(int ms) @@ -141,6 +96,23 @@ std::vector<SocketStatus> SelectMethod::selectMultiple(int ms) { timeval maxwait, *towait; + fd_set readset; + fd_set writeset; + + FD_ZERO(&readset); + FD_ZERO(&writeset); + + Socket::Handle max = 0; + + for (auto &s : m_table) { + if (s.second.second & Read) + FD_SET(s.first, &readset); + if (s.second.second & Write) + FD_SET(s.first, &writeset); + + if (s.first > max) + max = s.first; + } maxwait.tv_sec = 0; maxwait.tv_usec = ms * 1000; @@ -148,19 +120,20 @@ // Set to nullptr for infinite timeout. towait = (ms <= 0) ? nullptr : &maxwait; - auto error = ::select(m_max + 1, &m_readset, &m_writeset, NULL, towait); + auto error = ::select(max + 1, &readset, &writeset, nullptr, towait); if (error == SOCKET_ERROR) throw error::Failure("select", Socket::syserror()); if (error == 0) throw error::Timeout("select"); std::vector<SocketStatus> sockets; - for (auto &c : m_lookup) - if (FD_ISSET(c.first, &m_readset)) - sockets.push_back({ std::get<0>(c.second), Read }); - for (auto &c : m_lookup) - if (FD_ISSET(c.first, &m_writeset)) - sockets.push_back({ std::get<0>(c.second), Write }); + + for (auto &c : m_table) { + if (FD_ISSET(c.first, &readset)) + sockets.push_back({ c.second.first, Read }); + if (FD_ISSET(c.first, &writeset)) + sockets.push_back({ c.second.first, Write }); + } return sockets; } @@ -235,7 +208,7 @@ void PollMethod::remove(const Socket &s, int direction) { for (auto i = m_fds.begin(); i != m_fds.end();) { - if (i->fd == s) { + if (i->fd == s.handle()) { i->events &= ~(topoll(direction)); if (i->events == 0) {
--- a/C++/Tests/Sockets/main.cpp Fri Oct 24 10:28:17 2014 +0200 +++ b/C++/Tests/Sockets/main.cpp Fri Oct 24 18:28:39 2014 +0200 @@ -30,6 +30,7 @@ using namespace std::literals::chrono_literals; +using namespace error; using namespace address; /* -------------------------------------------------------- @@ -86,6 +87,62 @@ * Select tests * -------------------------------------------------------- */ +TEST(ListenerMethodSelect, timeout) +{ + std::thread server([] () { + Socket s, client; + SocketListener listener(Select); + bool running = true; + int tries = 0; + + try { + s = { AF_INET, SOCK_STREAM, 0 }; + s.set(SOL_SOCKET, SO_REUSEADDR, 1); + s.bind(Internet{"*", 10000, AF_INET}); + s.listen(10); + + listener.add(s, Read); + + while (running) { + try { + listener.select(500ms); + client = s.accept(); + running = false; + + // Abort if no client connected + if (tries >= 10) + running = false; + } catch (const Timeout &) { + puts("DEBUG: TIMEOUT"); + } + } + } catch (const std::exception &ex) { + std::cerr << "warning: " << ex.what() << std::endl; + } + + s.close(); + client.close(); + }); + + std::thread client([] () { + std::this_thread::sleep_for(2s); + + Socket s; + + try { + s = { AF_INET, SOCK_STREAM, 0 }; + s.connect(Internet{"localhost", 10000, AF_INET}); + } catch (const std::exception &ex) { + std::cerr << "warning: " << ex.what() << std::endl; + } + + s.close(); + }); + + server.join(); + client.join(); +} + TEST(ListenerMethodSelect, add) { Socket s, s2; @@ -93,7 +150,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; s2 = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Select); + SocketListener listener(Select); listener.add(s, Read); listener.add(s2, Read); @@ -114,7 +171,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; s2 = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Select); + SocketListener listener(Select); listener.add(s, Read); listener.add(s2, Read); @@ -141,7 +198,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; s2 = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Select); + SocketListener listener(Select); listener.add(s, Read | Write); listener.add(s2, Read | Write); @@ -179,7 +236,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Select); + SocketListener listener(Select); listener.add(s, Read); ASSERT_EQ(1UL, listener.size()); @@ -216,6 +273,62 @@ * Poll tests * -------------------------------------------------------- */ +TEST(ListenerMethodPoll, timeout) +{ + std::thread server([] () { + Socket s, client; + SocketListener listener(Poll); + bool running = true; + int tries = 0; + + try { + s = { AF_INET, SOCK_STREAM, 0 }; + s.set(SOL_SOCKET, SO_REUSEADDR, 1); + s.bind(Internet{"*", 10000, AF_INET}); + s.listen(10); + + listener.add(s, Read); + + while (running) { + try { + listener.select(500ms); + client = s.accept(); + running = false; + + // Abort if no client connected + if (tries >= 10) + running = false; + } catch (const Timeout &) { + puts("DEBUG: TIMEOUT"); + } + } + } catch (const std::exception &ex) { + std::cerr << "warning: " << ex.what() << std::endl; + } + + s.close(); + client.close(); + }); + + std::thread client([] () { + std::this_thread::sleep_for(2s); + + Socket s; + + try { + s = { AF_INET, SOCK_STREAM, 0 }; + s.connect(Internet{"localhost", 10000, AF_INET}); + } catch (const std::exception &ex) { + std::cerr << "warning: " << ex.what() << std::endl; + } + + s.close(); + }); + + server.join(); + client.join(); +} + TEST(ListenerMethodPoll, add) { Socket s, s2; @@ -223,7 +336,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; s2 = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Poll); + SocketListener listener(Poll); listener.add(s, Read); listener.add(s2, Read); @@ -244,7 +357,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; s2 = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Poll); + SocketListener listener(Poll); listener.add(s, Read); listener.add(s2, Read); @@ -267,7 +380,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; s2 = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Poll); + SocketListener listener(Poll); listener.add(s, Read | Write); listener.add(s2, Read | Write); @@ -305,7 +418,7 @@ try { s = { AF_INET, SOCK_STREAM, 0 }; - SocketListener listener(SocketMethod::Poll); + SocketListener listener(Poll); listener.add(s, Read); ASSERT_EQ(1UL, listener.size()); @@ -612,7 +725,7 @@ Socket master; try { - SocketListener masterListener(SocketMethod::Poll), clientListener(SocketMethod::Poll); + SocketListener masterListener(Poll), clientListener(Poll); master = { AF_INET, SOCK_STREAM, 0 }; master.set(SOL_SOCKET, SO_REUSEADDR, 1);