summaryrefslogtreecommitdiff
path: root/src/connection.h
diff options
context:
space:
mode:
authorPerttu Ahola <celeron55@gmail.com>2011-10-20 23:04:09 +0300
committerPerttu Ahola <celeron55@gmail.com>2011-10-20 23:04:09 +0300
commit4b6138e69b65271b0e568f821a4d1bd285affedd (patch)
tree003fd33f969e5a9bf0bc720bda7f869d1f9c1f45 /src/connection.h
parentb6fcbc5fbaba4a7faa65f792b16e47a405fa4ebf (diff)
downloadminetest-4b6138e69b65271b0e568f821a4d1bd285affedd.tar.gz
minetest-4b6138e69b65271b0e568f821a4d1bd285affedd.tar.bz2
minetest-4b6138e69b65271b0e568f821a4d1bd285affedd.zip
Improve Connection with threading and some kind of congestion control
Diffstat (limited to 'src/connection.h')
-rw-r--r--src/connection.h281
1 files changed, 209 insertions, 72 deletions
diff --git a/src/connection.h b/src/connection.h
index 6eb2f2824..570bc92ab 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -319,28 +319,6 @@ struct Channel
{
Channel();
~Channel();
- /*
- Processes a packet with the basic header stripped out.
- Parameters:
- packetdata: Data in packet (with no base headers)
- con: The connection to which the channel is associated
- (used for sending back stuff (ACKs))
- peer_id: peer id of the sender of the packet in question
- channelnum: channel on which the packet was sent
- reliable: true if recursing into a reliable packet
- */
- SharedBuffer<u8> ProcessPacket(
- SharedBuffer<u8> packetdata,
- Connection *con,
- u16 peer_id,
- u8 channelnum,
- bool reliable=false);
-
- // Returns next data from a buffer if possible
- // throws a NoIncomingDataException if no data is available
- // If found, sets peer_id
- SharedBuffer<u8> CheckIncomingBuffers(Connection *con,
- u16 &peer_id);
u16 next_outgoing_seqnum;
u16 next_incoming_seqnum;
@@ -412,78 +390,237 @@ public:
// with the id we have given to it
bool has_sent_with_id;
+ float m_sendtime_accu;
+ float m_max_packets_per_second;
+ int m_num_sent;
+ int m_max_num_sent;
+
private:
};
-class Connection
+/*
+ Connection
+*/
+
+struct OutgoingPacket
+{
+ u16 peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+
+ OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
+ bool reliable_):
+ peer_id(peer_id_),
+ channelnum(channelnum_),
+ data(data_),
+ reliable(reliable_)
+ {
+ }
+};
+
+enum ConnectionEventType{
+ CONNEVENT_NONE,
+ CONNEVENT_DATA_RECEIVED,
+ CONNEVENT_PEER_ADDED,
+ CONNEVENT_PEER_REMOVED,
+};
+
+struct ConnectionEvent
+{
+ enum ConnectionEventType type;
+ u16 peer_id;
+ SharedBuffer<u8> data;
+ bool timeout;
+ Address address;
+
+ ConnectionEvent(): type(CONNEVENT_NONE) {}
+
+ std::string describe()
+ {
+ switch(type){
+ case CONNEVENT_NONE:
+ return "CONNEVENT_NONE";
+ case CONNEVENT_DATA_RECEIVED:
+ return "CONNEVENT_DATA_RECEIVED";
+ case CONNEVENT_PEER_ADDED:
+ return "CONNEVENT_PEER_ADDED";
+ case CONNEVENT_PEER_REMOVED:
+ return "CONNEVENT_PEER_REMOVED";
+ }
+ return "Invalid ConnectionEvent";
+ }
+
+ void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
+ {
+ type = CONNEVENT_DATA_RECEIVED;
+ peer_id = peer_id_;
+ data = data_;
+ }
+ void peerAdded(u16 peer_id_, Address address_)
+ {
+ type = CONNEVENT_PEER_ADDED;
+ peer_id = peer_id_;
+ address = address_;
+ }
+ void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
+ {
+ type = CONNEVENT_PEER_REMOVED;
+ peer_id = peer_id_;
+ timeout = timeout_;
+ address = address_;
+ }
+};
+
+enum ConnectionCommandType{
+ CONNCMD_NONE,
+ CONNCMD_SERVE,
+ CONNCMD_CONNECT,
+ CONNCMD_DISCONNECT,
+ CONNCMD_SEND,
+ CONNCMD_SEND_TO_ALL,
+ CONNCMD_DELETE_PEER,
+};
+
+struct ConnectionCommand
+{
+ enum ConnectionCommandType type;
+ u16 port;
+ Address address;
+ u16 peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+
+ ConnectionCommand(): type(CONNCMD_NONE) {}
+
+ void serve(u16 port_)
+ {
+ type = CONNCMD_SERVE;
+ port = port_;
+ }
+ void connect(Address address_)
+ {
+ type = CONNCMD_CONNECT;
+ address = address_;
+ }
+ void disconnect()
+ {
+ type = CONNCMD_DISCONNECT;
+ }
+ void send(u16 peer_id_, u8 channelnum_,
+ SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND;
+ peer_id = peer_id_;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+ void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND_TO_ALL;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+ void deletePeer(u16 peer_id_)
+ {
+ type = CONNCMD_DELETE_PEER;
+ peer_id = peer_id_;
+ }
+};
+
+class Connection: public SimpleThread
{
public:
- Connection(
- u32 protocol_id,
- u32 max_packet_size,
- float timeout,
- PeerHandler *peerhandler
- );
+ Connection(u32 protocol_id, u32 max_packet_size, float timeout);
+ Connection(u32 protocol_id, u32 max_packet_size, float timeout,
+ PeerHandler *peerhandler);
~Connection();
- void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); }
- // Start being a server
+ void * Thread();
+
+ /* Interface */
+
+ ConnectionEvent getEvent();
+ ConnectionEvent waitEvent(u32 timeout_ms);
+ void putCommand(ConnectionCommand &c);
+
+ void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
void Serve(unsigned short port);
- // Connect to a server
void Connect(Address address);
bool Connected();
-
void Disconnect();
-
- // Sets peer_id
- SharedBuffer<u8> GetFromBuffers(u16 &peer_id);
-
- // The peer_id of sender is stored in peer_id
- // Return value: I guess this always throws an exception or
- // actually gets data
- // May call PeerHandler methods
u32 Receive(u16 &peer_id, u8 *data, u32 datasize);
-
- // These will automatically package the data as an original or split
void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
- // Send data as a packet; it will be wrapped in base header and
- // optionally to a reliable packet.
- void SendAsPacket(u16 peer_id, u8 channelnum,
+ void RunTimeouts(float dtime); // dummy
+ u16 GetPeerID(){ return m_peer_id; }
+ Address GetPeerAddress(u16 peer_id);
+ float GetPeerAvgRTT(u16 peer_id);
+ void DeletePeer(u16 peer_id);
+
+private:
+ void putEvent(ConnectionEvent &e);
+ void processCommand(ConnectionCommand &c);
+ void send(float dtime);
+ void receive();
+ void runTimeouts(float dtime);
+ void serve(u16 port);
+ void connect(Address address);
+ void disconnect();
+ void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
+ void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
+ void sendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
- // Sends a raw packet
- void RawSend(const BufferedPacket &packet);
+ void rawSendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable);
+ void rawSend(const BufferedPacket &packet);
+ Peer* getPeer(u16 peer_id);
+ Peer* getPeerNoEx(u16 peer_id);
+ core::list<Peer*> getPeers();
+ bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
+ // Returns next data from a buffer if possible
+ // If found, returns true; if not, false.
+ // If found, sets peer_id and dst
+ bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
+ SharedBuffer<u8> &dst);
+ /*
+ Processes a packet with the basic header stripped out.
+ Parameters:
+ packetdata: Data in packet (with no base headers)
+ peer_id: peer id of the sender of the packet in question
+ channelnum: channel on which the packet was sent
+ reliable: true if recursing into a reliable packet
+ */
+ SharedBuffer<u8> processPacket(Channel *channel,
+ SharedBuffer<u8> packetdata, u16 peer_id,
+ u8 channelnum, bool reliable);
+ bool deletePeer(u16 peer_id, bool timeout);
- // May call PeerHandler methods
- void RunTimeouts(float dtime);
-
- // Can throw a PeerNotFoundException
- Peer* GetPeer(u16 peer_id);
- // returns NULL if failed
- Peer* GetPeerNoEx(u16 peer_id);
- core::list<Peer*> GetPeers();
+ Queue<OutgoingPacket> m_outgoing_queue;
+ MutexedQueue<ConnectionEvent> m_event_queue;
+ MutexedQueue<ConnectionCommand> m_command_queue;
- // Calls PeerHandler::deletingPeer
- // Returns false if peer was not found
- bool deletePeer(u16 peer_id, bool timeout);
+ u32 m_protocol_id;
+ u32 m_max_packet_size;
+ float m_timeout;
+ UDPSocket m_socket;
+ u16 m_peer_id;
+
+ core::map<u16, Peer*> m_peers;
+ JMutex m_peers_mutex;
+ // Backwards compatibility
+ PeerHandler *m_bc_peerhandler;
+ int m_bc_receive_timeout;
+
void SetPeerID(u16 id){ m_peer_id = id; }
- u16 GetPeerID(){ return m_peer_id; }
u32 GetProtocolID(){ return m_protocol_id; }
-
- // For debug printing
void PrintInfo(std::ostream &out);
void PrintInfo();
+ std::string getDesc();
u16 m_indentation;
-
-private:
- u32 m_protocol_id;
- float m_timeout;
- PeerHandler *m_peerhandler;
- core::map<u16, Peer*> m_peers;
- u16 m_peer_id;
- //bool m_waiting_new_peer_id;
- u32 m_max_packet_size;
- UDPSocket m_socket;
};
} // namespace