From 6ab3b4c83856b5c8a1a526c0e4dc55babe79d50d Mon Sep 17 00:00:00 2001 From: ShadowNinja Date: Tue, 15 Apr 2014 13:41:07 -0400 Subject: Remove dependency on marshal and many other async changes This makes a number of changes: * Remove the dependency on marshal by using string.dump and loadstring. * Use lua_tolstring rather than having Lua functions pass string lengths to C++. * Move lua_api/l_async_events.* to cpp_api/s_async.*, where it belongs. * Make AsyncWorkerThread a child of ScriptApiBase, this removes some duplicate functionality. * Don't wait for async threads to shut down. (Is this safe? Might result in corruption if the thread is writing to a file.) * Pop more unused items from the stack * Code style fixes * Other misc changes --- src/script/cpp_api/CMakeLists.txt | 1 + src/script/cpp_api/s_async.cpp | 301 ++++++++++++++++++++++++++++++++++++++ src/script/cpp_api/s_async.h | 171 ++++++++++++++++++++++ 3 files changed, 473 insertions(+) create mode 100644 src/script/cpp_api/s_async.cpp create mode 100644 src/script/cpp_api/s_async.h (limited to 'src/script/cpp_api') diff --git a/src/script/cpp_api/CMakeLists.txt b/src/script/cpp_api/CMakeLists.txt index b753eda17..c45020055 100644 --- a/src/script/cpp_api/CMakeLists.txt +++ b/src/script/cpp_api/CMakeLists.txt @@ -9,6 +9,7 @@ set(common_SCRIPT_CPP_API_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/s_nodemeta.cpp ${CMAKE_CURRENT_SOURCE_DIR}/s_player.cpp ${CMAKE_CURRENT_SOURCE_DIR}/s_server.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/s_async.cpp PARENT_SCOPE) # Used by client only diff --git a/src/script/cpp_api/s_async.cpp b/src/script/cpp_api/s_async.cpp new file mode 100644 index 000000000..603b6fe9c --- /dev/null +++ b/src/script/cpp_api/s_async.cpp @@ -0,0 +1,301 @@ +/* +Minetest +Copyright (C) 2013 sapier, + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#include +#include + +extern "C" { +#include "lua.h" +#include "lauxlib.h" +#include "lualib.h" +} + +#include "s_async.h" +#include "log.h" +#include "filesys.h" +#include "porting.h" +#include "common/c_internal.h" + +/******************************************************************************/ +AsyncEngine::AsyncEngine() : + m_initDone(false), + m_JobIdCounter(0) +{ +} + +/******************************************************************************/ +AsyncEngine::~AsyncEngine() +{ + // Force kill all threads + for (std::vector::iterator i = m_WorkerThreads.begin(); + i != m_WorkerThreads.end(); i++) { + (*i)->Kill(); + delete *i; + } + + m_JobQueueMutex.Lock(); + m_JobQueue.clear(); + m_JobQueueMutex.Unlock(); + m_WorkerThreads.clear(); +} + +/******************************************************************************/ +bool AsyncEngine::registerFunction(const char* name, lua_CFunction func) +{ + if (m_initDone) { + return false; + } + m_FunctionList[name] = func; + return true; +} + +/******************************************************************************/ +void AsyncEngine::Initialize(unsigned int numEngines) +{ + m_initDone = true; + + for (unsigned int i = 0; i < numEngines; i++) { + AsyncWorkerThread* toAdd = new AsyncWorkerThread(this, i); + m_WorkerThreads.push_back(toAdd); + toAdd->Start(); + } +} + +/******************************************************************************/ +unsigned int AsyncEngine::doAsyncJob(std::string func, std::string params) +{ + m_JobQueueMutex.Lock(); + LuaJobInfo toadd; + toadd.JobId = m_JobIdCounter++; + toadd.serializedFunction = func; + toadd.serializedParams = params; + + m_JobQueue.push_back(toadd); + + m_JobQueueCounter.Post(); + + m_JobQueueMutex.Unlock(); + + return toadd.JobId; +} + +/******************************************************************************/ +LuaJobInfo AsyncEngine::getJob() +{ + m_JobQueueCounter.Wait(); + m_JobQueueMutex.Lock(); + + LuaJobInfo retval; + retval.valid = false; + + if (m_JobQueue.size() != 0) { + retval = m_JobQueue.front(); + retval.valid = true; + m_JobQueue.erase(m_JobQueue.begin()); + } + m_JobQueueMutex.Unlock(); + + return retval; +} + +/******************************************************************************/ +void AsyncEngine::putJobResult(LuaJobInfo result) +{ + m_ResultQueueMutex.Lock(); + m_ResultQueue.push_back(result); + m_ResultQueueMutex.Unlock(); +} + +/******************************************************************************/ +void AsyncEngine::Step(lua_State *L, int errorhandler) +{ + lua_getglobal(L, "engine"); + m_ResultQueueMutex.Lock(); + while (!m_ResultQueue.empty()) { + LuaJobInfo jobdone = m_ResultQueue.front(); + m_ResultQueue.erase(m_ResultQueue.begin()); + + lua_getfield(L, -1, "async_event_handler"); + + if (lua_isnil(L, -1)) { + assert("Async event handler does not exist!" == 0); + } + + luaL_checktype(L, -1, LUA_TFUNCTION); + + lua_pushinteger(L, jobdone.JobId); + lua_pushlstring(L, jobdone.serializedResult.c_str(), + jobdone.serializedResult.length()); + + if (lua_pcall(L, 2, 0, errorhandler)) { + script_error(L); + } + } + m_ResultQueueMutex.Unlock(); + lua_pop(L, 1); // Pop engine +} + +/******************************************************************************/ +void AsyncEngine::PushFinishedJobs(lua_State* L) { + // Result Table + m_ResultQueueMutex.Lock(); + + unsigned int index = 1; + lua_createtable(L, m_ResultQueue.size(), 0); + int top = lua_gettop(L); + + while (!m_ResultQueue.empty()) { + LuaJobInfo jobdone = m_ResultQueue.front(); + m_ResultQueue.erase(m_ResultQueue.begin()); + + lua_createtable(L, 0, 2); // Pre-alocate space for two map fields + int top_lvl2 = lua_gettop(L); + + lua_pushstring(L, "jobid"); + lua_pushnumber(L, jobdone.JobId); + lua_settable(L, top_lvl2); + + lua_pushstring(L, "retval"); + lua_pushlstring(L, jobdone.serializedResult.data(), + jobdone.serializedResult.size()); + lua_settable(L, top_lvl2); + + lua_rawseti(L, top, index++); + } + + m_ResultQueueMutex.Unlock(); +} + +/******************************************************************************/ +void AsyncEngine::PrepareEnvironment(lua_State* L, int top) { + for (std::map::iterator it = m_FunctionList.begin(); + it != m_FunctionList.end(); it++) { + lua_pushstring(L, it->first.c_str()); + lua_pushcfunction(L, it->second); + lua_settable(L, top); + } +} + +/******************************************************************************/ +AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher, + unsigned int threadNum) : + ScriptApiBase(), + m_JobDispatcher(jobDispatcher), + m_threadnum(threadNum) +{ + lua_State *L = getStack(); + + luaL_openlibs(L); + + // Prepare job lua environment + lua_newtable(L); + lua_setglobal(L, "engine"); + lua_getglobal(L, "engine"); + int top = lua_gettop(L); + + lua_pushstring(L, DIR_DELIM); + lua_setglobal(L, "DIR_DELIM"); + + lua_pushstring(L, + (porting::path_share + DIR_DELIM + "builtin").c_str()); + lua_setglobal(L, "SCRIPTDIR"); + + m_JobDispatcher->PrepareEnvironment(L, top); +} + +/******************************************************************************/ +AsyncWorkerThread::~AsyncWorkerThread() +{ + assert(IsRunning() == false); +} + +/******************************************************************************/ +void* AsyncWorkerThread::Thread() +{ + ThreadStarted(); + + // Register thread for error logging + char number[21]; + snprintf(number, sizeof(number), "%d", m_threadnum); + log_register_thread(std::string("AsyncWorkerThread_") + number); + + porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str()); + + std::string asyncscript = porting::path_share + DIR_DELIM + "builtin" + + DIR_DELIM + "async_env.lua"; + + if (!loadScript(asyncscript)) { + errorstream + << "AsyncWorkderThread execution of async base environment failed!" + << std::endl; + abort(); + } + + lua_State *L = getStack(); + // Main loop + while (!StopRequested()) { + // Wait for job + LuaJobInfo toProcess = m_JobDispatcher->getJob(); + + if (toProcess.valid == false || StopRequested()) { + continue; + } + + lua_getglobal(L, "engine"); + if (lua_isnil(L, -1)) { + errorstream << "Unable to find engine within async environment!"; + abort(); + } + + lua_getfield(L, -1, "job_processor"); + if (lua_isnil(L, -1)) { + errorstream << "Unable to get async job processor!" << std::endl; + abort(); + } + + luaL_checktype(L, -1, LUA_TFUNCTION); + + // Call it + lua_pushlstring(L, + toProcess.serializedFunction.data(), + toProcess.serializedFunction.size()); + lua_pushlstring(L, + toProcess.serializedParams.data(), + toProcess.serializedParams.size()); + + if (lua_pcall(L, 2, 1, m_errorhandler)) { + scriptError(); + toProcess.serializedResult = ""; + } else { + // Fetch result + size_t length; + const char *retval = lua_tolstring(L, -1, &length); + toProcess.serializedResult = std::string(retval, length); + } + + // Pop engine, job_processor, and retval + lua_pop(L, 3); + + // Put job result + m_JobDispatcher->putJobResult(toProcess); + } + log_deregister_thread(); + return 0; +} + diff --git a/src/script/cpp_api/s_async.h b/src/script/cpp_api/s_async.h new file mode 100644 index 000000000..c5c0e091d --- /dev/null +++ b/src/script/cpp_api/s_async.h @@ -0,0 +1,171 @@ +/* +Minetest +Copyright (C) 2013 sapier, + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#ifndef L_ASYNC_EVENTS_H_ +#define L_ASYNC_EVENTS_H_ + +#include +#include + +#include "jthread/jthread.h" +#include "jthread/jmutex.h" +#include "jthread/jsemaphore.h" +#include "debug.h" +#include "lua.h" +#include "cpp_api/s_base.h" + +// Forward declarations +class AsyncEngine; + + +// Declarations + +// Data required to queue a job +struct LuaJobInfo { + // Function to be called in async environment + std::string serializedFunction; + // Parameter to be passed to function + std::string serializedParams; + // Result of function call + std::string serializedResult; + // JobID used to identify a job and match it to callback + unsigned int JobId; + + bool valid; +}; + +// Asynchronous working environment +class AsyncWorkerThread : public JThread, public ScriptApiBase { +public: + /** + * default constructor + * @param pointer to job dispatcher + */ + AsyncWorkerThread(AsyncEngine* jobDispatcher, unsigned int threadNum); + + virtual ~AsyncWorkerThread(); + + void* Thread(); + +private: + AsyncEngine* m_JobDispatcher; + + // Thread number. Used for debug output + unsigned int m_threadnum; + +}; + +// Asynchornous thread and job management +class AsyncEngine { + friend class AsyncWorkerThread; +public: + AsyncEngine(); + ~AsyncEngine(); + + /** + * Register function to be used within engine + * @param name Function name to be used within Lua environment + * @param func C function to be called + */ + bool registerFunction(const char* name, lua_CFunction func); + + /** + * Create async engine tasks and lock function registration + * @param numEngines Number of async threads to be started + */ + void Initialize(unsigned int numEngines); + + /** + * queue/run a async job + * @param func Serialized lua function + * @param params Serialized parameters + * @return jobid The job is queued + */ + unsigned int doAsyncJob(std::string func, std::string params); + + /** + * Engine step to process finished jobs + * the engine step is one way to pass events back, PushFinishedJobs another + * @param L The Lua stack + * @param errorhandler Stack index of the Lua error handler + */ + void Step(lua_State *L, int errorhandler); + + /** + * Push a list of finished jobs onto the stack + * @param L The Lua stack + */ + void PushFinishedJobs(lua_State *L); + +protected: + /** + * Get a Job from queue to be processed + * this function blocks until a job is ready + * @return a job to be processed + */ + LuaJobInfo getJob(); + + /** + * Put a Job result back to result queue + * @param result result of completed job + */ + void putJobResult(LuaJobInfo result); + + /** + * Initialize environment with current registred functions + * this function adds all functions registred by registerFunction to the + * passed lua stack + * @param L Lua stack to initialize + * @param top Stack position + */ + void PrepareEnvironment(lua_State* L, int top); + +private: + + // Stack index of error handler + int m_errorhandler; + + // variable locking the engine against further modification + bool m_initDone; + + // Internal store for registred functions + std::map m_FunctionList; + + // Internal counter to create job IDs + unsigned int m_JobIdCounter; + + // Mutex to protect job queue + JMutex m_JobQueueMutex; + + // Job queue + std::vector m_JobQueue; + + // Mutex to protect result queue + JMutex m_ResultQueueMutex; + // Result queue + std::vector m_ResultQueue; + + // List of current worker threads + std::vector m_WorkerThreads; + + // Counter semaphore for job dispatching + JSemaphore m_JobQueueCounter; +}; + +#endif // L_ASYNC_EVENTS_H_ -- cgit v1.2.3