aboutsummaryrefslogtreecommitdiff
path: root/src/network/peerhandler.h
blob: da65483ef91867c01f54f67f6844a3994c2169f5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/*
Minetest
Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#pragma once

#include "networkprotocol.h"

namespace con
{

typedef enum
{
	MIN_RTT,
	MAX_RTT,
	AVG_RTT,
	MIN_JITTER,
	MAX_JITTER,
	AVG_JITTER
} rtt_stat_type;

class Peer;

class PeerHandler
{
public:
	PeerHandler() = default;

	virtual ~PeerHandler() = default;

	/*
		This is called after the Peer has been inserted into the
		Connection's peer container.
	*/
	virtual void peerAdded(Peer *peer) = 0;

	/*
		This is called before the Peer has been removed from the
		Connection's peer container.
	*/
	virtual void deletingPeer(Peer *peer, bool timeout) = 0;
};

enum PeerChangeType : u8
{
	PEER_ADDED,
	PEER_REMOVED
};

struct PeerChange
{
	PeerChange(PeerChangeType t, session_t _peer_id, bool _timeout) :
			type(t), peer_id(_peer_id), timeout(_timeout)
	{
	}
	PeerChange() = delete;

	PeerChangeType type;
	session_t peer_id;
	bool timeout;
};
}
ense for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #pragma once #include <vector> #include <deque> #include <unordered_set> #include <memory> #include <lua.h> #include "threading/semaphore.h" #include "threading/thread.h" #include "common/c_packer.h" #include "cpp_api/s_base.h" #include "cpp_api/s_security.h" // Forward declarations class AsyncEngine; // Declarations // Data required to queue a job struct LuaJobInfo { LuaJobInfo() = default; // Function to be called in async environment (from string.dump) std::string function; // Parameter to be passed to function (serialized) std::string params; // Alternative parameters std::unique_ptr<PackedValue> params_ext; // Result of function call (serialized) std::string result; // Alternative result std::unique_ptr<PackedValue> result_ext; // Name of the mod who invoked this call std::string mod_origin; // JobID used to identify a job and match it to callback u32 id; }; // Asynchronous working environment class AsyncWorkerThread : public Thread, virtual public ScriptApiBase, public ScriptApiSecurity { friend class AsyncEngine; public: virtual ~AsyncWorkerThread(); void *run(); protected: AsyncWorkerThread(AsyncEngine* jobDispatcher, const std::string &name); private: AsyncEngine *jobDispatcher = nullptr; bool isErrored = false; }; // Asynchornous thread and job management class AsyncEngine { friend class AsyncWorkerThread; typedef void (*StateInitializer)(lua_State *L, int top); public: AsyncEngine() = default; AsyncEngine(Server *server) : server(server) {}; ~AsyncEngine(); /** * Register function to be called on new states * @param func C function to be called */ void registerStateInitializer(StateInitializer func); /** * Create async engine tasks and lock function registration * @param numEngines Number of worker threads, 0 for automatic scaling */ void initialize(unsigned int numEngines); /** * Queue an async job * @param func Serialized lua function * @param params Serialized parameters * @return jobid The job is queued */ u32 queueAsyncJob(std::string &&func, std::string &&params, const std::string &mod_origin = ""); /** * Queue an async job * @param func Serialized lua function * @param params Serialized parameters (takes ownership!) * @return ID of queued job */ u32 queueAsyncJob(std::string &&func, PackedValue *params, const std::string &mod_origin = ""); /** * Engine step to process finished jobs * @param L The Lua stack */ void step(lua_State *L); protected: /** * Get a Job from queue to be processed * this function blocks until a job is ready * @param job a job to be processed * @return whether a job was available */ bool getJob(LuaJobInfo *job); /** * Put a Job result back to result queue * @param result result of completed job */ void putJobResult(LuaJobInfo &&result); /** * Start an additional worker thread */ void addWorkerThread(); /** * Process finished jobs callbacks */ void stepJobResults(lua_State *L); /** * Handle automatic scaling of worker threads */ void stepAutoscale(); /** * Initialize environment with current registred functions * this function adds all functions registred by registerFunction to the * passed lua stack * @param L Lua stack to initialize * @param top Stack position * @return false if a mod error ocurred */ bool prepareEnvironment(lua_State* L, int top); private: // Variable locking the engine against further modification bool initDone = false; // Maximum number of worker threads for automatic scaling // 0 if disabled unsigned int autoscaleMaxWorkers = 0; u64 autoscaleTimer = 0; std::unordered_set<u32> autoscaleSeenJobs; // Only set for the server async environment (duh) Server *server = nullptr; // Internal store for registred state initializers std::vector<StateInitializer> stateInitializers; // Internal counter to create job IDs u32 jobIdCounter = 0; // Mutex to protect job queue std::mutex jobQueueMutex; // Job queue std::deque<LuaJobInfo> jobQueue; // Mutex to protect result queue std::mutex resultQueueMutex; // Result queue std::deque<LuaJobInfo> resultQueue; // List of current worker threads std::vector<AsyncWorkerThread*> workerThreads; // Counter semaphore for job dispatching Semaphore jobQueueCounter; };