diff options
Diffstat (limited to 'src/script/cpp_api')
-rw-r--r-- | src/script/cpp_api/s_async.cpp | 149 | ||||
-rw-r--r-- | src/script/cpp_api/s_async.h | 51 |
2 files changed, 109 insertions, 91 deletions
diff --git a/src/script/cpp_api/s_async.cpp b/src/script/cpp_api/s_async.cpp index 603b6fe9c..4feed3e56 100644 --- a/src/script/cpp_api/s_async.cpp +++ b/src/script/cpp_api/s_async.cpp @@ -34,82 +34,102 @@ extern "C" { /******************************************************************************/ AsyncEngine::AsyncEngine() : - m_initDone(false), - m_JobIdCounter(0) + initDone(false), + jobIdCounter(0) { } /******************************************************************************/ AsyncEngine::~AsyncEngine() { + + // Request all threads to stop + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + (*it)->Stop(); + } + + + // Wake up all threads + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + jobQueueCounter.Post(); + } + + // Wait for threads to finish + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + (*it)->Wait(); + } + // Force kill all threads - for (std::vector<AsyncWorkerThread*>::iterator i = m_WorkerThreads.begin(); - i != m_WorkerThreads.end(); i++) { - (*i)->Kill(); - delete *i; + for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); + it != workerThreads.end(); it++) { + (*it)->Kill(); + delete *it; } - m_JobQueueMutex.Lock(); - m_JobQueue.clear(); - m_JobQueueMutex.Unlock(); - m_WorkerThreads.clear(); + jobQueueMutex.Lock(); + jobQueue.clear(); + jobQueueMutex.Unlock(); + workerThreads.clear(); } /******************************************************************************/ bool AsyncEngine::registerFunction(const char* name, lua_CFunction func) { - if (m_initDone) { + if (initDone) { return false; } - m_FunctionList[name] = func; + functionList[name] = func; return true; } /******************************************************************************/ -void AsyncEngine::Initialize(unsigned int numEngines) +void AsyncEngine::initialize(unsigned int numEngines) { - m_initDone = true; + initDone = true; for (unsigned int i = 0; i < numEngines; i++) { - AsyncWorkerThread* toAdd = new AsyncWorkerThread(this, i); - m_WorkerThreads.push_back(toAdd); + AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i); + workerThreads.push_back(toAdd); toAdd->Start(); } } /******************************************************************************/ -unsigned int AsyncEngine::doAsyncJob(std::string func, std::string params) +unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params) { - m_JobQueueMutex.Lock(); - LuaJobInfo toadd; - toadd.JobId = m_JobIdCounter++; - toadd.serializedFunction = func; - toadd.serializedParams = params; + jobQueueMutex.Lock(); + LuaJobInfo toAdd; + toAdd.id = jobIdCounter++; + toAdd.serializedFunction = func; + toAdd.serializedParams = params; - m_JobQueue.push_back(toadd); + jobQueue.push_back(toAdd); - m_JobQueueCounter.Post(); + jobQueueCounter.Post(); - m_JobQueueMutex.Unlock(); + jobQueueMutex.Unlock(); - return toadd.JobId; + return toAdd.id; } /******************************************************************************/ LuaJobInfo AsyncEngine::getJob() { - m_JobQueueCounter.Wait(); - m_JobQueueMutex.Lock(); + jobQueueCounter.Wait(); + jobQueueMutex.Lock(); LuaJobInfo retval; retval.valid = false; - if (m_JobQueue.size() != 0) { - retval = m_JobQueue.front(); + if (!jobQueue.empty()) { + retval = jobQueue.front(); + jobQueue.pop_front(); retval.valid = true; - m_JobQueue.erase(m_JobQueue.begin()); } - m_JobQueueMutex.Unlock(); + jobQueueMutex.Unlock(); return retval; } @@ -117,19 +137,19 @@ LuaJobInfo AsyncEngine::getJob() /******************************************************************************/ void AsyncEngine::putJobResult(LuaJobInfo result) { - m_ResultQueueMutex.Lock(); - m_ResultQueue.push_back(result); - m_ResultQueueMutex.Unlock(); + resultQueueMutex.Lock(); + resultQueue.push_back(result); + resultQueueMutex.Unlock(); } /******************************************************************************/ -void AsyncEngine::Step(lua_State *L, int errorhandler) +void AsyncEngine::step(lua_State *L, int errorhandler) { lua_getglobal(L, "engine"); - m_ResultQueueMutex.Lock(); - while (!m_ResultQueue.empty()) { - LuaJobInfo jobdone = m_ResultQueue.front(); - m_ResultQueue.erase(m_ResultQueue.begin()); + resultQueueMutex.Lock(); + while (!resultQueue.empty()) { + LuaJobInfo jobDone = resultQueue.front(); + resultQueue.pop_front(); lua_getfield(L, -1, "async_event_handler"); @@ -139,53 +159,54 @@ void AsyncEngine::Step(lua_State *L, int errorhandler) luaL_checktype(L, -1, LUA_TFUNCTION); - lua_pushinteger(L, jobdone.JobId); - lua_pushlstring(L, jobdone.serializedResult.c_str(), - jobdone.serializedResult.length()); + lua_pushinteger(L, jobDone.id); + lua_pushlstring(L, jobDone.serializedResult.data(), + jobDone.serializedResult.size()); if (lua_pcall(L, 2, 0, errorhandler)) { script_error(L); } } - m_ResultQueueMutex.Unlock(); + resultQueueMutex.Unlock(); lua_pop(L, 1); // Pop engine } /******************************************************************************/ -void AsyncEngine::PushFinishedJobs(lua_State* L) { +void AsyncEngine::pushFinishedJobs(lua_State* L) { // Result Table - m_ResultQueueMutex.Lock(); + resultQueueMutex.Lock(); unsigned int index = 1; - lua_createtable(L, m_ResultQueue.size(), 0); + lua_createtable(L, resultQueue.size(), 0); int top = lua_gettop(L); - while (!m_ResultQueue.empty()) { - LuaJobInfo jobdone = m_ResultQueue.front(); - m_ResultQueue.erase(m_ResultQueue.begin()); + while (!resultQueue.empty()) { + LuaJobInfo jobDone = resultQueue.front(); + resultQueue.pop_front(); - lua_createtable(L, 0, 2); // Pre-alocate space for two map fields + lua_createtable(L, 0, 2); // Pre-allocate space for two map fields int top_lvl2 = lua_gettop(L); lua_pushstring(L, "jobid"); - lua_pushnumber(L, jobdone.JobId); + lua_pushnumber(L, jobDone.id); lua_settable(L, top_lvl2); lua_pushstring(L, "retval"); - lua_pushlstring(L, jobdone.serializedResult.data(), - jobdone.serializedResult.size()); + lua_pushlstring(L, jobDone.serializedResult.data(), + jobDone.serializedResult.size()); lua_settable(L, top_lvl2); lua_rawseti(L, top, index++); } - m_ResultQueueMutex.Unlock(); + resultQueueMutex.Unlock(); } /******************************************************************************/ -void AsyncEngine::PrepareEnvironment(lua_State* L, int top) { - for (std::map<std::string, lua_CFunction>::iterator it = m_FunctionList.begin(); - it != m_FunctionList.end(); it++) { +void AsyncEngine::prepareEnvironment(lua_State* L, int top) +{ + for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin(); + it != functionList.end(); it++) { lua_pushstring(L, it->first.c_str()); lua_pushcfunction(L, it->second); lua_settable(L, top); @@ -196,8 +217,8 @@ void AsyncEngine::PrepareEnvironment(lua_State* L, int top) { AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher, unsigned int threadNum) : ScriptApiBase(), - m_JobDispatcher(jobDispatcher), - m_threadnum(threadNum) + jobDispatcher(jobDispatcher), + threadnum(threadNum) { lua_State *L = getStack(); @@ -216,7 +237,7 @@ AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher, (porting::path_share + DIR_DELIM + "builtin").c_str()); lua_setglobal(L, "SCRIPTDIR"); - m_JobDispatcher->PrepareEnvironment(L, top); + jobDispatcher->prepareEnvironment(L, top); } /******************************************************************************/ @@ -232,7 +253,7 @@ void* AsyncWorkerThread::Thread() // Register thread for error logging char number[21]; - snprintf(number, sizeof(number), "%d", m_threadnum); + snprintf(number, sizeof(number), "%d", threadnum); log_register_thread(std::string("AsyncWorkerThread_") + number); porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str()); @@ -251,7 +272,7 @@ void* AsyncWorkerThread::Thread() // Main loop while (!StopRequested()) { // Wait for job - LuaJobInfo toProcess = m_JobDispatcher->getJob(); + LuaJobInfo toProcess = jobDispatcher->getJob(); if (toProcess.valid == false || StopRequested()) { continue; @@ -293,7 +314,7 @@ void* AsyncWorkerThread::Thread() lua_pop(L, 3); // Put job result - m_JobDispatcher->putJobResult(toProcess); + jobDispatcher->putJobResult(toProcess); } log_deregister_thread(); return 0; diff --git a/src/script/cpp_api/s_async.h b/src/script/cpp_api/s_async.h index c5c0e091d..a6459c18d 100644 --- a/src/script/cpp_api/s_async.h +++ b/src/script/cpp_api/s_async.h @@ -17,10 +17,11 @@ with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifndef L_ASYNC_EVENTS_H_ -#define L_ASYNC_EVENTS_H_ +#ifndef CPP_API_ASYNC_EVENTS_HEADER +#define CPP_API_ASYNC_EVENTS_HEADER #include <vector> +#include <deque> #include <map> #include "jthread/jthread.h" @@ -45,7 +46,7 @@ struct LuaJobInfo { // Result of function call std::string serializedResult; // JobID used to identify a job and match it to callback - unsigned int JobId; + unsigned int id; bool valid; }; @@ -61,13 +62,13 @@ public: virtual ~AsyncWorkerThread(); - void* Thread(); + void *Thread(); private: - AsyncEngine* m_JobDispatcher; + AsyncEngine *jobDispatcher; // Thread number. Used for debug output - unsigned int m_threadnum; + unsigned int threadnum; }; @@ -89,15 +90,15 @@ public: * Create async engine tasks and lock function registration * @param numEngines Number of async threads to be started */ - void Initialize(unsigned int numEngines); + void initialize(unsigned int numEngines); /** - * queue/run a async job + * Queue an async job * @param func Serialized lua function * @param params Serialized parameters * @return jobid The job is queued */ - unsigned int doAsyncJob(std::string func, std::string params); + unsigned int queueAsyncJob(std::string func, std::string params); /** * Engine step to process finished jobs @@ -105,13 +106,13 @@ public: * @param L The Lua stack * @param errorhandler Stack index of the Lua error handler */ - void Step(lua_State *L, int errorhandler); + void step(lua_State *L, int errorhandler); /** * Push a list of finished jobs onto the stack * @param L The Lua stack */ - void PushFinishedJobs(lua_State *L); + void pushFinishedJobs(lua_State *L); protected: /** @@ -134,38 +135,34 @@ protected: * @param L Lua stack to initialize * @param top Stack position */ - void PrepareEnvironment(lua_State* L, int top); + void prepareEnvironment(lua_State* L, int top); private: - - // Stack index of error handler - int m_errorhandler; - - // variable locking the engine against further modification - bool m_initDone; + // Variable locking the engine against further modification + bool initDone; // Internal store for registred functions - std::map<std::string, lua_CFunction> m_FunctionList; + std::map<std::string, lua_CFunction> functionList; // Internal counter to create job IDs - unsigned int m_JobIdCounter; + unsigned int jobIdCounter; // Mutex to protect job queue - JMutex m_JobQueueMutex; + JMutex jobQueueMutex; // Job queue - std::vector<LuaJobInfo> m_JobQueue; + std::deque<LuaJobInfo> jobQueue; // Mutex to protect result queue - JMutex m_ResultQueueMutex; + JMutex resultQueueMutex; // Result queue - std::vector<LuaJobInfo> m_ResultQueue; + std::deque<LuaJobInfo> resultQueue; // List of current worker threads - std::vector<AsyncWorkerThread*> m_WorkerThreads; + std::vector<AsyncWorkerThread*> workerThreads; // Counter semaphore for job dispatching - JSemaphore m_JobQueueCounter; + JSemaphore jobQueueCounter; }; -#endif // L_ASYNC_EVENTS_H_ +#endif // CPP_API_ASYNC_EVENTS_HEADER |