summaryrefslogtreecommitdiff
path: root/src/network/connectionthreads.cpp
diff options
context:
space:
mode:
authorLoïc Blot <nerzhul@users.noreply.github.com>2017-08-25 15:53:56 +0200
committerGitHub <noreply@github.com>2017-08-25 15:53:56 +0200
commit3cea7a349ac55df93b3eac1bf40365e00e472480 (patch)
treef1b45da38389e406c242d83f581f016f542b9300 /src/network/connectionthreads.cpp
parentf6a33a1a7a298cb7d3fb18818bae97bd1b89d633 (diff)
downloadminetest-3cea7a349ac55df93b3eac1bf40365e00e472480.tar.gz
minetest-3cea7a349ac55df93b3eac1bf40365e00e472480.tar.bz2
minetest-3cea7a349ac55df93b3eac1bf40365e00e472480.zip
Network cleanup (#6310)
* Move Connection threads to dedicated files + various cleanups * ConnectionReceiveThread::processPacket now uses function pointer table to route MT packet types * Various code style fixes * Code style with clang-format * Various SharedBuffer copy removal * SharedBuffer cannot be copied anymore using Buffer * Fix many SharedBuffer copy (thanks to delete operator)
Diffstat (limited to 'src/network/connectionthreads.cpp')
-rw-r--r--src/network/connectionthreads.cpp1404
1 files changed, 1404 insertions, 0 deletions
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp
new file mode 100644
index 000000000..b07acd469
--- /dev/null
+++ b/src/network/connectionthreads.cpp
@@ -0,0 +1,1404 @@
+/*
+Minetest
+Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
+Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#include "connectionthreads.h"
+#include "log.h"
+#include "profiler.h"
+#include "settings.h"
+#include "network/networkpacket.h"
+#include "util/serialize.h"
+
+namespace con
+{
+
+/******************************************************************************/
+/* defines used for debugging and profiling */
+/******************************************************************************/
+#ifdef NDEBUG
+#define LOG(a) a
+#define PROFILE(a)
+#undef DEBUG_CONNECTION_KBPS
+#else
+/* this mutex is used to achieve log message consistency */
+std::mutex log_conthread_mutex;
+#define LOG(a) \
+ { \
+ MutexAutoLock loglock(log_conthread_mutex); \
+ a; \
+ }
+#define PROFILE(a) a
+//#define DEBUG_CONNECTION_KBPS
+#undef DEBUG_CONNECTION_KBPS
+#endif
+
+/* maximum number of retries for reliable packets */
+#define MAX_RELIABLE_RETRY 5
+
+#define WINDOW_SIZE 5
+
+static u16 readPeerId(u8 *packetdata)
+{
+ return readU16(&packetdata[4]);
+}
+static u8 readChannel(u8 *packetdata)
+{
+ return readU8(&packetdata[6]);
+}
+
+/******************************************************************************/
+/* Connection Threads */
+/******************************************************************************/
+
+ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
+ float timeout) :
+ Thread("ConnectionSend"),
+ m_max_packet_size(max_packet_size),
+ m_timeout(timeout),
+ m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
+{
+}
+
+void *ConnectionSendThread::run()
+{
+ assert(m_connection);
+
+ LOG(dout_con << m_connection->getDesc()
+ << "ConnectionSend thread started" << std::endl);
+
+ u64 curtime = porting::getTimeMs();
+ u64 lasttime = curtime;
+
+ PROFILE(std::stringstream ThreadIdentifier);
+ PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
+
+ /* if stop is requested don't stop immediately but try to send all */
+ /* packets first */
+ while (!stopRequested() || packetsQueued()) {
+ BEGIN_DEBUG_EXCEPTION_HANDLER
+ PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
+
+ m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
+
+ /* wait for trigger or timeout */
+ m_send_sleep_semaphore.wait(50);
+
+ /* remove all triggers */
+ while (m_send_sleep_semaphore.wait(0)) {
+ }
+
+ lasttime = curtime;
+ curtime = porting::getTimeMs();
+ float dtime = CALC_DTIME(lasttime, curtime);
+
+ /* first do all the reliable stuff */
+ runTimeouts(dtime);
+
+ /* translate commands to packets */
+ ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
+ while (c.type != CONNCMD_NONE) {
+ if (c.reliable)
+ processReliableCommand(c);
+ else
+ processNonReliableCommand(c);
+
+ c = m_connection->m_command_queue.pop_frontNoEx(0);
+ }
+
+ /* send non reliable packets */
+ sendPackets(dtime);
+
+ END_DEBUG_EXCEPTION_HANDLER
+ }
+
+ PROFILE(g_profiler->remove(ThreadIdentifier.str()));
+ return NULL;
+}
+
+void ConnectionSendThread::Trigger()
+{
+ m_send_sleep_semaphore.post();
+}
+
+bool ConnectionSendThread::packetsQueued()
+{
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+
+ if (!m_outgoing_queue.empty() && !peerIds.empty())
+ return true;
+
+ for (u16 peerId : peerIds) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerId);
+
+ if (!peer)
+ continue;
+
+ if (dynamic_cast<UDPPeer *>(&peer) == 0)
+ continue;
+
+ for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+ if (!channel.queued_commands.empty()) {
+ return true;
+ }
+ }
+ }
+
+
+ return false;
+}
+
+void ConnectionSendThread::runTimeouts(float dtime)
+{
+ std::list<u16> timeouted_peers;
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+
+ for (u16 &peerId : peerIds) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerId);
+
+ if (!peer)
+ continue;
+
+ UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
+ if (!udpPeer)
+ continue;
+
+ PROFILE(std::stringstream peerIdentifier);
+ PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
+ << ";" << peerId << ";RELIABLE]");
+ PROFILE(ScopeProfiler
+ peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
+
+ SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
+
+ /*
+ Check peer timeout
+ */
+ if (peer->isTimedOut(m_timeout)) {
+ infostream << m_connection->getDesc()
+ << "RunTimeouts(): Peer " << peer->id
+ << " has timed out."
+ << " (source=peer->timeout_counter)"
+ << std::endl;
+ // Add peer to the list
+ timeouted_peers.push_back(peer->id);
+ // Don't bother going through the buffers of this one
+ continue;
+ }
+
+ float resend_timeout = udpPeer->getResendTimeout();
+ bool retry_count_exceeded = false;
+ for (Channel &channel : udpPeer->channels) {
+ std::list<BufferedPacket> timed_outs;
+
+ if (udpPeer->getLegacyPeer())
+ channel.setWindowSize(WINDOW_SIZE);
+
+ // Remove timed out incomplete unreliable split packets
+ channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
+
+ // Increment reliable packet times
+ channel.outgoing_reliables_sent.incrementTimeouts(dtime);
+
+ unsigned int numpeers = m_connection->m_peers.size();
+
+ if (numpeers == 0)
+ return;
+
+ // Re-send timed out outgoing reliables
+ timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
+ (m_max_data_packets_per_iteration / numpeers));
+
+ channel.UpdatePacketLossCounter(timed_outs.size());
+ g_profiler->graphAdd("packets_lost", timed_outs.size());
+
+ m_iteration_packets_avaialble -= timed_outs.size();
+
+ for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
+ k != timed_outs.end(); ++k) {
+ u16 peer_id = readPeerId(*(k->data));
+ u8 channelnum = readChannel(*(k->data));
+ u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
+
+ channel.UpdateBytesLost(k->data.getSize());
+ k->resend_count++;
+
+ if (k->resend_count > MAX_RELIABLE_RETRY) {
+ retry_count_exceeded = true;
+ timeouted_peers.push_back(peer->id);
+ /* no need to check additional packets if a single one did timeout*/
+ break;
+ }
+
+ LOG(derr_con << m_connection->getDesc()
+ << "RE-SENDING timed-out RELIABLE to "
+ << k->address.serializeString()
+ << "(t/o=" << resend_timeout << "): "
+ << "from_peer_id=" << peer_id
+ << ", channel=" << ((int) channelnum & 0xff)
+ << ", seqnum=" << seqnum
+ << std::endl);
+
+ rawSend(*k);
+
+ // do not handle rtt here as we can't decide if this packet was
+ // lost or really takes more time to transmit
+ }
+
+ if (retry_count_exceeded) {
+ break; /* no need to check other channels if we already did timeout */
+ }
+
+ channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
+ }
+
+ /* skip to next peer if we did timeout */
+ if (retry_count_exceeded)
+ continue;
+
+ /* send ping if necessary */
+ if (udpPeer->Ping(dtime, data)) {
+ LOG(dout_con << m_connection->getDesc()
+ << "Sending ping for peer_id: " << udpPeer->id << std::endl);
+ /* this may fail if there ain't a sequence number left */
+ if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
+ //retrigger with reduced ping interval
+ udpPeer->Ping(4.0, data);
+ }
+ }
+
+ udpPeer->RunCommandQueues(m_max_packet_size,
+ m_max_commands_per_iteration,
+ m_max_packets_requeued);
+ }
+
+ // Remove timed out peers
+ for (u16 timeouted_peer : timeouted_peers) {
+ LOG(derr_con << m_connection->getDesc()
+ << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
+ m_connection->deletePeer(timeouted_peer, true);
+ }
+}
+
+void ConnectionSendThread::rawSend(const BufferedPacket &packet)
+{
+ try {
+ m_connection->m_udpSocket.Send(packet.address, *packet.data,
+ packet.data.getSize());
+ LOG(dout_con << m_connection->getDesc()
+ << " rawSend: " << packet.data.getSize()
+ << " bytes sent" << std::endl);
+ } catch (SendFailedException &e) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Connection::rawSend(): SendFailedException: "
+ << packet.address.serializeString() << std::endl);
+ }
+}
+
+void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
+{
+ try {
+ p.absolute_send_time = porting::getTimeMs();
+ // Buffer the packet
+ channel->outgoing_reliables_sent.insert(p,
+ (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
+ % (MAX_RELIABLE_WINDOW_SIZE + 1));
+ }
+ catch (AlreadyExistsException &e) {
+ LOG(derr_con << m_connection->getDesc()
+ << "WARNING: Going to send a reliable packet"
+ << " in outgoing buffer" << std::endl);
+ }
+
+ // Send the packet
+ rawSend(p);
+}
+
+bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum,
+ const SharedBuffer<u8> &data, bool reliable)
+{
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: dropped packet for non existent peer_id: "
+ << peer_id << std::endl);
+ FATAL_ERROR_IF(!reliable,
+ "Trying to send raw packet reliable but no peer found!");
+ return false;
+ }
+ Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
+
+ if (reliable) {
+ bool have_sequence_number_for_raw_packet = true;
+ u16 seqnum =
+ channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
+
+ if (!have_sequence_number_for_raw_packet)
+ return false;
+
+ SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
+ Address peer_address;
+ peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
+
+ // Add base headers and make a packet
+ BufferedPacket p = con::makePacket(peer_address, reliable,
+ m_connection->GetProtocolID(), m_connection->GetPeerID(),
+ channelnum);
+
+ // first check if our send window is already maxed out
+ if (channel->outgoing_reliables_sent.size()
+ < channel->getWindowSize()) {
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: sending a reliable packet to peer_id " << peer_id
+ << " channel: " << channelnum
+ << " seqnum: " << seqnum << std::endl);
+ sendAsPacketReliable(p, channel);
+ return true;
+ }
+
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: queueing reliable packet for peer_id: " << peer_id
+ << " channel: " << channelnum
+ << " seqnum: " << seqnum << std::endl);
+ channel->queued_reliables.push(p);
+ return false;
+ }
+
+ Address peer_address;
+ if (peer->getAddress(MTP_UDP, peer_address)) {
+ // Add base headers and make a packet
+ BufferedPacket p = con::makePacket(peer_address, data,
+ m_connection->GetProtocolID(), m_connection->GetPeerID(),
+ channelnum);
+
+ // Send the packet
+ rawSend(p);
+ return true;
+ }
+
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: dropped unreliable packet for peer_id: " << peer_id
+ << " because of (yet) missing udp address" << std::endl);
+ return false;
+}
+
+void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
+{
+ assert(c.reliable); // Pre-condition
+
+ switch (c.type) {
+ case CONNCMD_NONE:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONNCMD_NONE" << std::endl);
+ return;
+
+ case CONNCMD_SEND:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONNCMD_SEND" << std::endl);
+ sendReliable(c);
+ return;
+
+ case CONNCMD_SEND_TO_ALL:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
+ sendToAllReliable(c);
+ return;
+
+ case CONCMD_CREATE_PEER:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
+ if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ /* put to queue if we couldn't send it immediately */
+ sendReliable(c);
+ }
+ return;
+
+ case CONCMD_DISABLE_LEGACY:
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
+ if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ /* put to queue if we couldn't send it immediately */
+ sendReliable(c);
+ }
+ return;
+
+ case CONNCMD_SERVE:
+ case CONNCMD_CONNECT:
+ case CONNCMD_DISCONNECT:
+ case CONCMD_ACK:
+ FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
+ default:
+ LOG(dout_con << m_connection->getDesc()
+ << " Invalid reliable command type: " << c.type << std::endl);
+ }
+}
+
+
+void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
+{
+ assert(!c.reliable); // Pre-condition
+
+ switch (c.type) {
+ case CONNCMD_NONE:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_NONE" << std::endl);
+ return;
+ case CONNCMD_SERVE:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SERVE port="
+ << c.address.serializeString() << std::endl);
+ serve(c.address);
+ return;
+ case CONNCMD_CONNECT:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_CONNECT" << std::endl);
+ connect(c.address);
+ return;
+ case CONNCMD_DISCONNECT:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_DISCONNECT" << std::endl);
+ disconnect();
+ return;
+ case CONNCMD_DISCONNECT_PEER:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
+ disconnect_peer(c.peer_id);
+ return;
+ case CONNCMD_SEND:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SEND" << std::endl);
+ send(c.peer_id, c.channelnum, c.data);
+ return;
+ case CONNCMD_SEND_TO_ALL:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
+ sendToAll(c.channelnum, c.data);
+ return;
+ case CONCMD_ACK:
+ LOG(dout_con << m_connection->getDesc()
+ << " UDP processing CONCMD_ACK" << std::endl);
+ sendAsPacket(c.peer_id, c.channelnum, c.data, true);
+ return;
+ case CONCMD_CREATE_PEER:
+ FATAL_ERROR("Got command that should be reliable as unreliable command");
+ default:
+ LOG(dout_con << m_connection->getDesc()
+ << " Invalid command type: " << c.type << std::endl);
+ }
+}
+
+void ConnectionSendThread::serve(Address bind_address)
+{
+ LOG(dout_con << m_connection->getDesc()
+ << "UDP serving at port " << bind_address.serializeString() << std::endl);
+ try {
+ m_connection->m_udpSocket.Bind(bind_address);
+ m_connection->SetPeerID(PEER_ID_SERVER);
+ }
+ catch (SocketException &e) {
+ // Create event
+ ConnectionEvent ce;
+ ce.bindFailed();
+ m_connection->putEvent(ce);
+ }
+}
+
+void ConnectionSendThread::connect(Address address)
+{
+ LOG(dout_con << m_connection->getDesc() << " connecting to "
+ << address.serializeString()
+ << ":" << address.getPort() << std::endl);
+
+ UDPPeer *peer = m_connection->createServerPeer(address);
+
+ // Create event
+ ConnectionEvent e;
+ e.peerAdded(peer->id, peer->address);
+ m_connection->putEvent(e);
+
+ Address bind_addr;
+
+ if (address.isIPv6())
+ bind_addr.setAddress((IPv6AddressBytes *) NULL);
+ else
+ bind_addr.setAddress(0, 0, 0, 0);
+
+ m_connection->m_udpSocket.Bind(bind_addr);
+
+ // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
+ m_connection->SetPeerID(PEER_ID_INEXISTENT);
+ NetworkPacket pkt(0, 0);
+ m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
+}
+
+void ConnectionSendThread::disconnect()
+{
+ LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
+
+ // Create and send DISCO packet
+ SharedBuffer<u8> data(2);
+ writeU8(&data[0], PACKET_TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_DISCO);
+
+
+ // Send to all
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ sendAsPacket(peerid, 0, data, false);
+ }
+}
+
+void ConnectionSendThread::disconnect_peer(u16 peer_id)
+{
+ LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
+
+ // Create and send DISCO packet
+ SharedBuffer<u8> data(2);
+ writeU8(&data[0], PACKET_TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_DISCO);
+ sendAsPacket(peer_id, 0, data, false);
+
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer)
+ return;
+
+ if (dynamic_cast<UDPPeer *>(&peer) == 0) {
+ return;
+ }
+
+ dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
+}
+
+void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data)
+{
+ assert(channelnum < CHANNEL_COUNT); // Pre-condition
+
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
+ << ">>>NOT<<< found on sending packet"
+ << ", channel " << (channelnum % 0xFF)
+ << ", size: " << data.getSize() << std::endl);
+ return;
+ }
+
+ LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
+ << ", channel " << (channelnum % 0xFF)
+ << ", size: " << data.getSize() << std::endl);
+
+ u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
+
+ u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
+ std::list<SharedBuffer<u8>> originals;
+
+ makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
+
+ peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
+
+ for (const SharedBuffer<u8> &original : originals) {
+ sendAsPacket(peer_id, channelnum, original);
+ }
+}
+
+void ConnectionSendThread::sendReliable(ConnectionCommand &c)
+{
+ PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
+ if (!peer)
+ return;
+
+ peer->PutReliableSendCommand(c, m_max_packet_size);
+}
+
+void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
+{
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ send(peerid, channelnum, data);
+ }
+}
+
+void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
+{
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerid);
+
+ if (!peer)
+ continue;
+
+ peer->PutReliableSendCommand(c, m_max_packet_size);
+ }
+}
+
+void ConnectionSendThread::sendPackets(float dtime)
+{
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+ std::list<u16> pendingDisconnect;
+ std::map<u16, bool> pending_unreliable;
+
+ for (u16 peerId : peerIds) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerId);
+ //peer may have been removed
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
+ << peerId
+ << std::endl);
+ continue;
+ }
+ peer->m_increment_packets_remaining =
+ m_iteration_packets_avaialble / m_connection->m_peers.size();
+
+ UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
+
+ if (!udpPeer) {
+ continue;
+ }
+
+ if (udpPeer->m_pending_disconnect) {
+ pendingDisconnect.push_back(peerId);
+ }
+
+ PROFILE(std::stringstream
+ peerIdentifier);
+ PROFILE(
+ peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
+ << ";RELIABLE]");
+ PROFILE(ScopeProfiler
+ peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
+
+ LOG(dout_con << m_connection->getDesc()
+ << " Handle per peer queues: peer_id=" << peerId
+ << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
+
+ // first send queued reliable packets for all peers (if possible)
+ for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
+ Channel &channel = udpPeer->channels[i];
+ u16 next_to_ack = 0;
+
+ channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
+ u16 next_to_receive = 0;
+ channel.incoming_reliables.getFirstSeqnum(next_to_receive);
+
+ LOG(dout_con << m_connection->getDesc() << "\t channel: "
+ << i << ", peer quota:"
+ << peer->m_increment_packets_remaining
+ << std::endl
+ << "\t\t\treliables on wire: "
+ << channel.outgoing_reliables_sent.size()
+ << ", waiting for ack for " << next_to_ack
+ << std::endl
+ << "\t\t\tincoming_reliables: "
+ << channel.incoming_reliables.size()
+ << ", next reliable packet: "
+ << channel.readNextIncomingSeqNum()
+ << ", next queued: " << next_to_receive
+ << std::endl
+ << "\t\t\treliables queued : "
+ << channel.queued_reliables.size()
+ << std::endl
+ << "\t\t\tqueued commands : "
+ << channel.queued_commands.size()
+ << std::endl);
+
+ while (!channel.queued_reliables.empty() &&
+ channel.outgoing_reliables_sent.size()
+ < channel.getWindowSize() &&
+ peer->m_increment_packets_remaining > 0) {
+ BufferedPacket p = channel.queued_reliables.front();
+ channel.queued_reliables.pop();
+ LOG(dout_con << m_connection->getDesc()
+ << " INFO: sending a queued reliable packet "
+ << " channel: " << i
+ << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
+ << std::endl);
+ sendAsPacketReliable(p, &channel);
+ peer->m_increment_packets_remaining--;
+ }
+ }
+ }
+
+ if (!m_outgoing_queue.empty()) {
+ LOG(dout_con << m_connection->getDesc()
+ << " Handle non reliable queue ("
+ << m_outgoing_queue.size() << " pkts)" << std::endl);
+ }
+
+ unsigned int initial_queuesize = m_outgoing_queue.size();
+ /* send non reliable packets*/
+ for (unsigned int i = 0; i < initial_queuesize; i++) {
+ OutgoingPacket packet = m_outgoing_queue.front();
+ m_outgoing_queue.pop();
+
+ if (packet.reliable)
+ continue;
+
+ PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc()
+ << " Outgoing queue: peer_id=" << packet.peer_id
+ << ">>>NOT<<< found on sending packet"
+ << ", channel " << (packet.channelnum % 0xFF)
+ << ", size: " << packet.data.getSize() << std::endl);
+ continue;
+ }
+
+ /* send acks immediately */
+ if (packet.ack) {
+ rawSendAsPacket(packet.peer_id, packet.channelnum,
+ packet.data, packet.reliable);
+ peer->m_increment_packets_remaining =
+ MYMIN(0, peer->m_increment_packets_remaining--);
+ } else if (
+ (peer->m_increment_packets_remaining > 0) ||
+ (stopRequested())) {
+ rawSendAsPacket(packet.peer_id, packet.channelnum,
+ packet.data, packet.reliable);
+ peer->m_increment_packets_remaining--;
+ } else {
+ m_outgoing_queue.push(packet);
+ pending_unreliable[packet.peer_id] = true;
+ }
+ }
+
+ for (u16 peerId : pendingDisconnect) {
+ if (!pending_unreliable[peerId]) {
+ m_connection->deletePeer(peerId, false);
+ }
+ }
+}
+
+void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool ack)
+{
+ OutgoingPacket packet(peer_id, channelnum, data, false, ack);
+ m_outgoing_queue.push(packet);
+}
+
+ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
+ Thread("ConnectionReceive")
+{
+}
+
+void *ConnectionReceiveThread::run()
+{
+ assert(m_connection);
+
+ LOG(dout_con << m_connection->getDesc()
+ << "ConnectionReceive thread started" << std::endl);
+
+ PROFILE(std::stringstream
+ ThreadIdentifier);
+ PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
+
+#ifdef DEBUG_CONNECTION_KBPS
+ u64 curtime = porting::getTimeMs();
+ u64 lasttime = curtime;
+ float debug_print_timer = 0.0;
+#endif
+
+ while (!stopRequested()) {
+ BEGIN_DEBUG_EXCEPTION_HANDLER
+ PROFILE(ScopeProfiler
+ sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
+
+#ifdef DEBUG_CONNECTION_KBPS
+ lasttime = curtime;
+ curtime = porting::getTimeMs();
+ float dtime = CALC_DTIME(lasttime,curtime);
+#endif
+
+ /* receive packets */
+ receive();
+
+#ifdef DEBUG_CONNECTION_KBPS
+ debug_print_timer += dtime;
+ if (debug_print_timer > 20.0) {
+ debug_print_timer -= 20.0;
+
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (std::list<u16>::iterator i = peerids.begin();
+ i != peerids.end();
+ i++)
+ {
+ PeerHelper peer = m_connection->getPeerNoEx(*i);
+ if (!peer)
+ continue;
+
+ float peer_current = 0.0;
+ float peer_loss = 0.0;
+ float avg_rate = 0.0;
+ float avg_loss = 0.0;
+
+ for(u16 j=0; j<CHANNEL_COUNT; j++)
+ {
+ peer_current +=peer->channels[j].getCurrentDownloadRateKB();
+ peer_loss += peer->channels[j].getCurrentLossRateKB();
+ avg_rate += peer->channels[j].getAvgDownloadRateKB();
+ avg_loss += peer->channels[j].getAvgLossRateKB();
+ }
+
+ std::stringstream output;
+ output << std::fixed << std::setprecision(1);
+ output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
+ output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
+ output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
+ output << std::setfill(' ');
+ for(u16 j=0; j<CHANNEL_COUNT; j++)
+ {
+ output << "\tcha " << j << ":"
+ << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
+ << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
+ << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
+ << " /"
+ << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
+ << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
+ << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
+ << " / WS: " << peer->channels[j].getWindowSize()
+ << std::endl;
+ }
+
+ fprintf(stderr,"%s\n",output.str().c_str());
+ }
+ }
+#endif
+ END_DEBUG_EXCEPTION_HANDLER
+ }
+
+ PROFILE(g_profiler->remove(ThreadIdentifier.str()));
+ return NULL;
+}
+
+// Receive packets from the network and buffers and create ConnectionEvents
+void ConnectionReceiveThread::receive()
+{
+ // use IPv6 minimum allowed MTU as receive buffer size as this is
+ // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
+ // infrastructure
+ unsigned int packet_maxsize = 1500;
+ SharedBuffer<u8> packetdata(packet_maxsize);
+
+ bool packet_queued = true;
+
+ unsigned int loop_count = 0;
+
+ /* first of all read packets from socket */
+ /* check for incoming data available */
+ while ((loop_count < 10) &&
+ (m_connection->m_udpSocket.WaitData(50))) {
+ loop_count++;
+ try {
+ if (packet_queued) {
+ bool data_left = true;
+ u16 peer_id;
+ SharedBuffer<u8> resultdata;
+ while (data_left) {
+ try {
+ data_left = getFromBuffers(peer_id, resultdata);
+ if (data_left) {
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ m_connection->putEvent(e);
+ }
+ }
+ catch (ProcessedSilentlyException &e) {
+ /* try reading again */
+ }
+ }
+ packet_queued = false;
+ }
+
+ Address sender;
+ s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
+ packet_maxsize);
+
+ if ((received_size < BASE_HEADER_SIZE) ||
+ (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Receive(): Invalid incoming packet, "
+ << "size: " << received_size
+ << ", protocol: "
+ << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
+ << std::endl);
+ continue;
+ }
+
+ u16 peer_id = readPeerId(*packetdata);
+ u8 channelnum = readChannel(*packetdata);
+
+ if (channelnum > CHANNEL_COUNT - 1) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Receive(): Invalid channel " << channelnum << std::endl);
+ throw InvalidIncomingDataException("Channel doesn't exist");
+ }
+
+ /* Try to identify peer by sender address (may happen on join) */
+ if (peer_id == PEER_ID_INEXISTENT) {
+ peer_id = m_connection->lookupPeer(sender);
+ // We do not have to remind the peer of its
+ // peer id as the CONTROLTYPE_SET_PEER_ID
+ // command was sent reliably.
+ }
+
+ /* The peer was not found in our lists. Add it. */
+ if (peer_id == PEER_ID_INEXISTENT) {
+ peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
+ }
+
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc()
+ << " got packet from unknown peer_id: "
+ << peer_id << " Ignoring." << std::endl);
+ continue;
+ }
+
+ // Validate peer address
+
+ Address peer_address;
+
+ if (peer->getAddress(MTP_UDP, peer_address)) {
+ if (peer_address != sender) {
+ LOG(derr_con << m_connection->getDesc()
+ << m_connection->getDesc()
+ << " Peer " << peer_id << " sending from different address."
+ " Ignoring." << std::endl);
+ continue;
+ }
+ } else {
+
+ bool invalid_address = true;
+ if (invalid_address) {
+ LOG(derr_con << m_connection->getDesc()
+ << m_connection->getDesc()
+ << " Peer " << peer_id << " unknown."
+ " Ignoring." << std::endl);
+ continue;
+ }
+ }
+
+ peer->ResetTimeout();
+
+ Channel *channel = 0;
+
+ if (dynamic_cast<UDPPeer *>(&peer) != 0) {
+ channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
+ }
+
+ if (channel != 0) {
+ channel->UpdateBytesReceived(received_size);
+ }
+
+ // Throw the received packet to channel->processPacket()
+
+ // Make a new SharedBuffer from the data without the base headers
+ SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
+ memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
+ strippeddata.getSize());
+
+ try {
+ // Process it (the result is some data with no headers made by us)
+ SharedBuffer<u8> resultdata = processPacket
+ (channel, strippeddata, peer_id, channelnum, false);
+
+ LOG(dout_con << m_connection->getDesc()
+ << " ProcessPacket from peer_id: " << peer_id
+ << ",channel: " << (channelnum & 0xFF) << ", returned "
+ << resultdata.getSize() << " bytes" << std::endl);
+
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ m_connection->putEvent(e);
+ }
+ catch (ProcessedSilentlyException &e) {
+ }
+ catch (ProcessedQueued &e) {
+ packet_queued = true;
+ }
+ }
+ catch (InvalidIncomingDataException &e) {
+ }
+ catch (ProcessedSilentlyException &e) {
+ }
+ }
+}
+
+bool ConnectionReceiveThread::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
+{
+ std::list<u16> peerids = m_connection->getPeerIDs();
+
+ for (u16 peerid : peerids) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerid);
+ if (!peer)
+ continue;
+
+ if (dynamic_cast<UDPPeer *>(&peer) == 0)
+ continue;
+
+ for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+ if (checkIncomingBuffers(&channel, peer_id, dst)) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
+ u16 &peer_id, SharedBuffer<u8> &dst)
+{
+ u16 firstseqnum = 0;
+ if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
+ if (firstseqnum == channel->readNextIncomingSeqNum()) {
+ BufferedPacket p = channel->incoming_reliables.popFirst();
+ peer_id = readPeerId(*p.data);
+ u8 channelnum = readChannel(*p.data);
+ u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+
+ LOG(dout_con << m_connection->getDesc()
+ << "UNBUFFERING TYPE_RELIABLE"
+ << " seqnum=" << seqnum
+ << " peer_id=" << peer_id
+ << " channel=" << ((int) channelnum & 0xff)
+ << std::endl);
+
+ channel->incNextIncomingSeqNum();
+
+ u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(p.data.getSize() - headers_size);
+ memcpy(*payload, &p.data[headers_size], payload.getSize());
+
+ dst = processPacket(channel, payload, peer_id, channelnum, true);
+ return true;
+ }
+ }
+ return false;
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
+ SharedBuffer<u8> &packetdata, u16 peer_id, u8 channelnum, bool reliable)
+{
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer) {
+ errorstream << "Peer not found (possible timeout)" << std::endl;
+ throw ProcessedSilentlyException("Peer not found (possible timeout)");
+ }
+
+ if (packetdata.getSize() < 1)
+ throw InvalidIncomingDataException("packetdata.getSize() < 1");
+
+ u8 type = readU8(&(packetdata[0]));
+
+ if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
+ std::string errmsg = "Invalid peer_id=" + itos(peer_id);
+ errorstream << errmsg << std::endl;
+ throw InvalidIncomingDataException(errmsg.c_str());
+ }
+
+ if (type >= PACKET_TYPE_MAX) {
+ derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
+ << std::endl;
+ throw InvalidIncomingDataException("Invalid packet type");
+ }
+
+ const PacketTypeHandler &pHandle = packetTypeRouter[type];
+ return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
+}
+
+const ConnectionReceiveThread::PacketTypeHandler
+ ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
+ {&ConnectionReceiveThread::handlePacketType_Control},
+ {&ConnectionReceiveThread::handlePacketType_Original},
+ {&ConnectionReceiveThread::handlePacketType_Split},
+ {&ConnectionReceiveThread::handlePacketType_Reliable},
+};
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ if (packetdata.getSize() < 2)
+ throw InvalidIncomingDataException("packetdata.getSize() < 2");
+
+ u8 controltype = readU8(&(packetdata[1]));
+
+ if (controltype == CONTROLTYPE_ACK) {
+ assert(channel != NULL);
+
+ if (packetdata.getSize() < 4) {
+ throw InvalidIncomingDataException(
+ "packetdata.getSize() < 4 (ACK header size)");
+ }
+
+ u16 seqnum = readU16(&packetdata[2]);
+ LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
+ << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
+ << seqnum << " ]" << std::endl);
+
+ try {
+ BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
+
+ // only calculate rtt from straight sent packets
+ if (p.resend_count == 0) {
+ // Get round trip time
+ u64 current_time = porting::getTimeMs();
+
+ // a overflow is quite unlikely but as it'd result in major
+ // rtt miscalculation we handle it here
+ if (current_time > p.absolute_send_time) {
+ float rtt = (current_time - p.absolute_send_time) / 1000.0;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
+ } else if (p.totaltime > 0) {
+ float rtt = p.totaltime;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
+ }
+ }
+ // put bytes for max bandwidth calculation
+ channel->UpdateBytesSent(p.data.getSize(), 1);
+ if (channel->outgoing_reliables_sent.size() == 0)
+ m_connection->TriggerSend();
+ } catch (NotFoundException &e) {
+ LOG(derr_con << m_connection->getDesc()
+ << "WARNING: ACKed packet not in outgoing queue" << std::endl);
+ channel->UpdatePacketTooLateCounter();
+ }
+
+ throw ProcessedSilentlyException("Got an ACK");
+ } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
+ // Got a packet to set our peer id
+ if (packetdata.getSize() < 4)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
+ u16 peer_id_new = readU16(&packetdata[2]);
+ LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
+ << "... " << std::endl);
+
+ if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
+ LOG(derr_con << m_connection->getDesc()
+ << "WARNING: Not changing existing peer id." << std::endl);
+ } else {
+ LOG(dout_con << m_connection->getDesc() << "changing own peer id"
+ << std::endl);
+ m_connection->SetPeerID(peer_id_new);
+ }
+
+ ConnectionCommand cmd;
+
+ SharedBuffer<u8> reply(2);
+ writeU8(&reply[0], PACKET_TYPE_CONTROL);
+ writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
+ cmd.disableLegacy(PEER_ID_SERVER, reply);
+ m_connection->putCommand(cmd);
+
+ throw ProcessedSilentlyException("Got a SET_PEER_ID");
+ } else if (controltype == CONTROLTYPE_PING) {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
+ throw ProcessedSilentlyException("Got a PING");
+ } else if (controltype == CONTROLTYPE_DISCO) {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
+ << peer->id << std::endl);
+
+ if (!m_connection->deletePeer(peer->id, false)) {
+ derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
+ }
+
+ throw ProcessedSilentlyException("Got a DISCO");
+ } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
+ dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
+ throw ProcessedSilentlyException("Got non legacy control");
+ } else {
+ LOG(derr_con << m_connection->getDesc()
+ << "INVALID TYPE_CONTROL: invalid controltype="
+ << ((int) controltype & 0xff) << std::endl);
+ throw InvalidIncomingDataException("Invalid control type");
+ }
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
+ LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
+ << std::endl);
+ // Get the inside packet out and return it
+ SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
+ memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
+ return payload;
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ Address peer_address;
+
+ if (peer->getAddress(MTP_UDP, peer_address)) {
+ // We have to create a packet again for buffering
+ // This isn't actually too bad an idea.
+ BufferedPacket packet = makePacket(peer_address,
+ packetdata,
+ m_connection->GetProtocolID(),
+ peer->id,
+ channelnum);
+
+ // Buffer the packet
+ SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
+
+ if (data.getSize() != 0) {
+ LOG(dout_con << m_connection->getDesc()
+ << "RETURNING TYPE_SPLIT: Constructed full data, "
+ << "size=" << data.getSize() << std::endl);
+ return data;
+ }
+ LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
+ throw ProcessedSilentlyException("Buffered a split packet chunk");
+ }
+
+ // We should never get here.
+ FATAL_ERROR("Invalid execution point");
+}
+
+SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
+ SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
+{
+ assert(channel != NULL);
+
+ // Recursive reliable packets not allowed
+ if (reliable)
+ throw InvalidIncomingDataException("Found nested reliable packets");
+
+ if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
+ throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
+
+ u16 seqnum = readU16(&packetdata[1]);
+ bool is_future_packet = false;
+ bool is_old_packet = false;
+
+ /* packet is within our receive window send ack */
+ if (seqnum_in_window(seqnum,
+ channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
+ m_connection->sendAck(peer->id, channelnum, seqnum);
+ } else {
+ is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
+ is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
+
+ /* packet is not within receive window, don't send ack. *
+ * if this was a valid packet it's gonna be retransmitted */
+ if (is_future_packet)
+ throw ProcessedSilentlyException(
+ "Received packet newer then expected, not sending ack");
+
+ /* seems like our ack was lost, send another one for a old packet */
+ if (is_old_packet) {
+ LOG(dout_con << m_connection->getDesc()
+ << "RE-SENDING ACK: peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
+ m_connection->sendAck(peer->id, channelnum, seqnum);
+
+ // we already have this packet so this one was on wire at least
+ // the current timeout
+ // we don't know how long this packet was on wire don't do silly guessing
+ // dynamic_cast<UDPPeer*>(&peer)->
+ // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
+
+ throw ProcessedSilentlyException("Retransmitting ack for old packet");
+ }
+ }
+
+ if (seqnum != channel->readNextIncomingSeqNum()) {
+ Address peer_address;
+
+ // this is a reliable packet so we have a udp address for sure
+ peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
+ // This one comes later, buffer it.
+ // Actually we have to make a packet to buffer one.
+ // Well, we have all the ingredients, so just do it.
+ BufferedPacket packet = con::makePacket(
+ peer_address,
+ packetdata,
+ m_connection->GetProtocolID(),
+ peer->id,
+ channelnum);
+ try {
+ channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
+
+ LOG(dout_con << m_connection->getDesc()
+ << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
+
+ throw ProcessedQueued("Buffered future reliable packet");
+ } catch (AlreadyExistsException &e) {
+ } catch (IncomingDataCorruption &e) {
+ ConnectionCommand discon;
+ discon.disconnect_peer(peer->id);
+ m_connection->putCommand(discon);
+
+ LOG(derr_con << m_connection->getDesc()
+ << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum
+ << "DROPPING CLIENT!" << std::endl;)
+ }
+ }
+
+ /* we got a packet to process right now */
+ LOG(dout_con << m_connection->getDesc()
+ << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
+ << ", channel: " << (channelnum & 0xFF)
+ << ", seqnum: " << seqnum << std::endl;)
+
+
+ /* check for resend case */
+ u16 queued_seqnum = 0;
+ if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
+ if (queued_seqnum == seqnum) {
+ BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
+ /** TODO find a way to verify the new against the old packet */
+ }
+ }
+
+ channel->incNextIncomingSeqNum();
+
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
+ memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
+
+ return processPacket(channel, payload, peer->id, channelnum, true);
+}
+
+}