summaryrefslogtreecommitdiff
path: root/src/connection.h
diff options
context:
space:
mode:
authorsapier <Sapier at GMX dot net>2014-01-06 20:05:28 +0100
committersapier <Sapier at GMX dot net>2014-01-10 10:10:45 +0100
commit9edb91da5754cf194637d1d7faa513719b61f9b4 (patch)
tree66adfd87cb43e9b24adc70bcaabfae205fc10f8f /src/connection.h
parent8b0b857eaaa50c6ec217a46c0577395c78ec04c7 (diff)
downloadminetest-9edb91da5754cf194637d1d7faa513719b61f9b4.tar.gz
minetest-9edb91da5754cf194637d1d7faa513719b61f9b4.tar.bz2
minetest-9edb91da5754cf194637d1d7faa513719b61f9b4.zip
Fixed minetest reliable udp implementation (compatible to old clients)
Diffstat (limited to 'src/connection.h')
-rw-r--r--src/connection.h780
1 files changed, 600 insertions, 180 deletions
diff --git a/src/connection.h b/src/connection.h
index 56badc904..e43d93fb3 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -27,6 +27,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "util/pointer.h"
#include "util/container.h"
#include "util/thread.h"
+#include "util/numeric.h"
#include <iostream>
#include <fstream>
#include <list>
@@ -110,26 +111,74 @@ public:
{}
};
+class ProcessedQueued : public BaseException
+{
+public:
+ ProcessedQueued(const char *s):
+ BaseException(s)
+ {}
+};
+
+class IncomingDataCorruption : public BaseException
+{
+public:
+ IncomingDataCorruption(const char *s):
+ BaseException(s)
+ {}
+};
+
+typedef enum MTProtocols {
+ PRIMARY,
+ UDP,
+ MINETEST_RELIABLE_UDP
+} MTProtocols;
+
#define SEQNUM_MAX 65535
-inline bool seqnum_higher(u16 higher, u16 lower)
+inline bool seqnum_higher(u16 totest, u16 base)
+{
+ if (totest > base)
+ {
+ if((totest - base) > (SEQNUM_MAX/2))
+ return false;
+ else
+ return true;
+ }
+ else
+ {
+ if((base - totest) > (SEQNUM_MAX/2))
+ return true;
+ else
+ return false;
+ }
+}
+
+inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
{
- if(lower > higher && lower - higher > SEQNUM_MAX/2){
- return true;
+ u16 window_start = next;
+ u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
+
+ if (window_start < window_end)
+ {
+ return ((seqnum >= window_start) && (seqnum < window_end));
+ }
+ else
+ {
+ return ((seqnum < window_end) || (seqnum >= window_start));
}
- return (higher > lower);
}
struct BufferedPacket
{
BufferedPacket(u8 *a_data, u32 a_size):
- data(a_data, a_size), time(0.0), totaltime(0.0)
+ data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
{}
BufferedPacket(u32 a_size):
- data(a_size), time(0.0), totaltime(0.0)
+ data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
{}
SharedBuffer<u8> data; // Data of the packet, including headers
float time; // Seconds from buffering the packet or re-sending
float totaltime; // Seconds from buffering the packet
+ unsigned int absolute_send_time;
Address address; // Sender or destination
};
@@ -223,6 +272,8 @@ controltype and data description:
#define CONTROLTYPE_SET_PEER_ID 1
#define CONTROLTYPE_PING 2
#define CONTROLTYPE_DISCO 3
+#define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
+
/*
ORIGINAL: This is a plain packet with no control and no error
checking at all.
@@ -261,7 +312,6 @@ with a buffer in the receiving and transmitting end.
*/
#define TYPE_RELIABLE 3
#define RELIABLE_HEADER_SIZE 3
-//#define SEQNUM_INITIAL 0x10
#define SEQNUM_INITIAL 65500
/*
@@ -275,23 +325,35 @@ class ReliablePacketBuffer
{
public:
ReliablePacketBuffer();
- void print();
- bool empty();
- u32 size();
- RPBSearchResult findPacket(u16 seqnum);
- RPBSearchResult notFound();
- bool getFirstSeqnum(u16 *result);
+
+ bool getFirstSeqnum(u16& result);
+
BufferedPacket popFirst();
BufferedPacket popSeqnum(u16 seqnum);
- void insert(BufferedPacket &p);
+ void insert(BufferedPacket &p,u16 next_expected);
+
void incrementTimeouts(float dtime);
- void resetTimedOuts(float timeout);
- bool anyTotaltimeReached(float timeout);
- std::list<BufferedPacket> getTimedOuts(float timeout);
+ std::list<BufferedPacket> getTimedOuts(float timeout,
+ unsigned int max_packets);
+
+ void print();
+ bool empty();
+ bool containsPacket(u16 seqnum);
+ RPBSearchResult notFound();
+ u32 size();
+
private:
+ RPBSearchResult findPacket(u16 seqnum);
+
std::list<BufferedPacket> m_list;
u16 m_list_size;
+
+ u16 m_oldest_non_answered_ack;
+
+ JMutex m_list_mutex;
+
+ unsigned int writeptr;
};
/*
@@ -313,27 +375,207 @@ public:
private:
// Key is seqnum
std::map<u16, IncomingSplitPacket*> m_buf;
+
+ JMutex m_map_mutex;
};
-class Connection;
+struct OutgoingPacket
+{
+ u16 peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+ bool ack;
+
+ OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
+ bool reliable_,bool ack_=false):
+ peer_id(peer_id_),
+ channelnum(channelnum_),
+ data(data_),
+ reliable(reliable_),
+ ack(ack_)
+ {
+ }
+};
-struct Channel
+enum ConnectionCommandType{
+ CONNCMD_NONE,
+ CONNCMD_SERVE,
+ CONNCMD_CONNECT,
+ CONNCMD_DISCONNECT,
+ CONNCMD_DISCONNECT_PEER,
+ CONNCMD_SEND,
+ CONNCMD_SEND_TO_ALL,
+ CONNCMD_DELETE_PEER,
+ CONCMD_ACK,
+ CONCMD_CREATE_PEER,
+ CONCMD_DISABLE_LEGACY
+};
+
+struct ConnectionCommand
{
- Channel();
- ~Channel();
+ enum ConnectionCommandType type;
+ u16 port;
+ Address address;
+ u16 peer_id;
+ u8 channelnum;
+ Buffer<u8> data;
+ bool reliable;
+ bool raw;
- u16 next_outgoing_seqnum;
- u16 next_incoming_seqnum;
- u16 next_outgoing_split_seqnum;
+ ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
+
+ void serve(u16 port_)
+ {
+ type = CONNCMD_SERVE;
+ port = port_;
+ }
+ void connect(Address address_)
+ {
+ type = CONNCMD_CONNECT;
+ address = address_;
+ }
+ void disconnect()
+ {
+ type = CONNCMD_DISCONNECT;
+ }
+ void disconnect_peer(u16 peer_id_)
+ {
+ type = CONNCMD_DISCONNECT_PEER;
+ peer_id = peer_id_;
+ }
+ void send(u16 peer_id_, u8 channelnum_,
+ SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND;
+ peer_id = peer_id_;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+ void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND_TO_ALL;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+ void deletePeer(u16 peer_id_)
+ {
+ type = CONNCMD_DELETE_PEER;
+ peer_id = peer_id_;
+ }
+
+ void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
+ {
+ type = CONCMD_ACK;
+ peer_id = peer_id_;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = false;
+ }
+
+ void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
+ {
+ type = CONCMD_CREATE_PEER;
+ peer_id = peer_id_;
+ data = data_;
+ channelnum = 0;
+ reliable = true;
+ raw = true;
+ }
+
+ void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
+ {
+ type = CONCMD_DISABLE_LEGACY;
+ peer_id = peer_id_;
+ data = data_;
+ channelnum = 0;
+ reliable = true;
+ raw = true;
+ }
+};
+
+class Channel
+{
+
+public:
+ u16 readNextIncomingSeqNum();
+ u16 incNextIncomingSeqNum();
+
+ u16 getOutgoingSequenceNumber(bool& successfull);
+ u16 readOutgoingSequenceNumber();
+ bool putBackSequenceNumber(u16);
+
+ u16 readNextSplitSeqNum();
+ void setNextSplitSeqNum(u16 seqnum);
// This is for buffering the incoming packets that are coming in
// the wrong order
ReliablePacketBuffer incoming_reliables;
// This is for buffering the sent packets so that the sender can
// re-send them if no ACK is received
- ReliablePacketBuffer outgoing_reliables;
+ ReliablePacketBuffer outgoing_reliables_sent;
+
+ //queued reliable packets
+ Queue<BufferedPacket> queued_reliables;
+
+ //queue commands prior splitting to packets
+ Queue<ConnectionCommand> queued_commands;
IncomingSplitBuffer incoming_splits;
+
+ Channel();
+ ~Channel();
+
+ void UpdatePacketLossCounter(unsigned int count);
+ void UpdatePacketTooLateCounter();
+ void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
+ void UpdateBytesLost(unsigned int bytes);
+
+ void UpdateTimers(float dtime);
+
+ const float getCurrentDownloadRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
+ const float getMaxDownloadRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
+
+ const float getCurrentLossRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
+ const float getMaxLossRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
+
+ const float getAvgDownloadRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
+ const float getAvgLossRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
+
+ const unsigned int getWindowSize() const { return window_size; };
+
+ void setWindowSize(unsigned int size) { window_size = size; };
+private:
+ JMutex m_internal_mutex;
+ unsigned int window_size;
+
+ u16 next_incoming_seqnum;
+
+ u16 next_outgoing_seqnum;
+ u16 next_outgoing_split_seqnum;
+
+ unsigned int current_packet_loss;
+ unsigned int current_packet_too_late;
+ unsigned int current_packet_successfull;
+ float packet_loss_counter;
+
+ unsigned int current_bytes_transfered;
+ unsigned int current_bytes_lost;
+ float max_kbps;
+ float cur_kbps;
+ float avg_kbps;
+ float max_kbps_lost;
+ float cur_kbps_lost;
+ float avg_kbps_lost;
+ float bpm_counter;
};
class Peer;
@@ -360,71 +602,232 @@ public:
virtual void deletingPeer(Peer *peer, bool timeout) = 0;
};
-class Peer
+class PeerHelper
{
public:
+ PeerHelper();
+ PeerHelper(Peer* peer);
+ ~PeerHelper();
- Peer(u16 a_id, Address a_address);
- virtual ~Peer();
-
+ PeerHelper& operator=(Peer* peer);
+ Peer* operator->() const;
+ bool operator!();
+ Peer* operator&() const;
+ bool operator!=(void* ptr);
+
+private:
+ Peer* m_peer;
+};
+
+class Connection;
+
+typedef enum rtt_stat_type {
+ MIN_RTT,
+ MAX_RTT,
+ AVG_RTT,
+ MIN_JITTER,
+ MAX_JITTER,
+ AVG_JITTER
+} rtt_stat_type;
+
+class Peer {
+ public:
+ friend class PeerHelper;
+
+ Peer(Address address_,u16 id_,Connection* connection) :
+ id(id_),
+ m_increment_packets_remaining(9),
+ m_increment_bytes_remaining(0),
+ m_pending_deletion(false),
+ m_connection(connection),
+ address(address_),
+ m_ping_timer(0.0),
+ m_last_rtt(-1.0),
+ m_usage(0),
+ m_timeout_counter(0.0),
+ m_last_timeout_check(porting::getTimeMs()),
+ m_has_sent_with_id(false)
+ {
+ m_rtt.avg_rtt = -1.0;
+ m_rtt.jitter_avg = -1.0;
+ m_rtt.jitter_max = 0.0;
+ m_rtt.max_rtt = 0.0;
+ m_rtt.jitter_min = FLT_MAX;
+ m_rtt.min_rtt = FLT_MAX;
+ };
+
+ virtual ~Peer() {
+ JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ assert(m_usage == 0);
+ };
+
+ // Unique id of the peer
+ u16 id;
+
+ void Drop();
+
+ virtual void PutReliableSendCommand(ConnectionCommand &c,
+ unsigned int max_packet_size) {};
+
+ virtual bool isActive() { return false; };
+
+ virtual bool getAddress(MTProtocols type, Address& toset) = 0;
+
+ void ResetTimeout()
+ {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
+
+ bool isTimedOut(float timeout);
+
+ void setSentWithID()
+ { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
+
+ bool hasSentWithID()
+ { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
+
+ unsigned int m_increment_packets_remaining;
+ unsigned int m_increment_bytes_remaining;
+
+ virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
+ virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
+ virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
+ BufferedPacket toadd,
+ bool reliable)
+ {
+ fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
+ return SharedBuffer<u8>(0);
+ };
+
+ virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
+
+ virtual float getStat(rtt_stat_type type) const {
+ switch (type) {
+ case MIN_RTT:
+ return m_rtt.min_rtt;
+ case MAX_RTT:
+ return m_rtt.max_rtt;
+ case AVG_RTT:
+ return m_rtt.avg_rtt;
+ case MIN_JITTER:
+ return m_rtt.jitter_min;
+ case MAX_JITTER:
+ return m_rtt.jitter_max;
+ case AVG_JITTER:
+ return m_rtt.jitter_avg;
+ }
+ return -1;
+ }
+ protected:
+ virtual void reportRTT(float rtt) {};
+
+ void RTTStatistics(float rtt,
+ std::string profiler_id="",
+ unsigned int num_samples=1000);
+
+ bool IncUseCount();
+ void DecUseCount();
+
+ JMutex m_exclusive_access_mutex;
+
+ bool m_pending_deletion;
+
+ Connection* m_connection;
+
+ // Address of the peer
+ Address address;
+
+ // Ping timer
+ float m_ping_timer;
+ private:
+
+ struct rttstats {
+ float jitter_min;
+ float jitter_max;
+ float jitter_avg;
+ float min_rtt;
+ float max_rtt;
+ float avg_rtt;
+ };
+
+ rttstats m_rtt;
+ float m_last_rtt;
+
+ // current usage count
+ unsigned int m_usage;
+
+ // Seconds from last receive
+ float m_timeout_counter;
+
+ u32 m_last_timeout_check;
+
+ bool m_has_sent_with_id;
+};
+
+class UDPPeer : public Peer
+{
+public:
+
+ friend class PeerHelper;
+ friend class ConnectionReceiveThread;
+ friend class ConnectionSendThread;
+
+ UDPPeer(u16 a_id, Address a_address, Connection* connection);
+ virtual ~UDPPeer();
+
+ void PutReliableSendCommand(ConnectionCommand &c,
+ unsigned int max_packet_size);
+
+ bool isActive()
+ { return ((hasSentWithID()) && (!m_pending_deletion)); };
+
+ bool getAddress(MTProtocols type, Address& toset);
+
+ void setNonLegacyPeer()
+ { m_legacy_peer = false; }
+
+ bool getLegacyPeer()
+ { return m_legacy_peer; }
+
+ u16 getNextSplitSequenceNumber(u8 channel);
+ void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
+
+ SharedBuffer<u8> addSpiltPacket(u8 channel,
+ BufferedPacket toadd,
+ bool reliable);
+protected:
/*
Calculates avg_rtt and resend_timeout.
-
rtt=-1 only recalculates resend_timeout
*/
void reportRTT(float rtt);
- Channel channels[CHANNEL_COUNT];
+ void RunCommandQueues(
+ unsigned int max_packet_size,
+ unsigned int maxcommands,
+ unsigned int maxtransfer);
- // Address of the peer
- Address address;
- // Unique id of the peer
- u16 id;
- // Seconds from last receive
- float timeout_counter;
- // Ping timer
- float ping_timer;
+ float getResendTimeout()
+ { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
+
+ void setResendTimeout(float timeout)
+ { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
+ bool Ping(float dtime,SharedBuffer<u8>& data);
+
+ Channel channels[CHANNEL_COUNT];
+private:
// This is changed dynamically
float resend_timeout;
- // Updated when an ACK is received
- float avg_rtt;
- // This is set to true when the peer has actually sent something
- // with the id we have given to it
- bool has_sent_with_id;
-
- float m_sendtime_accu;
- float m_max_packets_per_second;
- int m_num_sent;
- int m_max_num_sent;
-
- // Updated from configuration by Connection
- float congestion_control_aim_rtt;
- float congestion_control_max_rate;
- float congestion_control_min_rate;
-private:
+
+ bool processReliableSendCommand(
+ ConnectionCommand &c,
+ unsigned int max_packet_size);
+
+ bool m_legacy_peer;
};
/*
Connection
*/
-struct OutgoingPacket
-{
- u16 peer_id;
- u8 channelnum;
- SharedBuffer<u8> data;
- bool reliable;
-
- OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
- bool reliable_):
- peer_id(peer_id_),
- channelnum(channelnum_),
- data(data_),
- reliable(reliable_)
- {
- }
-};
-
enum ConnectionEventType{
CONNEVENT_NONE,
CONNEVENT_DATA_RECEIVED,
@@ -485,76 +888,108 @@ struct ConnectionEvent
}
};
-enum ConnectionCommandType{
- CONNCMD_NONE,
- CONNCMD_SERVE,
- CONNCMD_CONNECT,
- CONNCMD_DISCONNECT,
- CONNCMD_SEND,
- CONNCMD_SEND_TO_ALL,
- CONNCMD_DELETE_PEER,
+class ConnectionSendThread : public JThread {
+
+public:
+ friend class UDPPeer;
+
+ ConnectionSendThread(Connection* parent,
+ unsigned int max_packet_size, float timeout);
+
+ void * Thread ();
+
+ void Trigger();
+
+ void setPeerTimeout(float peer_timeout)
+ { m_timeout = peer_timeout; }
+
+private:
+ void runTimeouts (float dtime);
+ void rawSend (const BufferedPacket &packet);
+ bool rawSendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable);
+
+ void processReliableCommand (ConnectionCommand &c);
+ void processNonReliableCommand (ConnectionCommand &c);
+ void serve (u16 port);
+ void connect (Address address);
+ void disconnect ();
+ void disconnect_peer(u16 peer_id);
+ void send (u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data);
+ void sendReliable (ConnectionCommand &c);
+ void sendToAll (u8 channelnum,
+ SharedBuffer<u8> data);
+ void sendToAllReliable(ConnectionCommand &c);
+
+ void sendPackets (float dtime);
+
+ void sendAsPacket (u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data,bool ack=false);
+
+ void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
+
+ bool packetsQueued();
+
+ Connection* m_connection;
+ unsigned int m_max_packet_size;
+ float m_timeout;
+ Queue<OutgoingPacket> m_outgoing_queue;
+ JSemaphore m_send_sleep_semaphore;
+
+ unsigned int m_iteration_packets_avaialble;
+ unsigned int m_max_commands_per_iteration;
+ unsigned int m_max_data_packets_per_iteration;
+ unsigned int m_max_packets_requeued;
};
-struct ConnectionCommand
-{
- enum ConnectionCommandType type;
- u16 port;
- Address address;
- u16 peer_id;
- u8 channelnum;
- Buffer<u8> data;
- bool reliable;
-
- ConnectionCommand(): type(CONNCMD_NONE) {}
+class ConnectionReceiveThread : public JThread {
+public:
+ ConnectionReceiveThread(Connection* parent,
+ unsigned int max_packet_size);
- void serve(u16 port_)
- {
- type = CONNCMD_SERVE;
- port = port_;
- }
- void connect(Address address_)
- {
- type = CONNCMD_CONNECT;
- address = address_;
- }
- void disconnect()
- {
- type = CONNCMD_DISCONNECT;
- }
- void send(u16 peer_id_, u8 channelnum_,
- SharedBuffer<u8> data_, bool reliable_)
- {
- type = CONNCMD_SEND;
- peer_id = peer_id_;
- channelnum = channelnum_;
- data = data_;
- reliable = reliable_;
- }
- void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
- {
- type = CONNCMD_SEND_TO_ALL;
- channelnum = channelnum_;
- data = data_;
- reliable = reliable_;
- }
- void deletePeer(u16 peer_id_)
- {
- type = CONNCMD_DELETE_PEER;
- peer_id = peer_id_;
- }
+ void * Thread ();
+
+private:
+ void receive ();
+
+ // Returns next data from a buffer if possible
+ // If found, returns true; if not, false.
+ // If found, sets peer_id and dst
+ bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
+
+ bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
+ SharedBuffer<u8> &dst);
+
+ /*
+ Processes a packet with the basic header stripped out.
+ Parameters:
+ packetdata: Data in packet (with no base headers)
+ peer_id: peer id of the sender of the packet in question
+ channelnum: channel on which the packet was sent
+ reliable: true if recursing into a reliable packet
+ */
+ SharedBuffer<u8> processPacket(Channel *channel,
+ SharedBuffer<u8> packetdata, u16 peer_id,
+ u8 channelnum, bool reliable);
+
+
+ Connection* m_connection;
+ unsigned int m_max_packet_size;
};
-class Connection: public JThread
+class Connection
{
public:
+ friend class ConnectionSendThread;
+ friend class ConnectionReceiveThread;
+
Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
PeerHandler *peerhandler);
~Connection();
- void * Thread();
/* Interface */
-
ConnectionEvent getEvent();
ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c);
@@ -572,68 +1007,53 @@ public:
Address GetPeerAddress(u16 peer_id);
float GetPeerAvgRTT(u16 peer_id);
void DeletePeer(u16 peer_id);
-
-private:
+ const u32 GetProtocolID() const { return m_protocol_id; };
+ const std::string getDesc();
+
+protected:
+ PeerHelper getPeer(u16 peer_id);
+ PeerHelper getPeerNoEx(u16 peer_id);
+ u16 lookupPeer(Address& sender);
+
+ u16 createPeer(Address& sender, MTProtocols protocol, int fd);
+ UDPPeer* createServerPeer(Address& sender);
+ bool deletePeer(u16 peer_id, bool timeout);
+
+ void SetPeerID(u16 id){ m_peer_id = id; }
+
+ void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
+
+ void PrintInfo(std::ostream &out);
+ void PrintInfo();
+
+ std::list<u16> getPeerIDs();
+
+ UDPSocket m_udpSocket;
+ MutexedQueue<ConnectionCommand> m_command_queue;
+
void putEvent(ConnectionEvent &e);
- void processCommand(ConnectionCommand &c);
- void send(float dtime);
- void receive();
- void runTimeouts(float dtime);
- void serve(u16 port);
- void connect(Address address);
- void disconnect();
- void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
- void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
- void sendAsPacket(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable);
- void rawSendAsPacket(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable);
- void rawSend(const BufferedPacket &packet);
- Peer* getPeer(u16 peer_id);
- Peer* getPeerNoEx(u16 peer_id);
+
+private:
std::list<Peer*> getPeers();
- bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
- // Returns next data from a buffer if possible
- // If found, returns true; if not, false.
- // If found, sets peer_id and dst
- bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
- SharedBuffer<u8> &dst);
- /*
- Processes a packet with the basic header stripped out.
- Parameters:
- packetdata: Data in packet (with no base headers)
- peer_id: peer id of the sender of the packet in question
- channelnum: channel on which the packet was sent
- reliable: true if recursing into a reliable packet
- */
- SharedBuffer<u8> processPacket(Channel *channel,
- SharedBuffer<u8> packetdata, u16 peer_id,
- u8 channelnum, bool reliable);
- bool deletePeer(u16 peer_id, bool timeout);
-
- Queue<OutgoingPacket> m_outgoing_queue;
+
MutexedQueue<ConnectionEvent> m_event_queue;
- MutexedQueue<ConnectionCommand> m_command_queue;
-
- u32 m_protocol_id;
- u32 m_max_packet_size;
- float m_timeout;
- UDPSocket m_socket;
+
u16 m_peer_id;
+ u32 m_protocol_id;
std::map<u16, Peer*> m_peers;
JMutex m_peers_mutex;
+ ConnectionSendThread m_sendThread;
+ ConnectionReceiveThread m_receiveThread;
+
+ JMutex m_info_mutex;
+
// Backwards compatibility
PeerHandler *m_bc_peerhandler;
int m_bc_receive_timeout;
-
- void SetPeerID(u16 id){ m_peer_id = id; }
- u32 GetProtocolID(){ return m_protocol_id; }
- void PrintInfo(std::ostream &out);
- void PrintInfo();
- std::string getDesc();
- u16 m_indentation;
+
+ bool m_shutting_down;
};
} // namespace