diff options
Diffstat (limited to 'src/script/cpp_api/s_async.cpp')
-rw-r--r-- | src/script/cpp_api/s_async.cpp | 149 |
1 files changed, 85 insertions, 64 deletions
diff --git a/src/script/cpp_api/s_async.cpp b/src/script/cpp_api/s_async.cpp index 603b6fe9c..4feed3e56 100644 --- a/src/script/cpp_api/s_async.cpp +++ b/src/script/cpp_api/s_async.cpp @@ -34,82 +34,102 @@ extern "C" { /******************************************************************************/ AsyncEngine::AsyncEngine() : - m_initDone(false), - m_JobIdCounter(0) + initDone(false), + jobIdCounter(0) { } /******************************************************************************/ AsyncEngine::~AsyncEngine() { + + // Request all threads to stop + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + (*it)->Stop(); + } + + + // Wake up all threads + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + jobQueueCounter.Post(); + } + + // Wait for threads to finish + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + (*it)->Wait(); + } + // Force kill all threads - for (std::vector<AsyncWorkerThread*>::iterator i = m_WorkerThreads.begin(); - i != m_WorkerThreads.end(); i++) { - (*i)->Kill(); - delete *i; + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + (*it)->Kill(); + delete *it; } - m_JobQueueMutex.Lock(); - m_JobQueue.clear(); - m_JobQueueMutex.Unlock(); - m_WorkerThreads.clear(); + jobQueueMutex.Lock(); + jobQueue.clear(); + jobQueueMutex.Unlock(); + workerThreads.clear(); } /******************************************************************************/ bool AsyncEngine::registerFunction(const char* name, lua_CFunction func) { - if (m_initDone) { + if (initDone) { return false; } - m_FunctionList[name] = func; + functionList[name] = func; return true; } /******************************************************************************/ -void AsyncEngine::Initialize(unsigned int numEngines) +void AsyncEngine::initialize(unsigned int numEngines) { - m_initDone = true; + initDone = true; for (unsigned int i = 0; i < numEngines; i++) { - AsyncWorkerThread* toAdd = new AsyncWorkerThread(this, i); - m_WorkerThreads.push_back(toAdd); + AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i); + workerThreads.push_back(toAdd); toAdd->Start(); } } /******************************************************************************/ -unsigned int AsyncEngine::doAsyncJob(std::string func, std::string params) +unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params) { - m_JobQueueMutex.Lock(); - LuaJobInfo toadd; - toadd.JobId = m_JobIdCounter++; - toadd.serializedFunction = func; - toadd.serializedParams = params; + jobQueueMutex.Lock(); + LuaJobInfo toAdd; + toAdd.id = jobIdCounter++; + toAdd.serializedFunction = func; + toAdd.serializedParams = params; - m_JobQueue.push_back(toadd); + jobQueue.push_back(toAdd); - m_JobQueueCounter.Post(); + jobQueueCounter.Post(); - m_JobQueueMutex.Unlock(); + jobQueueMutex.Unlock(); - return toadd.JobId; + return toAdd.id; } /******************************************************************************/ LuaJobInfo AsyncEngine::getJob() { - m_JobQueueCounter.Wait(); - m_JobQueueMutex.Lock(); + jobQueueCounter.Wait(); + jobQueueMutex.Lock(); LuaJobInfo retval; retval.valid = false; - if (m_JobQueue.size() != 0) { - retval = m_JobQueue.front(); + if (!jobQueue.empty()) { + retval = jobQueue.front(); + jobQueue.pop_front(); retval.valid = true; - m_JobQueue.erase(m_JobQueue.begin()); } - m_JobQueueMutex.Unlock(); + jobQueueMutex.Unlock(); return retval; } @@ -117,19 +137,19 @@ LuaJobInfo AsyncEngine::getJob() /******************************************************************************/ void AsyncEngine::putJobResult(LuaJobInfo result) { - m_ResultQueueMutex.Lock(); - m_ResultQueue.push_back(result); - m_ResultQueueMutex.Unlock(); + resultQueueMutex.Lock(); + resultQueue.push_back(result); + resultQueueMutex.Unlock(); } /******************************************************************************/ -void AsyncEngine::Step(lua_State *L, int errorhandler) +void AsyncEngine::step(lua_State *L, int errorhandler) { lua_getglobal(L, "engine"); - m_ResultQueueMutex.Lock(); - while (!m_ResultQueue.empty()) { - LuaJobInfo jobdone = m_ResultQueue.front(); - m_ResultQueue.erase(m_ResultQueue.begin()); + resultQueueMutex.Lock(); + while (!resultQueue.empty()) { + LuaJobInfo jobDone = resultQueue.front(); + resultQueue.pop_front(); lua_getfield(L, -1, "async_event_handler"); @@ -139,53 +159,54 @@ void AsyncEngine::Step(lua_State *L, int errorhandler) luaL_checktype(L, -1, LUA_TFUNCTION); - lua_pushinteger(L, jobdone.JobId); - lua_pushlstring(L, jobdone.serializedResult.c_str(), - jobdone.serializedResult.length()); + lua_pushinteger(L, jobDone.id); + lua_pushlstring(L, jobDone.serializedResult.data(), + jobDone.serializedResult.size()); if (lua_pcall(L, 2, 0, errorhandler)) { script_error(L); } } - m_ResultQueueMutex.Unlock(); + resultQueueMutex.Unlock(); lua_pop(L, 1); // Pop engine } /******************************************************************************/ -void AsyncEngine::PushFinishedJobs(lua_State* L) { +void AsyncEngine::pushFinishedJobs(lua_State* L) { // Result Table - m_ResultQueueMutex.Lock(); + resultQueueMutex.Lock(); unsigned int index = 1; - lua_createtable(L, m_ResultQueue.size(), 0); + lua_createtable(L, resultQueue.size(), 0); int top = lua_gettop(L); - while (!m_ResultQueue.empty()) { - LuaJobInfo jobdone = m_ResultQueue.front(); - m_ResultQueue.erase(m_ResultQueue.begin()); + while (!resultQueue.empty()) { + LuaJobInfo jobDone = resultQueue.front(); + resultQueue.pop_front(); - lua_createtable(L, 0, 2); // Pre-alocate space for two map fields + lua_createtable(L, 0, 2); // Pre-allocate space for two map fields int top_lvl2 = lua_gettop(L); lua_pushstring(L, "jobid"); - lua_pushnumber(L, jobdone.JobId); + lua_pushnumber(L, jobDone.id); lua_settable(L, top_lvl2); lua_pushstring(L, "retval"); - lua_pushlstring(L, jobdone.serializedResult.data(), - jobdone.serializedResult.size()); + lua_pushlstring(L, jobDone.serializedResult.data(), + jobDone.serializedResult.size()); lua_settable(L, top_lvl2); lua_rawseti(L, top, index++); } - m_ResultQueueMutex.Unlock(); + resultQueueMutex.Unlock(); } /******************************************************************************/ -void AsyncEngine::PrepareEnvironment(lua_State* L, int top) { - for (std::map<std::string, lua_CFunction>::iterator it = m_FunctionList.begin(); - it != m_FunctionList.end(); it++) { +void AsyncEngine::prepareEnvironment(lua_State* L, int top) +{ + for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin(); + it != functionList.end(); it++) { lua_pushstring(L, it->first.c_str()); lua_pushcfunction(L, it->second); lua_settable(L, top); @@ -196,8 +217,8 @@ void AsyncEngine::PrepareEnvironment(lua_State* L, int top) { AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher, unsigned int threadNum) : ScriptApiBase(), - m_JobDispatcher(jobDispatcher), - m_threadnum(threadNum) + jobDispatcher(jobDispatcher), + threadnum(threadNum) { lua_State *L = getStack(); @@ -216,7 +237,7 @@ AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher, (porting::path_share + DIR_DELIM + "builtin").c_str()); lua_setglobal(L, "SCRIPTDIR"); - m_JobDispatcher->PrepareEnvironment(L, top); + jobDispatcher->prepareEnvironment(L, top); } /******************************************************************************/ @@ -232,7 +253,7 @@ void* AsyncWorkerThread::Thread() // Register thread for error logging char number[21]; - snprintf(number, sizeof(number), "%d", m_threadnum); + snprintf(number, sizeof(number), "%d", threadnum); log_register_thread(std::string("AsyncWorkerThread_") + number); porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str()); @@ -251,7 +272,7 @@ void* AsyncWorkerThread::Thread() // Main loop while (!StopRequested()) { // Wait for job - LuaJobInfo toProcess = m_JobDispatcher->getJob(); + LuaJobInfo toProcess = jobDispatcher->getJob(); if (toProcess.valid == false || StopRequested()) { continue; @@ -293,7 +314,7 @@ void* AsyncWorkerThread::Thread() lua_pop(L, 3); // Put job result - m_JobDispatcher->putJobResult(toProcess); + jobDispatcher->putJobResult(toProcess); } log_deregister_thread(); return 0; |