/*
Minetest
Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "connectionthreads.h"
#include "log.h"
#include "profiler.h"
#include "settings.h"
#include "network/networkpacket.h"
#include "util/serialize.h"
namespace con
{
/******************************************************************************/
/* defines used for debugging and profiling */
/******************************************************************************/
#ifdef NDEBUG
#define LOG(a) a
#define PROFILE(a)
#undef DEBUG_CONNECTION_KBPS
#else
/* this mutex is used to achieve log message consistency */
std::mutex log_conthread_mutex;
#define LOG(a) \
{ \
MutexAutoLock loglock(log_conthread_mutex); \
a; \
}
#define PROFILE(a) a
//#define DEBUG_CONNECTION_KBPS
#undef DEBUG_CONNECTION_KBPS
#endif
/* maximum number of retries for reliable packets */
#define MAX_RELIABLE_RETRY 5
#define WINDOW_SIZE 5
static session_t readPeerId(u8 *packetdata)
{
return readU16(&packetdata[4]);
}
static u8 readChannel(u8 *packetdata)
{
return readU8(&packetdata[6]);
}
/******************************************************************************/
/* Connection Threads */
/******************************************************************************/
ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
float timeout) :
Thread("ConnectionSend"),
m_max_packet_size(max_packet_size),
m_timeout(timeout),
m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
{
}
void *ConnectionSendThread::run()
{
assert(m_connection);
LOG(dout_con << m_connection->getDesc()
<< "ConnectionSend thread started" << std::endl);
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
PROFILE(std::stringstream ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
/* if stop is requested don't stop immediately but try to send all */
/* packets first */
while (!stopRequested() || packetsQueued()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
/* wait for trigger or timeout */
m_send_sleep_semaphore.wait(50);
/* remove all triggers */
while (m_send_sleep_semaphore.wait(0)) {
}
lasttime = curtime;
curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime, curtime);
/* first do all the reliable stuff */
runTimeouts(dtime);
/* translate commands to packets */
ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
while (c.type != CONNCMD_NONE) {
if (c.reliable)
processReliableCommand(c);
else
processNonReliableCommand(c);
c = m_connection->m_command_queue.pop_frontNoEx(0);
}
/* send non reliable packets */
sendPackets(dtime);
END_DEBUG_EXCEPTION_HANDLER
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
}
void ConnectionSendThread::Trigger()
{
m_send_sleep_semaphore.post();
}
bool ConnectionSendThread::packetsQueued()
{
std::list<session_t> peerIds = m_connection->getPeerIDs();
if (!m_outgoing_queue.empty() && !peerIds.empty())
return true;
for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer)
continue;
if (dynamic_cast<UDPPeer *>(&peer) == 0)
continue;
for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
if (!channel.queued_commands.empty()) {
return true;
}
}
}
return false;
}
void ConnectionSendThread::runTimeouts(float dtime)
{
std::list<session_t> timeouted_peers;
std::list<session_t> peerIds = m_connection->getPeerIDs();
for (session_t &peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer)
continue;
UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
if (!udpPeer)
continue;
PROFILE(std::stringstream peerIdentifier);
PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
<< ";" << peerId << ";RELIABLE]");
PROFILE(ScopeProfiler
peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
/*
Check peer timeout
*/
if (peer->isTimedOut(m_timeout)) {
infostream << m_connection->getDesc()
<< "RunTimeouts(): Peer " << peer->id
<< " has timed out."
<< " (source=peer->timeout_counter)"
<< std::endl;
// Add peer to the list
timeouted_peers.push_back(peer->id);
// Don't bother going through the buffers of this one
continue;
}
float resend_timeout = udpPeer->getResendTimeout();
bool retry_count_exceeded = false;
for (Channel &channel : udpPeer->channels) {
std::list<BufferedPacket> timed_outs;
if (udpPeer->getLegacyPeer())
channel.setWindowSize(WINDOW_SIZE);
// Remove timed out incomplete unreliable split packets
channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
// Increment reliable packet times
channel.outgoing_reliables_sent.incrementTimeouts(dtime);
unsigned int numpeers = m_connection->m_peers.size();
if (numpeers == 0)
return;
|