diff options
author | Perttu Ahola <celeron55@gmail.com> | 2011-10-20 23:04:09 +0300 |
---|---|---|
committer | Perttu Ahola <celeron55@gmail.com> | 2011-10-20 23:04:09 +0300 |
commit | 4b6138e69b65271b0e568f821a4d1bd285affedd (patch) | |
tree | 003fd33f969e5a9bf0bc720bda7f869d1f9c1f45 /src | |
parent | b6fcbc5fbaba4a7faa65f792b16e47a405fa4ebf (diff) | |
download | minetest-4b6138e69b65271b0e568f821a4d1bd285affedd.tar.gz minetest-4b6138e69b65271b0e568f821a4d1bd285affedd.tar.bz2 minetest-4b6138e69b65271b0e568f821a4d1bd285affedd.zip |
Improve Connection with threading and some kind of congestion control
Diffstat (limited to 'src')
-rw-r--r-- | src/client.cpp | 23 | ||||
-rw-r--r-- | src/client.h | 10 | ||||
-rw-r--r-- | src/connection.cpp | 1368 | ||||
-rw-r--r-- | src/connection.h | 281 | ||||
-rw-r--r-- | src/defaultsettings.cpp | 2 | ||||
-rw-r--r-- | src/main.cpp | 2 | ||||
-rw-r--r-- | src/map.cpp | 2 | ||||
-rw-r--r-- | src/server.cpp | 60 | ||||
-rw-r--r-- | src/server.h | 4 | ||||
-rw-r--r-- | src/servercommand.cpp | 23 | ||||
-rw-r--r-- | src/test.cpp | 48 |
11 files changed, 1135 insertions, 688 deletions
diff --git a/src/client.cpp b/src/client.cpp index 5ec53a524..a777293a3 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -246,7 +246,7 @@ void Client::connect(Address address) { DSTACK(__FUNCTION_NAME); //JMutexAutoLock lock(m_con_mutex); //bulk comment-out - m_con.setTimeoutMs(0); + m_con.SetTimeoutMs(0); m_con.Connect(address); } @@ -563,8 +563,8 @@ void Client::step(float dtime) counter = 0.0; //JMutexAutoLock lock(m_con_mutex); //bulk comment-out // connectedAndInitialized() is true, peer exists. - con::Peer *peer = m_con.GetPeer(PEER_ID_SERVER); - infostream<<"Client: avg_rtt="<<peer->avg_rtt<<std::endl; + float avg_rtt = m_con.GetPeerAvgRTT(PEER_ID_SERVER); + infostream<<"Client: avg_rtt="<<avg_rtt<<std::endl; } } @@ -709,14 +709,6 @@ void Client::ProcessData(u8 *data, u32 datasize, u16 sender_peer_id) return; } - con::Peer *peer; - { - //JMutexAutoLock lock(m_con_mutex); //bulk comment-out - // All data is coming from the server - // PeerNotFoundException is handled by caller. - peer = m_con.GetPeer(PEER_ID_SERVER); - } - u8 ser_version = m_server_ser_ver; //infostream<<"Client received command="<<(int)command<<std::endl; @@ -2168,9 +2160,10 @@ ClientEvent Client::getClientEvent() float Client::getRTT(void) { - con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER); - if(!peer) - return 0.0; - return peer->avg_rtt; + try{ + return m_con.GetPeerAvgRTT(PEER_ID_SERVER); + } catch(con::PeerNotFoundException &e){ + return 1337; + } } diff --git a/src/client.h b/src/client.h index 52dd66ca2..8585f6d4a 100644 --- a/src/client.h +++ b/src/client.h @@ -251,11 +251,11 @@ public: float getAvgRtt() { - //JMutexAutoLock lock(m_con_mutex); //bulk comment-out - con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER); - if(peer == NULL) - return 0.0; - return peer->avg_rtt; + try{ + return m_con.GetPeerAvgRTT(PEER_ID_SERVER); + } catch(con::PeerNotFoundException){ + return 1337; + } } bool getChatMessage(std::wstring &message) diff --git a/src/connection.cpp b/src/connection.cpp index 623994c4c..1c424839f 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -20,6 +20,8 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "connection.h" #include "main.h" #include "serialization.h" +#include "log.h" +#include "porting.h" namespace con { @@ -439,16 +441,19 @@ Channel::~Channel() Peer */ -Peer::Peer(u16 a_id, Address a_address) +Peer::Peer(u16 a_id, Address a_address): + address(a_address), + id(a_id), + timeout_counter(0.0), + ping_timer(0.0), + resend_timeout(0.5), + avg_rtt(-1.0), + has_sent_with_id(false), + m_sendtime_accu(0), + m_max_packets_per_second(10), + m_num_sent(0), + m_max_num_sent(0) { - id = a_id; - address = a_address; - timeout_counter = 0.0; - //resend_timeout = RESEND_TIMEOUT_MINIMUM; - ping_timer = 0.0; - resend_timeout = 0.5; - avg_rtt = -1.0; - has_sent_with_id = false; } Peer::~Peer() { @@ -456,6 +461,19 @@ Peer::~Peer() void Peer::reportRTT(float rtt) { + if(rtt >= 0.0){ + if(rtt < 0.01){ + if(m_max_packets_per_second < 100) + m_max_packets_per_second += 10; + } else if(rtt < 0.2){ + if(m_max_packets_per_second < 100) + m_max_packets_per_second += 2; + } else { + if(m_max_packets_per_second > 5) + m_max_packets_per_second *= 0.5; + } + } + if(rtt < -0.999) {} else if(avg_rtt < 0.0) @@ -485,461 +503,210 @@ void Peer::reportRTT(float rtt) Connection */ -Connection::Connection( - u32 protocol_id, - u32 max_packet_size, - float timeout, - PeerHandler *peerhandler -) +Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout): + m_protocol_id(protocol_id), + m_max_packet_size(max_packet_size), + m_timeout(timeout), + m_peer_id(0), + m_bc_peerhandler(NULL), + m_bc_receive_timeout(0), + m_indentation(0) { - assert(peerhandler != NULL); + m_socket.setTimeoutMs(5); - m_protocol_id = protocol_id; - m_max_packet_size = max_packet_size; - m_timeout = timeout; - m_peer_id = PEER_ID_INEXISTENT; - //m_waiting_new_peer_id = false; - m_indentation = 0; - m_peerhandler = peerhandler; + Start(); } -Connection::~Connection() +Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, + PeerHandler *peerhandler): + m_protocol_id(protocol_id), + m_max_packet_size(max_packet_size), + m_timeout(timeout), + m_peer_id(0), + m_bc_peerhandler(peerhandler), + m_bc_receive_timeout(0), + m_indentation(0) { - // Clear peers - core::map<u16, Peer*>::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - delete peer; - } -} + m_socket.setTimeoutMs(5); -void Connection::Serve(unsigned short port) -{ - m_socket.Bind(port); - m_peer_id = PEER_ID_SERVER; + Start(); } -void Connection::Connect(Address address) -{ - core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER); - if(node != NULL){ - throw ConnectionException("Already connected to a server"); - } - Peer *peer = new Peer(PEER_ID_SERVER, address); - m_peers.insert(peer->id, peer); - m_peerhandler->peerAdded(peer); - - m_socket.Bind(0); - - // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT - m_peer_id = PEER_ID_INEXISTENT; - SharedBuffer<u8> data(0); - Send(PEER_ID_SERVER, 0, data, true); - - //m_waiting_new_peer_id = true; -} - -void Connection::Disconnect() +Connection::~Connection() { - // Create and send DISCO packet - SharedBuffer<u8> data(2); - writeU8(&data[0], TYPE_CONTROL); - writeU8(&data[1], CONTROLTYPE_DISCO); - - // Send to all - core::map<u16, Peer*>::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - SendAsPacket(peer->id, 0, data, false); - } + stop(); } -bool Connection::Connected() -{ - if(m_peers.size() != 1) - return false; - - core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER); - if(node == NULL) - return false; - - if(m_peer_id == PEER_ID_INEXISTENT) - return false; - - return true; -} +/* Internal stuff */ -SharedBuffer<u8> Channel::ProcessPacket( - SharedBuffer<u8> packetdata, - Connection *con, - u16 peer_id, - u8 channelnum, - bool reliable) +void * Connection::Thread() { - IndentationRaiser iraiser(&(con->m_indentation)); - - if(packetdata.getSize() < 1) - throw InvalidIncomingDataException("packetdata.getSize() < 1"); + ThreadStarted(); + log_register_thread("Connection"); - u8 type = readU8(&packetdata[0]); + dout_con<<"Connection thread started"<<std::endl; - if(type == TYPE_CONTROL) - { - if(packetdata.getSize() < 2) - throw InvalidIncomingDataException("packetdata.getSize() < 2"); - - u8 controltype = readU8(&packetdata[1]); + u32 curtime = porting::getTimeMs(); + u32 lasttime = curtime; - if(controltype == CONTROLTYPE_ACK) - { - if(packetdata.getSize() < 4) - throw InvalidIncomingDataException - ("packetdata.getSize() < 4 (ACK header size)"); - - u16 seqnum = readU16(&packetdata[2]); - con->PrintInfo(); - dout_con<<"Got CONTROLTYPE_ACK: channelnum=" - <<((int)channelnum&0xff)<<", peer_id="<<peer_id - <<", seqnum="<<seqnum<<std::endl; - - try{ - BufferedPacket p = outgoing_reliables.popSeqnum(seqnum); - // Get round trip time - float rtt = p.totaltime; - - // Let peer calculate stuff according to it - // (avg_rtt and resend_timeout) - Peer *peer = con->GetPeer(peer_id); - peer->reportRTT(rtt); - - //con->PrintInfo(dout_con); - //dout_con<<"RTT = "<<rtt<<std::endl; - - /*dout_con<<"OUTGOING: "; - con->PrintInfo(); - outgoing_reliables.print(); - dout_con<<std::endl;*/ - } - catch(NotFoundException &e){ - con->PrintInfo(derr_con); - derr_con<<"WARNING: ACKed packet not " - "in outgoing queue" - <<std::endl; - } - - throw ProcessedSilentlyException("Got an ACK"); - } - else if(controltype == CONTROLTYPE_SET_PEER_ID) - { - if(packetdata.getSize() < 4) - throw InvalidIncomingDataException - ("packetdata.getSize() < 4 (SET_PEER_ID header size)"); - u16 peer_id_new = readU16(&packetdata[2]); - con->PrintInfo(); - dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl; - - if(con->GetPeerID() != PEER_ID_INEXISTENT) - { - con->PrintInfo(derr_con); - derr_con<<"WARNING: Not changing" - " existing peer id."<<std::endl; - } - else - { - dout_con<<"changing."<<std::endl; - con->SetPeerID(peer_id_new); - } - throw ProcessedSilentlyException("Got a SET_PEER_ID"); - } - else if(controltype == CONTROLTYPE_PING) - { - // Just ignore it, the incoming data already reset - // the timeout counter - con->PrintInfo(); - dout_con<<"PING"<<std::endl; - throw ProcessedSilentlyException("Got a PING"); - } - else if(controltype == CONTROLTYPE_DISCO) - { - // Just ignore it, the incoming data already reset - // the timeout counter - con->PrintInfo(); - dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl; - - if(con->deletePeer(peer_id, false) == false) - { - con->PrintInfo(derr_con); - derr_con<<"DISCO: Peer not found"<<std::endl; - } - - throw ProcessedSilentlyException("Got a DISCO"); - } - else{ - con->PrintInfo(derr_con); - derr_con<<"INVALID TYPE_CONTROL: invalid controltype=" - <<((int)controltype&0xff)<<std::endl; - throw InvalidIncomingDataException("Invalid control type"); - } - } - else if(type == TYPE_ORIGINAL) - { - if(packetdata.getSize() < ORIGINAL_HEADER_SIZE) - throw InvalidIncomingDataException - ("packetdata.getSize() < ORIGINAL_HEADER_SIZE"); - con->PrintInfo(); - dout_con<<"RETURNING TYPE_ORIGINAL to user" - <<std::endl; - // Get the inside packet out and return it - SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); - memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize()); - return payload; - } - else if(type == TYPE_SPLIT) + while(getRun()) { - // We have to create a packet again for buffering - // This isn't actually too bad an idea. - BufferedPacket packet = makePacket( - con->GetPeer(peer_id)->address, - packetdata, - con->GetProtocolID(), - peer_id, - channelnum); - // Buffer the packet - SharedBuffer<u8> data = incoming_splits.insert(packet, reliable); - if(data.getSize() != 0) - { - con->PrintInfo(); - dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, " - <<"size="<<data.getSize()<<std::endl; - return data; - } - con->PrintInfo(); - dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl; - throw ProcessedSilentlyException("Buffered a split packet chunk"); - } - else if(type == TYPE_RELIABLE) - { - // Recursive reliable packets not allowed - assert(reliable == false); - - if(packetdata.getSize() < RELIABLE_HEADER_SIZE) - throw InvalidIncomingDataException - ("packetdata.getSize() < RELIABLE_HEADER_SIZE"); - - u16 seqnum = readU16(&packetdata[1]); - - bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum); - bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum); + BEGIN_DEBUG_EXCEPTION_HANDLER - con->PrintInfo(); - if(is_future_packet) - dout_con<<"BUFFERING"; - else if(is_old_packet) - dout_con<<"OLD"; - else - dout_con<<"RECUR"; - dout_con<<" TYPE_RELIABLE seqnum="<<seqnum - <<" next="<<next_incoming_seqnum; - dout_con<<" [sending CONTROLTYPE_ACK" - " to peer_id="<<peer_id<<"]"; - dout_con<<std::endl; + lasttime = curtime; + curtime = porting::getTimeMs(); + float dtime = (float)(curtime - lasttime) / 1000.; + if(dtime > 0.1) + dtime = 0.1; + if(dtime < 0.0) + dtime = 0.0; - //DEBUG - //assert(incoming_reliables.size() < 100); - - // Send a CONTROLTYPE_ACK - SharedBuffer<u8> reply(4); - writeU8(&reply[0], TYPE_CONTROL); - writeU8(&reply[1], CONTROLTYPE_ACK); - writeU16(&reply[2], seqnum); - con->SendAsPacket(peer_id, channelnum, reply, false); - - //if(seqnum_higher(seqnum, next_incoming_seqnum)) - if(is_future_packet) - { - /*con->PrintInfo(); - dout_con<<"Buffering reliable packet (seqnum=" - <<seqnum<<")"<<std::endl;*/ - - // This one comes later, buffer it. - // Actually we have to make a packet to buffer one. - // Well, we have all the ingredients, so just do it. - BufferedPacket packet = makePacket( - con->GetPeer(peer_id)->address, - packetdata, - con->GetProtocolID(), - peer_id, - channelnum); - try{ - incoming_reliables.insert(packet); - - /*con->PrintInfo(); - dout_con<<"INCOMING: "; - incoming_reliables.print(); - dout_con<<std::endl;*/ - } - catch(AlreadyExistsException &e) - { - } + runTimeouts(dtime); - throw ProcessedSilentlyException("Buffered future reliable packet"); - } - //else if(seqnum_higher(next_incoming_seqnum, seqnum)) - else if(is_old_packet) - { - // An old packet, dump it - throw InvalidIncomingDataException("Got an old reliable packet"); + while(m_command_queue.size() != 0){ + ConnectionCommand c = m_command_queue.pop_front(); + processCommand(c); } - next_incoming_seqnum++; - - // Get out the inside packet and re-process it - SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); - memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); + send(dtime); - return ProcessPacket(payload, con, peer_id, channelnum, true); - } - else - { - con->PrintInfo(derr_con); - derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl; - throw InvalidIncomingDataException("Invalid packet type"); + receive(); + + END_DEBUG_EXCEPTION_HANDLER(derr_con); } - - // We should never get here. - // If you get here, add an exception or a return to some of the - // above conditionals. - assert(0); - throw BaseException("Error in Channel::ProcessPacket()"); + + return NULL; } -SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con, - u16 &peer_id) +void Connection::putEvent(ConnectionEvent &e) { - u16 firstseqnum = 0; - // Clear old packets from start of buffer - try{ - for(;;){ - firstseqnum = incoming_reliables.getFirstSeqnum(); - if(seqnum_higher(next_incoming_seqnum, firstseqnum)) - incoming_reliables.popFirst(); - else - break; - } - // This happens if all packets are old - }catch(con::NotFoundException) - {} - - if(incoming_reliables.empty() == false) - { - if(firstseqnum == next_incoming_seqnum) - { - BufferedPacket p = incoming_reliables.popFirst(); - - peer_id = readPeerId(*p.data); - u8 channelnum = readChannel(*p.data); - u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); - - con->PrintInfo(); - dout_con<<"UNBUFFERING TYPE_RELIABLE" - <<" seqnum="<<seqnum - <<" peer_id="<<peer_id - <<" channel="<<((int)channelnum&0xff) - <<std::endl; - - next_incoming_seqnum++; - - u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE; - // Get out the inside packet and re-process it - SharedBuffer<u8> payload(p.data.getSize() - headers_size); - memcpy(*payload, &p.data[headers_size], payload.getSize()); + assert(e.type != CONNEVENT_NONE); + m_event_queue.push_back(e); +} - return ProcessPacket(payload, con, peer_id, channelnum, true); - } +void Connection::processCommand(ConnectionCommand &c) +{ + switch(c.type){ + case CONNCMD_NONE: + dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl; + return; + case CONNCMD_SERVE: + dout_con<<getDesc()<<" processing CONNCMD_SERVE port=" + <<c.port<<std::endl; + serve(c.port); + return; + case CONNCMD_CONNECT: + dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl; + connect(c.address); + return; + case CONNCMD_DISCONNECT: + dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl; + disconnect(); + return; + case CONNCMD_SEND: + dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl; + send(c.peer_id, c.channelnum, c.data, c.reliable); + return; + case CONNCMD_SEND_TO_ALL: + dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl; + sendToAll(c.channelnum, c.data, c.reliable); + return; + case CONNCMD_DELETE_PEER: + dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl; + deletePeer(c.peer_id, false); + return; } - - throw NoIncomingDataException("No relevant data in buffers"); } -SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id) +void Connection::send(float dtime) { - core::map<u16, Peer*>::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) + for(core::map<u16, Peer*>::Iterator + j = m_peers.getIterator(); + j.atEnd() == false; j++) { Peer *peer = j.getNode()->getValue(); - for(u16 i=0; i<CHANNEL_COUNT; i++) - { - Channel *channel = &peer->channels[i]; - try{ - SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers - (this, peer_id); - - return resultdata; - } - catch(NoIncomingDataException &e) - { - } - catch(InvalidIncomingDataException &e) - { - } - catch(ProcessedSilentlyException &e) - { - } + peer->m_sendtime_accu += dtime; + peer->m_num_sent = 0; + peer->m_max_num_sent = peer->m_sendtime_accu * + peer->m_max_packets_per_second; + } + Queue<OutgoingPacket> postponed_packets; + while(m_outgoing_queue.size() != 0){ + OutgoingPacket packet = m_outgoing_queue.pop_front(); + Peer *peer = getPeerNoEx(packet.peer_id); + if(!peer) + continue; + if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){ + postponed_packets.push_back(packet); + } else if(peer->m_num_sent < peer->m_max_num_sent){ + rawSendAsPacket(packet.peer_id, packet.channelnum, + packet.data, packet.reliable); + peer->m_num_sent++; + } else { + postponed_packets.push_back(packet); } } - throw NoIncomingDataException("No relevant data in buffers"); + while(postponed_packets.size() != 0){ + m_outgoing_queue.push_back(postponed_packets.pop_front()); + } + for(core::map<u16, Peer*>::Iterator + j = m_peers.getIterator(); + j.atEnd() == false; j++) + { + Peer *peer = j.getNode()->getValue(); + peer->m_sendtime_accu -= (float)peer->m_num_sent / + peer->m_max_packets_per_second; + if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second) + peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second; + } } -u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) +// Receive packets from the network and buffers and create ConnectionEvents +void Connection::receive() { - /* - Receive a packet from the network - */ - + u32 datasize = 100000; // TODO: We can not know how many layers of header there are. // For now, just assume there are no other than the base headers. u32 packet_maxsize = datasize + BASE_HEADER_SIZE; Buffer<u8> packetdata(packet_maxsize); + + bool single_wait_done = false; for(;;) { - try - { - /* - Check if some buffer has relevant data - */ - try{ - SharedBuffer<u8> resultdata = GetFromBuffers(peer_id); - - if(datasize < resultdata.getSize()) - throw InvalidIncomingDataException - ("Buffer too small for received data"); - - memcpy(data, *resultdata, resultdata.getSize()); - return resultdata.getSize(); - } - catch(NoIncomingDataException &e) + try{ + /* Check if some buffer has relevant data */ { + u16 peer_id; + SharedBuffer<u8> resultdata; + bool got = getFromBuffers(peer_id, resultdata); + if(got){ + ConnectionEvent e; + e.dataReceived(peer_id, resultdata); + putEvent(e); + continue; + } } - - Address sender; + + if(single_wait_done){ + if(m_socket.WaitData(0) == false) + break; + } + + single_wait_done = true; + Address sender; s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize); if(received_size < 0) - throw NoIncomingDataException("No incoming data"); + break; if(received_size < BASE_HEADER_SIZE) - throw InvalidIncomingDataException("No full header received"); + continue; if(readU32(&packetdata[0]) != m_protocol_id) - throw InvalidIncomingDataException("Invalid protocol id"); + continue; - peer_id = readPeerId(*packetdata); + u16 peer_id = readPeerId(*packetdata); u8 channelnum = readChannel(*packetdata); if(channelnum > CHANNEL_COUNT-1){ PrintInfo(derr_con); @@ -999,17 +766,23 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) /* Find an unused peer id */ + bool out_of_ids = false; for(;;) { // Check if exists if(m_peers.find(peer_id_new) == NULL) break; // Check for overflow - if(peer_id_new == 65535) - throw ConnectionException - ("Connection ran out of peer ids"); + if(peer_id_new == 65535){ + out_of_ids = true; + break; + } peer_id_new++; } + if(out_of_ids){ + errorstream<<getDesc()<<" ran out of peer ids"<<std::endl; + continue; + } PrintInfo(); dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT," @@ -1018,14 +791,18 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) // Create a peer Peer *peer = new Peer(peer_id_new, sender); m_peers.insert(peer->id, peer); - m_peerhandler->peerAdded(peer); + + // Create peer addition event + ConnectionEvent e; + e.peerAdded(peer_id_new, sender); + putEvent(e); // Create CONTROL packet to tell the peer id to the new peer. SharedBuffer<u8> reply(4); writeU8(&reply[0], TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID); writeU16(&reply[2], peer_id_new); - SendAsPacket(peer_id_new, 0, reply, true); + sendAsPacket(peer_id_new, 0, reply, true); // We're now talking to a valid peer_id peer_id = peer_id_new; @@ -1053,12 +830,7 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) PrintInfo(derr_con); derr_con<<"Peer "<<peer_id<<" sending from different address." " Ignoring."<<std::endl; - throw InvalidIncomingDataException - ("Peer sending from different address"); - /*// If there is more data, receive again - if(m_socket.WaitData(0) == true) - continue; - throw NoIncomingDataException("No incoming data (2)");*/ + continue; } peer->timeout_counter = 0.0; @@ -1074,8 +846,8 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) try{ // Process it (the result is some data with no headers made by us) - SharedBuffer<u8> resultdata = channel->ProcessPacket - (strippeddata, this, peer_id, channelnum); + SharedBuffer<u8> resultdata = processPacket + (channel, strippeddata, peer_id, channelnum, false); PrintInfo(); dout_con<<"ProcessPacket returned data of size " @@ -1085,123 +857,20 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) throw InvalidIncomingDataException ("Buffer too small for received data"); - memcpy(data, *resultdata, resultdata.getSize()); - return resultdata.getSize(); - } - catch(ProcessedSilentlyException &e) - { - // If there is more data, receive again - if(m_socket.WaitData(0) == true) - continue; - } - throw NoIncomingDataException("No incoming data (2)"); - } // try - catch(InvalidIncomingDataException &e) - { - // If there is more data, receive again - if(m_socket.WaitData(0) == true) + ConnectionEvent e; + e.dataReceived(peer_id, resultdata); + putEvent(e); continue; - } - catch(SendFailedException &e) - { - } - } // for -} - -void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable) -{ - core::map<u16, Peer*>::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - Send(peer->id, channelnum, data, reliable); - } -} - -void Connection::Send(u16 peer_id, u8 channelnum, - SharedBuffer<u8> data, bool reliable) -{ - assert(channelnum < CHANNEL_COUNT); - - Peer *peer = GetPeerNoEx(peer_id); - if(peer == NULL) - return; - Channel *channel = &(peer->channels[channelnum]); - - u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE; - if(reliable) - chunksize_max -= RELIABLE_HEADER_SIZE; - - core::list<SharedBuffer<u8> > originals; - originals = makeAutoSplitPacket(data, chunksize_max, - channel->next_outgoing_split_seqnum); - - core::list<SharedBuffer<u8> >::Iterator i; - i = originals.begin(); - for(; i != originals.end(); i++) - { - SharedBuffer<u8> original = *i; - - SendAsPacket(peer_id, channelnum, original, reliable); - } -} - -void Connection::SendAsPacket(u16 peer_id, u8 channelnum, - SharedBuffer<u8> data, bool reliable) -{ - Peer *peer = GetPeer(peer_id); - Channel *channel = &(peer->channels[channelnum]); - - if(reliable) - { - u16 seqnum = channel->next_outgoing_seqnum; - channel->next_outgoing_seqnum++; - - SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum); - - // Add base headers and make a packet - BufferedPacket p = makePacket(peer->address, reliable, - m_protocol_id, m_peer_id, channelnum); - - try{ - // Buffer the packet - channel->outgoing_reliables.insert(p); - } - catch(AlreadyExistsException &e) - { - PrintInfo(derr_con); - derr_con<<"WARNING: Going to send a reliable packet " - "seqnum="<<seqnum<<" that is already " - "in outgoing buffer"<<std::endl; - //assert(0); + }catch(ProcessedSilentlyException &e){ } - - // Send the packet - RawSend(p); - } - else - { - // Add base headers and make a packet - BufferedPacket p = makePacket(peer->address, data, - m_protocol_id, m_peer_id, channelnum); - - // Send the packet - RawSend(p); + }catch(InvalidIncomingDataException &e){ } -} - -void Connection::RawSend(const BufferedPacket &packet) -{ - try{ - m_socket.Send(packet.address, *packet.data, packet.data.getSize()); - } catch(SendFailedException &e){ - derr_con<<"Connection::RawSend(): SendFailedException: " - <<packet.address.serializeString()<<std::endl; + catch(ProcessedSilentlyException &e){ } + } // for } -void Connection::RunTimeouts(float dtime) +void Connection::runTimeouts(float dtime) { core::list<u16> timeouted_peers; core::map<u16, Peer*>::Iterator j; @@ -1278,7 +947,7 @@ void Connection::RunTimeouts(float dtime) <<", seqnum="<<seqnum <<std::endl; - RawSend(*j); + rawSend(*j); // Enlarge avg_rtt and resend_timeout: // The rtt will be at least the timeout. @@ -1298,7 +967,7 @@ void Connection::RunTimeouts(float dtime) SharedBuffer<u8> data(2); writeU8(&data[0], TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_PING); - SendAsPacket(peer->id, 0, data, true); + rawSendAsPacket(peer->id, 0, data, true); peer->ping_timer = 0.0; } @@ -1317,12 +986,167 @@ nextpeer: } } -Peer* Connection::GetPeer(u16 peer_id) +void Connection::serve(u16 port) +{ + dout_con<<getDesc()<<" serving at port "<<port<<std::endl; + m_socket.Bind(port); + m_peer_id = PEER_ID_SERVER; +} + +void Connection::connect(Address address) +{ + dout_con<<getDesc()<<" connecting to "<<address.serializeString() + <<":"<<address.getPort()<<std::endl; + + core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER); + if(node != NULL){ + throw ConnectionException("Already connected to a server"); + } + + Peer *peer = new Peer(PEER_ID_SERVER, address); + m_peers.insert(peer->id, peer); + + // Create event + ConnectionEvent e; + e.peerAdded(peer->id, peer->address); + putEvent(e); + + m_socket.Bind(0); + + // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT + m_peer_id = PEER_ID_INEXISTENT; + SharedBuffer<u8> data(0); + Send(PEER_ID_SERVER, 0, data, true); +} + +void Connection::disconnect() +{ + dout_con<<getDesc()<<" disconnecting"<<std::endl; + + // Create and send DISCO packet + SharedBuffer<u8> data(2); + writeU8(&data[0], TYPE_CONTROL); + writeU8(&data[1], CONTROLTYPE_DISCO); + + // Send to all + core::map<u16, Peer*>::Iterator j; + j = m_peers.getIterator(); + for(; j.atEnd() == false; j++) + { + Peer *peer = j.getNode()->getValue(); + rawSendAsPacket(peer->id, 0, data, false); + } +} + +void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable) +{ + core::map<u16, Peer*>::Iterator j; + j = m_peers.getIterator(); + for(; j.atEnd() == false; j++) + { + Peer *peer = j.getNode()->getValue(); + send(peer->id, channelnum, data, reliable); + } +} + +void Connection::send(u16 peer_id, u8 channelnum, + SharedBuffer<u8> data, bool reliable) +{ + dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl; + + assert(channelnum < CHANNEL_COUNT); + + Peer *peer = getPeerNoEx(peer_id); + if(peer == NULL) + return; + Channel *channel = &(peer->channels[channelnum]); + + u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE; + if(reliable) + chunksize_max -= RELIABLE_HEADER_SIZE; + + core::list<SharedBuffer<u8> > originals; + originals = makeAutoSplitPacket(data, chunksize_max, + channel->next_outgoing_split_seqnum); + + core::list<SharedBuffer<u8> >::Iterator i; + i = originals.begin(); + for(; i != originals.end(); i++) + { + SharedBuffer<u8> original = *i; + + sendAsPacket(peer_id, channelnum, original, reliable); + } +} + +void Connection::sendAsPacket(u16 peer_id, u8 channelnum, + SharedBuffer<u8> data, bool reliable) +{ + OutgoingPacket packet(peer_id, channelnum, data, reliable); + m_outgoing_queue.push_back(packet); +} + +void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum, + SharedBuffer<u8> data, bool reliable) +{ + Peer *peer = getPeerNoEx(peer_id); + if(!peer) + return; + Channel *channel = &(peer->channels[channelnum]); + + if(reliable) + { + u16 seqnum = channel->next_outgoing_seqnum; + channel->next_outgoing_seqnum++; + + SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum); + + // Add base headers and make a packet + BufferedPacket p = makePacket(peer->address, reliable, + m_protocol_id, m_peer_id, channelnum); + + try{ + // Buffer the packet + channel->outgoing_reliables.insert(p); + } + catch(AlreadyExistsException &e) + { + PrintInfo(derr_con); + derr_con<<"WARNING: Going to send a reliable packet " + "seqnum="<<seqnum<<" that is already " + "in outgoing buffer"<<std::endl; + //assert(0); + } + + // Send the packet + rawSend(p); + } + else + { + // Add base headers and make a packet + BufferedPacket p = makePacket(peer->address, data, + m_protocol_id, m_peer_id, channelnum); + + // Send the packet + rawSend(p); + } +} + +void Connection::rawSend(const BufferedPacket &packet) +{ + try{ + m_socket.Send(packet.address, *packet.data, packet.data.getSize()); + } catch(SendFailedException &e){ + derr_con<<"Connection::rawSend(): SendFailedException: " + <<packet.address.serializeString()<<std::endl; + } +} + +Peer* Connection::getPeer(u16 peer_id) { core::map<u16, Peer*>::Node *node = m_peers.find(peer_id); if(node == NULL){ - // Peer not found throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)"); } @@ -1332,7 +1156,7 @@ Peer* Connection::GetPeer(u16 peer_id) return node->getValue(); } -Peer* Connection::GetPeerNoEx(u16 peer_id) +Peer* Connection::getPeerNoEx(u16 peer_id) { core::map<u16, Peer*>::Node *node = m_peers.find(peer_id); @@ -1346,7 +1170,7 @@ Peer* Connection::GetPeerNoEx(u16 peer_id) return node->getValue(); } -core::list<Peer*> Connection::GetPeers() +core::list<Peer*> Connection::getPeers() { core::list<Peer*> list; core::map<u16, Peer*>::Iterator j; @@ -1359,23 +1183,474 @@ core::list<Peer*> Connection::GetPeers() return list; } +bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst) +{ + core::map<u16, Peer*>::Iterator j; + j = m_peers.getIterator(); + for(; j.atEnd() == false; j++) + { + Peer *peer = j.getNode()->getValue(); + for(u16 i=0; i<CHANNEL_COUNT; i++) + { + Channel *channel = &peer->channels[i]; + SharedBuffer<u8> resultdata; + bool got = checkIncomingBuffers(channel, peer_id, resultdata); + if(got){ + dst = resultdata; + return true; + } + } + } + return false; +} + +bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id, + SharedBuffer<u8> &dst) +{ + u16 firstseqnum = 0; + // Clear old packets from start of buffer + try{ + for(;;){ + firstseqnum = channel->incoming_reliables.getFirstSeqnum(); + if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum)) + channel->incoming_reliables.popFirst(); + else + break; + } + // This happens if all packets are old + }catch(con::NotFoundException) + {} + + if(channel->incoming_reliables.empty() == false) + { + if(firstseqnum == channel->next_incoming_seqnum) + { + BufferedPacket p = channel->incoming_reliables.popFirst(); + + peer_id = readPeerId(*p.data); + u8 channelnum = readChannel(*p.data); + u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); + + PrintInfo(); + dout_con<<"UNBUFFERING TYPE_RELIABLE" + <<" seqnum="<<seqnum + <<" peer_id="<<peer_id + <<" channel="<<((int)channelnum&0xff) + <<std::endl; + + channel->next_incoming_seqnum++; + + u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE; + // Get out the inside packet and re-process it + SharedBuffer<u8> payload(p.data.getSize() - headers_size); + memcpy(*payload, &p.data[headers_size], payload.getSize()); + + dst = processPacket(channel, payload, peer_id, channelnum, true); + return true; + } + } + return false; +} + +SharedBuffer<u8> Connection::processPacket(Channel *channel, + SharedBuffer<u8> packetdata, u16 peer_id, + u8 channelnum, bool reliable) +{ + IndentationRaiser iraiser(&(m_indentation)); + + if(packetdata.getSize() < 1) + throw InvalidIncomingDataException("packetdata.getSize() < 1"); + + u8 type = readU8(&packetdata[0]); + + if(type == TYPE_CONTROL) + { + if(packetdata.getSize() < 2) + throw InvalidIncomingDataException("packetdata.getSize() < 2"); + + u8 controltype = readU8(&packetdata[1]); + + if(controltype == CONTROLTYPE_ACK) + { + if(packetdata.getSize() < 4) + throw InvalidIncomingDataException + ("packetdata.getSize() < 4 (ACK header size)"); + + u16 seqnum = readU16(&packetdata[2]); + PrintInfo(); + dout_con<<"Got CONTROLTYPE_ACK: channelnum=" + <<((int)channelnum&0xff)<<", peer_id="<<peer_id + <<", seqnum="<<seqnum<<std::endl; + + try{ + BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum); + // Get round trip time + float rtt = p.totaltime; + + // Let peer calculate stuff according to it + // (avg_rtt and resend_timeout) + Peer *peer = getPeer(peer_id); + peer->reportRTT(rtt); + + //PrintInfo(dout_con); + //dout_con<<"RTT = "<<rtt<<std::endl; + + /*dout_con<<"OUTGOING: "; + PrintInfo(); + channel->outgoing_reliables.print(); + dout_con<<std::endl;*/ + } + catch(NotFoundException &e){ + PrintInfo(derr_con); + derr_con<<"WARNING: ACKed packet not " + "in outgoing queue" + <<std::endl; + } + + throw ProcessedSilentlyException("Got an ACK"); + } + else if(controltype == CONTROLTYPE_SET_PEER_ID) + { + if(packetdata.getSize() < 4) + throw InvalidIncomingDataException + ("packetdata.getSize() < 4 (SET_PEER_ID header size)"); + u16 peer_id_new = readU16(&packetdata[2]); + PrintInfo(); + dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl; + + if(GetPeerID() != PEER_ID_INEXISTENT) + { + PrintInfo(derr_con); + derr_con<<"WARNING: Not changing" + " existing peer id."<<std::endl; + } + else + { + dout_con<<"changing."<<std::endl; + SetPeerID(peer_id_new); + } + throw ProcessedSilentlyException("Got a SET_PEER_ID"); + } + else if(controltype == CONTROLTYPE_PING) + { + // Just ignore it, the incoming data already reset + // the timeout counter + PrintInfo(); + dout_con<<"PING"<<std::endl; + throw ProcessedSilentlyException("Got a PING"); + } + else if(controltype == CONTROLTYPE_DISCO) + { + // Just ignore it, the incoming data already reset + // the timeout counter + PrintInfo(); + dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl; + + if(deletePeer(peer_id, false) == false) + { + PrintInfo(derr_con); + derr_con<<"DISCO: Peer not found"<<std::endl; + } + + throw ProcessedSilentlyException("Got a DISCO"); + } + else{ + PrintInfo(derr_con); + derr_con<<"INVALID TYPE_CONTROL: invalid controltype=" + <<((int)controltype&0xff)<<std::endl; + throw InvalidIncomingDataException("Invalid control type"); + } + } + else if(type == TYPE_ORIGINAL) + { + if(packetdata.getSize() < ORIGINAL_HEADER_SIZE) + throw InvalidIncomingDataException + ("packetdata.getSize() < ORIGINAL_HEADER_SIZE"); + PrintInfo(); + dout_con<<"RETURNING TYPE_ORIGINAL to user" + <<std::endl; + // Get the inside packet out and return it + SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); + memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize()); + return payload; + } + else if(type == TYPE_SPLIT) + { + // We have to create a packet again for buffering + // This isn't actually too bad an idea. + BufferedPacket packet = makePacket( + getPeer(peer_id)->address, + packetdata, + GetProtocolID(), + peer_id, + channelnum); + // Buffer the packet + SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable); + if(data.getSize() != 0) + { + PrintInfo(); + dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, " + <<"size="<<data.getSize()<<std::endl; + return data; + } + PrintInfo(); + dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl; + throw ProcessedSilentlyException("Buffered a split packet chunk"); + } + else if(type == TYPE_RELIABLE) + { + // Recursive reliable packets not allowed + assert(reliable == false); + + if(packetdata.getSize() < RELIABLE_HEADER_SIZE) + throw InvalidIncomingDataException + ("packetdata.getSize() < RELIABLE_HEADER_SIZE"); + + u16 seqnum = readU16(&packetdata[1]); + + bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum); + bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum); + + PrintInfo(); + if(is_future_packet) + dout_con<<"BUFFERING"; + else if(is_old_packet) + dout_con<<"OLD"; + else + dout_con<<"RECUR"; + dout_con<<" TYPE_RELIABLE seqnum="<<seqnum + <<" next="<<channel->next_incoming_seqnum; + dout_con<<" [sending CONTROLTYPE_ACK" + " to peer_id="<<peer_id<<"]"; + dout_con<<std::endl; + + //DEBUG + //assert(channel->incoming_reliables.size() < 100); + + // Send a CONTROLTYPE_ACK + SharedBuffer<u8> reply(4); + writeU8(&reply[0], TYPE_CONTROL); + writeU8(&reply[1], CONTROLTYPE_ACK); + writeU16(&reply[2], seqnum); + rawSendAsPacket(peer_id, channelnum, reply, false); + + //if(seqnum_higher(seqnum, channel->next_incoming_seqnum)) + if(is_future_packet) + { + /*PrintInfo(); + dout_con<<"Buffering reliable packet (seqnum=" + <<seqnum<<")"<<std::endl;*/ + + // This one comes later, buffer it. + // Actually we have to make a packet to buffer one. + // Well, we have all the ingredients, so just do it. + BufferedPacket packet = makePacket( + getPeer(peer_id)->address, + packetdata, + GetProtocolID(), + peer_id, + channelnum); + try{ + channel->incoming_reliables.insert(packet); + + /*PrintInfo(); + dout_con<<"INCOMING: "; + channel->incoming_reliables.print(); + dout_con<<std::endl;*/ + } + catch(AlreadyExistsException &e) + { + } + + throw ProcessedSilentlyException("Buffered future reliable packet"); + } + //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum)) + else if(is_old_packet) + { + // An old packet, dump it + throw InvalidIncomingDataException("Got an old reliable packet"); + } + + channel->next_incoming_seqnum++; + + // Get out the inside packet and re-process it + SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); + memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); + + return processPacket(channel, payload, peer_id, channelnum, true); + } + else + { + PrintInfo(derr_con); + derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl; + throw InvalidIncomingDataException("Invalid packet type"); + } + + // We should never get here. + // If you get here, add an exception or a return to some of the + // above conditionals. + assert(0); + throw BaseException("Error in Channel::ProcessPacket()"); +} + bool Connection::deletePeer(u16 peer_id, bool timeout) { if(m_peers.find(peer_id) == NULL) return false; - m_peerhandler->deletingPeer(m_peers[peer_id], timeout); + + Peer *peer = m_peers[peer_id]; + + // Create event + ConnectionEvent e; + e.peerRemoved(peer_id, timeout, peer->address); + putEvent(e); + delete m_peers[peer_id]; m_peers.remove(peer_id); return true; } +/* Interface */ + +ConnectionEvent Connection::getEvent() +{ + if(m_event_queue.size() == 0){ + ConnectionEvent e; + e.type = CONNEVENT_NONE; + return e; + } + return m_event_queue.pop_front(); +} + +ConnectionEvent Connection::waitEvent(u32 timeout_ms) +{ + try{ + return m_event_queue.pop_front(timeout_ms); + } catch(ItemNotFoundException &e){ + ConnectionEvent e; + e.type = CONNEVENT_NONE; + return e; + } +} + +void Connection::putCommand(ConnectionCommand &c) +{ + m_command_queue.push_back(c); +} + +void Connection::Serve(unsigned short port) +{ + ConnectionCommand c; + c.serve(port); + putCommand(c); +} + +void Connection::Connect(Address address) +{ + ConnectionCommand c; + c.connect(address); + putCommand(c); +} + +bool Connection::Connected() +{ + JMutexAutoLock peerlock(m_peers_mutex); + + if(m_peers.size() != 1) + return false; + + core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER); + if(node == NULL) + return false; + + if(m_peer_id == PEER_ID_INEXISTENT) + return false; + + return true; +} + +void Connection::Disconnect() +{ + ConnectionCommand c; + c.disconnect(); + putCommand(c); +} + +u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) +{ + for(;;){ + ConnectionEvent e = waitEvent(m_bc_receive_timeout); + if(e.type != CONNEVENT_NONE) + dout_con<<getDesc()<<": Receive: got event: " + <<e.describe()<<std::endl; + switch(e.type){ + case CONNEVENT_NONE: + throw NoIncomingDataException("No incoming data"); + case CONNEVENT_DATA_RECEIVED: + peer_id = e.peer_id; + memcpy(data, *e.data, e.data.getSize()); + return e.data.getSize(); + case CONNEVENT_PEER_ADDED: { + Peer tmp(e.peer_id, e.address); + if(m_bc_peerhandler) + m_bc_peerhandler->peerAdded(&tmp); + continue; } + case CONNEVENT_PEER_REMOVED: { + Peer tmp(e.peer_id, e.address); + if(m_bc_peerhandler) + m_bc_peerhandler->deletingPeer(&tmp, e.timeout); + continue; } + } + } + throw NoIncomingDataException("No incoming data"); +} + +void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable) +{ + assert(channelnum < CHANNEL_COUNT); + + ConnectionCommand c; + c.sendToAll(channelnum, data, reliable); + putCommand(c); +} + +void Connection::Send(u16 peer_id, u8 channelnum, + SharedBuffer<u8> data, bool reliable) +{ + assert(channelnum < CHANNEL_COUNT); + + ConnectionCommand c; + c.send(peer_id, channelnum, data, reliable); + putCommand(c); +} + +void Connection::RunTimeouts(float dtime) +{ + // No-op +} + +Address Connection::GetPeerAddress(u16 peer_id) +{ + JMutexAutoLock peerlock(m_peers_mutex); + return getPeer(peer_id)->address; +} + +float Connection::GetPeerAvgRTT(u16 peer_id) +{ + JMutexAutoLock peerlock(m_peers_mutex); + return getPeer(peer_id)->avg_rtt; +} + +void Connection::DeletePeer(u16 peer_id) +{ + ConnectionCommand c; + c.deletePeer(peer_id); + putCommand(c); +} + void Connection::PrintInfo(std::ostream &out) { - out<<m_socket.GetHandle(); - out<<" "; - out<<"con "<<m_peer_id<<": "; - for(s16 i=0; i<(s16)m_indentation-1; i++) - out<<" "; + out<<getDesc()<<": "; } void Connection::PrintInfo() @@ -1383,5 +1658,10 @@ void Connection::PrintInfo() PrintInfo(dout_con); } +std::string Connection::getDesc() +{ + return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")"; +} + } // namespace diff --git a/src/connection.h b/src/connection.h index 6eb2f2824..570bc92ab 100644 --- a/src/connection.h +++ b/src/connection.h @@ -319,28 +319,6 @@ struct Channel { Channel(); ~Channel(); - /* - Processes a packet with the basic header stripped out. - Parameters: - packetdata: Data in packet (with no base headers) - con: The connection to which the channel is associated - (used for sending back stuff (ACKs)) - peer_id: peer id of the sender of the packet in question - channelnum: channel on which the packet was sent - reliable: true if recursing into a reliable packet - */ - SharedBuffer<u8> ProcessPacket( - SharedBuffer<u8> packetdata, - Connection *con, - u16 peer_id, - u8 channelnum, - bool reliable=false); - - // Returns next data from a buffer if possible - // throws a NoIncomingDataException if no data is available - // If found, sets peer_id - SharedBuffer<u8> CheckIncomingBuffers(Connection *con, - u16 &peer_id); u16 next_outgoing_seqnum; u16 next_incoming_seqnum; @@ -412,78 +390,237 @@ public: // with the id we have given to it bool has_sent_with_id; + float m_sendtime_accu; + float m_max_packets_per_second; + int m_num_sent; + int m_max_num_sent; + private: }; -class Connection +/* + Connection +*/ + +struct OutgoingPacket +{ + u16 peer_id; + u8 channelnum; + SharedBuffer<u8> data; + bool reliable; + + OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_, + bool reliable_): + peer_id(peer_id_), + channelnum(channelnum_), + data(data_), + reliable(reliable_) + { + } +}; + +enum ConnectionEventType{ + CONNEVENT_NONE, + CONNEVENT_DATA_RECEIVED, + CONNEVENT_PEER_ADDED, + CONNEVENT_PEER_REMOVED, +}; + +struct ConnectionEvent +{ + enum ConnectionEventType type; + u16 peer_id; + SharedBuffer<u8> data; + bool timeout; + Address address; + + ConnectionEvent(): type(CONNEVENT_NONE) {} + + std::string describe() + { + switch(type){ + case CONNEVENT_NONE: + return "CONNEVENT_NONE"; + case CONNEVENT_DATA_RECEIVED: + return "CONNEVENT_DATA_RECEIVED"; + case CONNEVENT_PEER_ADDED: + return "CONNEVENT_PEER_ADDED"; + case CONNEVENT_PEER_REMOVED: + return "CONNEVENT_PEER_REMOVED"; + } + return "Invalid ConnectionEvent"; + } + + void dataReceived(u16 peer_id_, SharedBuffer<u8> data_) + { + type = CONNEVENT_DATA_RECEIVED; + peer_id = peer_id_; + data = data_; + } + void peerAdded(u16 peer_id_, Address address_) + { + type = CONNEVENT_PEER_ADDED; + peer_id = peer_id_; + address = address_; + } + void peerRemoved(u16 peer_id_, bool timeout_, Address address_) + { + type = CONNEVENT_PEER_REMOVED; + peer_id = peer_id_; + timeout = timeout_; + address = address_; + } +}; + +enum ConnectionCommandType{ + CONNCMD_NONE, + CONNCMD_SERVE, + CONNCMD_CONNECT, + CONNCMD_DISCONNECT, + CONNCMD_SEND, + CONNCMD_SEND_TO_ALL, + CONNCMD_DELETE_PEER, +}; + +struct ConnectionCommand +{ + enum ConnectionCommandType type; + u16 port; + Address address; + u16 peer_id; + u8 channelnum; + SharedBuffer<u8> data; + bool reliable; + + ConnectionCommand(): type(CONNCMD_NONE) {} + + void serve(u16 port_) + { + type = CONNCMD_SERVE; + port = port_; + } + void connect(Address address_) + { + type = CONNCMD_CONNECT; + address = address_; + } + void disconnect() + { + type = CONNCMD_DISCONNECT; + } + void send(u16 peer_id_, u8 channelnum_, + SharedBuffer<u8> data_, bool reliable_) + { + type = CONNCMD_SEND; + peer_id = peer_id_; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_) + { + type = CONNCMD_SEND_TO_ALL; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + void deletePeer(u16 peer_id_) + { + type = CONNCMD_DELETE_PEER; + peer_id = peer_id_; + } +}; + +class Connection: public SimpleThread { public: - Connection( - u32 protocol_id, - u32 max_packet_size, - float timeout, - PeerHandler *peerhandler - ); + Connection(u32 protocol_id, u32 max_packet_size, float timeout); + Connection(u32 protocol_id, u32 max_packet_size, float timeout, + PeerHandler *peerhandler); ~Connection(); - void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); } - // Start being a server + void * Thread(); + + /* Interface */ + + ConnectionEvent getEvent(); + ConnectionEvent waitEvent(u32 timeout_ms); + void putCommand(ConnectionCommand &c); + + void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; } void Serve(unsigned short port); - // Connect to a server void Connect(Address address); bool Connected(); - void Disconnect(); - - // Sets peer_id - SharedBuffer<u8> GetFromBuffers(u16 &peer_id); - - // The peer_id of sender is stored in peer_id - // Return value: I guess this always throws an exception or - // actually gets data - // May call PeerHandler methods u32 Receive(u16 &peer_id, u8 *data, u32 datasize); - - // These will automatically package the data as an original or split void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable); void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable); - // Send data as a packet; it will be wrapped in base header and - // optionally to a reliable packet. - void SendAsPacket(u16 peer_id, u8 channelnum, + void RunTimeouts(float dtime); // dummy + u16 GetPeerID(){ return m_peer_id; } + Address GetPeerAddress(u16 peer_id); + float GetPeerAvgRTT(u16 peer_id); + void DeletePeer(u16 peer_id); + +private: + void putEvent(ConnectionEvent &e); + void processCommand(ConnectionCommand &c); + void send(float dtime); + void receive(); + void runTimeouts(float dtime); + void serve(u16 port); + void connect(Address address); + void disconnect(); + void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable); + void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable); + void sendAsPacket(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable); - // Sends a raw packet - void RawSend(const BufferedPacket &packet); + void rawSendAsPacket(u16 peer_id, u8 channelnum, + SharedBuffer<u8> data, bool reliable); + void rawSend(const BufferedPacket &packet); + Peer* getPeer(u16 peer_id); + Peer* getPeerNoEx(u16 peer_id); + core::list<Peer*> getPeers(); + bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst); + // Returns next data from a buffer if possible + // If found, returns true; if not, false. + // If found, sets peer_id and dst + bool checkIncomingBuffers(Channel *channel, u16 &peer_id, + SharedBuffer<u8> &dst); + /* + Processes a packet with the basic header stripped out. + Parameters: + packetdata: Data in packet (with no base headers) + peer_id: peer id of the sender of the packet in question + channelnum: channel on which the packet was sent + reliable: true if recursing into a reliable packet + */ + SharedBuffer<u8> processPacket(Channel *channel, + SharedBuffer<u8> packetdata, u16 peer_id, + u8 channelnum, bool reliable); + bool deletePeer(u16 peer_id, bool timeout); - // May call PeerHandler methods - void RunTimeouts(float dtime); - - // Can throw a PeerNotFoundException - Peer* GetPeer(u16 peer_id); - // returns NULL if failed - Peer* GetPeerNoEx(u16 peer_id); - core::list<Peer*> GetPeers(); + Queue<OutgoingPacket> m_outgoing_queue; + MutexedQueue<ConnectionEvent> m_event_queue; + MutexedQueue<ConnectionCommand> m_command_queue; - // Calls PeerHandler::deletingPeer - // Returns false if peer was not found - bool deletePeer(u16 peer_id, bool timeout); + u32 m_protocol_id; + u32 m_max_packet_size; + float m_timeout; + UDPSocket m_socket; + u16 m_peer_id; + + core::map<u16, Peer*> m_peers; + JMutex m_peers_mutex; + // Backwards compatibility + PeerHandler *m_bc_peerhandler; + int m_bc_receive_timeout; + void SetPeerID(u16 id){ m_peer_id = id; } - u16 GetPeerID(){ return m_peer_id; } u32 GetProtocolID(){ return m_protocol_id; } - - // For debug printing void PrintInfo(std::ostream &out); void PrintInfo(); + std::string getDesc(); u16 m_indentation; - -private: - u32 m_protocol_id; - float m_timeout; - PeerHandler *m_peerhandler; - core::map<u16, Peer*> m_peers; - u16 m_peer_id; - //bool m_waiting_new_peer_id; - u32 m_max_packet_size; - UDPSocket m_socket; }; } // namespace diff --git a/src/defaultsettings.cpp b/src/defaultsettings.cpp index 586eaa282..7f0d46a10 100644 --- a/src/defaultsettings.cpp +++ b/src/defaultsettings.cpp @@ -100,7 +100,7 @@ void set_default_settings(Settings *settings) //settings->setDefault("max_simultaneous_block_sends_per_client", "1"); // This causes frametime jitter on client side, or does it? settings->setDefault("max_simultaneous_block_sends_per_client", "2"); - settings->setDefault("max_simultaneous_block_sends_server_total", "2"); + settings->setDefault("max_simultaneous_block_sends_server_total", "8"); settings->setDefault("max_block_send_distance", "7"); settings->setDefault("max_block_generate_distance", "5"); settings->setDefault("time_send_interval", "20"); diff --git a/src/main.cpp b/src/main.cpp index df1347f12..bc44775bd 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -481,6 +481,8 @@ MainGameCallback *g_gamecallback = NULL; // Connection std::ostream *dout_con_ptr = &dummyout; std::ostream *derr_con_ptr = &verbosestream; +//std::ostream *dout_con_ptr = &infostream; +//std::ostream *derr_con_ptr = &errorstream; // Server std::ostream *dout_server_ptr = &infostream; diff --git a/src/map.cpp b/src/map.cpp index ba4130ca2..8aad4e539 100644 --- a/src/map.cpp +++ b/src/map.cpp @@ -21,7 +21,9 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "mapsector.h" #include "mapblock.h" #include "main.h" +#ifndef SERVER #include "client.h" +#endif #include "filesys.h" #include "utility.h" #include "voxel.h" diff --git a/src/server.cpp b/src/server.cpp index 1a441e819..37ba65a95 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1074,7 +1074,7 @@ void Server::start(unsigned short port) m_thread.stop(); // Initialize connection - m_con.setTimeoutMs(30); + m_con.SetTimeoutMs(30); m_con.Serve(port); // Start thread @@ -1823,9 +1823,18 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) JMutexAutoLock envlock(m_env_mutex); JMutexAutoLock conlock(m_con_mutex); - con::Peer *peer; try{ - peer = m_con.GetPeer(peer_id); + Address address = m_con.GetPeerAddress(peer_id); + + // drop player if is ip is banned + if(m_banmanager.isIpBanned(address.serializeString())){ + SendAccessDenied(m_con, peer_id, + L"Your ip is banned. Banned name was " + +narrow_to_wide(m_banmanager.getBanName( + address.serializeString()))); + m_con.DeletePeer(peer_id); + return; + } } catch(con::PeerNotFoundException &e) { @@ -1834,17 +1843,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) return; } - // drop player if is ip is banned - if(m_banmanager.isIpBanned(peer->address.serializeString())){ - SendAccessDenied(m_con, peer_id, - L"Your ip is banned. Banned name was " - +narrow_to_wide(m_banmanager.getBanName( - peer->address.serializeString()))); - m_con.deletePeer(peer_id, false); - return; - } - - u8 peer_ser_ver = getClient(peer->id)->serialization_version; + u8 peer_ser_ver = getClient(peer_id)->serialization_version; try { @@ -1865,7 +1864,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) return; infostream<<"Server: Got TOSERVER_INIT from " - <<peer->id<<std::endl; + <<peer_id<<std::endl; // First byte after command is maximum supported // serialization version @@ -1878,7 +1877,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) deployed = SER_FMT_VER_INVALID; //peer->serialization_version = deployed; - getClient(peer->id)->pending_serialization_version = deployed; + getClient(peer_id)->pending_serialization_version = deployed; if(deployed == SER_FMT_VER_INVALID) { @@ -1900,7 +1899,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) net_proto_version = readU16(&data[2+1+PLAYERNAME_SIZE+PASSWORD_SIZE]); } - getClient(peer->id)->net_proto_version = net_proto_version; + getClient(peer_id)->net_proto_version = net_proto_version; if(net_proto_version == 0) { @@ -2045,11 +2044,11 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) if(command == TOSERVER_INIT2) { infostream<<"Server: Got TOSERVER_INIT2 from " - <<peer->id<<std::endl; + <<peer_id<<std::endl; - getClient(peer->id)->serialization_version - = getClient(peer->id)->pending_serialization_version; + getClient(peer_id)->serialization_version + = getClient(peer_id)->pending_serialization_version; /* Send some initialization data @@ -2059,8 +2058,8 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) SendPlayerInfos(); // Send inventory to player - UpdateCrafting(peer->id); - SendInventory(peer->id); + UpdateCrafting(peer_id); + SendInventory(peer_id); // Send player items to all players SendPlayerItems(); @@ -2074,7 +2073,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) { SharedBuffer<u8> data = makePacket_TOCLIENT_TIME_OF_DAY( m_env.getTimeOfDay()); - m_con.Send(peer->id, 0, data, true); + m_con.Send(peer_id, 0, data, true); } // Send information about server to player in chat @@ -2095,7 +2094,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) } // Warnings about protocol version can be issued here - if(getClient(peer->id)->net_proto_version < PROTOCOL_VERSION) + if(getClient(peer_id)->net_proto_version < PROTOCOL_VERSION) { SendChatMessage(peer_id, L"# Server: WARNING: YOUR CLIENT IS OLD AND MAY WORK PROPERLY WITH THIS SERVER"); } @@ -2402,7 +2401,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) else if(action == 2) { #if 0 - RemoteClient *client = getClient(peer->id); + RemoteClient *client = getClient(peer_id); JMutexAutoLock digmutex(client->m_dig_mutex); client->m_dig_tool_item = -1; #endif @@ -2685,7 +2684,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id) } // Reset build time counter - getClient(peer->id)->m_time_from_building = 0.0; + getClient(peer_id)->m_time_from_building = 0.0; // Create node data MaterialItem *mitem = (MaterialItem*)item; @@ -3428,11 +3427,10 @@ core::list<PlayerInfo> Server::getPlayerInfo() Player *player = *i; try{ - con::Peer *peer = m_con.GetPeer(player->peer_id); - // Copy info from peer to info struct - info.id = peer->id; - info.address = peer->address; - info.avg_rtt = peer->avg_rtt; + // Copy info from connection to info struct + info.id = player->peer_id; + info.address = m_con.GetPeerAddress(player->peer_id); + info.avg_rtt = m_con.GetPeerAvgRTT(player->peer_id); } catch(con::PeerNotFoundException &e) { diff --git a/src/server.h b/src/server.h index dac7e2826..b238bec26 100644 --- a/src/server.h +++ b/src/server.h @@ -468,9 +468,9 @@ public: return m_banmanager.getBanDescription(ip_or_name); } - con::Peer* getPeerNoEx(u16 peer_id) + Address getPeerAddress(u16 peer_id) { - return m_con.GetPeerNoEx(peer_id); + return m_con.GetPeerAddress(peer_id); } // Envlock and conlock should be locked when calling this diff --git a/src/servercommand.cpp b/src/servercommand.cpp index a09003960..156598940 100644 --- a/src/servercommand.cpp +++ b/src/servercommand.cpp @@ -250,20 +250,19 @@ void cmd_banunban(std::wostringstream &os, ServerCommandContext *ctx) os<<L"-!- No such player"; return; } - - con::Peer *peer = ctx->server->getPeerNoEx(player->peer_id); - if(peer == NULL) - { + + try{ + Address address = ctx->server->getPeerAddress(player->peer_id); + std::string ip_string = address.serializeString(); + ctx->server->setIpBanned(ip_string, player->getName()); + os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|" + <<narrow_to_wide(player->getName()); + + actionstream<<ctx->player->getName()<<" bans " + <<player->getName()<<" / "<<ip_string<<std::endl; + } catch(con::PeerNotFoundException){ dstream<<__FUNCTION_NAME<<": peer was not found"<<std::endl; - return; } - std::string ip_string = peer->address.serializeString(); - ctx->server->setIpBanned(ip_string, player->getName()); - os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|" - <<narrow_to_wide(player->getName()); - - actionstream<<ctx->player->getName()<<" bans " - <<player->getName()<<" / "<<ip_string<<std::endl; } else { diff --git a/src/test.cpp b/src/test.cpp index 6b9ef4b6f..c1f04b2ef 100644 --- a/src/test.cpp +++ b/src/test.cpp @@ -819,7 +819,10 @@ struct TestConnection /* Test some real connections + + NOTE: This mostly tests the legacy interface. */ + u32 proto_id = 0xad26846a; Handler hand_server("server"); @@ -843,11 +846,30 @@ struct TestConnection sleep_ms(50); + // Client should not have added client yet + assert(hand_client.count == 0); + + try + { + u16 peer_id; + u8 data[100]; + infostream<<"** running client.Receive()"<<std::endl; + u32 size = client.Receive(peer_id, data, 100); + infostream<<"** Client received: peer_id="<<peer_id + <<", size="<<size + <<std::endl; + } + catch(con::NoIncomingDataException &e) + { + } + // Client should have added server now assert(hand_client.count == 1); assert(hand_client.last_id == 1); - // But server should not have added client + // Server should not have added client yet assert(hand_server.count == 0); + + sleep_ms(50); try { @@ -930,7 +952,7 @@ struct TestConnection } u16 peer_id_client = 2; - +#if 0 { /* Send consequent packets in different order @@ -941,13 +963,13 @@ struct TestConnection SharedBuffer<u8> data2 = SharedBufferFromString("Hello2"); Address client_address = - server.GetPeer(peer_id_client)->address; + server.GetPeerAddress(peer_id_client); infostream<<"*** Sending packets in wrong order (2,1,2)" <<std::endl; u8 chn = 0; - con::Channel *ch = &server.GetPeer(peer_id_client)->channels[chn]; + con::Channel *ch = &server.getPeer(peer_id_client)->channels[chn]; u16 sn = ch->next_outgoing_seqnum; ch->next_outgoing_seqnum = sn+1; server.Send(peer_id_client, chn, data2, true); @@ -1004,6 +1026,7 @@ struct TestConnection } assert(got_exception); } +#endif { const int datasize = 30000; SharedBuffer<u8> data1(datasize); @@ -1022,12 +1045,25 @@ struct TestConnection server.Send(peer_id_client, 0, data1, true); - sleep_ms(50); + sleep_ms(3000); u8 recvdata[datasize + 1000]; infostream<<"** running client.Receive()"<<std::endl; u16 peer_id = 132; - u16 size = client.Receive(peer_id, recvdata, datasize + 1000); + u16 size = 0; + bool received = false; + u32 timems0 = porting::getTimeMs(); + for(;;){ + if(porting::getTimeMs() - timems0 > 5000) + break; + try{ + size = client.Receive(peer_id, recvdata, datasize + 1000); + received = true; + }catch(con::NoIncomingDataException &e){ + } + sleep_ms(10); + } + assert(received); infostream<<"** Client received: peer_id="<<peer_id <<", size="<<size <<std::endl; |