From 24664714bdfa927cc69c07e5236ca7dcf0f9a1cf Mon Sep 17 00:00:00 2001 From: Eduardo Bart Date: Tue, 12 Mar 2013 00:18:47 -0300 Subject: [PATCH] Fix rare but serious bug in Connection * Implement output pooling for writing data in connection, this should fix rare cases where sending would fail --- modules/game_npctrade/npctrade.lua | 3 +- modules/game_npctrade/npctrade.otui | 1 - src/framework/net/connection.cpp | 206 ++++++++++++++++------------ src/framework/net/connection.h | 19 +-- 4 files changed, 134 insertions(+), 95 deletions(-) diff --git a/modules/game_npctrade/npctrade.lua b/modules/game_npctrade/npctrade.lua index 905dad2a..d04e22a3 100644 --- a/modules/game_npctrade/npctrade.lua +++ b/modules/game_npctrade/npctrade.lua @@ -151,8 +151,7 @@ function onTradeTypeChange(radioTabs, selected, deselected) ignoreCapacity:setVisible(currentTradeType == BUY) ignoreEquipped:setVisible(currentTradeType == SELL) showAllItems:setVisible(currentTradeType == SELL) - sellAllButton:setVisible(false) - --sellAllButton:setVisible(currentTradeType == SELL) + sellAllButton:setVisible(currentTradeType == SELL) refreshTradeItems() refreshPlayerGoods() diff --git a/modules/game_npctrade/npctrade.otui b/modules/game_npctrade/npctrade.otui index 5dc30b46..7c766a10 100644 --- a/modules/game_npctrade/npctrade.otui +++ b/modules/game_npctrade/npctrade.otui @@ -252,7 +252,6 @@ MainWindow margin-right: 10 visible: false @onClick: modules.game_npctrade.sellAll() - visible: false Button id: tradeButton diff --git a/src/framework/net/connection.cpp b/src/framework/net/connection.cpp index bcfb61f5..07ee9ce1 100644 --- a/src/framework/net/connection.cpp +++ b/src/framework/net/connection.cpp @@ -27,16 +27,17 @@ #include asio::io_service g_ioService; +std::list> Connection::m_outputStreams; Connection::Connection() : m_readTimer(g_ioService), m_writeTimer(g_ioService), + m_delayedWriteTimer(g_ioService), m_resolver(g_ioService), m_socket(g_ioService) { m_connected = false; m_connecting = false; - m_sendBufferSize = 0; } Connection::~Connection() @@ -56,34 +57,7 @@ void Connection::poll() void Connection::terminate() { g_ioService.stop(); -} - -void Connection::connect(const std::string& host, uint16 port, const std::function& connectCallback) -{ - m_connected = false; - m_connecting = true; - m_error.clear(); - m_connectCallback = connectCallback; - - asio::ip::tcp::resolver::query query(host, stdext::unsafe_cast(port)); - - auto self = asConnection(); - m_resolver.async_resolve(query, [=](const boost::system::error_code& error, asio::ip::tcp::resolver::iterator endpointIterator) { - if(self.is_unique()) - return; - m_readTimer.cancel(); - - if(!error) { - m_socket.async_connect(*endpointIterator, std::bind(&Connection::onConnect, asConnection(), std::placeholders::_1)); - - m_readTimer.expires_from_now(boost::posix_time::seconds(READ_TIMEOUT)); - m_readTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); - } else - handleError(error); - }); - - m_readTimer.expires_from_now(boost::posix_time::seconds(READ_TIMEOUT)); - m_readTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); + m_outputStreams.clear(); } void Connection::close() @@ -92,8 +66,8 @@ void Connection::close() return; // flush send data before disconnecting on clean connections - if(m_connected && !m_error && m_sendBufferSize > 0 && m_sendEvent) - m_sendEvent->execute(); + if(m_connected && !m_error && m_outputStream) + internal_write(); m_connecting = false; m_connected = false; @@ -104,6 +78,7 @@ void Connection::close() m_resolver.cancel(); m_readTimer.cancel(); m_writeTimer.cancel(); + m_delayedWriteTimer.cancel(); if(m_socket.is_open()) { boost::system::error_code ec; @@ -112,95 +87,138 @@ void Connection::close() } } -void Connection::write(uint8* buffer, uint16 size) +void Connection::connect(const std::string& host, uint16 port, const std::function& connectCallback) +{ + m_connected = false; + m_connecting = true; + m_error.clear(); + m_connectCallback = connectCallback; + + asio::ip::tcp::resolver::query query(host, stdext::unsafe_cast(port)); + m_resolver.async_resolve(query, std::bind(&Connection::onResolve, asConnection(), std::placeholders::_1, std::placeholders::_2)); + + m_readTimer.cancel(); + m_readTimer.expires_from_now(boost::posix_time::seconds(READ_TIMEOUT)); + m_readTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); +} + +void Connection::internal_connect(asio::ip::basic_resolver::iterator endpointIterator) +{ + m_socket.async_connect(*endpointIterator, std::bind(&Connection::onConnect, asConnection(), std::placeholders::_1)); + + m_readTimer.cancel(); + m_readTimer.expires_from_now(boost::posix_time::seconds(READ_TIMEOUT)); + m_readTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); +} + +void Connection::write(uint8* buffer, size_t size) { if(!m_connected) return; - // send old buffer if we can't add more data - if(m_sendBufferSize + size >= SEND_BUFFER_SIZE && m_sendEvent) - m_sendEvent->execute(); - // we can't send the data right away, otherwise we could create tcp congestion - memcpy(m_sendBuffer + m_sendBufferSize, buffer, size); - m_sendBufferSize += size; + if(!m_outputStream) { + if(!m_outputStreams.empty()) { + m_outputStream = m_outputStreams.front(); + m_outputStreams.pop_front(); + } else + m_outputStream = std::shared_ptr(new asio::streambuf); - if(!m_sendEvent || m_sendEvent->isExecuted() || m_sendEvent->isCanceled()) { - auto self = asConnection(); + m_delayedWriteTimer.cancel(); + m_delayedWriteTimer.expires_from_now(boost::posix_time::milliseconds(1)); + m_delayedWriteTimer.async_wait(std::bind(&Connection::onCanWrite, asConnection(), std::placeholders::_1)); + } - // wait 1 ms to do the real send - m_sendEvent = g_dispatcher.scheduleEvent([=] { - if(self.is_unique()) - return; - //m_writeTimer.cancel(); + std::ostream os(m_outputStream.get()); + os.write((const char*)buffer, size); + os.flush(); +} + +void Connection::internal_write() +{ + if(!m_connected) + return; - asio::async_write(m_socket, - asio::buffer(m_sendBuffer, m_sendBufferSize), - std::bind(&Connection::onWrite, asConnection(), std::placeholders::_1, std::placeholders::_2)); + std::shared_ptr outputStream = m_outputStream; + m_outputStream = nullptr; - m_writeTimer.expires_from_now(boost::posix_time::seconds(WRITE_TIMEOUT)); - m_writeTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); + asio::async_write(m_socket, + *outputStream, + std::bind(&Connection::onWrite, asConnection(), std::placeholders::_1, std::placeholders::_2, outputStream)); - m_sendBufferSize = 0; - }, SEND_INTERVAL); - } + m_writeTimer.cancel(); + m_writeTimer.expires_from_now(boost::posix_time::seconds(WRITE_TIMEOUT)); + m_writeTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); } void Connection::read(uint16 bytes, const RecvCallback& callback) { - m_readTimer.cancel(); - if(!m_connected) return; m_recvCallback = callback; asio::async_read(m_socket, - asio::buffer(m_streamBuffer.prepare(bytes)), - std::bind(&Connection::onRecv, asConnection(), std::placeholders::_1, bytes)); + asio::buffer(m_inputStream.prepare(bytes)), + std::bind(&Connection::onRecv, asConnection(), std::placeholders::_1, std::placeholders::_2)); + m_readTimer.cancel(); m_readTimer.expires_from_now(boost::posix_time::seconds(READ_TIMEOUT)); m_readTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); } void Connection::read_until(const std::string& what, const RecvCallback& callback) { - m_readTimer.cancel(); - if(!m_connected) return; m_recvCallback = callback; asio::async_read_until(m_socket, - m_streamBuffer, + m_inputStream, what.c_str(), std::bind(&Connection::onRecv, asConnection(), std::placeholders::_1, std::placeholders::_2)); + m_readTimer.cancel(); m_readTimer.expires_from_now(boost::posix_time::seconds(READ_TIMEOUT)); m_readTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); } void Connection::read_some(const RecvCallback& callback) { - m_readTimer.cancel(); - if(!m_connected) return; m_recvCallback = callback; - m_socket.async_read_some(asio::buffer(m_streamBuffer.prepare(RECV_BUFFER_SIZE)), + m_socket.async_read_some(asio::buffer(m_inputStream.prepare(RECV_BUFFER_SIZE)), std::bind(&Connection::onRecv, asConnection(), std::placeholders::_1, std::placeholders::_2)); + m_readTimer.cancel(); m_readTimer.expires_from_now(boost::posix_time::seconds(READ_TIMEOUT)); m_readTimer.async_wait(std::bind(&Connection::onTimeout, asConnection(), std::placeholders::_1)); } +void Connection::onResolve(const boost::system::error_code& error, asio::ip::basic_resolver::iterator endpointIterator) +{ + m_readTimer.cancel(); + + if(error == asio::error::operation_aborted) + return; + + if(!error) + internal_connect(endpointIterator); + else + handleError(error); +} + void Connection::onConnect(const boost::system::error_code& error) { m_readTimer.cancel(); + if(error == asio::error::operation_aborted) + return; + if(!error) { m_connected = true; @@ -216,14 +234,29 @@ void Connection::onConnect(const boost::system::error_code& error) m_connecting = false; } -void Connection::onWrite(const boost::system::error_code& error, size_t) +void Connection::onCanWrite(const boost::system::error_code& error) +{ + m_delayedWriteTimer.cancel(); + + if(error == asio::error::operation_aborted) + return; + + if(m_connected) + internal_write(); +} + +void Connection::onWrite(const boost::system::error_code& error, size_t writeSize, std::shared_ptr outputStream) { m_writeTimer.cancel(); - if(!m_connected) + if(error == asio::error::operation_aborted) return; - if(error) + // free output stream and store for using it again later + outputStream->consume(outputStream->size()); + m_outputStreams.push_back(outputStream); + + if(m_connected && error) handleError(error); } @@ -231,36 +264,41 @@ void Connection::onRecv(const boost::system::error_code& error, size_t recvSize) { m_readTimer.cancel(); - if(!m_connected) + if(error == asio::error::operation_aborted) return; - if(!error) { - if(m_recvCallback) { - const char* header = boost::asio::buffer_cast(m_streamBuffer.data()); - m_recvCallback((uint8*)header, recvSize); - } + if(m_connected) { + if(!error) { + if(m_recvCallback) { + const char* header = boost::asio::buffer_cast(m_inputStream.data()); + m_recvCallback((uint8*)header, recvSize); + } + } else + handleError(error); } - else - handleError(error); - m_streamBuffer.consume(recvSize); + if(!error) + m_inputStream.consume(recvSize); } void Connection::onTimeout(const boost::system::error_code& error) { - if(error != asio::error::operation_aborted) - handleError(asio::error::timed_out); + if(error == asio::error::operation_aborted) + return; + + handleError(asio::error::timed_out); } void Connection::handleError(const boost::system::error_code& error) { - if(error != asio::error::operation_aborted) { - m_error = error; - if(m_errorCallback) - m_errorCallback(error); - if(m_connected || m_connecting) - close(); - } + if(error == asio::error::operation_aborted) + return; + + m_error = error; + if(m_errorCallback) + m_errorCallback(error); + if(m_connected || m_connecting) + close(); } int Connection::getIp() diff --git a/src/framework/net/connection.h b/src/framework/net/connection.h index ebfddc63..dbeacae1 100644 --- a/src/framework/net/connection.h +++ b/src/framework/net/connection.h @@ -36,7 +36,6 @@ class Connection : public LuaObject enum { READ_TIMEOUT = 30, WRITE_TIMEOUT = 30, - SEND_INTERVAL = 1, SEND_BUFFER_SIZE = 65536, RECV_BUFFER_SIZE = 65536 }; @@ -51,7 +50,7 @@ public: void connect(const std::string& host, uint16 port, const std::function& connectCallback); void close(); - void write(uint8* buffer, uint16 size); + void write(uint8* buffer, size_t size); void read(uint16 bytes, const RecvCallback& callback); void read_until(const std::string& what, const RecvCallback& callback); void read_some(const RecvCallback& callback); @@ -64,9 +63,14 @@ public: bool isConnected() { return m_connected; } ConnectionPtr asConnection() { return static_self_cast(); } + protected: + void internal_connect(asio::ip::basic_resolver::iterator endpointIterator); + void internal_write(); + void onResolve(const boost::system::error_code& error, asio::ip::tcp::resolver::iterator endpointIterator); void onConnect(const boost::system::error_code& error); - void onWrite(const boost::system::error_code& error, size_t); + void onCanWrite(const boost::system::error_code& error); + void onWrite(const boost::system::error_code& error, size_t writeSize, std::shared_ptr outputStream); void onRecv(const boost::system::error_code& error, size_t recvSize); void onTimeout(const boost::system::error_code& error); void handleError(const boost::system::error_code& error); @@ -77,17 +81,16 @@ protected: asio::deadline_timer m_readTimer; asio::deadline_timer m_writeTimer; + asio::deadline_timer m_delayedWriteTimer; asio::ip::tcp::resolver m_resolver; asio::ip::tcp::socket m_socket; - uint8 m_sendBuffer[SEND_BUFFER_SIZE]; - asio::streambuf m_streamBuffer; + static std::list> m_outputStreams; + std::shared_ptr m_outputStream; + asio::streambuf m_inputStream; bool m_connected; bool m_connecting; boost::system::error_code m_error; - int m_sendBufferSize; - Timer m_sendTimer; - ScheduledEventPtr m_sendEvent; friend class Server; };