From 0be7bd5360322b6dc3245504c2b601806614ed1f Mon Sep 17 00:00:00 2001 From: Eduardo Bart Date: Mon, 4 Mar 2013 18:56:22 -0300 Subject: [PATCH] Implement async dispatcher #221 --- src/client/localplayer.h | 2 +- src/framework/CMakeLists.txt | 3 + src/framework/core/application.cpp | 5 + src/framework/core/asyncdispatcher.cpp | 71 ++++++++++++++ src/framework/core/asyncdispatcher.h | 59 ++++++++++++ src/framework/core/logger.cpp | 8 ++ src/framework/core/logger.h | 2 + src/framework/sound/soundfile.cpp | 9 +- src/framework/sound/soundmanager.cpp | 111 ++++++++++++++-------- src/framework/sound/soundmanager.h | 1 + src/framework/sound/streamsoundsource.cpp | 37 ++++---- src/framework/sound/streamsoundsource.h | 1 + src/framework/stdext/thread.h | 35 +++++-- 13 files changed, 274 insertions(+), 70 deletions(-) create mode 100644 src/framework/core/asyncdispatcher.cpp create mode 100644 src/framework/core/asyncdispatcher.h diff --git a/src/client/localplayer.h b/src/client/localplayer.h index eccc6bc8..d2ba0774 100644 --- a/src/client/localplayer.h +++ b/src/client/localplayer.h @@ -29,7 +29,7 @@ class LocalPlayer : public Player { enum { - PREWALK_TIMEOUT = 5000 + PREWALK_TIMEOUT = 1000 }; public: diff --git a/src/framework/CMakeLists.txt b/src/framework/CMakeLists.txt index b96b7044..657a457d 100644 --- a/src/framework/CMakeLists.txt +++ b/src/framework/CMakeLists.txt @@ -64,6 +64,8 @@ set(framework_SOURCES ${framework_SOURCES} ${CMAKE_CURRENT_LIST_DIR}/core/application.h ${CMAKE_CURRENT_LIST_DIR}/core/adaptativeframecounter.cpp ${CMAKE_CURRENT_LIST_DIR}/core/adaptativeframecounter.h + ${CMAKE_CURRENT_LIST_DIR}/core/asyncdispatcher.cpp + ${CMAKE_CURRENT_LIST_DIR}/core/asyncdispatcher.h ${CMAKE_CURRENT_LIST_DIR}/core/binarytree.cpp ${CMAKE_CURRENT_LIST_DIR}/core/binarytree.h ${CMAKE_CURRENT_LIST_DIR}/core/clock.cpp @@ -194,6 +196,7 @@ message(STATUS "Build revision: ${BUILD_REVISION}") add_definitions(-D"BUILD_REVISION=\\\"${BUILD_REVISION}\\\"") # find boost +set(framework_DEFINITIONS ${framework_DEFINITIONS} -DBOOST_THREAD_PROVIDES_FUTURE) # enable boost::future set(REQUIRED_BOOST_COMPONENTS system thread chrono) if(WIN32) set(Boost_THREADAPI win32) diff --git a/src/framework/core/application.cpp b/src/framework/core/application.cpp index 894dc6de..0a643e04 100644 --- a/src/framework/core/application.cpp +++ b/src/framework/core/application.cpp @@ -27,6 +27,7 @@ #include #include #include +#include "asyncdispatcher.h" #include #include #include @@ -76,6 +77,8 @@ void Application::init(std::vector& args) // process args encoding g_platform.processArgs(args); + g_asyncDispatcher.init(); + std::string startupOptions; for(uint i=1;i + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "asyncdispatcher.h" + +AsyncDispatcher g_asyncDispatcher; + +void AsyncDispatcher::init() +{ + spawn_thread(); +} + +void AsyncDispatcher::terminate() +{ + stop(); + m_tasks.clear(); +} + +void AsyncDispatcher::spawn_thread() +{ + m_running = true; + m_threads.push_back(std::thread(std::bind(&AsyncDispatcher::exec_loop, this))); +} + +void AsyncDispatcher::stop() +{ + m_mutex.lock(); + m_running = false; + m_condition.notify_all(); + m_mutex.unlock(); + for(std::thread& thread : m_threads) + thread.join(); + m_threads.clear(); +}; + +void AsyncDispatcher::exec_loop() { + std::unique_lock lock(m_mutex); + while(true) { + while(m_tasks.size() == 0 && m_running) + m_condition.wait(lock); + + if(!m_running) + return; + + std::function task = m_tasks.front(); + m_tasks.pop_front(); + + lock.unlock(); + task(); + lock.lock(); + } +} diff --git a/src/framework/core/asyncdispatcher.h b/src/framework/core/asyncdispatcher.h new file mode 100644 index 00000000..75290550 --- /dev/null +++ b/src/framework/core/asyncdispatcher.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2010-2013 OTClient + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#ifndef ASYNCDISPATCHER_H +#define ASYNCDISPATCHER_H + +#include "declarations.h" +#include + +class AsyncDispatcher { +public: + void init(); + void terminate(); + + void spawn_thread(); + void stop(); + + template + std::future::type> schedule(const F& task) { + std::lock_guard lock(m_mutex); + auto prom = std::make_shared::type>>(); + m_tasks.push_back([=]() { prom->set_value(task()); }); + m_condition.notify_all(); + return prom->get_future(); + } + +protected: + void exec_loop(); + +private: + std::list> m_tasks; + std::mutex m_mutex; + std::list m_threads; + std::condition_variable m_condition; + stdext::boolean m_running; +}; + +extern AsyncDispatcher g_asyncDispatcher; + +#endif diff --git a/src/framework/core/logger.cpp b/src/framework/core/logger.cpp index d015ed1d..f67a6cc4 100644 --- a/src/framework/core/logger.cpp +++ b/src/framework/core/logger.cpp @@ -35,6 +35,8 @@ Logger g_logger; void Logger::log(Fw::LogLevel level, const std::string& message) { + std::lock_guard lock(m_mutex); + #ifdef NDEBUG if(level == Fw::LogDebug) return; @@ -95,6 +97,8 @@ void Logger::log(Fw::LogLevel level, const std::string& message) void Logger::logFunc(Fw::LogLevel level, const std::string& message, std::string prettyFunction) { + std::lock_guard lock(m_mutex); + prettyFunction = prettyFunction.substr(0, prettyFunction.find_first_of('(')); if(prettyFunction.find_last_of(' ') != std::string::npos) prettyFunction = prettyFunction.substr(prettyFunction.find_last_of(' ') + 1); @@ -114,6 +118,8 @@ void Logger::logFunc(Fw::LogLevel level, const std::string& message, std::string void Logger::fireOldMessages() { + std::lock_guard lock(m_mutex); + if(m_onLog) { auto backup = m_logMessages; for(const LogMessage& logMessage : backup) { @@ -124,6 +130,8 @@ void Logger::fireOldMessages() void Logger::setLogFile(const std::string& file) { + std::lock_guard lock(m_mutex); + m_outFile.open(stdext::utf8_to_latin1(file.c_str()).c_str(), std::ios::out | std::ios::app); if(!m_outFile.is_open() || !m_outFile.good()) { g_logger.error(stdext::format("Unable to save log to '%s'", file)); diff --git a/src/framework/core/logger.h b/src/framework/core/logger.h index 665e8044..d56c4060 100644 --- a/src/framework/core/logger.h +++ b/src/framework/core/logger.h @@ -25,6 +25,7 @@ #include "../global.h" +#include #include struct LogMessage { @@ -61,6 +62,7 @@ private: std::list m_logMessages; OnLogCallback m_onLog; std::ofstream m_outFile; + std::recursive_mutex m_mutex; }; extern Logger g_logger; diff --git a/src/framework/sound/soundfile.cpp b/src/framework/sound/soundfile.cpp index 8c76620d..e703ac44 100644 --- a/src/framework/sound/soundfile.cpp +++ b/src/framework/sound/soundfile.cpp @@ -31,11 +31,10 @@ SoundFile::SoundFile(const FileStreamPtr& fileStream) SoundFilePtr SoundFile::loadSoundFile(const std::string& filename) { + stdext::timer t; FileStreamPtr file = g_resources.openFile(filename); - if(!file) { - g_logger.traceError(stdext::format("unable to open %s", filename)); - return nullptr; - } + if(!file) + stdext::throw_exception(stdext::format("unable to open %s", filename)); // cache file buffer to avoid lags from hard drive file->cache(); @@ -50,7 +49,7 @@ SoundFilePtr SoundFile::loadSoundFile(const std::string& filename) if(oggSoundFile->prepareOgg()) soundFile = oggSoundFile; } else - g_logger.error(stdext::format("unknown sound file format %s", filename)); + stdext::throw_exception(stdext::format("unknown sound file format %s", filename)); return soundFile; } diff --git a/src/framework/sound/soundmanager.cpp b/src/framework/sound/soundmanager.cpp index 6c5bff25..7120bd0a 100644 --- a/src/framework/sound/soundmanager.cpp +++ b/src/framework/sound/soundmanager.cpp @@ -30,6 +30,8 @@ #include #include #include +#include +#include SoundManager g_sounds; @@ -57,9 +59,14 @@ void SoundManager::terminate() { ensureContext(); + for(auto it = m_streamFiles.begin(); it != m_streamFiles.end();++it) + it->second.wait(); + m_streamFiles.clear(); + m_sources.clear(); m_buffers.clear(); m_channels.clear(); + m_audioEnabled = false; alcMakeContextCurrent(nullptr); @@ -86,6 +93,23 @@ void SoundManager::poll() lastUpdate = now; ensureContext(); + + for(auto it = m_streamFiles.begin(); it != m_streamFiles.end();) { + StreamSoundSourcePtr source = it->first; + std::future& future = it->second; + + if(std::is_ready(future)) { + SoundFilePtr sound = future.get(); + if(sound) + source->setSoundFile(sound); + else + source->stop(); + it = m_streamFiles.erase(it); + } else { + ++it; + } + } + for(auto it = m_sources.begin(); it != m_sources.end();) { SoundSourcePtr source = *it; @@ -161,9 +185,8 @@ SoundSourcePtr SoundManager::play(std::string filename, float fadetime, float ga soundSource->setRelative(true); soundSource->setGain(gain); - if(fadetime > 0) { + if(fadetime > 0) soundSource->setFading(StreamSoundSource::FadingOn, fadetime); - } soundSource->play(); @@ -202,45 +225,55 @@ SoundSourcePtr SoundManager::createSoundSource(const std::string& filename) source = SoundSourcePtr(new SoundSource); source->setBuffer(it->second); } else { - SoundFilePtr soundFile = SoundFile::loadSoundFile(filename); - if(!soundFile) - return nullptr; +#if defined __linux && !defined OPENGL_ES + // due to OpenAL implementation bug, stereo buffers are always downmixed to mono on linux systems + // this is hack to work around the issue + // solution taken from http://opensource.creative.com/pipermail/openal/2007-April/010355.html + CombinedSoundSourcePtr combinedSource(new CombinedSoundSource); + StreamSoundSourcePtr streamSource; - if(soundFile->getSize() <= MAX_CACHE_SIZE) { - source = SoundSourcePtr(new SoundSource); - SoundBufferPtr buffer = SoundBufferPtr(new SoundBuffer); - buffer->fillBuffer(soundFile); - source->setBuffer(buffer); - m_buffers[filename] = buffer; - g_logger.warning(stdext::format("uncached sound '%s' requested to be played", filename)); - } else { - StreamSoundSourcePtr streamSource(new StreamSoundSource); - streamSource->setSoundFile(soundFile); - source = streamSource; - - #if defined __linux && !defined OPENGL_ES - // due to OpenAL implementation bug, stereo buffers are always downmixed to mono on linux systems - // this is hack to work around the issue - // solution taken from http://opensource.creative.com/pipermail/openal/2007-April/010355.html - if(soundFile->getSampleFormat() == AL_FORMAT_STEREO16) { - CombinedSoundSourcePtr combinedSource(new CombinedSoundSource); - - streamSource->downMix(StreamSoundSource::DownMixLeft); - streamSource->setRelative(true); - streamSource->setPosition(Point(-128, 0)); - combinedSource->addSource(streamSource); - - streamSource = StreamSoundSourcePtr(new StreamSoundSource); - streamSource->setSoundFile(SoundFile::loadSoundFile(filename)); - streamSource->downMix(StreamSoundSource::DownMixRight); - streamSource->setRelative(true); - streamSource->setPosition(Point(128,0)); - combinedSource->addSource(streamSource); - - source = combinedSource; + streamSource = StreamSoundSourcePtr(new StreamSoundSource); + streamSource->downMix(StreamSoundSource::DownMixLeft); + streamSource->setRelative(true); + streamSource->setPosition(Point(-128, 0)); + combinedSource->addSource(streamSource); + m_streamFiles[streamSource] = g_asyncDispatcher.schedule([=]() -> SoundFilePtr { + stdext::timer a; + try { + return SoundFile::loadSoundFile(filename); + } catch(std::exception& e) { + g_logger.error(e.what()); + return nullptr; } - #endif - } + }); + + streamSource = StreamSoundSourcePtr(new StreamSoundSource); + streamSource->downMix(StreamSoundSource::DownMixRight); + streamSource->setRelative(true); + streamSource->setPosition(Point(128,0)); + combinedSource->addSource(streamSource); + m_streamFiles[streamSource] = g_asyncDispatcher.schedule([=]() -> SoundFilePtr { + try { + return SoundFile::loadSoundFile(filename); + } catch(std::exception& e) { + g_logger.error(e.what()); + return nullptr; + } + }); + + source = combinedSource; +#else + StreamSoundSourcePtr streamSource(new StreamSoundSource); + m_streamFiles[streamSource] = m_loadJobs [=]() -> SoundFilePtr { + try { + return SoundFile::loadSoundFile(filename); + } catch(std::exception& e) { + g_logger.error(e.what()); + return nullptr; + } + }); + source = streamSource; +#endif } } catch(std::exception& e) { g_logger.error(stdext::format("failed to load sound source: '%s'", e.what())); diff --git a/src/framework/sound/soundmanager.h b/src/framework/sound/soundmanager.h index 2c615f69..aa7eab78 100644 --- a/src/framework/sound/soundmanager.h +++ b/src/framework/sound/soundmanager.h @@ -57,6 +57,7 @@ private: ALCdevice *m_device; ALCcontext *m_context; + std::map> m_streamFiles; std::unordered_map m_buffers; std::vector m_sources; stdext::boolean m_audioEnabled; diff --git a/src/framework/sound/streamsoundsource.cpp b/src/framework/sound/streamsoundsource.cpp index 72dcd208..39f84038 100644 --- a/src/framework/sound/streamsoundsource.cpp +++ b/src/framework/sound/streamsoundsource.cpp @@ -25,6 +25,7 @@ #include "soundfile.h" #include +#include StreamSoundSource::StreamSoundSource() { @@ -41,22 +42,26 @@ StreamSoundSource::~StreamSoundSource() void StreamSoundSource::setSoundFile(const SoundFilePtr& soundFile) { m_soundFile = soundFile; + if(m_waitingFile) { + m_waitingFile = false; + play(); + } } void StreamSoundSource::play() { m_playing = true; + if(!m_soundFile) { + m_waitingFile = true; + return; + } + if(m_eof) { m_soundFile->reset(); m_eof = false; } - if(!m_soundFile) { - g_logger.error("there is not sound file to play the stream"); - return; - } - queueBuffers(); SoundSource::play(); @@ -64,9 +69,13 @@ void StreamSoundSource::play() void StreamSoundSource::stop() { + m_playing = false; + + if(m_waitingFile) + return; + SoundSource::stop(); unqueueBuffers(); - m_playing = false; } void StreamSoundSource::queueBuffers() @@ -91,6 +100,9 @@ void StreamSoundSource::unqueueBuffers() void StreamSoundSource::update() { + if(m_waitingFile) + return; + SoundSource::update(); int processed = 0; @@ -118,6 +130,9 @@ void StreamSoundSource::update() bool StreamSoundSource::fillBufferAndQueue(uint buffer) { + if(m_waitingFile) + return false; + // fill buffer static DataBuffer bufferData(2*STREAM_FRAGMENT_SIZE); ALenum format = m_soundFile->getSampleFormat(); @@ -170,15 +185,5 @@ bool StreamSoundSource::fillBufferAndQueue(uint buffer) void StreamSoundSource::downMix(StreamSoundSource::DownMix downMix) { - if(!m_soundFile) { - g_logger.error("down mix must be set after setting a sound file"); - return; - } - - if(m_soundFile->getSampleFormat() != AL_FORMAT_STEREO16) { - g_logger.error("can only downmix 16 bit stereo audio files"); - return; - } - m_downMix = downMix; } diff --git a/src/framework/sound/streamsoundsource.h b/src/framework/sound/streamsoundsource.h index 46686bfa..bfbc60e2 100644 --- a/src/framework/sound/streamsoundsource.h +++ b/src/framework/sound/streamsoundsource.h @@ -61,6 +61,7 @@ private: stdext::boolean m_looping; stdext::boolean m_playing; stdext::boolean m_eof; + stdext::boolean m_waitingFile; }; #endif diff --git a/src/framework/stdext/thread.h b/src/framework/stdext/thread.h index c4b74520..06737e18 100644 --- a/src/framework/stdext/thread.h +++ b/src/framework/stdext/thread.h @@ -29,25 +29,42 @@ #include #include #include +#include #include namespace std { - using boost::thread; + using boost::thread; + using boost::future; + using boost::future_status; + using boost::promise; - using boost::mutex; - using boost::timed_mutex; - using boost::recursive_mutex; - using boost::recursive_timed_mutex; + using boost::mutex; + using boost::timed_mutex; + using boost::recursive_mutex; + using boost::recursive_timed_mutex; - using boost::lock_guard; - using boost::unique_lock; + using boost::lock_guard; + using boost::unique_lock; - using boost::condition_variable; - using boost::condition_variable_any; + using boost::condition_variable; + using boost::condition_variable_any; + + template + bool is_ready(std::future& f) + { return f.wait_for(boost::chrono::seconds(0)) == future_status::ready; } } + #else #include #include #include +#include + +namespace std { + template + bool is_ready(std::future& f) + { return f.wait_for(chrono::seconds(0)) == future_status::ready; } +}; + #endif #endif