diff options
Diffstat (limited to 'src/network/connection.h')
-rw-r--r-- | src/network/connection.h | 1096 |
1 files changed, 1096 insertions, 0 deletions
diff --git a/src/network/connection.h b/src/network/connection.h new file mode 100644 index 000000000..15ea7e20f --- /dev/null +++ b/src/network/connection.h @@ -0,0 +1,1096 @@ +/* +Minetest +Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com> + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#ifndef CONNECTION_HEADER +#define CONNECTION_HEADER + +#include "irrlichttypes_bloated.h" +#include "socket.h" +#include "exceptions.h" +#include "constants.h" +#include "network/networkpacket.h" +#include "util/pointer.h" +#include "util/container.h" +#include "util/thread.h" +#include "util/numeric.h" +#include <iostream> +#include <fstream> +#include <list> +#include <map> + +class NetworkPacket; + +namespace con +{ + +/* + Exceptions +*/ +class NotFoundException : public BaseException +{ +public: + NotFoundException(const char *s): + BaseException(s) + {} +}; + +class PeerNotFoundException : public BaseException +{ +public: + PeerNotFoundException(const char *s): + BaseException(s) + {} +}; + +class ConnectionException : public BaseException +{ +public: + ConnectionException(const char *s): + BaseException(s) + {} +}; + +class ConnectionBindFailed : public BaseException +{ +public: + ConnectionBindFailed(const char *s): + BaseException(s) + {} +}; + +class InvalidIncomingDataException : public BaseException +{ +public: + InvalidIncomingDataException(const char *s): + BaseException(s) + {} +}; + +class InvalidOutgoingDataException : public BaseException +{ +public: + InvalidOutgoingDataException(const char *s): + BaseException(s) + {} +}; + +class NoIncomingDataException : public BaseException +{ +public: + NoIncomingDataException(const char *s): + BaseException(s) + {} +}; + +class ProcessedSilentlyException : public BaseException +{ +public: + ProcessedSilentlyException(const char *s): + BaseException(s) + {} +}; + +class ProcessedQueued : public BaseException +{ +public: + ProcessedQueued(const char *s): + BaseException(s) + {} +}; + +class IncomingDataCorruption : public BaseException +{ +public: + IncomingDataCorruption(const char *s): + BaseException(s) + {} +}; + +typedef enum MTProtocols { + MTP_PRIMARY, + MTP_UDP, + MTP_MINETEST_RELIABLE_UDP +} MTProtocols; + +#define SEQNUM_MAX 65535 +inline bool seqnum_higher(u16 totest, u16 base) +{ + if (totest > base) + { + if ((totest - base) > (SEQNUM_MAX/2)) + return false; + else + return true; + } + else + { + if ((base - totest) > (SEQNUM_MAX/2)) + return true; + else + return false; + } +} + +inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size) +{ + u16 window_start = next; + u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1); + + if (window_start < window_end) + { + return ((seqnum >= window_start) && (seqnum < window_end)); + } + else + { + return ((seqnum < window_end) || (seqnum >= window_start)); + } +} + +struct BufferedPacket +{ + BufferedPacket(u8 *a_data, u32 a_size): + data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1), + resend_count(0) + {} + BufferedPacket(u32 a_size): + data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1), + resend_count(0) + {} + Buffer<u8> data; // Data of the packet, including headers + float time; // Seconds from buffering the packet or re-sending + float totaltime; // Seconds from buffering the packet + unsigned int absolute_send_time; + Address address; // Sender or destination + unsigned int resend_count; +}; + +// This adds the base headers to the data and makes a packet out of it +BufferedPacket makePacket(Address &address, u8 *data, u32 datasize, + u32 protocol_id, u16 sender_peer_id, u8 channel); +BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data, + u32 protocol_id, u16 sender_peer_id, u8 channel); + +// Add the TYPE_ORIGINAL header to the data +SharedBuffer<u8> makeOriginalPacket( + SharedBuffer<u8> data); + +// Split data in chunks and add TYPE_SPLIT headers to them +std::list<SharedBuffer<u8> > makeSplitPacket( + SharedBuffer<u8> data, + u32 chunksize_max, + u16 seqnum); + +// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet +// Increments split_seqnum if a split packet is made +std::list<SharedBuffer<u8> > makeAutoSplitPacket( + SharedBuffer<u8> data, + u32 chunksize_max, + u16 &split_seqnum); + +// Add the TYPE_RELIABLE header to the data +SharedBuffer<u8> makeReliablePacket( + SharedBuffer<u8> data, + u16 seqnum); + +struct IncomingSplitPacket +{ + IncomingSplitPacket() + { + time = 0.0; + reliable = false; + } + // Key is chunk number, value is data without headers + std::map<u16, SharedBuffer<u8> > chunks; + u32 chunk_count; + float time; // Seconds from adding + bool reliable; // If true, isn't deleted on timeout + + bool allReceived() + { + return (chunks.size() == chunk_count); + } +}; + +/* +=== NOTES === + +A packet is sent through a channel to a peer with a basic header: +TODO: Should we have a receiver_peer_id also? + Header (7 bytes): + [0] u32 protocol_id + [4] u16 sender_peer_id + [6] u8 channel +sender_peer_id: + Unique to each peer. + value 0 (PEER_ID_INEXISTENT) is reserved for making new connections + value 1 (PEER_ID_SERVER) is reserved for server + these constants are defined in constants.h +channel: + The lower the number, the higher the priority is. + Only channels 0, 1 and 2 exist. +*/ +#define BASE_HEADER_SIZE 7 +#define CHANNEL_COUNT 3 +/* +Packet types: + +CONTROL: This is a packet used by the protocol. +- When this is processed, nothing is handed to the user. + Header (2 byte): + [0] u8 type + [1] u8 controltype +controltype and data description: + CONTROLTYPE_ACK + [2] u16 seqnum + CONTROLTYPE_SET_PEER_ID + [2] u16 peer_id_new + CONTROLTYPE_PING + - There is no actual reply, but this can be sent in a reliable + packet to get a reply + CONTROLTYPE_DISCO +*/ +#define TYPE_CONTROL 0 +#define CONTROLTYPE_ACK 0 +#define CONTROLTYPE_SET_PEER_ID 1 +#define CONTROLTYPE_PING 2 +#define CONTROLTYPE_DISCO 3 +#define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4 + +/* +ORIGINAL: This is a plain packet with no control and no error +checking at all. +- When this is processed, it is directly handed to the user. + Header (1 byte): + [0] u8 type +*/ +#define TYPE_ORIGINAL 1 +#define ORIGINAL_HEADER_SIZE 1 +/* +SPLIT: These are sequences of packets forming one bigger piece of +data. +- When processed and all the packet_nums 0...packet_count-1 are + present (this should be buffered), the resulting data shall be + directly handed to the user. +- If the data fails to come up in a reasonable time, the buffer shall + be silently discarded. +- These can be sent as-is or atop of a RELIABLE packet stream. + Header (7 bytes): + [0] u8 type + [1] u16 seqnum + [3] u16 chunk_count + [5] u16 chunk_num +*/ +#define TYPE_SPLIT 2 +/* +RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs, +and they shall be delivered in the same order as sent. This is done +with a buffer in the receiving and transmitting end. +- When this is processed, the contents of each packet is recursively + processed as packets. + Header (3 bytes): + [0] u8 type + [1] u16 seqnum + +*/ +#define TYPE_RELIABLE 3 +#define RELIABLE_HEADER_SIZE 3 +#define SEQNUM_INITIAL 65500 + +/* + A buffer which stores reliable packets and sorts them internally + for fast access to the smallest one. +*/ + +typedef std::list<BufferedPacket>::iterator RPBSearchResult; + +class ReliablePacketBuffer +{ +public: + ReliablePacketBuffer(); + + bool getFirstSeqnum(u16& result); + + BufferedPacket popFirst(); + BufferedPacket popSeqnum(u16 seqnum); + void insert(BufferedPacket &p,u16 next_expected); + + void incrementTimeouts(float dtime); + std::list<BufferedPacket> getTimedOuts(float timeout, + unsigned int max_packets); + + void print(); + bool empty(); + bool containsPacket(u16 seqnum); + RPBSearchResult notFound(); + u32 size(); + + +private: + RPBSearchResult findPacket(u16 seqnum); + + std::list<BufferedPacket> m_list; + u32 m_list_size; + + u16 m_oldest_non_answered_ack; + + JMutex m_list_mutex; +}; + +/* + A buffer for reconstructing split packets +*/ + +class IncomingSplitBuffer +{ +public: + ~IncomingSplitBuffer(); + /* + Returns a reference counted buffer of length != 0 when a full split + packet is constructed. If not, returns one of length 0. + */ + SharedBuffer<u8> insert(BufferedPacket &p, bool reliable); + + void removeUnreliableTimedOuts(float dtime, float timeout); + +private: + // Key is seqnum + std::map<u16, IncomingSplitPacket*> m_buf; + + JMutex m_map_mutex; +}; + +struct OutgoingPacket +{ + u16 peer_id; + u8 channelnum; + SharedBuffer<u8> data; + bool reliable; + bool ack; + + OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_, + bool reliable_,bool ack_=false): + peer_id(peer_id_), + channelnum(channelnum_), + data(data_), + reliable(reliable_), + ack(ack_) + { + } +}; + +enum ConnectionCommandType{ + CONNCMD_NONE, + CONNCMD_SERVE, + CONNCMD_CONNECT, + CONNCMD_DISCONNECT, + CONNCMD_DISCONNECT_PEER, + CONNCMD_SEND, + CONNCMD_SEND_TO_ALL, + CONCMD_ACK, + CONCMD_CREATE_PEER, + CONCMD_DISABLE_LEGACY +}; + +struct ConnectionCommand +{ + enum ConnectionCommandType type; + Address address; + u16 peer_id; + u8 channelnum; + Buffer<u8> data; + bool reliable; + bool raw; + + ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {} + + void serve(Address address_) + { + type = CONNCMD_SERVE; + address = address_; + } + void connect(Address address_) + { + type = CONNCMD_CONNECT; + address = address_; + } + void disconnect() + { + type = CONNCMD_DISCONNECT; + } + void disconnect_peer(u16 peer_id_) + { + type = CONNCMD_DISCONNECT_PEER; + peer_id = peer_id_; + } + void send(u16 peer_id_, u8 channelnum_, + NetworkPacket* pkt, bool reliable_) + { + type = CONNCMD_SEND; + peer_id = peer_id_; + channelnum = channelnum_; + data = pkt->oldForgePacket(); + reliable = reliable_; + } + + void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_) + { + type = CONCMD_ACK; + peer_id = peer_id_; + channelnum = channelnum_; + data = data_; + reliable = false; + } + + void createPeer(u16 peer_id_, SharedBuffer<u8> data_) + { + type = CONCMD_CREATE_PEER; + peer_id = peer_id_; + data = data_; + channelnum = 0; + reliable = true; + raw = true; + } + + void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_) + { + type = CONCMD_DISABLE_LEGACY; + peer_id = peer_id_; + data = data_; + channelnum = 0; + reliable = true; + raw = true; + } +}; + +class Channel +{ + +public: + u16 readNextIncomingSeqNum(); + u16 incNextIncomingSeqNum(); + + u16 getOutgoingSequenceNumber(bool& successfull); + u16 readOutgoingSequenceNumber(); + bool putBackSequenceNumber(u16); + + u16 readNextSplitSeqNum(); + void setNextSplitSeqNum(u16 seqnum); + + // This is for buffering the incoming packets that are coming in + // the wrong order + ReliablePacketBuffer incoming_reliables; + // This is for buffering the sent packets so that the sender can + // re-send them if no ACK is received + ReliablePacketBuffer outgoing_reliables_sent; + + //queued reliable packets + std::queue<BufferedPacket> queued_reliables; + + //queue commands prior splitting to packets + std::deque<ConnectionCommand> queued_commands; + + IncomingSplitBuffer incoming_splits; + + Channel(); + ~Channel(); + + void UpdatePacketLossCounter(unsigned int count); + void UpdatePacketTooLateCounter(); + void UpdateBytesSent(unsigned int bytes,unsigned int packages=1); + void UpdateBytesLost(unsigned int bytes); + void UpdateBytesReceived(unsigned int bytes); + + void UpdateTimers(float dtime, bool legacy_peer); + + const float getCurrentDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; }; + const float getMaxDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_kbps; }; + + const float getCurrentLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; + const float getMaxLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; + + const float getCurrentIncomingRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; }; + const float getMaxIncomingRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; }; + + const float getAvgDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; }; + const float getAvgLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; + const float getAvgIncomingRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; }; + + const unsigned int getWindowSize() const { return window_size; }; + + void setWindowSize(unsigned int size) { window_size = size; }; +private: + JMutex m_internal_mutex; + int window_size; + + u16 next_incoming_seqnum; + + u16 next_outgoing_seqnum; + u16 next_outgoing_split_seqnum; + + unsigned int current_packet_loss; + unsigned int current_packet_too_late; + unsigned int current_packet_successfull; + float packet_loss_counter; + + unsigned int current_bytes_transfered; + unsigned int current_bytes_received; + unsigned int current_bytes_lost; + float max_kbps; + float cur_kbps; + float avg_kbps; + float max_incoming_kbps; + float cur_incoming_kbps; + float avg_incoming_kbps; + float max_kbps_lost; + float cur_kbps_lost; + float avg_kbps_lost; + float bpm_counter; + + unsigned int rate_samples; +}; + +class Peer; + +enum PeerChangeType +{ + PEER_ADDED, + PEER_REMOVED +}; +struct PeerChange +{ + PeerChangeType type; + u16 peer_id; + bool timeout; +}; + +class PeerHandler +{ +public: + + PeerHandler() + { + } + virtual ~PeerHandler() + { + } + + /* + This is called after the Peer has been inserted into the + Connection's peer container. + */ + virtual void peerAdded(Peer *peer) = 0; + /* + This is called before the Peer has been removed from the + Connection's peer container. + */ + virtual void deletingPeer(Peer *peer, bool timeout) = 0; +}; + +class PeerHelper +{ +public: + PeerHelper(); + PeerHelper(Peer* peer); + ~PeerHelper(); + + PeerHelper& operator=(Peer* peer); + Peer* operator->() const; + bool operator!(); + Peer* operator&() const; + bool operator!=(void* ptr); + +private: + Peer* m_peer; +}; + +class Connection; + +typedef enum { + MIN_RTT, + MAX_RTT, + AVG_RTT, + MIN_JITTER, + MAX_JITTER, + AVG_JITTER +} rtt_stat_type; + +typedef enum { + CUR_DL_RATE, + AVG_DL_RATE, + CUR_INC_RATE, + AVG_INC_RATE, + CUR_LOSS_RATE, + AVG_LOSS_RATE, +} rate_stat_type; + +class Peer { + public: + friend class PeerHelper; + + Peer(Address address_,u16 id_,Connection* connection) : + id(id_), + m_increment_packets_remaining(9), + m_increment_bytes_remaining(0), + m_pending_deletion(false), + m_connection(connection), + address(address_), + m_ping_timer(0.0), + m_last_rtt(-1.0), + m_usage(0), + m_timeout_counter(0.0), + m_last_timeout_check(porting::getTimeMs()), + m_has_sent_with_id(false) + { + m_rtt.avg_rtt = -1.0; + m_rtt.jitter_avg = -1.0; + m_rtt.jitter_max = 0.0; + m_rtt.max_rtt = 0.0; + m_rtt.jitter_min = FLT_MAX; + m_rtt.min_rtt = FLT_MAX; + }; + + virtual ~Peer() { + JMutexAutoLock usage_lock(m_exclusive_access_mutex); + FATAL_ERROR_IF(m_usage != 0, "Reference counting failure"); + }; + + // Unique id of the peer + u16 id; + + void Drop(); + + virtual void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size) {}; + + virtual bool isActive() { return false; }; + + virtual bool getAddress(MTProtocols type, Address& toset) = 0; + + void ResetTimeout() + {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; }; + + bool isTimedOut(float timeout); + + void setSentWithID() + { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; }; + + bool hasSentWithID() + { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; }; + + unsigned int m_increment_packets_remaining; + unsigned int m_increment_bytes_remaining; + + virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; }; + virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {}; + virtual SharedBuffer<u8> addSpiltPacket(u8 channel, + BufferedPacket toadd, + bool reliable) + { + fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n"); + return SharedBuffer<u8>(0); + }; + + virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; }; + + virtual float getStat(rtt_stat_type type) const { + switch (type) { + case MIN_RTT: + return m_rtt.min_rtt; + case MAX_RTT: + return m_rtt.max_rtt; + case AVG_RTT: + return m_rtt.avg_rtt; + case MIN_JITTER: + return m_rtt.jitter_min; + case MAX_JITTER: + return m_rtt.jitter_max; + case AVG_JITTER: + return m_rtt.jitter_avg; + } + return -1; + } + protected: + virtual void reportRTT(float rtt) {}; + + void RTTStatistics(float rtt, + std::string profiler_id="", + unsigned int num_samples=1000); + + bool IncUseCount(); + void DecUseCount(); + + JMutex m_exclusive_access_mutex; + + bool m_pending_deletion; + + Connection* m_connection; + + // Address of the peer + Address address; + + // Ping timer + float m_ping_timer; + private: + + struct rttstats { + float jitter_min; + float jitter_max; + float jitter_avg; + float min_rtt; + float max_rtt; + float avg_rtt; + }; + + rttstats m_rtt; + float m_last_rtt; + + // current usage count + unsigned int m_usage; + + // Seconds from last receive + float m_timeout_counter; + + u32 m_last_timeout_check; + + bool m_has_sent_with_id; +}; + +class UDPPeer : public Peer +{ +public: + + friend class PeerHelper; + friend class ConnectionReceiveThread; + friend class ConnectionSendThread; + friend class Connection; + + UDPPeer(u16 a_id, Address a_address, Connection* connection); + virtual ~UDPPeer() {}; + + void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size); + + bool isActive() + { return ((hasSentWithID()) && (!m_pending_deletion)); }; + + bool getAddress(MTProtocols type, Address& toset); + + void setNonLegacyPeer(); + + bool getLegacyPeer() + { return m_legacy_peer; } + + u16 getNextSplitSequenceNumber(u8 channel); + void setNextSplitSequenceNumber(u8 channel, u16 seqnum); + + SharedBuffer<u8> addSpiltPacket(u8 channel, + BufferedPacket toadd, + bool reliable); + + +protected: + /* + Calculates avg_rtt and resend_timeout. + rtt=-1 only recalculates resend_timeout + */ + void reportRTT(float rtt); + + void RunCommandQueues( + unsigned int max_packet_size, + unsigned int maxcommands, + unsigned int maxtransfer); + + float getResendTimeout() + { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } + + void setResendTimeout(float timeout) + { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } + bool Ping(float dtime,SharedBuffer<u8>& data); + + Channel channels[CHANNEL_COUNT]; + bool m_pending_disconnect; +private: + // This is changed dynamically + float resend_timeout; + + bool processReliableSendCommand( + ConnectionCommand &c, + unsigned int max_packet_size); + + bool m_legacy_peer; +}; + +/* + Connection +*/ + +enum ConnectionEventType{ + CONNEVENT_NONE, + CONNEVENT_DATA_RECEIVED, + CONNEVENT_PEER_ADDED, + CONNEVENT_PEER_REMOVED, + CONNEVENT_BIND_FAILED, +}; + +struct ConnectionEvent +{ + enum ConnectionEventType type; + u16 peer_id; + Buffer<u8> data; + bool timeout; + Address address; + + ConnectionEvent(): type(CONNEVENT_NONE), peer_id(0), + timeout(false) {} + + std::string describe() + { + switch(type) { + case CONNEVENT_NONE: + return "CONNEVENT_NONE"; + case CONNEVENT_DATA_RECEIVED: + return "CONNEVENT_DATA_RECEIVED"; + case CONNEVENT_PEER_ADDED: + return "CONNEVENT_PEER_ADDED"; + case CONNEVENT_PEER_REMOVED: + return "CONNEVENT_PEER_REMOVED"; + case CONNEVENT_BIND_FAILED: + return "CONNEVENT_BIND_FAILED"; + } + return "Invalid ConnectionEvent"; + } + + void dataReceived(u16 peer_id_, SharedBuffer<u8> data_) + { + type = CONNEVENT_DATA_RECEIVED; + peer_id = peer_id_; + data = data_; + } + void peerAdded(u16 peer_id_, Address address_) + { + type = CONNEVENT_PEER_ADDED; + peer_id = peer_id_; + address = address_; + } + void peerRemoved(u16 peer_id_, bool timeout_, Address address_) + { + type = CONNEVENT_PEER_REMOVED; + peer_id = peer_id_; + timeout = timeout_; + address = address_; + } + void bindFailed() + { + type = CONNEVENT_BIND_FAILED; + } +}; + +class ConnectionSendThread : public JThread { + +public: + friend class UDPPeer; + + ConnectionSendThread(unsigned int max_packet_size, float timeout); + + void * Thread (); + + void Trigger(); + + void setParent(Connection* parent) { + assert(parent != NULL); // Pre-condition + m_connection = parent; + } + + void setPeerTimeout(float peer_timeout) + { m_timeout = peer_timeout; } + +private: + void runTimeouts (float dtime); + void rawSend (const BufferedPacket &packet); + bool rawSendAsPacket(u16 peer_id, u8 channelnum, + SharedBuffer<u8> data, bool reliable); + + void processReliableCommand (ConnectionCommand &c); + void processNonReliableCommand (ConnectionCommand &c); + void serve (Address bind_address); + void connect (Address address); + void disconnect (); + void disconnect_peer(u16 peer_id); + void send (u16 peer_id, u8 channelnum, + SharedBuffer<u8> data); + void sendReliable (ConnectionCommand &c); + void sendToAll (u8 channelnum, + SharedBuffer<u8> data); + void sendToAllReliable(ConnectionCommand &c); + + void sendPackets (float dtime); + + void sendAsPacket (u16 peer_id, u8 channelnum, + SharedBuffer<u8> data,bool ack=false); + + void sendAsPacketReliable(BufferedPacket& p, Channel* channel); + + bool packetsQueued(); + + Connection* m_connection; + unsigned int m_max_packet_size; + float m_timeout; + std::queue<OutgoingPacket> m_outgoing_queue; + JSemaphore m_send_sleep_semaphore; + + unsigned int m_iteration_packets_avaialble; + unsigned int m_max_commands_per_iteration; + unsigned int m_max_data_packets_per_iteration; + unsigned int m_max_packets_requeued; +}; + +class ConnectionReceiveThread : public JThread { +public: + ConnectionReceiveThread(unsigned int max_packet_size); + + void * Thread (); + + void setParent(Connection* parent) { + assert(parent != NULL); // Pre-condition + m_connection = parent; + } + +private: + void receive (); + + // Returns next data from a buffer if possible + // If found, returns true; if not, false. + // If found, sets peer_id and dst + bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst); + + bool checkIncomingBuffers(Channel *channel, u16 &peer_id, + SharedBuffer<u8> &dst); + + /* + Processes a packet with the basic header stripped out. + Parameters: + packetdata: Data in packet (with no base headers) + peer_id: peer id of the sender of the packet in question + channelnum: channel on which the packet was sent + reliable: true if recursing into a reliable packet + */ + SharedBuffer<u8> processPacket(Channel *channel, + SharedBuffer<u8> packetdata, u16 peer_id, + u8 channelnum, bool reliable); + + + Connection* m_connection; +}; + +class Connection +{ +public: + friend class ConnectionSendThread; + friend class ConnectionReceiveThread; + + Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, + PeerHandler *peerhandler); + ~Connection(); + + /* Interface */ + ConnectionEvent waitEvent(u32 timeout_ms); + void putCommand(ConnectionCommand &c); + + void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; } + void Serve(Address bind_addr); + void Connect(Address address); + bool Connected(); + void Disconnect(); + void Receive(NetworkPacket* pkt); + void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable); + u16 GetPeerID() { return m_peer_id; } + Address GetPeerAddress(u16 peer_id); + float getPeerStat(u16 peer_id, rtt_stat_type type); + float getLocalStat(rate_stat_type type); + const u32 GetProtocolID() const { return m_protocol_id; }; + const std::string getDesc(); + void DisconnectPeer(u16 peer_id); + +protected: + PeerHelper getPeer(u16 peer_id); + PeerHelper getPeerNoEx(u16 peer_id); + u16 lookupPeer(Address& sender); + + u16 createPeer(Address& sender, MTProtocols protocol, int fd); + UDPPeer* createServerPeer(Address& sender); + bool deletePeer(u16 peer_id, bool timeout); + + void SetPeerID(u16 id) { m_peer_id = id; } + + void sendAck(u16 peer_id, u8 channelnum, u16 seqnum); + + void PrintInfo(std::ostream &out); + void PrintInfo(); + + std::list<u16> getPeerIDs() + { + JMutexAutoLock peerlock(m_peers_mutex); + return m_peer_ids; + } + + UDPSocket m_udpSocket; + MutexedQueue<ConnectionCommand> m_command_queue; + + void putEvent(ConnectionEvent &e); + + void TriggerSend() + { m_sendThread.Trigger(); } +private: + std::list<Peer*> getPeers(); + + MutexedQueue<ConnectionEvent> m_event_queue; + + u16 m_peer_id; + u32 m_protocol_id; + + std::map<u16, Peer*> m_peers; + std::list<u16> m_peer_ids; + JMutex m_peers_mutex; + + ConnectionSendThread m_sendThread; + ConnectionReceiveThread m_receiveThread; + + JMutex m_info_mutex; + + // Backwards compatibility + PeerHandler *m_bc_peerhandler; + int m_bc_receive_timeout; + + bool m_shutting_down; + + u16 m_next_remote_peer_id; +}; + +} // namespace + +#endif |