/* Minetest Copyright (C) 2013 sapier, <sapier AT gmx DOT net> This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include <stdio.h> #include <stdlib.h> extern "C" { #include "lua.h" #include "lauxlib.h" #include "lualib.h" } #include "server.h" #include "s_async.h" #include "log.h" #include "filesys.h" #include "porting.h" #include "common/c_internal.h" /******************************************************************************/ AsyncEngine::AsyncEngine() : initDone(false), jobIdCounter(0) { } /******************************************************************************/ AsyncEngine::~AsyncEngine() { // Request all threads to stop for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); it != workerThreads.end(); it++) { (*it)->Stop(); } // Wake up all threads for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); it != workerThreads.end(); it++) { jobQueueCounter.Post(); } // Wait for threads to finish for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); it != workerThreads.end(); it++) { (*it)->Wait(); } // Force kill all threads for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin(); it != workerThreads.end(); it++) { (*it)->Kill(); delete *it; } jobQueueMutex.Lock(); jobQueue.clear(); jobQueueMutex.Unlock(); workerThreads.clear(); } /******************************************************************************/ bool AsyncEngine::registerFunction(const char* name, lua_CFunction func) { if (initDone) { return false; } functionList[name] = func; return true; } /******************************************************************************/ void AsyncEngine::initialize(unsigned int numEngines) { initDone = true; for (unsigned int i = 0; i < numEngines; i++) { AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i); workerThreads.push_back(toAdd); toAdd->Start(); } } /******************************************************************************/ unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params) { jobQueueMutex.Lock(); LuaJobInfo toAdd; toAdd.id = jobIdCounter++; toAdd.serializedFunction = func; toAdd.serializedParams = params; jobQueue.push_back(toAdd); jobQueueCounter.Post(); jobQueueMutex.Unlock(); return toAdd.id; } /******************************************************************************/ LuaJobInfo AsyncEngine::getJob() { jobQueueCounter.Wait(); jobQueueMutex.Lock(); LuaJobInfo retval; retval.valid = false; if (!jobQueue.empty()) { retval = jobQueue.front(); jobQueue.pop_front(); retval.valid = true; } jobQueueMutex.Unlock(); return retval; } /******************************************************************************/ void AsyncEngine::putJobResult(LuaJobInfo result) { resultQueueMutex.Lock(); resultQueue.push_back(result); resultQueueMutex.Unlock(); } /******************************************************************************/ void AsyncEngine::step(lua_State *L, int errorhandler) { lua_getglobal(L, "core"); resultQueueMutex.Lock(); while (!resultQueue.empty()) { LuaJobInfo jobDone = resultQueue.front(); resultQueue.pop_front(); lua_getfield(L, -1, "async_event_handler"); if (lua_isnil(L, -1)) { assert("Async event handler does not exist!" == 0); } luaL_checktype(L, -1, LUA_TFUNCTION); lua_pushinteger(L, jobDone.id); lua_pushlstring(L, jobDone.serializedResult.data(), jobDone.serializedResult.size()); if (lua_pcall(L, 2, 0, errorhandler)) { script_error(L); } } resultQueueMutex.Unlock(); lua_pop(L, 1); // Pop core } /******************************************************************************/ void AsyncEngine::pushFinishedJobs(lua_State* L) { // Result Table resultQueueMutex.Lock(); unsigned int index = 1; lua_createtable(L, resultQueue.size(), 0); int top = lua_gettop(L); while (!resultQueue.empty()) { LuaJobInfo jobDone = resultQueue.front(); resultQueue.pop_front(); lua_createtable(L, 0, 2); // Pre-allocate space for two map fields int top_lvl2 = lua_gettop(L); lua_pushstring(L, "jobid"); lua_pushnumber(L, jobDone.id); lua_settable(L, top_lvl2); lua_pushstring(L, "retval"); lua_pushlstring(L, jobDone.serializedResult.data(), jobDone.serializedResult.size()); lua_settable(L, top_lvl2); lua_rawseti(L, top, index++); } resultQueueMutex.Unlock(); } /******************************************************************************/ void AsyncEngine::prepareEnvironment(lua_State* L, int top) { for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin(); it != functionList.end(); it++) { lua_pushstring(L, it->first.c_str()); lua_pushcfunction(L, it->second); lua_settable(L, top); } } /******************************************************************************/ AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher, unsigned int threadNum) : ScriptApiBase(), jobDispatcher(jobDispatcher), threadnum(threadNum) { lua_State *L = getStack(); // Prepare job lua environment lua_getglobal(L, "core"); int top = lua_gettop(L); // Push builtin initialization type lua_pushstring(L, "async"); lua_setglobal(L, "INIT"); jobDispatcher->prepareEnvironment(L, top); } /******************************************************************************/ AsyncWorkerThread::~AsyncWorkerThread() { assert(IsRunning() == false); } /******************************************************************************/ void* AsyncWorkerThread::Thread() { ThreadStarted(); // Register thread for error logging char number[21]; snprintf(number, sizeof(number), "%d", threadnum); log_register_thread(std::string("AsyncWorkerThread_") + number); porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str()); lua_State *L = getStack(); std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua"; if (!loadScript(script)) { errorstream << "AsyncWorkderThread execution of async base environment failed!" << std::endl; abort(); } lua_getglobal(L, "core"); if (lua_isnil(L, -1)) { errorstream << "Unable to find core within async environment!"; abort(); } // Main loop while (!StopRequested()) { // Wait for job LuaJobInfo toProcess = jobDispatcher->getJob(); if (toProcess.valid == false || StopRequested()) { continue; } lua_getfield(L, -1, "job_processor"); if (lua_isnil(L, -1)) { errorstream << "Unable to get async job processor!" << std::endl; abort(); } luaL_checktype(L, -1, LUA_TFUNCTION); // Call it lua_pushlstring(L, toProcess.serializedFunction.data(), toProcess.serializedFunction.size()); lua_pushlstring(L, toProcess.serializedParams.data(), toProcess.serializedParams.size()); if (lua_pcall(L, 2, 1, m_errorhandler)) { scriptError(); toProcess.serializedResult = ""; } else { // Fetch result size_t length; const char *retval = lua_tolstring(L, -1, &length); toProcess.serializedResult = std::string(retval, length); } lua_pop(L, 1); // Pop retval // Put job result jobDispatcher->putJobResult(toProcess); } lua_pop(L, 1); // Pop core log_deregister_thread(); return 0; }