From 4b6138e69b65271b0e568f821a4d1bd285affedd Mon Sep 17 00:00:00 2001 From: Perttu Ahola Date: Thu, 20 Oct 2011 23:04:09 +0300 Subject: Improve Connection with threading and some kind of congestion control --- src/connection.h | 281 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 209 insertions(+), 72 deletions(-) (limited to 'src/connection.h') diff --git a/src/connection.h b/src/connection.h index 6eb2f2824..570bc92ab 100644 --- a/src/connection.h +++ b/src/connection.h @@ -319,28 +319,6 @@ struct Channel { Channel(); ~Channel(); - /* - Processes a packet with the basic header stripped out. - Parameters: - packetdata: Data in packet (with no base headers) - con: The connection to which the channel is associated - (used for sending back stuff (ACKs)) - peer_id: peer id of the sender of the packet in question - channelnum: channel on which the packet was sent - reliable: true if recursing into a reliable packet - */ - SharedBuffer ProcessPacket( - SharedBuffer packetdata, - Connection *con, - u16 peer_id, - u8 channelnum, - bool reliable=false); - - // Returns next data from a buffer if possible - // throws a NoIncomingDataException if no data is available - // If found, sets peer_id - SharedBuffer CheckIncomingBuffers(Connection *con, - u16 &peer_id); u16 next_outgoing_seqnum; u16 next_incoming_seqnum; @@ -412,78 +390,237 @@ public: // with the id we have given to it bool has_sent_with_id; + float m_sendtime_accu; + float m_max_packets_per_second; + int m_num_sent; + int m_max_num_sent; + private: }; -class Connection +/* + Connection +*/ + +struct OutgoingPacket +{ + u16 peer_id; + u8 channelnum; + SharedBuffer data; + bool reliable; + + OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer data_, + bool reliable_): + peer_id(peer_id_), + channelnum(channelnum_), + data(data_), + reliable(reliable_) + { + } +}; + +enum ConnectionEventType{ + CONNEVENT_NONE, + CONNEVENT_DATA_RECEIVED, + CONNEVENT_PEER_ADDED, + CONNEVENT_PEER_REMOVED, +}; + +struct ConnectionEvent +{ + enum ConnectionEventType type; + u16 peer_id; + SharedBuffer data; + bool timeout; + Address address; + + ConnectionEvent(): type(CONNEVENT_NONE) {} + + std::string describe() + { + switch(type){ + case CONNEVENT_NONE: + return "CONNEVENT_NONE"; + case CONNEVENT_DATA_RECEIVED: + return "CONNEVENT_DATA_RECEIVED"; + case CONNEVENT_PEER_ADDED: + return "CONNEVENT_PEER_ADDED"; + case CONNEVENT_PEER_REMOVED: + return "CONNEVENT_PEER_REMOVED"; + } + return "Invalid ConnectionEvent"; + } + + void dataReceived(u16 peer_id_, SharedBuffer data_) + { + type = CONNEVENT_DATA_RECEIVED; + peer_id = peer_id_; + data = data_; + } + void peerAdded(u16 peer_id_, Address address_) + { + type = CONNEVENT_PEER_ADDED; + peer_id = peer_id_; + address = address_; + } + void peerRemoved(u16 peer_id_, bool timeout_, Address address_) + { + type = CONNEVENT_PEER_REMOVED; + peer_id = peer_id_; + timeout = timeout_; + address = address_; + } +}; + +enum ConnectionCommandType{ + CONNCMD_NONE, + CONNCMD_SERVE, + CONNCMD_CONNECT, + CONNCMD_DISCONNECT, + CONNCMD_SEND, + CONNCMD_SEND_TO_ALL, + CONNCMD_DELETE_PEER, +}; + +struct ConnectionCommand +{ + enum ConnectionCommandType type; + u16 port; + Address address; + u16 peer_id; + u8 channelnum; + SharedBuffer data; + bool reliable; + + ConnectionCommand(): type(CONNCMD_NONE) {} + + void serve(u16 port_) + { + type = CONNCMD_SERVE; + port = port_; + } + void connect(Address address_) + { + type = CONNCMD_CONNECT; + address = address_; + } + void disconnect() + { + type = CONNCMD_DISCONNECT; + } + void send(u16 peer_id_, u8 channelnum_, + SharedBuffer data_, bool reliable_) + { + type = CONNCMD_SEND; + peer_id = peer_id_; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + void sendToAll(u8 channelnum_, SharedBuffer data_, bool reliable_) + { + type = CONNCMD_SEND_TO_ALL; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + void deletePeer(u16 peer_id_) + { + type = CONNCMD_DELETE_PEER; + peer_id = peer_id_; + } +}; + +class Connection: public SimpleThread { public: - Connection( - u32 protocol_id, - u32 max_packet_size, - float timeout, - PeerHandler *peerhandler - ); + Connection(u32 protocol_id, u32 max_packet_size, float timeout); + Connection(u32 protocol_id, u32 max_packet_size, float timeout, + PeerHandler *peerhandler); ~Connection(); - void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); } - // Start being a server + void * Thread(); + + /* Interface */ + + ConnectionEvent getEvent(); + ConnectionEvent waitEvent(u32 timeout_ms); + void putCommand(ConnectionCommand &c); + + void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; } void Serve(unsigned short port); - // Connect to a server void Connect(Address address); bool Connected(); - void Disconnect(); - - // Sets peer_id - SharedBuffer GetFromBuffers(u16 &peer_id); - - // The peer_id of sender is stored in peer_id - // Return value: I guess this always throws an exception or - // actually gets data - // May call PeerHandler methods u32 Receive(u16 &peer_id, u8 *data, u32 datasize); - - // These will automatically package the data as an original or split void SendToAll(u8 channelnum, SharedBuffer data, bool reliable); void Send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable); - // Send data as a packet; it will be wrapped in base header and - // optionally to a reliable packet. - void SendAsPacket(u16 peer_id, u8 channelnum, + void RunTimeouts(float dtime); // dummy + u16 GetPeerID(){ return m_peer_id; } + Address GetPeerAddress(u16 peer_id); + float GetPeerAvgRTT(u16 peer_id); + void DeletePeer(u16 peer_id); + +private: + void putEvent(ConnectionEvent &e); + void processCommand(ConnectionCommand &c); + void send(float dtime); + void receive(); + void runTimeouts(float dtime); + void serve(u16 port); + void connect(Address address); + void disconnect(); + void sendToAll(u8 channelnum, SharedBuffer data, bool reliable); + void send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable); + void sendAsPacket(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable); - // Sends a raw packet - void RawSend(const BufferedPacket &packet); + void rawSendAsPacket(u16 peer_id, u8 channelnum, + SharedBuffer data, bool reliable); + void rawSend(const BufferedPacket &packet); + Peer* getPeer(u16 peer_id); + Peer* getPeerNoEx(u16 peer_id); + core::list getPeers(); + bool getFromBuffers(u16 &peer_id, SharedBuffer &dst); + // Returns next data from a buffer if possible + // If found, returns true; if not, false. + // If found, sets peer_id and dst + bool checkIncomingBuffers(Channel *channel, u16 &peer_id, + SharedBuffer &dst); + /* + Processes a packet with the basic header stripped out. + Parameters: + packetdata: Data in packet (with no base headers) + peer_id: peer id of the sender of the packet in question + channelnum: channel on which the packet was sent + reliable: true if recursing into a reliable packet + */ + SharedBuffer processPacket(Channel *channel, + SharedBuffer packetdata, u16 peer_id, + u8 channelnum, bool reliable); + bool deletePeer(u16 peer_id, bool timeout); - // May call PeerHandler methods - void RunTimeouts(float dtime); - - // Can throw a PeerNotFoundException - Peer* GetPeer(u16 peer_id); - // returns NULL if failed - Peer* GetPeerNoEx(u16 peer_id); - core::list GetPeers(); + Queue m_outgoing_queue; + MutexedQueue m_event_queue; + MutexedQueue m_command_queue; - // Calls PeerHandler::deletingPeer - // Returns false if peer was not found - bool deletePeer(u16 peer_id, bool timeout); + u32 m_protocol_id; + u32 m_max_packet_size; + float m_timeout; + UDPSocket m_socket; + u16 m_peer_id; + + core::map m_peers; + JMutex m_peers_mutex; + // Backwards compatibility + PeerHandler *m_bc_peerhandler; + int m_bc_receive_timeout; + void SetPeerID(u16 id){ m_peer_id = id; } - u16 GetPeerID(){ return m_peer_id; } u32 GetProtocolID(){ return m_protocol_id; } - - // For debug printing void PrintInfo(std::ostream &out); void PrintInfo(); + std::string getDesc(); u16 m_indentation; - -private: - u32 m_protocol_id; - float m_timeout; - PeerHandler *m_peerhandler; - core::map m_peers; - u16 m_peer_id; - //bool m_waiting_new_peer_id; - u32 m_max_packet_size; - UDPSocket m_socket; }; } // namespace -- cgit v1.2.3