aboutsummaryrefslogtreecommitdiff
path: root/src/connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/connection.cpp')
-rw-r--r--src/connection.cpp170
1 files changed, 106 insertions, 64 deletions
diff --git a/src/connection.cpp b/src/connection.cpp
index 2e126a770..92f9f8ec2 100644
--- a/src/connection.cpp
+++ b/src/connection.cpp
@@ -34,6 +34,14 @@ with this program; if not, write to the Free Software Foundation, Inc.,
namespace con
{
+/******************************************************************************/
+/* defines used for debugging and profiling */
+/******************************************************************************/
+#ifdef NDEBUG
+#define LOG(a) a
+#define PROFILE(a)
+#undef DEBUG_CONNECTION_KBPS
+#else
/* this mutex is used to achieve log message consistency */
JMutex log_message_mutex;
#define LOG(a) \
@@ -41,15 +49,10 @@ JMutex log_message_mutex;
JMutexAutoLock loglock(log_message_mutex); \
a; \
}
-
-/******************************************************************************/
-/* defines used for debugging and profiling */
-/******************************************************************************/
#define PROFILE(a) a
-//#define PROFILE(a)
-
//#define DEBUG_CONNECTION_KBPS
#undef DEBUG_CONNECTION_KBPS
+#endif
static inline float CALC_DTIME(unsigned int lasttime, unsigned int curtime) {
@@ -960,15 +963,12 @@ void Peer::Drop()
UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
Peer(a_address,a_id,connection),
+ m_pending_disconnect(false),
resend_timeout(0.5),
m_legacy_peer(true)
{
}
-UDPPeer::~UDPPeer()
-{
-}
-
bool UDPPeer::getAddress(MTProtocols type,Address& toset)
{
if ((type == UDP) || (type == MINETEST_RELIABLE_UDP) || (type == PRIMARY))
@@ -980,6 +980,15 @@ bool UDPPeer::getAddress(MTProtocols type,Address& toset)
return false;
}
+void UDPPeer::setNonLegacyPeer()
+{
+ m_legacy_peer = false;
+ for(unsigned int i=0; i< CHANNEL_COUNT; i++)
+ {
+ channels->setWindowSize(g_settings->getU16("max_packets_per_iteration"));
+ }
+}
+
void UDPPeer::reportRTT(float rtt)
{
if (rtt < 0.0) {
@@ -1014,6 +1023,9 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
unsigned int max_packet_size)
{
+ if (m_pending_disconnect)
+ return;
+
if ( channels[c.channelnum].queued_commands.empty() &&
/* don't queue more packets then window size */
(channels[c.channelnum].queued_reliables.size()
@@ -1040,6 +1052,9 @@ bool UDPPeer::processReliableSendCommand(
ConnectionCommand &c,
unsigned int max_packet_size)
{
+ if (m_pending_disconnect)
+ return true;
+
u32 chunksize_max = max_packet_size
- BASE_HEADER_SIZE
- RELIABLE_HEADER_SIZE;
@@ -1564,7 +1579,6 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
case CONNCMD_SERVE:
case CONNCMD_CONNECT:
case CONNCMD_DISCONNECT:
- case CONNCMD_DELETE_PEER:
case CONCMD_ACK:
assert("Got command that shouldn't be reliable as reliable command" == 0);
default:
@@ -1606,10 +1620,6 @@ void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
LOG(dout_con<<m_connection->getDesc()<<" UDP processing CONNCMD_SEND_TO_ALL"<<std::endl);
sendToAll(c.channelnum, c.data);
return;
- case CONNCMD_DELETE_PEER:
- LOG(dout_con<<m_connection->getDesc()<<" UDP processing CONNCMD_DELETE_PEER"<<std::endl);
- m_connection->deletePeer(c.peer_id, false);
- return;
case CONCMD_ACK:
LOG(dout_con<<m_connection->getDesc()<<" UDP processing CONCMD_ACK"<<std::endl);
sendAsPacket(c.peer_id,c.channelnum,c.data,true);
@@ -1686,6 +1696,18 @@ void ConnectionSendThread::disconnect_peer(u16 peer_id)
writeU8(&data[0], TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_DISCO);
sendAsPacket(peer_id, 0,data,false);
+
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer)
+ return;
+
+ if (dynamic_cast<UDPPeer*>(&peer) == 0)
+ {
+ return;
+ }
+
+ dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect = true;
}
void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
@@ -1764,6 +1786,8 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
void ConnectionSendThread::sendPackets(float dtime)
{
std::list<u16> peerIds = m_connection->getPeerIDs();
+ std::list<u16> pendingDisconnect;
+ std::map<u16,bool> pending_unreliable;
for(std::list<u16>::iterator
j = peerIds.begin();
@@ -1782,6 +1806,11 @@ void ConnectionSendThread::sendPackets(float dtime)
continue;
}
+ if (dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect)
+ {
+ pendingDisconnect.push_back(*j);
+ }
+
PROFILE(std::stringstream peerIdentifier);
PROFILE(peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << *j << ";RELIABLE]");
PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
@@ -1877,6 +1906,17 @@ void ConnectionSendThread::sendPackets(float dtime)
}
else {
m_outgoing_queue.push_back(packet);
+ pending_unreliable[packet.peer_id] = true;
+ }
+ }
+
+ for(std::list<u16>::iterator
+ k = pendingDisconnect.begin();
+ k != pendingDisconnect.end(); ++k)
+ {
+ if (!pending_unreliable[*k])
+ {
+ m_connection->deletePeer(*k,false);
}
}
}
@@ -1986,11 +2026,10 @@ void * ConnectionReceiveThread::Thread()
// Receive packets from the network and buffers and create ConnectionEvents
void ConnectionReceiveThread::receive()
{
- /* now reorder reliables */
- u32 datasize = m_max_packet_size * 2; // Double it just to be safe
- // TODO: We can not know how many layers of header there are.
- // For now, just assume there are no other than the base headers.
- u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
+ // use IPv6 minimum allowed MTU as receive buffer size as this is
+ // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
+ // infrastructure
+ unsigned int packet_maxsize = 1500;
SharedBuffer<u8> packetdata(packet_maxsize);
bool packet_queued = true;
@@ -2126,7 +2165,7 @@ void ConnectionReceiveThread::receive()
LOG(dout_con<<m_connection->getDesc()
<<" ProcessPacket from peer_id: " << peer_id
- << ",channel: " << channelnum << ", returned "
+ << ",channel: " << (channelnum & 0xFF) << ", returned "
<< resultdata.getSize() << " bytes" <<std::endl);
ConnectionEvent e;
@@ -2262,6 +2301,10 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
}
//put bytes for max bandwidth calculation
channel->UpdateBytesSent(p.data.getSize(),1);
+ if (channel->outgoing_reliables_sent.size() == 0)
+ {
+ m_connection->TriggerSend();
+ }
}
catch(NotFoundException &e){
LOG(derr_con<<m_connection->getDesc()
@@ -2534,7 +2577,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
m_info_mutex(),
m_bc_peerhandler(0),
m_bc_receive_timeout(0),
- m_shutting_down(false)
+ m_shutting_down(false),
+ m_next_remote_peer_id(2)
{
m_udpSocket.setTimeoutMs(5);
@@ -2554,7 +2598,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
m_info_mutex(),
m_bc_peerhandler(peerhandler),
m_bc_receive_timeout(0),
- m_shutting_down(false)
+ m_shutting_down(false),
+ m_next_remote_peer_id(2)
{
m_udpSocket.setTimeoutMs(5);
@@ -2810,11 +2855,6 @@ void Connection::Send(u16 peer_id, u8 channelnum,
putCommand(c);
}
-void Connection::RunTimeouts(float dtime)
-{
- // No-op
-}
-
Address Connection::GetPeerAddress(u16 peer_id)
{
PeerHelper peer = getPeerNoEx(peer_id);
@@ -2838,46 +2878,43 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
// Somebody wants to make a new connection
// Get a unique peer id (2 or higher)
- u16 peer_id_new = 2;
+ u16 peer_id_new = m_next_remote_peer_id;
u16 overflow = MAX_UDP_PEERS;
/*
Find an unused peer id
*/
- bool out_of_ids = false;
- for(;;)
{
- // Check if exists
- if(m_peers.find(peer_id_new) == m_peers.end())
- break;
- // Check for overflow
- if(peer_id_new == overflow){
- out_of_ids = true;
- break;
+ JMutexAutoLock lock(m_peers_mutex);
+ bool out_of_ids = false;
+ for(;;)
+ {
+ // Check if exists
+ if(m_peers.find(peer_id_new) == m_peers.end())
+ break;
+ // Check for overflow
+ if(peer_id_new == overflow){
+ out_of_ids = true;
+ break;
+ }
+ peer_id_new++;
+ }
+ if(out_of_ids){
+ errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
+ return PEER_ID_INEXISTENT;
}
- peer_id_new++;
- }
- if(out_of_ids){
- errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
- return PEER_ID_INEXISTENT;
- }
-
- LOG(dout_con<<getDesc()
- <<"createPeer(): giving peer_id="<<peer_id_new<<std::endl);
- // Create a peer
- Peer *peer = 0;
+ // Create a peer
+ Peer *peer = 0;
+ peer = new UDPPeer(peer_id_new, sender, this);
- peer = new UDPPeer(peer_id_new, sender, this);
+ m_peers[peer->id] = peer;
+ }
- m_peers_mutex.Lock();
- m_peers[peer->id] = peer;
- m_peers_mutex.Unlock();
+ m_next_remote_peer_id = (peer_id_new +1) % MAX_UDP_PEERS;
- // Create peer addition event
- ConnectionEvent e;
- e.peerAdded(peer_id_new, sender);
- putEvent(e);
+ LOG(dout_con<<getDesc()
+ <<"createPeer(): giving peer_id="<<peer_id_new<<std::endl);
ConnectionCommand cmd;
SharedBuffer<u8> reply(4);
@@ -2887,17 +2924,15 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
cmd.createPeer(peer_id_new,reply);
this->putCommand(cmd);
+ // Create peer addition event
+ ConnectionEvent e;
+ e.peerAdded(peer_id_new, sender);
+ putEvent(e);
+
// We're now talking to a valid peer_id
return peer_id_new;
}
-void Connection::DeletePeer(u16 peer_id)
-{
- ConnectionCommand c;
- c.deletePeer(peer_id);
- putCommand(c);
-}
-
void Connection::PrintInfo(std::ostream &out)
{
m_info_mutex.Lock();
@@ -2915,6 +2950,13 @@ const std::string Connection::getDesc()
return std::string("con(")+itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
}
+void Connection::DisconnectPeer(u16 peer_id)
+{
+ ConnectionCommand discon;
+ discon.disconnect_peer(peer_id);
+ putCommand(discon);
+}
+
void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum) {
assert(channelnum < CHANNEL_COUNT);