diff options
Diffstat (limited to 'src/connection.cpp')
-rw-r--r-- | src/connection.cpp | 170 |
1 files changed, 106 insertions, 64 deletions
diff --git a/src/connection.cpp b/src/connection.cpp index 2e126a770..92f9f8ec2 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -34,6 +34,14 @@ with this program; if not, write to the Free Software Foundation, Inc., namespace con { +/******************************************************************************/ +/* defines used for debugging and profiling */ +/******************************************************************************/ +#ifdef NDEBUG +#define LOG(a) a +#define PROFILE(a) +#undef DEBUG_CONNECTION_KBPS +#else /* this mutex is used to achieve log message consistency */ JMutex log_message_mutex; #define LOG(a) \ @@ -41,15 +49,10 @@ JMutex log_message_mutex; JMutexAutoLock loglock(log_message_mutex); \ a; \ } - -/******************************************************************************/ -/* defines used for debugging and profiling */ -/******************************************************************************/ #define PROFILE(a) a -//#define PROFILE(a) - //#define DEBUG_CONNECTION_KBPS #undef DEBUG_CONNECTION_KBPS +#endif static inline float CALC_DTIME(unsigned int lasttime, unsigned int curtime) { @@ -960,15 +963,12 @@ void Peer::Drop() UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : Peer(a_address,a_id,connection), + m_pending_disconnect(false), resend_timeout(0.5), m_legacy_peer(true) { } -UDPPeer::~UDPPeer() -{ -} - bool UDPPeer::getAddress(MTProtocols type,Address& toset) { if ((type == UDP) || (type == MINETEST_RELIABLE_UDP) || (type == PRIMARY)) @@ -980,6 +980,15 @@ bool UDPPeer::getAddress(MTProtocols type,Address& toset) return false; } +void UDPPeer::setNonLegacyPeer() +{ + m_legacy_peer = false; + for(unsigned int i=0; i< CHANNEL_COUNT; i++) + { + channels->setWindowSize(g_settings->getU16("max_packets_per_iteration")); + } +} + void UDPPeer::reportRTT(float rtt) { if (rtt < 0.0) { @@ -1014,6 +1023,9 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data) void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, unsigned int max_packet_size) { + if (m_pending_disconnect) + return; + if ( channels[c.channelnum].queued_commands.empty() && /* don't queue more packets then window size */ (channels[c.channelnum].queued_reliables.size() @@ -1040,6 +1052,9 @@ bool UDPPeer::processReliableSendCommand( ConnectionCommand &c, unsigned int max_packet_size) { + if (m_pending_disconnect) + return true; + u32 chunksize_max = max_packet_size - BASE_HEADER_SIZE - RELIABLE_HEADER_SIZE; @@ -1564,7 +1579,6 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) case CONNCMD_SERVE: case CONNCMD_CONNECT: case CONNCMD_DISCONNECT: - case CONNCMD_DELETE_PEER: case CONCMD_ACK: assert("Got command that shouldn't be reliable as reliable command" == 0); default: @@ -1606,10 +1620,6 @@ void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c) LOG(dout_con<<m_connection->getDesc()<<" UDP processing CONNCMD_SEND_TO_ALL"<<std::endl); sendToAll(c.channelnum, c.data); return; - case CONNCMD_DELETE_PEER: - LOG(dout_con<<m_connection->getDesc()<<" UDP processing CONNCMD_DELETE_PEER"<<std::endl); - m_connection->deletePeer(c.peer_id, false); - return; case CONCMD_ACK: LOG(dout_con<<m_connection->getDesc()<<" UDP processing CONCMD_ACK"<<std::endl); sendAsPacket(c.peer_id,c.channelnum,c.data,true); @@ -1686,6 +1696,18 @@ void ConnectionSendThread::disconnect_peer(u16 peer_id) writeU8(&data[0], TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_DISCO); sendAsPacket(peer_id, 0,data,false); + + PeerHelper peer = m_connection->getPeerNoEx(peer_id); + + if (!peer) + return; + + if (dynamic_cast<UDPPeer*>(&peer) == 0) + { + return; + } + + dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect = true; } void ConnectionSendThread::send(u16 peer_id, u8 channelnum, @@ -1764,6 +1786,8 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c) void ConnectionSendThread::sendPackets(float dtime) { std::list<u16> peerIds = m_connection->getPeerIDs(); + std::list<u16> pendingDisconnect; + std::map<u16,bool> pending_unreliable; for(std::list<u16>::iterator j = peerIds.begin(); @@ -1782,6 +1806,11 @@ void ConnectionSendThread::sendPackets(float dtime) continue; } + if (dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect) + { + pendingDisconnect.push_back(*j); + } + PROFILE(std::stringstream peerIdentifier); PROFILE(peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << *j << ";RELIABLE]"); PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG)); @@ -1877,6 +1906,17 @@ void ConnectionSendThread::sendPackets(float dtime) } else { m_outgoing_queue.push_back(packet); + pending_unreliable[packet.peer_id] = true; + } + } + + for(std::list<u16>::iterator + k = pendingDisconnect.begin(); + k != pendingDisconnect.end(); ++k) + { + if (!pending_unreliable[*k]) + { + m_connection->deletePeer(*k,false); } } } @@ -1986,11 +2026,10 @@ void * ConnectionReceiveThread::Thread() // Receive packets from the network and buffers and create ConnectionEvents void ConnectionReceiveThread::receive() { - /* now reorder reliables */ - u32 datasize = m_max_packet_size * 2; // Double it just to be safe - // TODO: We can not know how many layers of header there are. - // For now, just assume there are no other than the base headers. - u32 packet_maxsize = datasize + BASE_HEADER_SIZE; + // use IPv6 minimum allowed MTU as receive buffer size as this is + // theoretical reliable upper boundary of a udp packet for all IPv6 enabled + // infrastructure + unsigned int packet_maxsize = 1500; SharedBuffer<u8> packetdata(packet_maxsize); bool packet_queued = true; @@ -2126,7 +2165,7 @@ void ConnectionReceiveThread::receive() LOG(dout_con<<m_connection->getDesc() <<" ProcessPacket from peer_id: " << peer_id - << ",channel: " << channelnum << ", returned " + << ",channel: " << (channelnum & 0xFF) << ", returned " << resultdata.getSize() << " bytes" <<std::endl); ConnectionEvent e; @@ -2262,6 +2301,10 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel, } //put bytes for max bandwidth calculation channel->UpdateBytesSent(p.data.getSize(),1); + if (channel->outgoing_reliables_sent.size() == 0) + { + m_connection->TriggerSend(); + } } catch(NotFoundException &e){ LOG(derr_con<<m_connection->getDesc() @@ -2534,7 +2577,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_info_mutex(), m_bc_peerhandler(0), m_bc_receive_timeout(0), - m_shutting_down(false) + m_shutting_down(false), + m_next_remote_peer_id(2) { m_udpSocket.setTimeoutMs(5); @@ -2554,7 +2598,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_info_mutex(), m_bc_peerhandler(peerhandler), m_bc_receive_timeout(0), - m_shutting_down(false) + m_shutting_down(false), + m_next_remote_peer_id(2) { m_udpSocket.setTimeoutMs(5); @@ -2810,11 +2855,6 @@ void Connection::Send(u16 peer_id, u8 channelnum, putCommand(c); } -void Connection::RunTimeouts(float dtime) -{ - // No-op -} - Address Connection::GetPeerAddress(u16 peer_id) { PeerHelper peer = getPeerNoEx(peer_id); @@ -2838,46 +2878,43 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) // Somebody wants to make a new connection // Get a unique peer id (2 or higher) - u16 peer_id_new = 2; + u16 peer_id_new = m_next_remote_peer_id; u16 overflow = MAX_UDP_PEERS; /* Find an unused peer id */ - bool out_of_ids = false; - for(;;) { - // Check if exists - if(m_peers.find(peer_id_new) == m_peers.end()) - break; - // Check for overflow - if(peer_id_new == overflow){ - out_of_ids = true; - break; + JMutexAutoLock lock(m_peers_mutex); + bool out_of_ids = false; + for(;;) + { + // Check if exists + if(m_peers.find(peer_id_new) == m_peers.end()) + break; + // Check for overflow + if(peer_id_new == overflow){ + out_of_ids = true; + break; + } + peer_id_new++; + } + if(out_of_ids){ + errorstream<<getDesc()<<" ran out of peer ids"<<std::endl; + return PEER_ID_INEXISTENT; } - peer_id_new++; - } - if(out_of_ids){ - errorstream<<getDesc()<<" ran out of peer ids"<<std::endl; - return PEER_ID_INEXISTENT; - } - - LOG(dout_con<<getDesc() - <<"createPeer(): giving peer_id="<<peer_id_new<<std::endl); - // Create a peer - Peer *peer = 0; + // Create a peer + Peer *peer = 0; + peer = new UDPPeer(peer_id_new, sender, this); - peer = new UDPPeer(peer_id_new, sender, this); + m_peers[peer->id] = peer; + } - m_peers_mutex.Lock(); - m_peers[peer->id] = peer; - m_peers_mutex.Unlock(); + m_next_remote_peer_id = (peer_id_new +1) % MAX_UDP_PEERS; - // Create peer addition event - ConnectionEvent e; - e.peerAdded(peer_id_new, sender); - putEvent(e); + LOG(dout_con<<getDesc() + <<"createPeer(): giving peer_id="<<peer_id_new<<std::endl); ConnectionCommand cmd; SharedBuffer<u8> reply(4); @@ -2887,17 +2924,15 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) cmd.createPeer(peer_id_new,reply); this->putCommand(cmd); + // Create peer addition event + ConnectionEvent e; + e.peerAdded(peer_id_new, sender); + putEvent(e); + // We're now talking to a valid peer_id return peer_id_new; } -void Connection::DeletePeer(u16 peer_id) -{ - ConnectionCommand c; - c.deletePeer(peer_id); - putCommand(c); -} - void Connection::PrintInfo(std::ostream &out) { m_info_mutex.Lock(); @@ -2915,6 +2950,13 @@ const std::string Connection::getDesc() return std::string("con(")+itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")"; } +void Connection::DisconnectPeer(u16 peer_id) +{ + ConnectionCommand discon; + discon.disconnect_peer(peer_id); + putCommand(discon); +} + void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum) { assert(channelnum < CHANNEL_COUNT); |