aboutsummaryrefslogtreecommitdiff
path: root/src/script/cpp_api
diff options
context:
space:
mode:
authorShadowNinja <shadowninja@minetest.net>2014-04-15 13:41:07 -0400
committerShadowNinja <shadowninja@minetest.net>2014-04-27 16:15:53 -0400
commit6ab3b4c83856b5c8a1a526c0e4dc55babe79d50d (patch)
tree90e64f78dd3ceac12dbb21ac6b045500de0ca603 /src/script/cpp_api
parentdb4ea4658c58772ee447ff0eff8bb39b692081ec (diff)
downloadminetest-6ab3b4c83856b5c8a1a526c0e4dc55babe79d50d.tar.gz
minetest-6ab3b4c83856b5c8a1a526c0e4dc55babe79d50d.tar.bz2
minetest-6ab3b4c83856b5c8a1a526c0e4dc55babe79d50d.zip
Remove dependency on marshal and many other async changes
This makes a number of changes: * Remove the dependency on marshal by using string.dump and loadstring. * Use lua_tolstring rather than having Lua functions pass string lengths to C++. * Move lua_api/l_async_events.* to cpp_api/s_async.*, where it belongs. * Make AsyncWorkerThread a child of ScriptApiBase, this removes some duplicate functionality. * Don't wait for async threads to shut down. (Is this safe? Might result in corruption if the thread is writing to a file.) * Pop more unused items from the stack * Code style fixes * Other misc changes
Diffstat (limited to 'src/script/cpp_api')
-rw-r--r--src/script/cpp_api/CMakeLists.txt1
-rw-r--r--src/script/cpp_api/s_async.cpp301
-rw-r--r--src/script/cpp_api/s_async.h171
3 files changed, 473 insertions, 0 deletions
diff --git a/src/script/cpp_api/CMakeLists.txt b/src/script/cpp_api/CMakeLists.txt
index b753eda17..c45020055 100644
--- a/src/script/cpp_api/CMakeLists.txt
+++ b/src/script/cpp_api/CMakeLists.txt
@@ -9,6 +9,7 @@ set(common_SCRIPT_CPP_API_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/s_nodemeta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/s_player.cpp
${CMAKE_CURRENT_SOURCE_DIR}/s_server.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/s_async.cpp
PARENT_SCOPE)
# Used by client only
diff --git a/src/script/cpp_api/s_async.cpp b/src/script/cpp_api/s_async.cpp
new file mode 100644
index 000000000..603b6fe9c
--- /dev/null
+++ b/src/script/cpp_api/s_async.cpp
@@ -0,0 +1,301 @@
+/*
+Minetest
+Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+
+extern "C" {
+#include "lua.h"
+#include "lauxlib.h"
+#include "lualib.h"
+}
+
+#include "s_async.h"
+#include "log.h"
+#include "filesys.h"
+#include "porting.h"
+#include "common/c_internal.h"
+
+/******************************************************************************/
+AsyncEngine::AsyncEngine() :
+ m_initDone(false),
+ m_JobIdCounter(0)
+{
+}
+
+/******************************************************************************/
+AsyncEngine::~AsyncEngine()
+{
+ // Force kill all threads
+ for (std::vector<AsyncWorkerThread*>::iterator i = m_WorkerThreads.begin();
+ i != m_WorkerThreads.end(); i++) {
+ (*i)->Kill();
+ delete *i;
+ }
+
+ m_JobQueueMutex.Lock();
+ m_JobQueue.clear();
+ m_JobQueueMutex.Unlock();
+ m_WorkerThreads.clear();
+}
+
+/******************************************************************************/
+bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
+{
+ if (m_initDone) {
+ return false;
+ }
+ m_FunctionList[name] = func;
+ return true;
+}
+
+/******************************************************************************/
+void AsyncEngine::Initialize(unsigned int numEngines)
+{
+ m_initDone = true;
+
+ for (unsigned int i = 0; i < numEngines; i++) {
+ AsyncWorkerThread* toAdd = new AsyncWorkerThread(this, i);
+ m_WorkerThreads.push_back(toAdd);
+ toAdd->Start();
+ }
+}
+
+/******************************************************************************/
+unsigned int AsyncEngine::doAsyncJob(std::string func, std::string params)
+{
+ m_JobQueueMutex.Lock();
+ LuaJobInfo toadd;
+ toadd.JobId = m_JobIdCounter++;
+ toadd.serializedFunction = func;
+ toadd.serializedParams = params;
+
+ m_JobQueue.push_back(toadd);
+
+ m_JobQueueCounter.Post();
+
+ m_JobQueueMutex.Unlock();
+
+ return toadd.JobId;
+}
+
+/******************************************************************************/
+LuaJobInfo AsyncEngine::getJob()
+{
+ m_JobQueueCounter.Wait();
+ m_JobQueueMutex.Lock();
+
+ LuaJobInfo retval;
+ retval.valid = false;
+
+ if (m_JobQueue.size() != 0) {
+ retval = m_JobQueue.front();
+ retval.valid = true;
+ m_JobQueue.erase(m_JobQueue.begin());
+ }
+ m_JobQueueMutex.Unlock();
+
+ return retval;
+}
+
+/******************************************************************************/
+void AsyncEngine::putJobResult(LuaJobInfo result)
+{
+ m_ResultQueueMutex.Lock();
+ m_ResultQueue.push_back(result);
+ m_ResultQueueMutex.Unlock();
+}
+
+/******************************************************************************/
+void AsyncEngine::Step(lua_State *L, int errorhandler)
+{
+ lua_getglobal(L, "engine");
+ m_ResultQueueMutex.Lock();
+ while (!m_ResultQueue.empty()) {
+ LuaJobInfo jobdone = m_ResultQueue.front();
+ m_ResultQueue.erase(m_ResultQueue.begin());
+
+ lua_getfield(L, -1, "async_event_handler");
+
+ if (lua_isnil(L, -1)) {
+ assert("Async event handler does not exist!" == 0);
+ }
+
+ luaL_checktype(L, -1, LUA_TFUNCTION);
+
+ lua_pushinteger(L, jobdone.JobId);
+ lua_pushlstring(L, jobdone.serializedResult.c_str(),
+ jobdone.serializedResult.length());
+
+ if (lua_pcall(L, 2, 0, errorhandler)) {
+ script_error(L);
+ }
+ }
+ m_ResultQueueMutex.Unlock();
+ lua_pop(L, 1); // Pop engine
+}
+
+/******************************************************************************/
+void AsyncEngine::PushFinishedJobs(lua_State* L) {
+ // Result Table
+ m_ResultQueueMutex.Lock();
+
+ unsigned int index = 1;
+ lua_createtable(L, m_ResultQueue.size(), 0);
+ int top = lua_gettop(L);
+
+ while (!m_ResultQueue.empty()) {
+ LuaJobInfo jobdone = m_ResultQueue.front();
+ m_ResultQueue.erase(m_ResultQueue.begin());
+
+ lua_createtable(L, 0, 2); // Pre-alocate space for two map fields
+ int top_lvl2 = lua_gettop(L);
+
+ lua_pushstring(L, "jobid");
+ lua_pushnumber(L, jobdone.JobId);
+ lua_settable(L, top_lvl2);
+
+ lua_pushstring(L, "retval");
+ lua_pushlstring(L, jobdone.serializedResult.data(),
+ jobdone.serializedResult.size());
+ lua_settable(L, top_lvl2);
+
+ lua_rawseti(L, top, index++);
+ }
+
+ m_ResultQueueMutex.Unlock();
+}
+
+/******************************************************************************/
+void AsyncEngine::PrepareEnvironment(lua_State* L, int top) {
+ for (std::map<std::string, lua_CFunction>::iterator it = m_FunctionList.begin();
+ it != m_FunctionList.end(); it++) {
+ lua_pushstring(L, it->first.c_str());
+ lua_pushcfunction(L, it->second);
+ lua_settable(L, top);
+ }
+}
+
+/******************************************************************************/
+AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
+ unsigned int threadNum) :
+ ScriptApiBase(),
+ m_JobDispatcher(jobDispatcher),
+ m_threadnum(threadNum)
+{
+ lua_State *L = getStack();
+
+ luaL_openlibs(L);
+
+ // Prepare job lua environment
+ lua_newtable(L);
+ lua_setglobal(L, "engine");
+ lua_getglobal(L, "engine");
+ int top = lua_gettop(L);
+
+ lua_pushstring(L, DIR_DELIM);
+ lua_setglobal(L, "DIR_DELIM");
+
+ lua_pushstring(L,
+ (porting::path_share + DIR_DELIM + "builtin").c_str());
+ lua_setglobal(L, "SCRIPTDIR");
+
+ m_JobDispatcher->PrepareEnvironment(L, top);
+}
+
+/******************************************************************************/
+AsyncWorkerThread::~AsyncWorkerThread()
+{
+ assert(IsRunning() == false);
+}
+
+/******************************************************************************/
+void* AsyncWorkerThread::Thread()
+{
+ ThreadStarted();
+
+ // Register thread for error logging
+ char number[21];
+ snprintf(number, sizeof(number), "%d", m_threadnum);
+ log_register_thread(std::string("AsyncWorkerThread_") + number);
+
+ porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
+
+ std::string asyncscript = porting::path_share + DIR_DELIM + "builtin"
+ + DIR_DELIM + "async_env.lua";
+
+ if (!loadScript(asyncscript)) {
+ errorstream
+ << "AsyncWorkderThread execution of async base environment failed!"
+ << std::endl;
+ abort();
+ }
+
+ lua_State *L = getStack();
+ // Main loop
+ while (!StopRequested()) {
+ // Wait for job
+ LuaJobInfo toProcess = m_JobDispatcher->getJob();
+
+ if (toProcess.valid == false || StopRequested()) {
+ continue;
+ }
+
+ lua_getglobal(L, "engine");
+ if (lua_isnil(L, -1)) {
+ errorstream << "Unable to find engine within async environment!";
+ abort();
+ }
+
+ lua_getfield(L, -1, "job_processor");
+ if (lua_isnil(L, -1)) {
+ errorstream << "Unable to get async job processor!" << std::endl;
+ abort();
+ }
+
+ luaL_checktype(L, -1, LUA_TFUNCTION);
+
+ // Call it
+ lua_pushlstring(L,
+ toProcess.serializedFunction.data(),
+ toProcess.serializedFunction.size());
+ lua_pushlstring(L,
+ toProcess.serializedParams.data(),
+ toProcess.serializedParams.size());
+
+ if (lua_pcall(L, 2, 1, m_errorhandler)) {
+ scriptError();
+ toProcess.serializedResult = "";
+ } else {
+ // Fetch result
+ size_t length;
+ const char *retval = lua_tolstring(L, -1, &length);
+ toProcess.serializedResult = std::string(retval, length);
+ }
+
+ // Pop engine, job_processor, and retval
+ lua_pop(L, 3);
+
+ // Put job result
+ m_JobDispatcher->putJobResult(toProcess);
+ }
+ log_deregister_thread();
+ return 0;
+}
+
diff --git a/src/script/cpp_api/s_async.h b/src/script/cpp_api/s_async.h
new file mode 100644
index 000000000..c5c0e091d
--- /dev/null
+++ b/src/script/cpp_api/s_async.h
@@ -0,0 +1,171 @@
+/*
+Minetest
+Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#ifndef L_ASYNC_EVENTS_H_
+#define L_ASYNC_EVENTS_H_
+
+#include <vector>
+#include <map>
+
+#include "jthread/jthread.h"
+#include "jthread/jmutex.h"
+#include "jthread/jsemaphore.h"
+#include "debug.h"
+#include "lua.h"
+#include "cpp_api/s_base.h"
+
+// Forward declarations
+class AsyncEngine;
+
+
+// Declarations
+
+// Data required to queue a job
+struct LuaJobInfo {
+ // Function to be called in async environment
+ std::string serializedFunction;
+ // Parameter to be passed to function
+ std::string serializedParams;
+ // Result of function call
+ std::string serializedResult;
+ // JobID used to identify a job and match it to callback
+ unsigned int JobId;
+
+ bool valid;
+};
+
+// Asynchronous working environment
+class AsyncWorkerThread : public JThread, public ScriptApiBase {
+public:
+ /**
+ * default constructor
+ * @param pointer to job dispatcher
+ */
+ AsyncWorkerThread(AsyncEngine* jobDispatcher, unsigned int threadNum);
+
+ virtual ~AsyncWorkerThread();
+
+ void* Thread();
+
+private:
+ AsyncEngine* m_JobDispatcher;
+
+ // Thread number. Used for debug output
+ unsigned int m_threadnum;
+
+};
+
+// Asynchornous thread and job management
+class AsyncEngine {
+ friend class AsyncWorkerThread;
+public:
+ AsyncEngine();
+ ~AsyncEngine();
+
+ /**
+ * Register function to be used within engine
+ * @param name Function name to be used within Lua environment
+ * @param func C function to be called
+ */
+ bool registerFunction(const char* name, lua_CFunction func);
+
+ /**
+ * Create async engine tasks and lock function registration
+ * @param numEngines Number of async threads to be started
+ */
+ void Initialize(unsigned int numEngines);
+
+ /**
+ * queue/run a async job
+ * @param func Serialized lua function
+ * @param params Serialized parameters
+ * @return jobid The job is queued
+ */
+ unsigned int doAsyncJob(std::string func, std::string params);
+
+ /**
+ * Engine step to process finished jobs
+ * the engine step is one way to pass events back, PushFinishedJobs another
+ * @param L The Lua stack
+ * @param errorhandler Stack index of the Lua error handler
+ */
+ void Step(lua_State *L, int errorhandler);
+
+ /**
+ * Push a list of finished jobs onto the stack
+ * @param L The Lua stack
+ */
+ void PushFinishedJobs(lua_State *L);
+
+protected:
+ /**
+ * Get a Job from queue to be processed
+ * this function blocks until a job is ready
+ * @return a job to be processed
+ */
+ LuaJobInfo getJob();
+
+ /**
+ * Put a Job result back to result queue
+ * @param result result of completed job
+ */
+ void putJobResult(LuaJobInfo result);
+
+ /**
+ * Initialize environment with current registred functions
+ * this function adds all functions registred by registerFunction to the
+ * passed lua stack
+ * @param L Lua stack to initialize
+ * @param top Stack position
+ */
+ void PrepareEnvironment(lua_State* L, int top);
+
+private:
+
+ // Stack index of error handler
+ int m_errorhandler;
+
+ // variable locking the engine against further modification
+ bool m_initDone;
+
+ // Internal store for registred functions
+ std::map<std::string, lua_CFunction> m_FunctionList;
+
+ // Internal counter to create job IDs
+ unsigned int m_JobIdCounter;
+
+ // Mutex to protect job queue
+ JMutex m_JobQueueMutex;
+
+ // Job queue
+ std::vector<LuaJobInfo> m_JobQueue;
+
+ // Mutex to protect result queue
+ JMutex m_ResultQueueMutex;
+ // Result queue
+ std::vector<LuaJobInfo> m_ResultQueue;
+
+ // List of current worker threads
+ std::vector<AsyncWorkerThread*> m_WorkerThreads;
+
+ // Counter semaphore for job dispatching
+ JSemaphore m_JobQueueCounter;
+};
+
+#endif // L_ASYNC_EVENTS_H_