aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/defaultsettings.cpp1
-rw-r--r--src/network/CMakeLists.txt1
-rw-r--r--src/network/connection.cpp1555
-rw-r--r--src/network/connection.h195
-rw-r--r--src/network/connectionthreads.cpp1404
-rw-r--r--src/network/connectionthreads.h148
-rw-r--r--src/network/networkpacket.cpp4
-rw-r--r--src/network/networkpacket.h2
-rw-r--r--src/unittest/test_connection.cpp6
-rw-r--r--src/util/pointer.h22
10 files changed, 1669 insertions, 1669 deletions
diff --git a/src/defaultsettings.cpp b/src/defaultsettings.cpp
index ccf81f1e7..502223fcd 100644
--- a/src/defaultsettings.cpp
+++ b/src/defaultsettings.cpp
@@ -290,7 +290,6 @@ void set_default_settings(Settings *settings)
// Network
settings->setDefault("enable_ipv6", "true");
settings->setDefault("ipv6_server", "false");
- settings->setDefault("workaround_window_size","5");
settings->setDefault("max_packets_per_iteration","1024");
settings->setDefault("port", "30000");
settings->setDefault("strict_protocol_version_checking", "false");
diff --git a/src/network/CMakeLists.txt b/src/network/CMakeLists.txt
index 44969105b..dedca9f2b 100644
--- a/src/network/CMakeLists.txt
+++ b/src/network/CMakeLists.txt
@@ -1,6 +1,7 @@
set(common_network_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/address.cpp
${CMAKE_CURRENT_SOURCE_DIR}/connection.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/connectionthreads.cpp
${CMAKE_CURRENT_SOURCE_DIR}/networkpacket.cpp
${CMAKE_CURRENT_SOURCE_DIR}/serverpackethandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/serveropcodes.cpp
diff --git a/src/network/connection.cpp b/src/network/connection.cpp
index 44e403611..01f1a7e63 100644
--- a/src/network/connection.cpp
+++ b/src/network/connection.cpp
@@ -19,10 +19,12 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <iomanip>
#include <cerrno>
+#include <algorithm>
#include "connection.h"
#include "serialization.h"
#include "log.h"
#include "porting.h"
+#include "network/connectionthreads.h"
#include "network/networkpacket.h"
#include "network/peerhandler.h"
#include "util/serialize.h"
@@ -40,7 +42,6 @@ namespace con
#ifdef NDEBUG
#define LOG(a) a
#define PROFILE(a)
-#undef DEBUG_CONNECTION_KBPS
#else
/* this mutex is used to achieve log message consistency */
std::mutex log_message_mutex;
@@ -50,37 +51,14 @@ std::mutex log_message_mutex;
a; \
}
#define PROFILE(a) a
-//#define DEBUG_CONNECTION_KBPS
-#undef DEBUG_CONNECTION_KBPS
#endif
-
-static inline float CALC_DTIME(u64 lasttime, u64 curtime)
-{
- float value = ( curtime - lasttime) / 1000.0;
- return MYMAX(MYMIN(value,0.1),0.0);
-}
-
-#define MAX_UDP_PEERS 65535
-
#define PING_TIMEOUT 5.0
-/* maximum number of retries for reliable packets */
-#define MAX_RELIABLE_RETRY 5
-
-static u16 readPeerId(u8 *packetdata)
-{
- return readU16(&packetdata[4]);
-}
-static u8 readChannel(u8 *packetdata)
-{
- return readU8(&packetdata[6]);
-}
-
-BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
+BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, u16 sender_peer_id, u8 channel)
{
- u32 packet_size = datasize + BASE_HEADER_SIZE;
+ u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
BufferedPacket p(packet_size);
p.address = address;
@@ -88,47 +66,36 @@ BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
writeU16(&p.data[4], sender_peer_id);
writeU8(&p.data[6], channel);
- memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
+ memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
return p;
}
-BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
- u32 protocol_id, u16 sender_peer_id, u8 channel)
-{
- return makePacket(address, *data, data.getSize(),
- protocol_id, sender_peer_id, channel);
-}
-
-SharedBuffer<u8> makeOriginalPacket(
- SharedBuffer<u8> data)
+SharedBuffer<u8> makeOriginalPacket(const SharedBuffer<u8> &data)
{
u32 header_size = 1;
u32 packet_size = data.getSize() + header_size;
SharedBuffer<u8> b(packet_size);
- writeU8(&(b[0]), TYPE_ORIGINAL);
+ writeU8(&(b[0]), PACKET_TYPE_ORIGINAL);
if (data.getSize() > 0) {
memcpy(&(b[header_size]), *data, data.getSize());
}
return b;
}
-std::list<SharedBuffer<u8> > makeSplitPacket(
- SharedBuffer<u8> data,
- u32 chunksize_max,
- u16 seqnum)
+// Split data in chunks and add TYPE_SPLIT headers to them
+void makeSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max, u16 seqnum,
+ std::list<SharedBuffer<u8>> *chunks)
{
// Chunk packets, containing the TYPE_SPLIT header
- std::list<SharedBuffer<u8> > chunks;
-
u32 chunk_header_size = 7;
u32 maximum_data_size = chunksize_max - chunk_header_size;
u32 start = 0;
u32 end = 0;
u32 chunk_num = 0;
u16 chunk_count = 0;
- do{
+ do {
end = start + maximum_data_size - 1;
if (end > data.getSize() - 1)
end = data.getSize() - 1;
@@ -138,44 +105,38 @@ std::list<SharedBuffer<u8> > makeSplitPacket(
SharedBuffer<u8> chunk(packet_size);
- writeU8(&chunk[0], TYPE_SPLIT);
+ writeU8(&chunk[0], PACKET_TYPE_SPLIT);
writeU16(&chunk[1], seqnum);
// [3] u16 chunk_count is written at next stage
writeU16(&chunk[5], chunk_num);
memcpy(&chunk[chunk_header_size], &data[start], payload_size);
- chunks.push_back(chunk);
+ chunks->push_back(chunk);
chunk_count++;
start = end + 1;
chunk_num++;
}
- while(end != data.getSize() - 1);
+ while (end != data.getSize() - 1);
- for (SharedBuffer<u8> &chunk : chunks) {
+ for (SharedBuffer<u8> &chunk : *chunks) {
// Write chunk_count
writeU16(&(chunk[3]), chunk_count);
}
-
- return chunks;
}
-std::list<SharedBuffer<u8> > makeAutoSplitPacket(
- SharedBuffer<u8> data,
- u32 chunksize_max,
- u16 &split_seqnum)
+void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
+ u16 &split_seqnum, std::list<SharedBuffer<u8>> *list)
{
u32 original_header_size = 1;
- std::list<SharedBuffer<u8> > list;
- if (data.getSize() + original_header_size > chunksize_max)
- {
- list = makeSplitPacket(data, chunksize_max, split_seqnum);
+
+ if (data.getSize() + original_header_size > chunksize_max) {
+ makeSplitPacket(data, chunksize_max, split_seqnum, list);
split_seqnum++;
- return list;
+ return;
}
- list.push_back(makeOriginalPacket(data));
- return list;
+ list->push_back(makeOriginalPacket(data));
}
SharedBuffer<u8> makeReliablePacket(
@@ -186,7 +147,7 @@ SharedBuffer<u8> makeReliablePacket(
u32 packet_size = data.getSize() + header_size;
SharedBuffer<u8> b(packet_size);
- writeU8(&b[0], TYPE_RELIABLE);
+ writeU8(&b[0], PACKET_TYPE_RELIABLE);
writeU16(&b[1], seqnum);
memcpy(&b[header_size], *data, data.getSize());
@@ -306,7 +267,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
return;
}
u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
- if (type != TYPE_RELIABLE) {
+ if (type != PACKET_TYPE_RELIABLE) {
errorstream << "ReliablePacketBuffer::insert(): type is not reliable"
<< std::endl;
return;
@@ -439,7 +400,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
This will throw a GotSplitPacketException when a full
split packet is constructed.
*/
-SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
+SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
{
MutexAutoLock listlock(m_map_mutex);
u32 headersize = BASE_HEADER_SIZE + 7;
@@ -452,24 +413,19 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
- if (type != TYPE_SPLIT) {
+ if (type != PACKET_TYPE_SPLIT) {
errorstream << "IncomingSplitBuffer::insert(): type is not split"
<< std::endl;
return SharedBuffer<u8>();
}
// Add if doesn't exist
- if (m_buf.find(seqnum) == m_buf.end())
- {
- IncomingSplitPacket *sp = new IncomingSplitPacket();
- sp->chunk_count = chunk_count;
- sp->reliable = reliable;
- m_buf[seqnum] = sp;
+ if (m_buf.find(seqnum) == m_buf.end()) {
+ m_buf[seqnum] = new IncomingSplitPacket(chunk_count, reliable);
}
IncomingSplitPacket *sp = m_buf[seqnum];
- // TODO: These errors should be thrown or something? Dunno.
if (chunk_count != sp->chunk_count)
LOG(derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
<<" != sp->chunk_count="<<sp->chunk_count
@@ -507,13 +463,11 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
// Copy chunks to data buffer
u32 start = 0;
- for(u32 chunk_i=0; chunk_i<sp->chunk_count;
- chunk_i++)
- {
- SharedBuffer<u8> buf = sp->chunks[chunk_i];
- u16 chunkdatasize = buf.getSize();
- memcpy(&fulldata[start], *buf, chunkdatasize);
- start += chunkdatasize;;
+ for (u32 chunk_i=0; chunk_i<sp->chunk_count; chunk_i++) {
+ const SharedBuffer<u8> &buf = sp->chunks[chunk_i];
+ u16 buf_chunkdatasize = buf.getSize();
+ memcpy(&fulldata[start], *buf, buf_chunkdatasize);
+ start += buf_chunkdatasize;
}
// Remove sp from buffer
@@ -524,7 +478,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
}
void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
{
- std::list<u16> remove_queue;
+ std::deque<u16> remove_queue;
{
MutexAutoLock listlock(m_map_mutex);
for (auto &i : m_buf) {
@@ -838,7 +792,8 @@ Peer* PeerHelper::operator&() const
return m_peer;
}
-bool PeerHelper::operator!() {
+bool PeerHelper::operator!()
+{
return ! m_peer;
}
@@ -1002,7 +957,7 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
if (m_ping_timer >= PING_TIMEOUT)
{
// Create and send PING packet
- writeU8(&data[0], TYPE_CONTROL);
+ writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_PING);
m_ping_timer = 0.0;
return true;
@@ -1048,15 +1003,13 @@ bool UDPPeer::processReliableSendCommand(
sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
- std::list<SharedBuffer<u8> > originals;
+ std::list<SharedBuffer<u8>> originals;
u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
- if (c.raw)
- {
+ if (c.raw) {
originals.emplace_back(c.data);
- }
- else {
- originals = makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number);
+ } else {
+ makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
}
@@ -1183,1381 +1136,11 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
channels[channel].setNextSplitSeqNum(seqnum);
}
-SharedBuffer<u8> UDPPeer::addSpiltPacket(u8 channel,
- BufferedPacket toadd,
- bool reliable)
+SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
+ bool reliable)
{
assert(channel < CHANNEL_COUNT); // Pre-condition
- return channels[channel].incoming_splits.insert(toadd,reliable);
-}
-
-/******************************************************************************/
-/* Connection Threads */
-/******************************************************************************/
-
-ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
- float timeout) :
- Thread("ConnectionSend"),
- m_max_packet_size(max_packet_size),
- m_timeout(timeout),
- m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
-{
-}
-
-void * ConnectionSendThread::run()
-{
- assert(m_connection);
-
- LOG(dout_con<<m_connection->getDesc()
- <<"ConnectionSend thread started"<<std::endl);
-
- u64 curtime = porting::getTimeMs();
- u64 lasttime = curtime;
-
- PROFILE(std::stringstream ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
-
- /* if stop is requested don't stop immediately but try to send all */
- /* packets first */
- while(!stopRequested() || packetsQueued()) {
- BEGIN_DEBUG_EXCEPTION_HANDLER
- PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
-
- m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
-
- /* wait for trigger or timeout */
- m_send_sleep_semaphore.wait(50);
-
- /* remove all triggers */
- while(m_send_sleep_semaphore.wait(0)) {}
-
- lasttime = curtime;
- curtime = porting::getTimeMs();
- float dtime = CALC_DTIME(lasttime,curtime);
-
- /* first do all the reliable stuff */
- runTimeouts(dtime);
-
- /* translate commands to packets */
- ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
- while(c.type != CONNCMD_NONE)
- {
- if (c.reliable)
- processReliableCommand(c);
- else
- processNonReliableCommand(c);
-
- c = m_connection->m_command_queue.pop_frontNoEx(0);
- }
-
- /* send non reliable packets */
- sendPackets(dtime);
-
- END_DEBUG_EXCEPTION_HANDLER
- }
-
- PROFILE(g_profiler->remove(ThreadIdentifier.str()));
- return NULL;
-}
-
-void ConnectionSendThread::Trigger()
-{
- m_send_sleep_semaphore.post();
-}
-
-bool ConnectionSendThread::packetsQueued()
-{
- std::list<u16> peerIds = m_connection->getPeerIDs();
-
- if (!m_outgoing_queue.empty() && !peerIds.empty())
- return true;
-
- for (u16 peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
-
- if (!peer)
- continue;
-
- if (dynamic_cast<UDPPeer*>(&peer) == 0)
- continue;
-
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
- if (!channel.queued_commands.empty()) {
- return true;
- }
- }
- }
-
-
- return false;
-}
-
-void ConnectionSendThread::runTimeouts(float dtime)
-{
- std::list<u16> timeouted_peers;
- std::list<u16> peerIds = m_connection->getPeerIDs();
-
- for (u16 &peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
-
- if (!peer)
- continue;
-
- if (dynamic_cast<UDPPeer*>(&peer) == 0)
- continue;
-
- PROFILE(std::stringstream peerIdentifier);
- PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
- << ";" << peerId << ";RELIABLE]");
- PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
-
- SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
-
- /*
- Check peer timeout
- */
- if (peer->isTimedOut(m_timeout))
- {
- infostream<<m_connection->getDesc()
- <<"RunTimeouts(): Peer "<<peer->id
- <<" has timed out."
- <<" (source=peer->timeout_counter)"
- <<std::endl;
- // Add peer to the list
- timeouted_peers.push_back(peer->id);
- // Don't bother going through the buffers of this one
- continue;
- }
-
- float resend_timeout = dynamic_cast<UDPPeer*>(&peer)->getResendTimeout();
- bool retry_count_exceeded = false;
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
- std::list<BufferedPacket> timed_outs;
-
- if (dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer())
- channel.setWindowSize(g_settings->getU16("workaround_window_size"));
-
- // Remove timed out incomplete unreliable split packets
- channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
-
- // Increment reliable packet times
- channel.outgoing_reliables_sent.incrementTimeouts(dtime);
-
- unsigned int numpeers = m_connection->m_peers.size();
-
- if (numpeers == 0)
- return;
-
- // Re-send timed out outgoing reliables
- timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
- (m_max_data_packets_per_iteration/numpeers));
-
- channel.UpdatePacketLossCounter(timed_outs.size());
- g_profiler->graphAdd("packets_lost", timed_outs.size());
-
- m_iteration_packets_avaialble -= timed_outs.size();
-
- for(std::list<BufferedPacket>::iterator k = timed_outs.begin();
- k != timed_outs.end(); ++k)
- {
- u16 peer_id = readPeerId(*(k->data));
- u8 channelnum = readChannel(*(k->data));
- u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE+1]));
-
- channel.UpdateBytesLost(k->data.getSize());
- k->resend_count++;
-
- if (k-> resend_count > MAX_RELIABLE_RETRY) {
- retry_count_exceeded = true;
- timeouted_peers.push_back(peer->id);
- /* no need to check additional packets if a single one did timeout*/
- break;
- }
-
- LOG(derr_con<<m_connection->getDesc()
- <<"RE-SENDING timed-out RELIABLE to "
- << k->address.serializeString()
- << "(t/o="<<resend_timeout<<"): "
- <<"from_peer_id="<<peer_id
- <<", channel="<<((int)channelnum&0xff)
- <<", seqnum="<<seqnum
- <<std::endl);
-
- rawSend(*k);
-
- // do not handle rtt here as we can't decide if this packet was
- // lost or really takes more time to transmit
- }
-
- if (retry_count_exceeded) {
- break; /* no need to check other channels if we already did timeout */
- }
-
- channel.UpdateTimers(dtime,dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer());
- }
-
- /* skip to next peer if we did timeout */
- if (retry_count_exceeded)
- continue;
-
- /* send ping if necessary */
- if (dynamic_cast<UDPPeer*>(&peer)->Ping(dtime,data)) {
- LOG(dout_con<<m_connection->getDesc()
- <<"Sending ping for peer_id: "
- << dynamic_cast<UDPPeer*>(&peer)->id <<std::endl);
- /* this may fail if there ain't a sequence number left */
- if (!rawSendAsPacket(dynamic_cast<UDPPeer*>(&peer)->id, 0, data, true))
- {
- //retrigger with reduced ping interval
- dynamic_cast<UDPPeer*>(&peer)->Ping(4.0,data);
- }
- }
-
- dynamic_cast<UDPPeer*>(&peer)->RunCommandQueues(m_max_packet_size,
- m_max_commands_per_iteration,
- m_max_packets_requeued);
- }
-
- // Remove timed out peers
- for (u16 timeouted_peer : timeouted_peers) {
- LOG(derr_con << m_connection->getDesc()
- << "RunTimeouts(): Removing peer "<< timeouted_peer <<std::endl);
- m_connection->deletePeer(timeouted_peer, true);
- }
-}
-
-void ConnectionSendThread::rawSend(const BufferedPacket &packet)
-{
- try{
- m_connection->m_udpSocket.Send(packet.address, *packet.data,
- packet.data.getSize());
- LOG(dout_con <<m_connection->getDesc()
- << " rawSend: " << packet.data.getSize()
- << " bytes sent" << std::endl);
- } catch(SendFailedException &e) {
- LOG(derr_con<<m_connection->getDesc()
- <<"Connection::rawSend(): SendFailedException: "
- <<packet.address.serializeString()<<std::endl);
- }
-}
-
-void ConnectionSendThread::sendAsPacketReliable(BufferedPacket& p, Channel* channel)
-{
- try{
- p.absolute_send_time = porting::getTimeMs();
- // Buffer the packet
- channel->outgoing_reliables_sent.insert(p,
- (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
- % (MAX_RELIABLE_WINDOW_SIZE+1));
- }
- catch(AlreadyExistsException &e)
- {
- LOG(derr_con<<m_connection->getDesc()
- <<"WARNING: Going to send a reliable packet"
- <<" in outgoing buffer" <<std::endl);
- }
-
- // Send the packet
- rawSend(p);
-}
-
-bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable)
-{
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer) {
- LOG(dout_con<<m_connection->getDesc()
- <<" INFO: dropped packet for non existent peer_id: "
- << peer_id << std::endl);
- FATAL_ERROR_IF(!reliable, "Trying to send raw packet reliable but no peer found!");
- return false;
- }
- Channel *channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[channelnum]);
-
- if (reliable)
- {
- bool have_sequence_number_for_raw_packet = true;
- u16 seqnum =
- channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
-
- if (!have_sequence_number_for_raw_packet)
- return false;
-
- SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
- Address peer_address;
- peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
-
- // Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, reliable,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
-
- // first check if our send window is already maxed out
- if (channel->outgoing_reliables_sent.size()
- < channel->getWindowSize()) {
- LOG(dout_con<<m_connection->getDesc()
- <<" INFO: sending a reliable packet to peer_id " << peer_id
- <<" channel: " << channelnum
- <<" seqnum: " << seqnum << std::endl);
- sendAsPacketReliable(p,channel);
- return true;
- }
-
- LOG(dout_con<<m_connection->getDesc()
- <<" INFO: queueing reliable packet for peer_id: " << peer_id
- <<" channel: " << channelnum
- <<" seqnum: " << seqnum << std::endl);
- channel->queued_reliables.push(p);
- return false;
- }
-
- Address peer_address;
- if (peer->getAddress(MTP_UDP, peer_address)) {
- // Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, data,
- m_connection->GetProtocolID(), m_connection->GetPeerID(),
- channelnum);
-
- // Send the packet
- rawSend(p);
- return true;
- }
-
- LOG(dout_con << m_connection->getDesc()
- << " INFO: dropped unreliable packet for peer_id: " << peer_id
- << " because of (yet) missing udp address" << std::endl);
- return false;
-}
-
-void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
-{
- assert(c.reliable); // Pre-condition
-
- switch(c.type) {
- case CONNCMD_NONE:
- LOG(dout_con<<m_connection->getDesc()
- <<"UDP processing reliable CONNCMD_NONE"<<std::endl);
- return;
-
- case CONNCMD_SEND:
- LOG(dout_con<<m_connection->getDesc()
- <<"UDP processing reliable CONNCMD_SEND"<<std::endl);
- sendReliable(c);
- return;
-
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con<<m_connection->getDesc()
- <<"UDP processing CONNCMD_SEND_TO_ALL"<<std::endl);
- sendToAllReliable(c);
- return;
-
- case CONCMD_CREATE_PEER:
- LOG(dout_con<<m_connection->getDesc()
- <<"UDP processing reliable CONCMD_CREATE_PEER"<<std::endl);
- if (!rawSendAsPacket(c.peer_id,c.channelnum,c.data,c.reliable))
- {
- /* put to queue if we couldn't send it immediately */
- sendReliable(c);
- }
- return;
-
- case CONCMD_DISABLE_LEGACY:
- LOG(dout_con<<m_connection->getDesc()
- <<"UDP processing reliable CONCMD_DISABLE_LEGACY"<<std::endl);
- if (!rawSendAsPacket(c.peer_id,c.channelnum,c.data,c.reliable))
- {
- /* put to queue if we couldn't send it immediately */
- sendReliable(c);
- }
- return;
-
- case CONNCMD_SERVE:
- case CONNCMD_CONNECT:
- case CONNCMD_DISCONNECT:
- case CONCMD_ACK:
- FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
- default:
- LOG(dout_con<<m_connection->getDesc()
- <<" Invalid reliable command type: " << c.type <<std::endl);
- }
-}
-
-
-void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
-{
- assert(!c.reliable); // Pre-condition
-
- switch(c.type) {
- case CONNCMD_NONE:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONNCMD_NONE"<<std::endl);
- return;
- case CONNCMD_SERVE:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONNCMD_SERVE port="
- <<c.address.serializeString()<<std::endl);
- serve(c.address);
- return;
- case CONNCMD_CONNECT:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONNCMD_CONNECT"<<std::endl);
- connect(c.address);
- return;
- case CONNCMD_DISCONNECT:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONNCMD_DISCONNECT"<<std::endl);
- disconnect();
- return;
- case CONNCMD_DISCONNECT_PEER:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONNCMD_DISCONNECT_PEER"<<std::endl);
- disconnect_peer(c.peer_id);
- return;
- case CONNCMD_SEND:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONNCMD_SEND"<<std::endl);
- send(c.peer_id, c.channelnum, c.data);
- return;
- case CONNCMD_SEND_TO_ALL:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONNCMD_SEND_TO_ALL"<<std::endl);
- sendToAll(c.channelnum, c.data);
- return;
- case CONCMD_ACK:
- LOG(dout_con<<m_connection->getDesc()
- <<" UDP processing CONCMD_ACK"<<std::endl);
- sendAsPacket(c.peer_id,c.channelnum,c.data,true);
- return;
- case CONCMD_CREATE_PEER:
- FATAL_ERROR("Got command that should be reliable as unreliable command");
- default:
- LOG(dout_con<<m_connection->getDesc()
- <<" Invalid command type: " << c.type <<std::endl);
- }
-}
-
-void ConnectionSendThread::serve(Address bind_address)
-{
- LOG(dout_con<<m_connection->getDesc()
- <<"UDP serving at port " << bind_address.serializeString() <<std::endl);
- try{
- m_connection->m_udpSocket.Bind(bind_address);
- m_connection->SetPeerID(PEER_ID_SERVER);
- }
- catch(SocketException &e) {
- // Create event
- ConnectionEvent ce;
- ce.bindFailed();
- m_connection->putEvent(ce);
- }
-}
-
-void ConnectionSendThread::connect(Address address)
-{
- LOG(dout_con<<m_connection->getDesc()<<" connecting to "<<address.serializeString()
- <<":"<<address.getPort()<<std::endl);
-
- UDPPeer *peer = m_connection->createServerPeer(address);
-
- // Create event
- ConnectionEvent e;
- e.peerAdded(peer->id, peer->address);
- m_connection->putEvent(e);
-
- Address bind_addr;
-
- if (address.isIPv6())
- bind_addr.setAddress((IPv6AddressBytes*) NULL);
- else
- bind_addr.setAddress(0,0,0,0);
-
- m_connection->m_udpSocket.Bind(bind_addr);
-
- // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
- m_connection->SetPeerID(PEER_ID_INEXISTENT);
- NetworkPacket pkt(0,0);
- m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
-}
-
-void ConnectionSendThread::disconnect()
-{
- LOG(dout_con<<m_connection->getDesc()<<" disconnecting"<<std::endl);
-
- // Create and send DISCO packet
- SharedBuffer<u8> data(2);
- writeU8(&data[0], TYPE_CONTROL);
- writeU8(&data[1], CONTROLTYPE_DISCO);
-
-
- // Send to all
- std::list<u16> peerids = m_connection->getPeerIDs();
-
- for (u16 peerid : peerids) {
- sendAsPacket(peerid, 0,data,false);
- }
-}
-
-void ConnectionSendThread::disconnect_peer(u16 peer_id)
-{
- LOG(dout_con<<m_connection->getDesc()<<" disconnecting peer"<<std::endl);
-
- // Create and send DISCO packet
- SharedBuffer<u8> data(2);
- writeU8(&data[0], TYPE_CONTROL);
- writeU8(&data[1], CONTROLTYPE_DISCO);
- sendAsPacket(peer_id, 0,data,false);
-
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
-
- if (!peer)
- return;
-
- if (dynamic_cast<UDPPeer*>(&peer) == 0)
- {
- return;
- }
-
- dynamic_cast<UDPPeer*>(&peer)->m_pending_disconnect = true;
-}
-
-void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data)
-{
- assert(channelnum < CHANNEL_COUNT); // Pre-condition
-
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
- if (!peer)
- {
- LOG(dout_con<<m_connection->getDesc()<<" peer: peer_id="<<peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() <<std::endl);
- return;
- }
-
- LOG(dout_con<<m_connection->getDesc()<<" sending to peer_id="<<peer_id
- << ", channel " << (channelnum % 0xFF)
- << ", size: " << data.getSize() <<std::endl);
-
- u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
-
- u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
- std::list<SharedBuffer<u8> > originals;
-
- originals = makeAutoSplitPacket(data, chunksize_max,split_sequence_number);
-
- peer->setNextSplitSequenceNumber(channelnum,split_sequence_number);
-
- for (const SharedBuffer<u8> &original : originals) {
- sendAsPacket(peer_id, channelnum, original);
- }
-}
-
-void ConnectionSendThread::sendReliable(ConnectionCommand &c)
-{
- PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
- if (!peer)
- return;
-
- peer->PutReliableSendCommand(c,m_max_packet_size);
-}
-
-void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
-{
- std::list<u16> peerids = m_connection->getPeerIDs();
-
- for (u16 peerid : peerids) {
- send(peerid, channelnum, data);
- }
-}
-
-void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
-{
- std::list<u16> peerids = m_connection->getPeerIDs();
-
- for (u16 peerid : peerids) {
- PeerHelper peer = m_connection->getPeerNoEx(peerid);
-
- if (!peer)
- continue;
-
- peer->PutReliableSendCommand(c,m_max_packet_size);
- }
-}
-
-void ConnectionSendThread::sendPackets(float dtime)
-{
- std::list<u16> peerIds = m_connection->getPeerIDs();
- std::list<u16> pendingDisconnect;
- std::map<u16,bool> pending_unreliable;
-
- for (u16 peerId : peerIds) {
- PeerHelper peer = m_connection->getPeerNoEx(peerId);
- //peer may have been removed
- if (!peer) {
- LOG(dout_con<<m_connection->getDesc()<< " Peer not found: peer_id=" << peerId
- << std::endl);
- continue;
- }
- peer->m_increment_packets_remaining = m_iteration_packets_avaialble/m_connection->m_peers.size();
-
- UDPPeer *udpPeer = dynamic_cast<UDPPeer*>(&peer);
-
- if (!udpPeer) {
- continue;
- }
-
- if (udpPeer->m_pending_disconnect) {
- pendingDisconnect.push_back(peerId);
- }
-
- PROFILE(std::stringstream peerIdentifier);
- PROFILE(peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
- << ";RELIABLE]");
- PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
-
- LOG(dout_con<<m_connection->getDesc()
- << " Handle per peer queues: peer_id=" << peerId
- << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
-
- // first send queued reliable packets for all peers (if possible)
- for (unsigned int i=0; i < CHANNEL_COUNT; i++) {
- Channel &channel = udpPeer->channels[i];
- u16 next_to_ack = 0;
-
- channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
- u16 next_to_receive = 0;
- channel.incoming_reliables.getFirstSeqnum(next_to_receive);
-
- LOG(dout_con<<m_connection->getDesc()<< "\t channel: "
- << i << ", peer quota:"
- << peer->m_increment_packets_remaining
- << std::endl
- << "\t\t\treliables on wire: "
- << channel.outgoing_reliables_sent.size()
- << ", waiting for ack for " << next_to_ack
- << std::endl
- << "\t\t\tincoming_reliables: "
- << channel.incoming_reliables.size()
- << ", next reliable packet: "
- << channel.readNextIncomingSeqNum()
- << ", next queued: " << next_to_receive
- << std::endl
- << "\t\t\treliables queued : "
- << channel.queued_reliables.size()
- << std::endl
- << "\t\t\tqueued commands : "
- << channel.queued_commands.size()
- << std::endl);
-
- while ((!channel.queued_reliables.empty()) &&
- (channel.outgoing_reliables_sent.size()
- < channel.getWindowSize())&&
- (peer->m_increment_packets_remaining > 0))
- {
- BufferedPacket p = channel.queued_reliables.front();
- channel.queued_reliables.pop();
- LOG(dout_con<<m_connection->getDesc()
- <<" INFO: sending a queued reliable packet "
- <<" channel: " << i
- <<", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
- << std::endl);
- sendAsPacketReliable(p, &channel);
- peer->m_increment_packets_remaining--;
- }
- }
- }
-
- if (!m_outgoing_queue.empty()) {
- LOG(dout_con<<m_connection->getDesc()
- << " Handle non reliable queue ("
- << m_outgoing_queue.size() << " pkts)" << std::endl);
- }
-
- unsigned int initial_queuesize = m_outgoing_queue.size();
- /* send non reliable packets*/
- for(unsigned int i=0;i < initial_queuesize;i++) {
- OutgoingPacket packet = m_outgoing_queue.front();
- m_outgoing_queue.pop();
-
- if (packet.reliable)
- continue;
-
- PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
- if (!peer) {
- LOG(dout_con<<m_connection->getDesc()
- <<" Outgoing queue: peer_id="<<packet.peer_id
- << ">>>NOT<<< found on sending packet"
- << ", channel " << (packet.channelnum % 0xFF)
- << ", size: " << packet.data.getSize() <<std::endl);
- continue;
- }
-
- /* send acks immediately */
- if (packet.ack) {
- rawSendAsPacket(packet.peer_id, packet.channelnum,
- packet.data, packet.reliable);
- peer->m_increment_packets_remaining =
- MYMIN(0,peer->m_increment_packets_remaining--);
- }
- else if (
- ( peer->m_increment_packets_remaining > 0) ||
- (stopRequested())) {
- rawSendAsPacket(packet.peer_id, packet.channelnum,
- packet.data, packet.reliable);
- peer->m_increment_packets_remaining--;
- }
- else {
- m_outgoing_queue.push(packet);
- pending_unreliable[packet.peer_id] = true;
- }
- }
-
- for (u16 peerId : pendingDisconnect) {
- if (!pending_unreliable[peerId])
- {
- m_connection->deletePeer(peerId,false);
- }
- }
-}
-
-void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool ack)
-{
- OutgoingPacket packet(peer_id, channelnum, data, false, ack);
- m_outgoing_queue.push(packet);
-}
-
-ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
- Thread("ConnectionReceive")
-{
-}
-
-void * ConnectionReceiveThread::run()
-{
- assert(m_connection);
-
- LOG(dout_con<<m_connection->getDesc()
- <<"ConnectionReceive thread started"<<std::endl);
-
- PROFILE(std::stringstream ThreadIdentifier);
- PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
-
-#ifdef DEBUG_CONNECTION_KBPS
- u64 curtime = porting::getTimeMs();
- u64 lasttime = curtime;
- float debug_print_timer = 0.0;
-#endif
-
- while(!stopRequested()) {
- BEGIN_DEBUG_EXCEPTION_HANDLER
- PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
-
-#ifdef DEBUG_CONNECTION_KBPS
- lasttime = curtime;
- curtime = porting::getTimeMs();
- float dtime = CALC_DTIME(lasttime,curtime);
-#endif
-
- /* receive packets */
- receive();
-
-#ifdef DEBUG_CONNECTION_KBPS
- debug_print_timer += dtime;
- if (debug_print_timer > 20.0) {
- debug_print_timer -= 20.0;
-
- std::list<u16> peerids = m_connection->getPeerIDs();
-
- for (std::list<u16>::iterator i = peerids.begin();
- i != peerids.end();
- i++)
- {
- PeerHelper peer = m_connection->getPeerNoEx(*i);
- if (!peer)
- continue;
-
- float peer_current = 0.0;
- float peer_loss = 0.0;
- float avg_rate = 0.0;
- float avg_loss = 0.0;
-
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
- peer_current +=peer->channels[j].getCurrentDownloadRateKB();
- peer_loss += peer->channels[j].getCurrentLossRateKB();
- avg_rate += peer->channels[j].getAvgDownloadRateKB();
- avg_loss += peer->channels[j].getAvgLossRateKB();
- }
-
- std::stringstream output;
- output << std::fixed << std::setprecision(1);
- output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
- output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
- output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
- output << std::setfill(' ');
- for(u16 j=0; j<CHANNEL_COUNT; j++)
- {
- output << "\tcha " << j << ":"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
- << " /"
- << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
- << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
- << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
- << " / WS: " << peer->channels[j].getWindowSize()
- << std::endl;
- }
-
- fprintf(stderr,"%s\n",output.str().c_str());
- }
- }
-#endif
- END_DEBUG_EXCEPTION_HANDLER
- }
-
- PROFILE(g_profiler->remove(ThreadIdentifier.str()));
- return NULL;
-}
-
-// Receive packets from the network and buffers and create ConnectionEvents
-void ConnectionReceiveThread::receive()
-{
- // use IPv6 minimum allowed MTU as receive buffer size as this is
- // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
- // infrastructure
- unsigned int packet_maxsize = 1500;
- SharedBuffer<u8> packetdata(packet_maxsize);
-
- bool packet_queued = true;
-
- unsigned int loop_count = 0;
-
- /* first of all read packets from socket */
- /* check for incoming data available */
- while( (loop_count < 10) &&
- (m_connection->m_udpSocket.WaitData(50))) {
- loop_count++;
- try {
- if (packet_queued) {
- bool data_left = true;
- u16 peer_id;
- SharedBuffer<u8> resultdata;
- while(data_left) {
- try {
- data_left = getFromBuffers(peer_id, resultdata);
- if (data_left) {
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(e);
- }
- }
- catch(ProcessedSilentlyException &e) {
- /* try reading again */
- }
- }
- packet_queued = false;
- }
-
- Address sender;
- s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata, packet_maxsize);
-
- if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID()))
- {
- LOG(derr_con<<m_connection->getDesc()
- <<"Receive(): Invalid incoming packet, "
- <<"size: " << received_size
- <<", protocol: "
- << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
- << std::endl);
- continue;
- }
-
- u16 peer_id = readPeerId(*packetdata);
- u8 channelnum = readChannel(*packetdata);
-
- if (channelnum > CHANNEL_COUNT-1) {
- LOG(derr_con<<m_connection->getDesc()
- <<"Receive(): Invalid channel "<<channelnum<<std::endl);
- throw InvalidIncomingDataException("Channel doesn't exist");
- }
-
- /* Try to identify peer by sender address (may happen on join) */
- if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->lookupPeer(sender);
- // We do not have to remind the peer of its
- // peer id as the CONTROLTYPE_SET_PEER_ID
- // command was sent reliably.
- }
-
- /* The peer was not found in our lists. Add it. */
- if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
- }
-
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
-
- if (!peer) {
- LOG(dout_con<<m_connection->getDesc()
- <<" got packet from unknown peer_id: "
- <<peer_id<<" Ignoring."<<std::endl);
- continue;
- }
-
- // Validate peer address
-
- Address peer_address;
-
- if (peer->getAddress(MTP_UDP, peer_address)) {
- if (peer_address != sender) {
- LOG(derr_con<<m_connection->getDesc()
- <<m_connection->getDesc()
- <<" Peer "<<peer_id<<" sending from different address."
- " Ignoring."<<std::endl);
- continue;
- }
- }
- else {
-
- bool invalid_address = true;
- if (invalid_address) {
- LOG(derr_con<<m_connection->getDesc()
- <<m_connection->getDesc()
- <<" Peer "<<peer_id<<" unknown."
- " Ignoring."<<std::endl);
- continue;
- }
- }
-
- peer->ResetTimeout();
-
- Channel *channel = 0;
-
- if (dynamic_cast<UDPPeer*>(&peer) != 0)
- {
- channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[channelnum]);
- }
-
- if (channel != 0) {
- channel->UpdateBytesReceived(received_size);
- }
-
- // Throw the received packet to channel->processPacket()
-
- // Make a new SharedBuffer from the data without the base headers
- SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
- memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
- strippeddata.getSize());
-
- try{
- // Process it (the result is some data with no headers made by us)
- SharedBuffer<u8> resultdata = processPacket
- (channel, strippeddata, peer_id, channelnum, false);
-
- LOG(dout_con<<m_connection->getDesc()
- <<" ProcessPacket from peer_id: " << peer_id
- << ",channel: " << (channelnum & 0xFF) << ", returned "
- << resultdata.getSize() << " bytes" <<std::endl);
-
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(e);
- }
- catch(ProcessedSilentlyException &e) {
- }
- catch(ProcessedQueued &e) {
- packet_queued = true;
- }
- }
- catch(InvalidIncomingDataException &e) {
- }
- catch(ProcessedSilentlyException &e) {
- }
- }
-}
-
-bool ConnectionReceiveThread::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
-{
- std::list<u16> peerids = m_connection->getPeerIDs();
-
- for (u16 peerid : peerids) {
- PeerHelper peer = m_connection->getPeerNoEx(peerid);
- if (!peer)
- continue;
-
- if (dynamic_cast<UDPPeer*>(&peer) == 0)
- continue;
-
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
- if (checkIncomingBuffers(&channel, peer_id, dst)) {
- return true;
- }
- }
- }
- return false;
-}
-
-bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
- u16 &peer_id, SharedBuffer<u8> &dst)
-{
- u16 firstseqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(firstseqnum))
- {
- if (firstseqnum == channel->readNextIncomingSeqNum())
- {
- BufferedPacket p = channel->incoming_reliables.popFirst();
- peer_id = readPeerId(*p.data);
- u8 channelnum = readChannel(*p.data);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
-
- LOG(dout_con<<m_connection->getDesc()
- <<"UNBUFFERING TYPE_RELIABLE"
- <<" seqnum="<<seqnum
- <<" peer_id="<<peer_id
- <<" channel="<<((int)channelnum&0xff)
- <<std::endl);
-
- channel->incNextIncomingSeqNum();
-
- u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(p.data.getSize() - headers_size);
- memcpy(*payload, &p.data[headers_size], payload.getSize());
-
- dst = processPacket(channel, payload, peer_id, channelnum, true);
- return true;
- }
- }
- return false;
-}
-
-SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
- SharedBuffer<u8> packetdata, u16 peer_id, u8 channelnum, bool reliable)
-{
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
-
- if (!peer) {
- errorstream << "Peer not found (possible timeout)" << std::endl;
- throw ProcessedSilentlyException("Peer not found (possible timeout)");
- }
-
- if (packetdata.getSize() < 1)
- throw InvalidIncomingDataException("packetdata.getSize() < 1");
-
- u8 type = readU8(&(packetdata[0]));
-
- if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
- std::string errmsg = "Invalid peer_id=" + itos(peer_id);
- errorstream << errmsg << std::endl;
- throw InvalidIncomingDataException(errmsg.c_str());
- }
-
- if (type == TYPE_CONTROL)
- {
- if (packetdata.getSize() < 2)
- throw InvalidIncomingDataException("packetdata.getSize() < 2");
-
- u8 controltype = readU8(&(packetdata[1]));
-
- if (controltype == CONTROLTYPE_ACK)
- {
- assert(channel != NULL);
-
- if (packetdata.getSize() < 4) {
- throw InvalidIncomingDataException(
- "packetdata.getSize() < 4 (ACK header size)");
- }
-
- u16 seqnum = readU16(&packetdata[2]);
- LOG(dout_con<<m_connection->getDesc()
- <<" [ CONTROLTYPE_ACK: channelnum="
- <<((int)channelnum&0xff)<<", peer_id="<<peer_id
- <<", seqnum="<<seqnum<< " ]"<<std::endl);
-
- try{
- BufferedPacket p =
- channel->outgoing_reliables_sent.popSeqnum(seqnum);
-
- // only calculate rtt from straight sent packets
- if (p.resend_count == 0) {
- // Get round trip time
- u64 current_time = porting::getTimeMs();
-
- // a overflow is quite unlikely but as it'd result in major
- // rtt miscalculation we handle it here
- if (current_time > p.absolute_send_time)
- {
- float rtt = (current_time - p.absolute_send_time) / 1000.0;
-
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
- }
- else if (p.totaltime > 0)
- {
- float rtt = p.totaltime;
-
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
- }
- }
- //put bytes for max bandwidth calculation
- channel->UpdateBytesSent(p.data.getSize(),1);
- if (channel->outgoing_reliables_sent.size() == 0)
- {
- m_connection->TriggerSend();
- }
- }
- catch(NotFoundException &e) {
- LOG(derr_con<<m_connection->getDesc()
- <<"WARNING: ACKed packet not "
- "in outgoing queue"
- <<std::endl);
- channel->UpdatePacketTooLateCounter();
- }
- throw ProcessedSilentlyException("Got an ACK");
- }
- else if (controltype == CONTROLTYPE_SET_PEER_ID) {
- // Got a packet to set our peer id
- if (packetdata.getSize() < 4)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
- u16 peer_id_new = readU16(&packetdata[2]);
- LOG(dout_con<<m_connection->getDesc()
- <<"Got new peer id: "<<peer_id_new<<"... "<<std::endl);
-
- if (m_connection->GetPeerID() != PEER_ID_INEXISTENT)
- {
- LOG(derr_con<<m_connection->getDesc()
- <<"WARNING: Not changing"
- " existing peer id."<<std::endl);
- }
- else
- {
- LOG(dout_con<<m_connection->getDesc()<<"changing own peer id"<<std::endl);
- m_connection->SetPeerID(peer_id_new);
- }
-
- ConnectionCommand cmd;
-
- SharedBuffer<u8> reply(2);
- writeU8(&reply[0], TYPE_CONTROL);
- writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
- cmd.disableLegacy(PEER_ID_SERVER,reply);
- m_connection->putCommand(cmd);
-
- throw ProcessedSilentlyException("Got a SET_PEER_ID");
- }
- else if (controltype == CONTROLTYPE_PING)
- {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- LOG(dout_con<<m_connection->getDesc()<<"PING"<<std::endl);
- throw ProcessedSilentlyException("Got a PING");
- }
- else if (controltype == CONTROLTYPE_DISCO)
- {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- LOG(dout_con<<m_connection->getDesc()
- <<"DISCO: Removing peer "<<(peer_id)<<std::endl);
-
- if (!m_connection->deletePeer(peer_id, false)) {
- derr_con<<m_connection->getDesc()
- <<"DISCO: Peer not found"<<std::endl;
- }
-
- throw ProcessedSilentlyException("Got a DISCO");
- }
- else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW)
- {
- dynamic_cast<UDPPeer*>(&peer)->setNonLegacyPeer();
- throw ProcessedSilentlyException("Got non legacy control");
- }
- else{
- LOG(derr_con<<m_connection->getDesc()
- <<"INVALID TYPE_CONTROL: invalid controltype="
- <<((int)controltype&0xff)<<std::endl);
- throw InvalidIncomingDataException("Invalid control type");
- }
- }
- else if (type == TYPE_ORIGINAL)
- {
- if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
- throw InvalidIncomingDataException
- ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
- LOG(dout_con<<m_connection->getDesc()
- <<"RETURNING TYPE_ORIGINAL to user"
- <<std::endl);
- // Get the inside packet out and return it
- SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
- memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
- return payload;
- }
- else if (type == TYPE_SPLIT)
- {
- Address peer_address;
-
- if (peer->getAddress(MTP_UDP, peer_address)) {
-
- // We have to create a packet again for buffering
- // This isn't actually too bad an idea.
- BufferedPacket packet = makePacket(
- peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer_id,
- channelnum);
-
- // Buffer the packet
- SharedBuffer<u8> data =
- peer->addSpiltPacket(channelnum,packet,reliable);
-
- if (data.getSize() != 0)
- {
- LOG(dout_con<<m_connection->getDesc()
- <<"RETURNING TYPE_SPLIT: Constructed full data, "
- <<"size="<<data.getSize()<<std::endl);
- return data;
- }
- LOG(dout_con<<m_connection->getDesc()<<"BUFFERED TYPE_SPLIT"<<std::endl);
- throw ProcessedSilentlyException("Buffered a split packet chunk");
- }
- else {
- //TODO throw some error
- }
- }
- else if (type == TYPE_RELIABLE)
- {
- assert(channel != NULL);
-
- // Recursive reliable packets not allowed
- if (reliable)
- throw InvalidIncomingDataException("Found nested reliable packets");
-
- if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
-
- u16 seqnum = readU16(&packetdata[1]);
- bool is_future_packet = false;
- bool is_old_packet = false;
-
- /* packet is within our receive window send ack */
- if (seqnum_in_window(seqnum, channel->readNextIncomingSeqNum(),MAX_RELIABLE_WINDOW_SIZE))
- {
- m_connection->sendAck(peer_id,channelnum,seqnum);
- }
- else {
- is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
- is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
-
-
- /* packet is not within receive window, don't send ack. *
- * if this was a valid packet it's gonna be retransmitted */
- if (is_future_packet)
- {
- throw ProcessedSilentlyException("Received packet newer then expected, not sending ack");
- }
-
- /* seems like our ack was lost, send another one for a old packet */
- if (is_old_packet)
- {
- LOG(dout_con<<m_connection->getDesc()
- << "RE-SENDING ACK: peer_id: " << peer_id
- << ", channel: " << (channelnum&0xFF)
- << ", seqnum: " << seqnum << std::endl;)
- m_connection->sendAck(peer_id,channelnum,seqnum);
-
- // we already have this packet so this one was on wire at least
- // the current timeout
- // we don't know how long this packet was on wire don't do silly guessing
- // dynamic_cast<UDPPeer*>(&peer)->reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
-
- throw ProcessedSilentlyException("Retransmitting ack for old packet");
- }
- }
-
- if (seqnum != channel->readNextIncomingSeqNum())
- {
- Address peer_address;
-
- // this is a reliable packet so we have a udp address for sure
- peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
- // This one comes later, buffer it.
- // Actually we have to make a packet to buffer one.
- // Well, we have all the ingredients, so just do it.
- BufferedPacket packet = con::makePacket(
- peer_address,
- packetdata,
- m_connection->GetProtocolID(),
- peer_id,
- channelnum);
- try{
- channel->incoming_reliables.insert(packet,channel->readNextIncomingSeqNum());
-
- LOG(dout_con<<m_connection->getDesc()
- << "BUFFERING, TYPE_RELIABLE peer_id: " << peer_id
- << ", channel: " << (channelnum&0xFF)
- << ", seqnum: " << seqnum << std::endl;)
-
- throw ProcessedQueued("Buffered future reliable packet");
- }
- catch(AlreadyExistsException &e)
- {
- }
- catch(IncomingDataCorruption &e)
- {
- ConnectionCommand discon;
- discon.disconnect_peer(peer_id);
- m_connection->putCommand(discon);
-
- LOG(derr_con<<m_connection->getDesc()
- << "INVALID, TYPE_RELIABLE peer_id: " << peer_id
- << ", channel: " << (channelnum&0xFF)
- << ", seqnum: " << seqnum
- << "DROPPING CLIENT!" << std::endl;)
- }
- }
-
- /* we got a packet to process right now */
- LOG(dout_con<<m_connection->getDesc()
- << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer_id
- << ", channel: " << (channelnum&0xFF)
- << ", seqnum: " << seqnum << std::endl;)
-
-
- /* check for resend case */
- u16 queued_seqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum))
- {
- if (queued_seqnum == seqnum)
- {
- BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
- /** TODO find a way to verify the new against the old packet */
- }
- }
-
- channel->incNextIncomingSeqNum();
-
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
- memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
-
- return processPacket(channel, payload, peer_id, channelnum, true);
- }
- else
- {
- derr_con<<m_connection->getDesc()
- <<"Got invalid type="<<((int)type&0xff)<<std::endl;
- throw InvalidIncomingDataException("Invalid packet type");
- }
-
- // We should never get here.
- FATAL_ERROR("Invalid execution point");
+ return channels[channel].incoming_splits.insert(toadd, reliable);
}
/*
@@ -2568,19 +1151,18 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
bool ipv6, PeerHandler *peerhandler) :
m_udpSocket(ipv6),
m_protocol_id(protocol_id),
- m_sendThread(max_packet_size, timeout),
- m_receiveThread(max_packet_size),
+ m_sendThread(new ConnectionSendThread(max_packet_size, timeout)),
+ m_receiveThread(new ConnectionReceiveThread(max_packet_size)),
m_bc_peerhandler(peerhandler)
{
m_udpSocket.setTimeoutMs(5);
- m_sendThread.setParent(this);
- m_receiveThread.setParent(this);
-
- m_sendThread.start();
- m_receiveThread.start();
+ m_sendThread->setParent(this);
+ m_receiveThread->setParent(this);
+ m_sendThread->start();
+ m_receiveThread->start();
}
@@ -2588,17 +1170,17 @@ Connection::~Connection()
{
m_shutting_down = true;
// request threads to stop
- m_sendThread.stop();
- m_receiveThread.stop();
+ m_sendThread->stop();
+ m_receiveThread->stop();
//TODO for some unkonwn reason send/receive threads do not exit as they're
// supposed to be but wait on peer timeout. To speed up shutdown we reduce
// timeout to half a second.
- m_sendThread.setPeerTimeout(0.5);
+ m_sendThread->setPeerTimeout(0.5);
// wait for threads to finish
- m_sendThread.wait();
- m_receiveThread.wait();
+ m_sendThread->wait();
+ m_receiveThread->wait();
// Delete peers
for (auto &peer : m_peers) {
@@ -2613,19 +1195,9 @@ void Connection::putEvent(ConnectionEvent &e)
m_event_queue.push_back(e);
}
-PeerHelper Connection::getPeer(u16 peer_id)
+void Connection::TriggerSend()
{
- MutexAutoLock peerlock(m_peers_mutex);
- std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
-
- if (node == m_peers.end()) {
- throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
- }
-
- // Error checking
- FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
-
- return PeerHelper(node->second);
+ m_sendThread->Trigger();
}
PeerHelper Connection::getPeerNoEx(u16 peer_id)
@@ -2721,7 +1293,7 @@ void Connection::putCommand(ConnectionCommand &c)
{
if (!m_shutting_down) {
m_command_queue.push_back(c);
- m_sendThread.Trigger();
+ m_sendThread->Trigger();
}
}
@@ -2910,7 +1482,7 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
ConnectionCommand cmd;
SharedBuffer<u8> reply(4);
- writeU8(&reply[0], TYPE_CONTROL);
+ writeU8(&reply[0], PACKET_TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
writeU16(&reply[2], peer_id_new);
cmd.createPeer(peer_id_new,reply);
@@ -2932,11 +1504,6 @@ void Connection::PrintInfo(std::ostream &out)
m_info_mutex.unlock();
}
-void Connection::PrintInfo()
-{
- PrintInfo(dout_con);
-}
-
const std::string Connection::getDesc()
{
return std::string("con(")+
@@ -2961,13 +1528,13 @@ void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum)
ConnectionCommand c;
SharedBuffer<u8> ack(4);
- writeU8(&ack[0], TYPE_CONTROL);
+ writeU8(&ack[0], PACKET_TYPE_CONTROL);
writeU8(&ack[1], CONTROLTYPE_ACK);
writeU16(&ack[2], seqnum);
c.ack(peer_id, channelnum, ack);
putCommand(c);
- m_sendThread.Trigger();
+ m_sendThread->Trigger();
}
UDPPeer* Connection::createServerPeer(Address& address)
diff --git a/src/network/connection.h b/src/network/connection.h
index 3d3a502aa..c54161cc9 100644
--- a/src/network/connection.h
+++ b/src/network/connection.h
@@ -37,13 +37,19 @@ class NetworkPacket;
namespace con
{
+class ConnectionReceiveThread;
+class ConnectionSendThread;
+
typedef enum MTProtocols {
MTP_PRIMARY,
MTP_UDP,
MTP_MINETEST_RELIABLE_UDP
} MTProtocols;
+#define MAX_UDP_PEERS 65535
+
#define SEQNUM_MAX 65535
+
inline bool seqnum_higher(u16 totest, u16 base)
{
if (totest > base)
@@ -73,6 +79,12 @@ inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
return ((seqnum < window_end) || (seqnum >= window_start));
}
+static inline float CALC_DTIME(u64 lasttime, u64 curtime)
+{
+ float value = ( curtime - lasttime) / 1000.0;
+ return MYMAX(MYMIN(value,0.1),0.0);
+}
+
struct BufferedPacket
{
BufferedPacket(u8 *a_data, u32 a_size):
@@ -90,44 +102,31 @@ struct BufferedPacket
};
// This adds the base headers to the data and makes a packet out of it
-BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
+BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, u16 sender_peer_id, u8 channel);
-BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
- u32 protocol_id, u16 sender_peer_id, u8 channel);
-
-// Add the TYPE_ORIGINAL header to the data
-SharedBuffer<u8> makeOriginalPacket(
- SharedBuffer<u8> data);
-
-// Split data in chunks and add TYPE_SPLIT headers to them
-std::list<SharedBuffer<u8> > makeSplitPacket(
- SharedBuffer<u8> data,
- u32 chunksize_max,
- u16 seqnum);
// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
// Increments split_seqnum if a split packet is made
-std::list<SharedBuffer<u8> > makeAutoSplitPacket(
- SharedBuffer<u8> data,
- u32 chunksize_max,
- u16 &split_seqnum);
+void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
+ u16 &split_seqnum, std::list<SharedBuffer<u8>> *list);
// Add the TYPE_RELIABLE header to the data
-SharedBuffer<u8> makeReliablePacket(
- const SharedBuffer<u8> &data,
- u16 seqnum);
+SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum);
struct IncomingSplitPacket
{
- IncomingSplitPacket() = default;
+ IncomingSplitPacket(u32 cc, bool r):
+ chunk_count(cc), reliable(r) {}
+
+ IncomingSplitPacket() = delete;
// Key is chunk number, value is data without headers
- std::map<u16, SharedBuffer<u8> > chunks;
+ std::map<u16, SharedBuffer<u8>> chunks;
u32 chunk_count;
float time = 0.0f; // Seconds from adding
bool reliable = false; // If true, isn't deleted on timeout
- bool allReceived()
+ bool allReceived() const
{
return (chunks.size() == chunk_count);
}
@@ -171,7 +170,7 @@ controltype and data description:
packet to get a reply
CONTROLTYPE_DISCO
*/
-#define TYPE_CONTROL 0
+//#define TYPE_CONTROL 0
#define CONTROLTYPE_ACK 0
#define CONTROLTYPE_SET_PEER_ID 1
#define CONTROLTYPE_PING 2
@@ -185,7 +184,7 @@ checking at all.
Header (1 byte):
[0] u8 type
*/
-#define TYPE_ORIGINAL 1
+//#define TYPE_ORIGINAL 1
#define ORIGINAL_HEADER_SIZE 1
/*
SPLIT: These are sequences of packets forming one bigger piece of
@@ -202,7 +201,7 @@ data.
[3] u16 chunk_count
[5] u16 chunk_num
*/
-#define TYPE_SPLIT 2
+//#define TYPE_SPLIT 2
/*
RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
and they shall be delivered in the same order as sent. This is done
@@ -214,10 +213,17 @@ with a buffer in the receiving and transmitting end.
[1] u16 seqnum
*/
-#define TYPE_RELIABLE 3
+//#define TYPE_RELIABLE 3
#define RELIABLE_HEADER_SIZE 3
#define SEQNUM_INITIAL 65500
+enum PacketType: u8 {
+ PACKET_TYPE_CONTROL = 0,
+ PACKET_TYPE_ORIGINAL = 1,
+ PACKET_TYPE_SPLIT = 2,
+ PACKET_TYPE_RELIABLE = 3,
+ PACKET_TYPE_MAX
+};
/*
A buffer which stores reliable packets and sorts them internally
for fast access to the smallest one.
@@ -270,7 +276,7 @@ public:
Returns a reference counted buffer of length != 0 when a full split
packet is constructed. If not, returns one of length 0.
*/
- SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
+ SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
void removeUnreliableTimedOuts(float dtime, float timeout);
@@ -318,8 +324,8 @@ struct ConnectionCommand
enum ConnectionCommandType type = CONNCMD_NONE;
Address address;
u16 peer_id = PEER_ID_INEXISTENT;
- u8 channelnum;
- Buffer<u8> data;
+ u8 channelnum = 0;
+ SharedBuffer<u8> data;
bool reliable = false;
bool raw = false;
@@ -551,13 +557,12 @@ class Peer {
virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
- virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
- BufferedPacket toadd,
- bool reliable)
- {
- fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
- return SharedBuffer<u8>(0);
- };
+ virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+ bool reliable)
+ {
+ fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
+ return SharedBuffer<u8>(0);
+ };
virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
@@ -649,10 +654,8 @@ public:
u16 getNextSplitSequenceNumber(u8 channel);
void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
- SharedBuffer<u8> addSpiltPacket(u8 channel,
- BufferedPacket toadd,
- bool reliable);
-
+ SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+ bool reliable);
protected:
/*
@@ -750,103 +753,6 @@ struct ConnectionEvent
}
};
-class ConnectionSendThread : public Thread {
-
-public:
- friend class UDPPeer;
-
- ConnectionSendThread(unsigned int max_packet_size, float timeout);
-
- void *run();
-
- void Trigger();
-
- void setParent(Connection* parent) {
- assert(parent != NULL); // Pre-condition
- m_connection = parent;
- }
-
- void setPeerTimeout(float peer_timeout)
- { m_timeout = peer_timeout; }
-
-private:
- void runTimeouts (float dtime);
- void rawSend (const BufferedPacket &packet);
- bool rawSendAsPacket(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable);
-
- void processReliableCommand (ConnectionCommand &c);
- void processNonReliableCommand (ConnectionCommand &c);
- void serve (Address bind_address);
- void connect (Address address);
- void disconnect ();
- void disconnect_peer(u16 peer_id);
- void send (u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data);
- void sendReliable (ConnectionCommand &c);
- void sendToAll (u8 channelnum,
- SharedBuffer<u8> data);
- void sendToAllReliable(ConnectionCommand &c);
-
- void sendPackets (float dtime);
-
- void sendAsPacket (u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data,bool ack=false);
-
- void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
-
- bool packetsQueued();
-
- Connection *m_connection = nullptr;
- unsigned int m_max_packet_size;
- float m_timeout;
- std::queue<OutgoingPacket> m_outgoing_queue;
- Semaphore m_send_sleep_semaphore;
-
- unsigned int m_iteration_packets_avaialble;
- unsigned int m_max_commands_per_iteration = 1;
- unsigned int m_max_data_packets_per_iteration;
- unsigned int m_max_packets_requeued = 256;
-};
-
-class ConnectionReceiveThread : public Thread {
-public:
- ConnectionReceiveThread(unsigned int max_packet_size);
-
- void *run();
-
- void setParent(Connection *parent) {
- assert(parent); // Pre-condition
- m_connection = parent;
- }
-
-private:
- void receive();
-
- // Returns next data from a buffer if possible
- // If found, returns true; if not, false.
- // If found, sets peer_id and dst
- bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
-
- bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
- SharedBuffer<u8> &dst);
-
- /*
- Processes a packet with the basic header stripped out.
- Parameters:
- packetdata: Data in packet (with no base headers)
- peer_id: peer id of the sender of the packet in question
- channelnum: channel on which the packet was sent
- reliable: true if recursing into a reliable packet
- */
- SharedBuffer<u8> processPacket(Channel *channel,
- SharedBuffer<u8> packetdata, u16 peer_id,
- u8 channelnum, bool reliable);
-
-
- Connection *m_connection = nullptr;
-};
-
class PeerHandler;
class Connection
@@ -863,7 +769,7 @@ public:
ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c);
- void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
+ void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
void Serve(Address bind_addr);
void Connect(Address address);
bool Connected();
@@ -879,7 +785,6 @@ public:
void DisconnectPeer(u16 peer_id);
protected:
- PeerHelper getPeer(u16 peer_id);
PeerHelper getPeerNoEx(u16 peer_id);
u16 lookupPeer(Address& sender);
@@ -892,7 +797,6 @@ protected:
void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
void PrintInfo(std::ostream &out);
- void PrintInfo();
std::list<u16> getPeerIDs()
{
@@ -905,8 +809,7 @@ protected:
void putEvent(ConnectionEvent &e);
- void TriggerSend()
- { m_sendThread.Trigger(); }
+ void TriggerSend();
private:
std::list<Peer*> getPeers();
@@ -919,14 +822,14 @@ private:
std::list<u16> m_peer_ids;
std::mutex m_peers_mutex;
- ConnectionSendThread m_sendThread;
- ConnectionReceiveThread m_receiveThread;
+ std::unique_ptr<ConnectionSendThread> m_sendThread;
+ std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
std::mutex m_info_mutex;
// Backwards compatibility
PeerHandler *m_bc_peerhandler;
- int m_bc_receive_timeout = 0;
+ u32 m_bc_receive_timeout = 0;
bool m_shutting_down = false;
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp
new file mode 100644
index 000000000..b07acd469
--- /dev/null
+++ b/src/network/connectionthreads.cpp
@@ -0,0 +1,1404 @@
+/*
+Minetest
+Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
+Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#include "connectionthreads.h"
+#include "log.h"
+#include "profiler.h"
+#include "settings.h"
+#include "network/networkpacket.h"
+#include "util/serialize.h"
+
+namespace con
+{
+
+/******************************************************************************/
+/* defines used for debugging and profiling */
+/******************************************************************************/
+#ifdef NDEBUG
+#define LOG(a) a
+#define PROFILE(a)
+#undef DEBUG_CONNECTION_KBPS
+#else
+/* this mutex is used to achieve log message consistency */
+std::mutex log_conthread_mutex;
+#define LOG(a) \
+ { \
+ MutexAutoLock loglock(log_conthread_mutex); \
+ a; \
+ }
+#define PROFILE(a) a
+//#define DEBUG_CONNECTION_KBPS
+#undef DEBUG_CONNECTION_KBPS
+#endif
+
+/* maximum number of retries for reliable packets */
+#define MAX_RELIABLE_RETRY 5
+
+#define WINDOW_SIZE 5
+
+static u16 readPeerId(u8 *packetdata)
+{
+ return readU16(&packetdata[4]);
+}
+static u8 readChannel(u8 *packetdata)
+{
+ return readU8(&packetdata[6]);
+}
+
+/******************************************************************************/
+/* Connection Threads */
+/******************************************************************************/
+
+ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
+ float timeout) :
+ Thread("ConnectionSend"),
+ m_max_packet_size(max_packet_size),
+ m_timeout(timeout),
+ m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
+{
+}
+
+void *ConnectionSendThread::run()
+{
+ assert(m_connection);
+
+ LOG(dout_con << m_connection->getDesc()
+ << "ConnectionSend thread started" << std::endl);
+
+ u64 curtime = porting::getTimeMs();
+ u64 lasttime = curtime;
+
+ PROFILE(std::stringstream ThreadIdentifier);
+ PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
+
+ /* if stop is requested don't stop immediately but try to send all */
+ /* packets first */
+ while (!stopRequested() || packetsQueued()) {
+ BEGIN_DEBUG_EXCEPTION_HANDLER
+ PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
+
+ m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
+
+ /* wait for trigger or timeout */
+ m_send_sleep_semaphore.wait(50);
+
+ /* remove all triggers */
+ while (m_send_sleep_semaphore.wait(0)) {
+ }
+
+ lasttime = curtime;
+ curtime = porting::getTimeMs();
+ float dtime = CALC_DTIME(lasttime, curtime);
+
+ /* first do all the reliable stuff */
+ runTimeouts(dtime);
+
+ /* translate commands to packets */
+ ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
+ while (c.type != CONNCMD_NONE) {
+ if (c.reliable)
+ processReliableCommand(c);
+ else
+ processNonReliableCommand(c);
+
+ c = m_connection->m_command_queue.pop_frontNoEx(0);
+ }
+
+ /* send non reliable packets */
+ sendPackets(dtime);
+
+ END_DEBUG_EXCEPTION_HANDLER
+ }
+
+ PROFILE(g_profiler->remove(ThreadIdentifier.str()));
+ return NULL;
+}
+
+void ConnectionSendThread::Trigger()
+{
+ m_send_sleep_semaphore.post();
+}
+
+bool ConnectionSendThread::packetsQueued()
+{
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+
+ if (!m_outgoing_queue.empty() && !peerIds.empty())
+ return true;
+
+ for (u16 peerId : peerIds) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerId);
+
+ if (!peer)
+ continue;
+
+ if (dynamic_cast<UDPPeer *>(&peer) == 0)
+ continue;
+
+ for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+ if (!channel.queued_commands.empty()) {
+ return true;
+ }
+ }
+ }
+
+
+ return false;
+}
+
+void ConnectionSendThread::runTimeouts(float dtime)
+{
+ std::list<u16> timeouted_peers;
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+
+ for (u16 &peerId : peerIds) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerId);
+
+ if (!peer)
+ continue;
+
+ UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
+ if (!udpPeer)
+ continue;
+
+ PROFILE(std::stringstream peerIdentifier);
+ PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
+ << ";" << peerId << ";RELIABLE]");
+ PROFILE(ScopeProfiler
+ peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
+
+ SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
+
+ /*
+ Check peer timeout
+ */
+ if (peer->isTimedOut(m_timeout)) {
+ infostream << m_connection->getDesc()
+ << "RunTimeouts(): Peer " << peer->id
+ << " has timed out."
+ << " (source=peer->timeout_counter)"
+ << std::endl;
+ // Add peer to the list
+ timeouted_peers.push_back(peer->id);
+ // Don't bother going through the buffers of this one
+ continue;
+ }
+
+ float resend_timeout = udpPeer->getResendTimeout();
+ bool retry_count_exceeded = false;
+ for (Channel &channel : udpPeer->channels) {
+ std::list<BufferedPacket> timed_outs;
+
+ if (udpPeer->getLegacyPeer())
+ channel.setWindowSize(WINDOW_SIZE);
+
+ // Remove timed out incomplete unreliable split packets
+ channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
+
+ // Increment reliable packet times
+ channel.outgoing_reliables_sent.incrementTimeouts(dtime);
+
+ unsigned int numpeers = m_connection->m_peers.size();
+
+ if (numpeers == 0)
+ return;
+
+ // Re-send timed out outgoing reliables
+ timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
+ (m_max_data_packets_per_iteration / numpeers));
+
+ channel.UpdatePacketLossCounter(timed_outs.size());
+ g_profiler->graphAdd("packets_lost", timed_outs.size());
+
+ m_iteration_packets_avaialble -= timed_outs.size();
+
+ for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
+ k != timed_outs.end(); ++k) {
+ u16 peer_id = readPeerId(*(k->data));
+ u8 channelnum = readChannel(*(k->data));
+ u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
+
+ channel.UpdateBytesLost(k->data.getSize());
+ k->resend_count++;
+
+ if (k->resend_count > MAX_RELIABLE_RETRY) {
+ retry_count_exceeded = true;
+ timeouted_peers.push_back(peer->id);
+ /* no need to check additional packets if a single one did timeout*/
+ break;
+ }
+
+ LOG(derr_con << m_connection->getDesc()
+ << "RE-SENDING timed-out RELIABLE to "
+ << k->address.serializeString()
+ << "(t/o=" << resend_timeout << "): "
+ << "from_peer_id=" << peer_id
+ << ", channel=" << ((int) channelnum & 0xff)
+ << ", seqnum=" << seqnum
+ << std::endl);
+
+ rawSend(*k);
+
+ // do not handle rtt here as we can't decide if this packet was
+ // lost or really takes more time to transmit
+ }
+
+ if (retry_count_exceeded) {
+ break; /* no need to check other channels if we already did timeout */
+ }
+
+ channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
+ }
+
+ /* skip to next peer if we did timeout */
+ if (retry_count_exceeded)
+ continue;
+
+ /* send ping if necessary */
+ if (udpPeer->Ping(dtime, data)) {
+ LOG(dout_con << m_connection->getDesc()
+ << "Sending ping for peer_id: " << udpPeer->id << std::endl);
+ /* this may fail if there ain't a sequence number left */
+ if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
+ //retrigger with reduced ping interval
+ udpPeer->Ping(4.0, data);
+ }
+ }
+
+ udpPeer->RunCommandQueues(m_max_packet_size,
+ m_max_commands_per_iteration,
+ m_max_packets_requeued);
+ }
+
+ // Remove timed out peers
+ for (u16 timeouted_peer : timeouted_peers) {
+ LOG(derr_con << m_connection->getDesc()
+ << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
+ m_connection->deletePeer(timeouted_peer, true);
+ }
+}
+
+void ConnectionSendThread::rawSend(const BufferedPacket &packet)
+{
+ try {
+ m_connection->m_udpSocket.Send(packet.address, *packet.data,
+ packet.data.getSize());
+ LOG(dout_con << m_connection->getDesc()
+ << " rawSend: " << packet.data.getSize()
+ << " bytes sent" << std::endl);
+ } catch (SendFailedException &e) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Connection::rawSend(): SendFailedException: "
+ << packet.address.serializeString() << std::endl);
+ }
+}
+
+void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
+{
+ try {
+ p.absolute_send_time = porting::getTimeMs();
+ // Buffer the packet
+ channel->outgoing_reliables_sent.insert(p,
+ (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
+ % (MAX_RELIABLE_WINDOW_SIZE + 1));
+ }
+ catch (AlreadyExistsException &e) {
+ LOG(derr_con << m_connection->getDesc()
+ << "WARNING: Going to send a reliable packet"
+ << " in outgoing buffer" << std::endl);
+ }
+
+ // Send the packet
+ rawSend(p);
+}
+
+bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum,
+ const SharedBuffer<u8> &data, bool reliable)
+{
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: dropped packet for non existent peer_id: "
+ << peer_id << std::endl);
+ FATAL_ERROR_IF(!reliable,
+ "Trying to send raw packet reliable but no peer found!");
+ return false;
+ }
+ Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
+
+ if (reliable) {
+ bool have_sequence_number_for_raw_packet = true;
+ u16 seqnum =
+ channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
+
+ if (!have_sequence_number_for_raw_packet)
+ return false;
+
+ SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
+ Address peer_address;
+ peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
+
+ // Add base headers and make a packet
+ BufferedPacket p = con::makePacket(peer_address, reliable,
+ m_connection->GetProtocolID(), m_connection->GetPeerID(),
+ channelnum);
+
+ // first check if our send window is already maxed out
+ if (channel->outgoing_reliables_sent.size()
+ < channel->getWindowSize()) {
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: sending a reliable packet to peer_id " << peer_id
+ << " channel: " << channelnum
+ << " seqnum: " << seqnum << std::endl);
+ sendAsPacketReliable(p, channel);
+ return true;
+ }
+
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: queueing reliable packet for peer_id: " << peer_id
+ << " channel: " << channelnum
+ << " seqnum: " << seqnum << std::endl);
+ channel->queued_reliables.push(p);
+ return false;
+ }
+
+ Address peer_address;
+ if (peer->getAddress(MTP_UDP, peer_address)) {
+ // Add base headers and make a packet
+ BufferedPacket p = con::makePacket(peer_address, data,
+ m_connection->GetProtocolID(), m_connection->GetPeerID(),
+ channelnum);
+
+ // Send the packet
+ rawSend(p);
+ return true;
+ }
+
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: dropped unreliable packet for peer_id: " << peer_id
+ << " because of (yet) missing udp address" << std::endl);
+ return false;
+}
+
+void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
+{
+ assert(c.reliable); // Pre-condition
+
+ switch (c.type) {
+ case CONNCMD_NONE:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONNCMD_NONE" << std::endl);
+ return;
+
+ case CONNCMD_SEND:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONNCMD_SEND" << std::endl);
+ sendReliable(c);
+ return;
+
+ case CONNCMD_SEND_TO_ALL:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
+ sendToAllReliable(c);
+ return;
+
+ case CONCMD_CREATE_PEER:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
+ if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ /* put to queue if we couldn't send it immediately */
+ sendReliable(c);
+ }
+ return;
+
+ case CONCMD_DISABLE_LEGACY:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
+ if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ /* put to queue if we couldn't send it immediately */
+ sendReliable(c);
+ }
+ return;
+
+ case CONNCMD_SERVE:
+ case CONNCMD_CONNECT:
+ case CONNCMD_DISCONNECT:
+ case CONCMD_ACK:
+ FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
+ default:
+ LOG(dout_con << m_connection->getDesc()
+ << " Invalid reliable command type: " << c.type << std::endl);
+ }
+}
+
+
+void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
+{
+ assert(!c.reliable); // Pre-condition
+
+ switch (c.type) {
+ case CONNCMD_NONE:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_NONE" << std::endl);
+ return;
+ case CONNCMD_SERVE:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SERVE port="
+ << c.address.serializeString() << std::endl);
+ serve(c.address);
+ return;
+ case CONNCMD_CONNECT:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_CONNECT" << std::endl);
+ connect(c.address);
+ return;
+ case CONNCMD_DISCONNECT:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_DISCONNECT" << std::endl);
+ disconnect();
+ return;
+ case CONNCMD_DISCONNECT_PEER:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
+ disconnect_peer(c.peer_id);
+ return;
+ case CONNCMD_SEND:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SEND" << std::endl);
+ send(c.peer_id, c.channelnum, c.data);
+ return;
+ case CONNCMD_SEND_TO_ALL:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
+ sendToAll(c.channelnum, c.data);
+ return;
+ case CONCMD_ACK:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONCMD_ACK" << std::endl);
+ sendAsPacket(c.peer_id, c.channelnum, c.data, true);
+ return;
+ case CONCMD_CREATE_PEER:
+ FATAL_ERROR("Got command that should be reliable as unreliable command");
+ default:
+ LOG(dout_con << m_connection->getDesc()
+ << " Invalid command type: " << c.type << std::endl);
+ }
+}
+
+void ConnectionSendThread::serve(Address bind_address)
+{
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP serving at port " << bind_address.serializeString() << std::endl);
+ try {
+ m_connection->m_udpSocket.Bind(bind_address);
+ m_connection->SetPeerID(PEER_ID_SERVER);
+ }
+ catch (SocketException &e) {
+ // Create event
+ ConnectionEvent ce;
+ ce.bindFailed();
+ m_connection->putEvent(ce);
+ }
+}
+
+void ConnectionSendThread::connect(Address address)
+{
+ LOG(dout_con << m_connection->getDesc() << " connecting to "
+ << address.serializeString()
+ << ":" << address.getPort() << std::endl);
+
+ UDPPeer *peer = m_connection->createServerPeer(address);
+
+ // Create event
+ ConnectionEvent e;
+ e.peerAdded(peer->id, peer->address);
+ m_connection->putEvent(e);
+
+ Address bind_addr;
+
+ if (address.isIPv6())
+ bind_addr.setAddress((IPv6AddressBytes *) NULL);
+ else
+ bind_addr.setAddress(0, 0, 0, 0);
+
+ m_connection->m_udpSocket.Bind(bind_addr);
+
+ // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
+ m_connection->SetPeerID(PEER_ID_INEXISTENT);
+ NetworkPacket pkt(0, 0);
+ m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
+}
+
+void ConnectionSendThread::disconnect()
+{
+ LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
+
+ // Create and send DISCO packet
+ SharedBuffer<u8> data(2);
+ writeU8(&data[0], PACKET_TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_DISCO);
+
+
+ // Send to all
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ sendAsPacket(peerid, 0, data, false);
+ }
+}
+
+void ConnectionSendThread::disconnect_peer(u16 peer_id)
+{
+ LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
+
+ // Create and send DISCO packet
+ SharedBuffer<u8> data(2);
+ writeU8(&data[0], PACKET_TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_DISCO);
+ sendAsPacket(peer_id, 0, data, false);
+
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer)
+ return;
+
+ if (dynamic_cast<UDPPeer *>(&peer) == 0) {
+ return;
+ }
+
+ dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
+}
+
+void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data)
+{
+ assert(channelnum < CHANNEL_COUNT); // Pre-condition
+
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
+ << ">>>NOT<<< found on sending packet"
+ << ", channel " << (channelnum % 0xFF)
+ << ", size: " << data.getSize() << std::endl);
+ return;
+ }
+
+ LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
+ << ", channel " << (channelnum % 0xFF)
+ << ", size: " << data.getSize() << std::endl);
+
+ u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
+
+ u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
+ std::list<SharedBuffer<u8>> originals;
+
+ makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
+
+ peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
+
+ for (const SharedBuffer<u8> &original : originals) {
+ sendAsPacket(peer_id, channelnum, original);
+ }
+}
+
+void ConnectionSendThread::sendReliable(ConnectionCommand &c)
+{
+ PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
+ if (!peer)
+ return;
+
+ peer->PutReliableSendCommand(c, m_max_packet_size);
+}
+
+void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
+{
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ send(peerid, channelnum, data);
+ }
+}
+
+void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
+{
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerid);
+
+ if (!peer)
+ continue;
+
+ peer->PutReliableSendCommand(c, m_max_packet_size);
+ }
+}
+
+void ConnectionSendThread::sendPackets(float dtime)
+{
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+ std::list<u16> pendingDisconnect;
+ std::map<u16, bool> pending_unreliable;
+
+ for (u16 peerId : peerIds) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerId);
+ //peer may have been removed
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
+ << peerId
+ << std::endl);
+ continue;
+ }
+ peer->m_increment_packets_remaining =
+ m_iteration_packets_avaialble / m_connection->m_peers.size();
+
+ UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
+
+ if (!udpPeer) {
+ continue;
+ }
+
+ if (udpPeer->m_pending_disconnect) {
+ pendingDisconnect.push_back(peerId);
+ }
+
+ PROFILE(std::stringstream
+ peerIdentifier);
+ PROFILE(
+ peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
+ << ";RELIABLE]");
+ PROFILE(ScopeProfiler
+ peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
+
+ LOG(dout_con << m_connection->getDesc()
+ << " Handle per peer queues: peer_id=" << peerId
+ << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
+
+ // first send queued reliable packets for all peers (if possible)
+ for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
+ Channel &channel = udpPeer->channels[i];
+ u16 next_to_ack = 0;
+
+ channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
+ u16 next_to_receive = 0;
+ channel.incoming_reliables.getFirstSeqnum(next_to_receive);
+
+ LOG(dout_con << m_connection->getDesc() << "\t channel: "
+ << i << ", peer quota:"
+ << peer->m_increment_packets_remaining
+ << std::endl
+ << "\t\t\treliables on wire: "
+ << channel.outgoing_reliables_sent.size()
+ << ", waiting for ack for " << next_to_ack
+ << std::endl
+ << "\t\t\tincoming_reliables: "
+ << channel.incoming_reliables.size()
+ << ", next reliable packet: "
+ << channel.readNextIncomingSeqNum()
+ << ", next queued: " << next_to_receive
+ << std::endl
+ << "\t\t\treliables queued : "
+ << channel.queued_reliables.size()
+ << std::endl
+ << "\t\t\tqueued commands : "
+ << channel.queued_commands.size()
+ << std::endl);
+
+ while (!channel.queued_reliables.empty() &&
+ channel.outgoing_reliables_sent.size()
+ < channel.getWindowSize() &&
+ peer->m_increment_packets_remaining > 0) {
+ BufferedPacket p = channel.queued_reliables.front();
+ channel.queued_reliables.pop();
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: sending a queued reliable packet "
+ << " channel: " << i
+ << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
+ << std::endl);
+ sendAsPacketReliable(p, &channel);
+ peer->m_increment_packets_remaining--;
+ }
+ }
+ }
+
+ if (!m_outgoing_queue.empty()) {
+ LOG(dout_con << m_connection->getDesc()
+ << " Handle non reliable queue ("
+ << m_outgoing_queue.size() << " pkts)" << std::endl);
+ }
+
+ unsigned int initial_queuesize = m_outgoing_queue.size();
+ /* send non reliable packets*/
+ for (unsigned int i = 0; i < initial_queuesize; i++) {
+ OutgoingPacket packet = m_outgoing_queue.front();
+ m_outgoing_queue.pop();
+
+ if (packet.reliable)
+ continue;
+
+ PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc()
+ << " Outgoing queue: peer_id=" << packet.peer_id
+ << ">>>NOT<<< found on sending packet"
+ << ", channel " << (packet.channelnum % 0xFF)
+ << ", size: " << packet.data.getSize() << std::endl);
+ continue;
+ }
+
+ /* send acks immediately */
+ if (packet.ack) {
+ rawSendAsPacket(packet.peer_id, packet.channelnum,
+ packet.data, packet.reliable);
+ peer->m_increment_packets_remaining =
+ MYMIN(0, peer->m_increment_packets_remaining--);
+ } else if (
+ (peer->m_increment_packets_remaining > 0) ||
+ (stopRequested())) {
+ rawSendAsPacket(packet.peer_id, packet.channelnum,
+ packet.data, packet.reliable);
+ peer->m_increment_packets_remaining--;
+ } else {
+ m_outgoing_queue.push(packet);
+ pending_unreliable[packet.peer_id] = true;
+ }
+ }
+
+ for (u16 peerId : pendingDisconnect) {
+ if (!pending_unreliable[peerId]) {
+ m_connection->deletePeer(peerId, false);
+ }
+ }
+}
+
+void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool ack)
+{
+ OutgoingPacket packet(peer_id, channelnum, data, false, ack);
+ m_outgoing_queue.push(packet);
+}
+
+ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
+ Thread("ConnectionReceive")
+{
+}
+
+void *ConnectionReceiveThread::run()
+{
+ assert(m_connection);
+
+ LOG(dout_con << m_connection->getDesc()
+ << "ConnectionReceive thread started" << std::endl);
+
+ PROFILE(std::stringstream
+ ThreadIdentifier);
+ PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
+
+#ifdef DEBUG_CONNECTION_KBPS
+ u64 curtime = porting::getTimeMs();
+ u64 lasttime = curtime;
+ float debug_print_timer = 0.0;
+#endif
+
+ while (!stopRequested()) {
+ BEGIN_DEBUG_EXCEPTION_HANDLER
+ PROFILE(ScopeProfiler
+ sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
+
+#ifdef DEBUG_CONNECTION_KBPS
+ lasttime = curtime;
+ curtime = porting::getTimeMs();
+ float dtime = CALC_DTIME(lasttime,curtime);
+#endif
+
+ /* receive packets */
+ receive();
+
+#ifdef DEBUG_CONNECTION_KBPS
+ debug_print_timer += dtime;
+ if (debug_print_timer > 20.0) {
+ debug_print_timer -= 20.0;
+
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (std::list<u16>::iterator i = peerids.begin();
+ i != peerids.end();
+ i++)
+ {
+ PeerHelper peer = m_connection->getPeerNoEx(*i);
+ if (!peer)
+ continue;
+
+ float peer_current = 0.0;
+ float peer_loss = 0.0;
+ float avg_rate = 0.0;
+ float avg_loss = 0.0;
+
+ for(u16 j=0; j<CHANNEL_COUNT; j++)
+ {
+ peer_current +=peer->channels[j].getCurrentDownloadRateKB();
+ peer_loss += peer->channels[j].getCurrentLossRateKB();
+ avg_rate += peer->channels[j].getAvgDownloadRateKB();
+ avg_loss += peer->channels[j].getAvgLossRateKB();
+ }
+
+ std::stringstream output;
+ output << std::fixed << std::setprecision(1);
+ output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
+ output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
+ output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
+ output << std::setfill(' ');
+ for(u16 j=0; j<CHANNEL_COUNT; j++)
+ {
+ output << "\tcha " << j << ":"
+ << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
+ << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
+ << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
+ << " /"
+ << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
+ << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
+ << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
+ << " / WS: " << peer->channels[j].getWindowSize()
+ << std::endl;
+ }
+
+ fprintf(stderr,"%s\n",output.str().c_str());
+ }
+ }
+#endif
+ END_DEBUG_EXCEPTION_HANDLER
+ }
+
+ PROFILE(g_profiler->remove(ThreadIdentifier.str()));
+ return NULL;
+}
+
+// Receive packets from the network and buffers and create ConnectionEvents
+void ConnectionReceiveThread::receive()
+{
+ // use IPv6 minimum allowed MTU as receive buffer size as this is
+ // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
+ // infrastructure
+ unsigned int packet_maxsize = 1500;
+ SharedBuffer<u8> packetdata(packet_maxsize);
+
+ bool packet_queued = true;
+
+ unsigned int loop_count = 0;
+
+ /* first of all read packets from socket */
+ /* check for incoming data available */
+ while ((loop_count < 10) &&
+ (m_connection->m_udpSocket.WaitData(50))) {
+ loop_count++;
+ try {
+ if (packet_queued) {
+ bool data_left = true;
+ u16 peer_id;
+ SharedBuffer<u8> resultdata;
+ while (data_left) {
+ try {
+ data_left = getFromBuffers(peer_id, resultdata);
+ if (data_left) {
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ m_connection->putEvent(e);
+ }
+ }
+ catch (ProcessedSilentlyException &e) {
+ /* try reading again */
+ }
+ }
+ packet_queued = false;
+ }
+
+ Address sender;
+ s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
+ packet_maxsize);
+
+ if ((received_size < BASE_HEADER_SIZE) ||
+ (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Receive(): Invalid incoming packet, "
+ << "size: " << received_size
+ << ", protocol: "
+ << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
+ << std::endl);
+ continue;
+ }
+
+ u16 peer_id = readPeerId(*packetdata);
+ u8 channelnum = readChannel(*packetdata);
+
+ if (channelnum > CHANNEL_COUNT - 1) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Receive(): Invalid channel " << channelnum << std::endl);
+ throw InvalidIncomingDataException("Channel doesn't exist");
+ }
+
+ /* Try to identify peer by sender address (may happen on join) */
+ if (peer_id == PEER_ID_INEXISTENT) {
+ peer_id = m_connection->lookupPeer(sender);
+ // We do not have to remind the peer of its
+ // peer id as the CONTROLTYPE_SET_PEER_ID
+ // command was sent reliably.
+ }
+
+ /* The peer was not found in our lists. Add it. */
+ if (peer_id == PEER_ID_INEXISTENT) {
+ peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
+ }
+
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc()
+ << " got packet from unknown peer_id: "
+ << peer_id << " Ignoring." << std::endl);
+ continue;
+ }
+
+ // Validate peer address
+
+ Address peer_address;
+
+ if (peer->getAddress(MTP_UDP, peer_address)) {
+ if (peer_address != sender) {
+ LOG(derr_con << m_connection->getDesc()
+ << m_connection->getDesc()
+ << " Peer " << peer_id << " sending from different address."
+ " Ignoring." << std::endl);
+ continue;
+ }
+ } else {
+
+ bool invalid_address = true;
+ if (invalid_address) {
+ LOG(derr_con << m_connection->getDesc()
+ << m_connection->getDesc()
+ << " Peer " << peer_id << " unknown."
+ " Ignoring." << std::endl);
+ continue;
+ }
+ }
+
+ peer->ResetTimeout();
+
+ Channel *channel = 0;
+
+ if (dynamic_cast<UDPPeer *>(&peer) != 0) {
+ channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
+ }
+
+ if (channel != 0) {
+ channel->UpdateBytesReceived(received_size);
+ }
+
+ // Throw the received packet to channel->processPacket()
+
+ // Make a new SharedBuffer from the data without the base headers
+ SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
+ memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
+ strippeddata.getSize());
+
+ try {
+ // Process it (the result is some data with no headers made by us)
+ SharedBuffer<u8> resultdata = processPacket
+ (channel, strippeddata, peer_id, channelnum, false);
+
+ LOG(dout_con << m_connection->getDesc()
+ << " ProcessPacket from peer_id: " << peer_id
+ << ",channel: " << (channelnum & 0xFF) << ", returned "
+ << resultdata.getSize() << " bytes" << std::endl);
+
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ m_connection->putEvent(e);
+ }
+ catch (ProcessedSilentlyException &e) {
+ }
+ catch (ProcessedQueued &e) {
+ packet_queued = true;
+ }
+ }
+ catch (InvalidIncomingDataException &e) {
+ }
+ catch (ProcessedSilentlyException &e) {
+ }
+ }
+}
+
+bool ConnectionReceiveThread::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
+{
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerid);
+ if (!peer)
+ continue;
+
+ if (dynamic_cast<UDPPeer *>(&peer) == 0)
+ continue;
+
+ for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+ if (checkIncomingBuffers(&channel, peer_id, dst)) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
+ u16 &peer_id, SharedBuffer<u8> &dst)
+{
+ u16 firstseqnum = 0;
+ if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
+ if (firstseqnum == channel->readNextIncomingSeqNum()) {
+ BufferedPacket p = channel->incoming_reliables.popFirst();
+ peer_id = readPeerId(*p.data);
+ u8 channelnum = readChannel(*p.data);
+ u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+
+ LOG(dout_con << m_connection->getDesc()
+ << "UNBUFFERING TYPE_RELIABLE"
+ << " seqnum=" << seqnum
+ << " peer_id=" << peer_id
+ << " channel=" << ((int) channelnum & 0xff)
+ << std::endl);
+
+ channel->incNextIncomingSeqNum();
+
+ u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(p.data.getSize() - headers_size);
+ memcpy(*payload, &p.data[headers_size], payload.getSize());
+
+ dst = processPacket(channel, payload, peer_id, channelnum, true);
+ return true;
+ }
+ }
+ return false;
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
+ SharedBuffer<u8> &packetdata, u16 peer_id, u8 channelnum, bool reliable)
+{
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer) {
+ errorstream << "Peer not found (possible timeout)" << std::endl;
+ throw ProcessedSilentlyException("Peer not found (possible timeout)");
+ }
+
+ if (packetdata.getSize() < 1)
+ throw InvalidIncomingDataException("packetdata.getSize() < 1");
+
+ u8 type = readU8(&(packetdata[0]));
+
+ if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
+ std::string errmsg = "Invalid peer_id=" + itos(peer_id);
+ errorstream << errmsg << std::endl;
+ throw InvalidIncomingDataException(errmsg.c_str());
+ }
+
+ if (type >= PACKET_TYPE_MAX) {
+ derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
+ << std::endl;
+ throw InvalidIncomingDataException("Invalid packet type");
+ }
+
+ const PacketTypeHandler &pHandle = packetTypeRouter[type];
+ return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
+}
+
+const ConnectionReceiveThread::PacketTypeHandler
+ ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
+ {&ConnectionReceiveThread::handlePacketType_Control},
+ {&ConnectionReceiveThread::handlePacketType_Original},
+ {&ConnectionReceiveThread::handlePacketType_Split},
+ {&ConnectionReceiveThread::handlePacketType_Reliable},
+};
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ if (packetdata.getSize() < 2)
+ throw InvalidIncomingDataException("packetdata.getSize() < 2");
+
+ u8 controltype = readU8(&(packetdata[1]));
+
+ if (controltype == CONTROLTYPE_ACK) {
+ assert(channel != NULL);
+
+ if (packetdata.getSize() < 4) {
+ throw InvalidIncomingDataException(
+ "packetdata.getSize() < 4 (ACK header size)");
+ }
+
+ u16 seqnum = readU16(&packetdata[2]);
+ LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
+ << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
+ << seqnum << " ]" << std::endl);
+
+ try {
+ BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
+
+ // only calculate rtt from straight sent packets
+ if (p.resend_count == 0) {
+ // Get round trip time
+ u64 current_time = porting::getTimeMs();
+
+ // a overflow is quite unlikely but as it'd result in major
+ // rtt miscalculation we handle it here
+ if (current_time > p.absolute_send_time) {
+ float rtt = (current_time - p.absolute_send_time) / 1000.0;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
+ } else if (p.totaltime > 0) {
+ float rtt = p.totaltime;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
+ }
+ }
+ // put bytes for max bandwidth calculation
+ channel->UpdateBytesSent(p.data.getSize(), 1);
+ if (channel->outgoing_reliables_sent.size() == 0)
+ m_connection->TriggerSend();
+ } catch (NotFoundException &e) {
+ LOG(derr_con << m_connection->getDesc()
+ << "WARNING: ACKed packet not in outgoing queue" << std::endl);
+ channel->UpdatePacketTooLateCounter();
+ }
+
+ throw ProcessedSilentlyException("Got an ACK");
+ } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
+ // Got a packet to set our peer id
+ if (packetdata.getSize() < 4)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
+ u16 peer_id_new = readU16(&packetdata[2]);
+ LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
+ << "... " << std::endl);
+
+ if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
+ LOG(derr_con << m_connection->getDesc()
+ << "WARNING: Not changing existing peer id." << std::endl);
+ } else {
+ LOG(dout_con << m_connection->getDesc() << "changing own peer id"
+ << std::endl);
+ m_connection->SetPeerID(peer_id_new);
+ }
+
+ ConnectionCommand cmd;
+
+ SharedBuffer<u8> reply(2);
+ writeU8(&reply[0], PACKET_TYPE_CONTROL);
+ writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
+ cmd.disableLegacy(PEER_ID_SERVER, reply);
+ m_connection->putCommand(cmd);
+
+ throw ProcessedSilentlyException("Got a SET_PEER_ID");
+ } else if (controltype == CONTROLTYPE_PING) {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
+ throw ProcessedSilentlyException("Got a PING");
+ } else if (controltype == CONTROLTYPE_DISCO) {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
+ << peer->id << std::endl);
+
+ if (!m_connection->deletePeer(peer->id, false)) {
+ derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
+ }
+
+ throw ProcessedSilentlyException("Got a DISCO");
+ } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
+ dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
+ throw ProcessedSilentlyException("Got non legacy control");
+ } else {
+ LOG(derr_con << m_connection->getDesc()
+ << "INVALID TYPE_CONTROL: invalid controltype="
+ << ((int) controltype & 0xff) << std::endl);
+ throw InvalidIncomingDataException("Invalid control type");
+ }
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
+ LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
+ << std::endl);
+ // Get the inside packet out and return it
+ SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
+ memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
+ return payload;
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ Address peer_address;
+
+ if (peer->getAddress(MTP_UDP, peer_address)) {
+ // We have to create a packet again for buffering
+ // This isn't actually too bad an idea.
+ BufferedPacket packet = makePacket(peer_address,
+ packetdata,
+ m_connection->GetProtocolID(),
+ peer->id,
+ channelnum);
+
+ // Buffer the packet
+ SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
+
+ if (data.getSize() != 0) {
+ LOG(dout_con << m_connection->getDesc()
+ << "RETURNING TYPE_SPLIT: Constructed full data, "
+ << "size=" << data.getSize() << std::endl);
+ return data;
+ }
+ LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
+ throw ProcessedSilentlyException("Buffered a split packet chunk");
+ }
+
+ // We should never get here.
+ FATAL_ERROR("Invalid execution point");
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ assert(channel != NULL);
+
+ // Recursive reliable packets not allowed
+ if (reliable)
+ throw InvalidIncomingDataException("Found nested reliable packets");
+
+ if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
+ throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
+
+ u16 seqnum = readU16(&packetdata[1]);
+ bool is_future_packet = false;
+ bool is_old_packet = false;
+
+ /* packet is within our receive window send ack */
+ if (seqnum_in_window(seqnum,
+ channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
+ m_connection->sendAck(peer->id, channelnum, seqnum);
+ } else {
+ is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
+ is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
+
+ /* packet is not within receive window, don't send ack. *
+ * if this was a valid packet it's gonna be retransmitted */
+ if (is_future_packet)
+ throw ProcessedSilentlyException(
+ "Received packet newer then expected, not sending ack");
+
+ /* seems like our ack was lost, send another one for a old packet */
+ if (is_old_packet) {
+ LOG(dout_con << m_connection->getDesc()
+ << "RE-SENDING ACK: peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
+ m_connection->sendAck(peer->id, channelnum, seqnum);
+
+ // we already have this packet so this one was on wire at least
+ // the current timeout
+ // we don't know how long this packet was on wire don't do silly guessing
+ // dynamic_cast<UDPPeer*>(&peer)->
+ // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
+
+ throw ProcessedSilentlyException("Retransmitting ack for old packet");
+ }
+ }
+
+ if (seqnum != channel->readNextIncomingSeqNum()) {
+ Address peer_address;
+
+ // this is a reliable packet so we have a udp address for sure
+ peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
+ // This one comes later, buffer it.
+ // Actually we have to make a packet to buffer one.
+ // Well, we have all the ingredients, so just do it.
+ BufferedPacket packet = con::makePacket(
+ peer_address,
+ packetdata,
+ m_connection->GetProtocolID(),
+ peer->id,
+ channelnum);
+ try {
+ channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
+
+ LOG(dout_con << m_connection->getDesc()
+ << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
+
+ throw ProcessedQueued("Buffered future reliable packet");
+ } catch (AlreadyExistsException &e) {
+ } catch (IncomingDataCorruption &e) {
+ ConnectionCommand discon;
+ discon.disconnect_peer(peer->id);
+ m_connection->putCommand(discon);
+
+ LOG(derr_con << m_connection->getDesc()
+ << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum
+ << "DROPPING CLIENT!" << std::endl;)
+ }
+ }
+
+ /* we got a packet to process right now */
+ LOG(dout_con << m_connection->getDesc()
+ << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
+
+
+ /* check for resend case */
+ u16 queued_seqnum = 0;
+ if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
+ if (queued_seqnum == seqnum) {
+ BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
+ /** TODO find a way to verify the new against the old packet */
+ }
+ }
+
+ channel->incNextIncomingSeqNum();
+
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
+ memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
+
+ return processPacket(channel, payload, peer->id, channelnum, true);
+}
+
+}
diff --git a/src/network/connectionthreads.h b/src/network/connectionthreads.h
new file mode 100644
index 000000000..11bec0c6c
--- /dev/null
+++ b/src/network/connectionthreads.h
@@ -0,0 +1,148 @@
+/*
+Minetest
+Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
+Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#pragma once
+
+#include <cassert>
+#include "../threading/thread.h"
+#include "connection.h"
+
+namespace con
+{
+
+class Connection;
+
+class ConnectionSendThread : public Thread
+{
+
+public:
+ friend class UDPPeer;
+
+ ConnectionSendThread(unsigned int max_packet_size, float timeout);
+
+ void *run();
+
+ void Trigger();
+
+ void setParent(Connection *parent)
+ {
+ assert(parent != NULL); // Pre-condition
+ m_connection = parent;
+ }
+
+ void setPeerTimeout(float peer_timeout) { m_timeout = peer_timeout; }
+
+private:
+ void runTimeouts(float dtime);
+ void rawSend(const BufferedPacket &packet);
+ bool rawSendAsPacket(u16 peer_id, u8 channelnum, const SharedBuffer<u8> &data,
+ bool reliable);
+
+ void processReliableCommand(ConnectionCommand &c);
+ void processNonReliableCommand(ConnectionCommand &c);
+ void serve(Address bind_address);
+ void connect(Address address);
+ void disconnect();
+ void disconnect_peer(u16 peer_id);
+ void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data);
+ void sendReliable(ConnectionCommand &c);
+ void sendToAll(u8 channelnum, SharedBuffer<u8> data);
+ void sendToAllReliable(ConnectionCommand &c);
+
+ void sendPackets(float dtime);
+
+ void sendAsPacket(u16 peer_id, u8 channelnum, SharedBuffer<u8> data,
+ bool ack = false);
+
+ void sendAsPacketReliable(BufferedPacket &p, Channel *channel);
+
+ bool packetsQueued();
+
+ Connection *m_connection = nullptr;
+ unsigned int m_max_packet_size;
+ float m_timeout;
+ std::queue<OutgoingPacket> m_outgoing_queue;
+ Semaphore m_send_sleep_semaphore;
+
+ unsigned int m_iteration_packets_avaialble;
+ unsigned int m_max_commands_per_iteration = 1;
+ unsigned int m_max_data_packets_per_iteration;
+ unsigned int m_max_packets_requeued = 256;
+};
+
+class ConnectionReceiveThread : public Thread
+{
+public:
+ ConnectionReceiveThread(unsigned int max_packet_size);
+
+ void *run();
+
+ void setParent(Connection *parent)
+ {
+ assert(parent); // Pre-condition
+ m_connection = parent;
+ }
+
+private:
+ void receive();
+
+ // Returns next data from a buffer if possible
+ // If found, returns true; if not, false.
+ // If found, sets peer_id and dst
+ bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
+
+ bool checkIncomingBuffers(Channel *channel, u16 &peer_id, SharedBuffer<u8> &dst);
+
+ /*
+ Processes a packet with the basic header stripped out.
+ Parameters:
+ packetdata: Data in packet (with no base headers)
+ peer_id: peer id of the sender of the packet in question
+ channelnum: channel on which the packet was sent
+ reliable: true if recursing into a reliable packet
+ */
+ SharedBuffer<u8> processPacket(Channel *channel, SharedBuffer<u8> &packetdata,
+ u16 peer_id, u8 channelnum, bool reliable);
+
+ SharedBuffer<u8> handlePacketType_Control(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable);
+ SharedBuffer<u8> handlePacketType_Original(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable);
+ SharedBuffer<u8> handlePacketType_Split(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable);
+ SharedBuffer<u8> handlePacketType_Reliable(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum,
+ bool reliable);
+
+ struct PacketTypeHandler
+ {
+ SharedBuffer<u8> (ConnectionReceiveThread::*handler)(Channel *channel,
+ SharedBuffer<u8> &packet, Peer *peer, u8 channelnum,
+ bool reliable);
+ };
+
+ static const PacketTypeHandler packetTypeRouter[PACKET_TYPE_MAX];
+
+ Connection *m_connection = nullptr;
+};
+}
diff --git a/src/network/networkpacket.cpp b/src/network/networkpacket.cpp
index d9dc51695..c91100945 100644
--- a/src/network/networkpacket.cpp
+++ b/src/network/networkpacket.cpp
@@ -526,9 +526,9 @@ NetworkPacket& NetworkPacket::operator<<(video::SColor src)
return *this;
}
-Buffer<u8> NetworkPacket::oldForgePacket()
+SharedBuffer<u8> NetworkPacket::oldForgePacket()
{
- Buffer<u8> sb(m_datasize + 2);
+ SharedBuffer<u8> sb(m_datasize + 2);
writeU16(&sb[0], m_command);
u8* datas = getU8Ptr(0);
diff --git a/src/network/networkpacket.h b/src/network/networkpacket.h
index e3c52aab9..008271fab 100644
--- a/src/network/networkpacket.h
+++ b/src/network/networkpacket.h
@@ -118,7 +118,7 @@ public:
NetworkPacket &operator<<(video::SColor src);
// Temp, we remove SharedBuffer when migration finished
- Buffer<u8> oldForgePacket();
+ SharedBuffer<u8> oldForgePacket();
private:
void checkReadOffset(u32 from_offset, u32 field_size);
diff --git a/src/unittest/test_connection.cpp b/src/unittest/test_connection.cpp
index 5a2e32ca8..dcfc3b4e5 100644
--- a/src/unittest/test_connection.cpp
+++ b/src/unittest/test_connection.cpp
@@ -115,7 +115,7 @@ void TestConnection::testHelpers()
infostream<<"data1[0]="<<((u32)data1[0]&0xff)<<std::endl;*/
UASSERT(p2.getSize() == 3 + data1.getSize());
- UASSERT(readU8(&p2[0]) == TYPE_RELIABLE);
+ UASSERT(readU8(&p2[0]) == con::PACKET_TYPE_RELIABLE);
UASSERT(readU16(&p2[1]) == seqnum);
UASSERT(readU8(&p2[3]) == data1[0]);
}
@@ -290,13 +290,13 @@ void TestConnection::testConnectSendReceive()
infostream << "...";
infostream << std::endl;
- Buffer<u8> sentdata = pkt.oldForgePacket();
+ SharedBuffer<u8> sentdata = pkt.oldForgePacket();
server.Send(peer_id_client, 0, &pkt, true);
//sleep_ms(3000);
- Buffer<u8> recvdata;
+ SharedBuffer<u8> recvdata;
infostream << "** running client.Receive()" << std::endl;
u16 peer_id = 132;
u16 size = 0;
diff --git a/src/util/pointer.h b/src/util/pointer.h
index f018efb20..efca62d2d 100644
--- a/src/util/pointer.h
+++ b/src/util/pointer.h
@@ -104,7 +104,6 @@ private:
/************************************************
* !!! W A R N I N G !!! *
- * !!! A C H T U N G !!! *
* *
* This smart pointer class is NOT thread safe. *
* ONLY use in a single-threaded context! *
@@ -134,7 +133,6 @@ public:
}
SharedBuffer(const SharedBuffer &buffer)
{
- //std::cout<<"SharedBuffer(const SharedBuffer &buffer)"<<std::endl;
m_size = buffer.m_size;
data = buffer.data;
refcount = buffer.refcount;
@@ -142,7 +140,6 @@ public:
}
SharedBuffer & operator=(const SharedBuffer & buffer)
{
- //std::cout<<"SharedBuffer & operator=(const SharedBuffer & buffer)"<<std::endl;
if(this == &buffer)
return *this;
drop();
@@ -171,19 +168,6 @@ public:
/*
Copies whole buffer
*/
- SharedBuffer(const Buffer<T> &buffer)
- {
- m_size = buffer.getSize();
- if(m_size != 0)
- {
- data = new T[m_size];
- memcpy(data, *buffer, buffer.getSize());
- }
- else
- data = NULL;
- refcount = new unsigned int;
- (*refcount) = 1;
- }
~SharedBuffer()
{
drop();
@@ -220,9 +204,3 @@ private:
unsigned int m_size;
unsigned int *refcount;
};
-
-inline SharedBuffer<u8> SharedBufferFromString(const char *string)
-{
- SharedBuffer<u8> b((u8*)string, strlen(string)+1);
- return b;
-}