aboutsummaryrefslogtreecommitdiff
path: root/src/network/connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network/connection.cpp')
-rw-r--r--src/network/connection.cpp145
1 files changed, 70 insertions, 75 deletions
diff --git a/src/network/connection.cpp b/src/network/connection.cpp
index 7794ce10f..f7452d8e4 100644
--- a/src/network/connection.cpp
+++ b/src/network/connection.cpp
@@ -42,10 +42,10 @@ namespace con
#undef DEBUG_CONNECTION_KBPS
#else
/* this mutex is used to achieve log message consistency */
-JMutex log_message_mutex;
+Mutex log_message_mutex;
#define LOG(a) \
{ \
- JMutexAutoLock loglock(log_message_mutex); \
+ MutexAutoLock loglock(log_message_mutex); \
a; \
}
#define PROFILE(a) a
@@ -209,7 +209,7 @@ ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
void ReliablePacketBuffer::print()
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
unsigned int index = 0;
for(std::list<BufferedPacket>::iterator i = m_list.begin();
@@ -223,7 +223,7 @@ void ReliablePacketBuffer::print()
}
bool ReliablePacketBuffer::empty()
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
return m_list.empty();
}
@@ -256,7 +256,7 @@ RPBSearchResult ReliablePacketBuffer::notFound()
}
bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
BufferedPacket p = *m_list.begin();
@@ -266,7 +266,7 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
BufferedPacket ReliablePacketBuffer::popFirst()
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
throw NotFoundException("Buffer is empty");
BufferedPacket p = *m_list.begin();
@@ -283,7 +283,7 @@ BufferedPacket ReliablePacketBuffer::popFirst()
}
BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
RPBSearchResult r = findPacket(seqnum);
if (r == notFound()) {
LOG(dout_con<<"Sequence number: " << seqnum
@@ -294,7 +294,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
RPBSearchResult next = r;
- next++;
+ ++next;
if (next != notFound()) {
u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
m_oldest_non_answered_ack = s;
@@ -311,7 +311,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
}
void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
"reliable packet" << std::endl;
@@ -358,7 +358,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
/* this is true e.g. on wrap around */
if (seqnum < next_expected) {
while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
- i++;
+ ++i;
if (i != m_list.end())
s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
}
@@ -367,7 +367,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
else
{
while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
- i++;
+ ++i;
if (i != m_list.end())
s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
}
@@ -411,7 +411,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
void ReliablePacketBuffer::incrementTimeouts(float dtime)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
for(std::list<BufferedPacket>::iterator i = m_list.begin();
i != m_list.end(); ++i)
{
@@ -423,7 +423,7 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime)
std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
unsigned int max_packets)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
std::list<BufferedPacket> timed_outs;
for(std::list<BufferedPacket>::iterator i = m_list.begin();
i != m_list.end(); ++i)
@@ -446,7 +446,7 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
IncomingSplitBuffer::~IncomingSplitBuffer()
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
i != m_buf.end(); ++i)
{
@@ -459,7 +459,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
*/
SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
u32 headersize = BASE_HEADER_SIZE + 7;
if (p.data.getSize() < headersize) {
errorstream << "Invalid data size for split packet" << std::endl;
@@ -546,7 +546,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
{
std::list<u16> remove_queue;
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
i != m_buf.end(); ++i)
{
@@ -562,7 +562,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
for(std::list<u16>::iterator j = remove_queue.begin();
j != remove_queue.end(); ++j)
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
delete m_buf[*j];
m_buf.erase(*j);
@@ -605,13 +605,13 @@ Channel::~Channel()
u16 Channel::readNextIncomingSeqNum()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
return next_incoming_seqnum;
}
u16 Channel::incNextIncomingSeqNum()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
u16 retval = next_incoming_seqnum;
next_incoming_seqnum++;
return retval;
@@ -619,18 +619,18 @@ u16 Channel::incNextIncomingSeqNum()
u16 Channel::readNextSplitSeqNum()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
return next_outgoing_split_seqnum;
}
void Channel::setNextSplitSeqNum(u16 seqnum)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
next_outgoing_split_seqnum = seqnum;
}
u16 Channel::getOutgoingSequenceNumber(bool& successful)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
u16 retval = next_outgoing_seqnum;
u16 lowest_unacked_seqnumber;
@@ -670,7 +670,7 @@ u16 Channel::getOutgoingSequenceNumber(bool& successful)
u16 Channel::readOutgoingSequenceNumber()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
return next_outgoing_seqnum;
}
@@ -686,32 +686,32 @@ bool Channel::putBackSequenceNumber(u16 seqnum)
void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_bytes_transfered += bytes;
current_packet_successfull += packets;
}
void Channel::UpdateBytesReceived(unsigned int bytes) {
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_bytes_received += bytes;
}
void Channel::UpdateBytesLost(unsigned int bytes)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_bytes_lost += bytes;
}
void Channel::UpdatePacketLossCounter(unsigned int count)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_packet_loss += count;
}
void Channel::UpdatePacketTooLateCounter()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_packet_too_late++;
}
@@ -731,7 +731,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer)
bool reasonable_amount_of_data_transmitted = false;
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
packet_loss = current_packet_loss;
//packet_too_late = current_packet_too_late;
packets_successfull = current_packet_successfull;
@@ -802,7 +802,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer)
if (bpm_counter > 10.0)
{
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
cur_kbps =
(((float) current_bytes_transfered)/bpm_counter)/1024.0;
current_bytes_transfered = 0;
@@ -903,7 +903,7 @@ bool PeerHelper::operator!=(void* ptr)
bool Peer::IncUseCount()
{
- JMutexAutoLock lock(m_exclusive_access_mutex);
+ MutexAutoLock lock(m_exclusive_access_mutex);
if (!m_pending_deletion)
{
@@ -917,7 +917,7 @@ bool Peer::IncUseCount()
void Peer::DecUseCount()
{
{
- JMutexAutoLock lock(m_exclusive_access_mutex);
+ MutexAutoLock lock(m_exclusive_access_mutex);
sanity_check(m_usage > 0);
m_usage--;
@@ -978,7 +978,7 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id,
bool Peer::isTimedOut(float timeout)
{
- JMutexAutoLock lock(m_exclusive_access_mutex);
+ MutexAutoLock lock(m_exclusive_access_mutex);
u32 current_time = porting::getTimeMs();
float dtime = CALC_DTIME(m_last_timeout_check,current_time);
@@ -992,7 +992,7 @@ bool Peer::isTimedOut(float timeout)
void Peer::Drop()
{
{
- JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ MutexAutoLock usage_lock(m_exclusive_access_mutex);
m_pending_deletion = true;
if (m_usage != 0)
return;
@@ -1051,7 +1051,7 @@ void UDPPeer::reportRTT(float rtt)
if (timeout > RESEND_TIMEOUT_MAX)
timeout = RESEND_TIMEOUT_MAX;
- JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ MutexAutoLock usage_lock(m_exclusive_access_mutex);
resend_timeout = timeout;
}
@@ -1255,8 +1255,9 @@ SharedBuffer<u8> UDPPeer::addSpiltPacket(u8 channel,
/* Connection Threads */
/******************************************************************************/
-ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size,
- float timeout) :
+ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
+ float timeout) :
+ Thread("ConnectionSend"),
m_connection(NULL),
m_max_packet_size(max_packet_size),
m_timeout(timeout),
@@ -1266,11 +1267,9 @@ ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size,
{
}
-void * ConnectionSendThread::Thread()
+void * ConnectionSendThread::run()
{
- assert(m_connection != NULL);
- ThreadStarted();
- log_register_thread("ConnectionSend");
+ assert(m_connection);
LOG(dout_con<<m_connection->getDesc()
<<"ConnectionSend thread started"<<std::endl);
@@ -1281,21 +1280,19 @@ void * ConnectionSendThread::Thread()
PROFILE(std::stringstream ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
- porting::setThreadName("ConnectionSend");
-
/* if stop is requested don't stop immediately but try to send all */
/* packets first */
- while(!StopRequested() || packetsQueued()) {
+ while(!stopRequested() || packetsQueued()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
/* wait for trigger or timeout */
- m_send_sleep_semaphore.Wait(50);
+ m_send_sleep_semaphore.wait(50);
/* remove all triggers */
- while(m_send_sleep_semaphore.Wait(0)) {}
+ while(m_send_sleep_semaphore.wait(0)) {}
lasttime = curtime;
curtime = porting::getTimeMs();
@@ -1319,7 +1316,7 @@ void * ConnectionSendThread::Thread()
/* send non reliable packets */
sendPackets(dtime);
- END_DEBUG_EXCEPTION_HANDLER(errorstream);
+ END_DEBUG_EXCEPTION_HANDLER
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
@@ -1328,7 +1325,7 @@ void * ConnectionSendThread::Thread()
void ConnectionSendThread::Trigger()
{
- m_send_sleep_semaphore.Post();
+ m_send_sleep_semaphore.post();
}
bool ConnectionSendThread::packetsQueued()
@@ -1763,7 +1760,7 @@ void ConnectionSendThread::disconnect()
for (std::list<u16>::iterator i = peerids.begin();
i != peerids.end();
- i++)
+ ++i)
{
sendAsPacket(*i, 0,data,false);
}
@@ -1843,7 +1840,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
for (std::list<u16>::iterator i = peerids.begin();
i != peerids.end();
- i++)
+ ++i)
{
send(*i, channelnum, data);
}
@@ -1855,7 +1852,7 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
for (std::list<u16>::iterator i = peerids.begin();
i != peerids.end();
- i++)
+ ++i)
{
PeerHelper peer = m_connection->getPeerNoEx(*i);
@@ -1984,7 +1981,7 @@ void ConnectionSendThread::sendPackets(float dtime)
}
else if (
( peer->m_increment_packets_remaining > 0) ||
- (StopRequested())) {
+ (stopRequested())) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
peer->m_increment_packets_remaining--;
@@ -2014,15 +2011,14 @@ void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
}
ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
+ Thread("ConnectionReceive"),
m_connection(NULL)
{
}
-void * ConnectionReceiveThread::Thread()
+void * ConnectionReceiveThread::run()
{
- assert(m_connection != NULL);
- ThreadStarted();
- log_register_thread("ConnectionReceive");
+ assert(m_connection);
LOG(dout_con<<m_connection->getDesc()
<<"ConnectionReceive thread started"<<std::endl);
@@ -2030,15 +2026,13 @@ void * ConnectionReceiveThread::Thread()
PROFILE(std::stringstream ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
- porting::setThreadName("ConnectionReceive");
-
#ifdef DEBUG_CONNECTION_KBPS
u32 curtime = porting::getTimeMs();
u32 lasttime = curtime;
float debug_print_timer = 0.0;
#endif
- while(!StopRequested()) {
+ while(!stopRequested()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
@@ -2103,8 +2097,9 @@ void * ConnectionReceiveThread::Thread()
}
}
#endif
- END_DEBUG_EXCEPTION_HANDLER(errorstream);
+ END_DEBUG_EXCEPTION_HANDLER
}
+
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
}
@@ -2684,8 +2679,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
m_sendThread.setParent(this);
m_receiveThread.setParent(this);
- m_sendThread.Start();
- m_receiveThread.Start();
+ m_sendThread.start();
+ m_receiveThread.start();
}
@@ -2694,8 +2689,8 @@ Connection::~Connection()
{
m_shutting_down = true;
// request threads to stop
- m_sendThread.Stop();
- m_receiveThread.Stop();
+ m_sendThread.stop();
+ m_receiveThread.stop();
//TODO for some unkonwn reason send/receive threads do not exit as they're
// supposed to be but wait on peer timeout. To speed up shutdown we reduce
@@ -2703,8 +2698,8 @@ Connection::~Connection()
m_sendThread.setPeerTimeout(0.5);
// wait for threads to finish
- m_sendThread.Wait();
- m_receiveThread.Wait();
+ m_sendThread.wait();
+ m_receiveThread.wait();
// Delete peers
for(std::map<u16, Peer*>::iterator
@@ -2724,7 +2719,7 @@ void Connection::putEvent(ConnectionEvent &e)
PeerHelper Connection::getPeer(u16 peer_id)
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
if (node == m_peers.end()) {
@@ -2739,7 +2734,7 @@ PeerHelper Connection::getPeer(u16 peer_id)
PeerHelper Connection::getPeerNoEx(u16 peer_id)
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
if (node == m_peers.end()) {
@@ -2755,7 +2750,7 @@ PeerHelper Connection::getPeerNoEx(u16 peer_id)
/* find peer_id for address */
u16 Connection::lookupPeer(Address& sender)
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
std::map<u16, Peer*>::iterator j;
j = m_peers.begin();
for(; j != m_peers.end(); ++j)
@@ -2794,7 +2789,7 @@ bool Connection::deletePeer(u16 peer_id, bool timeout)
/* lock list as short as possible */
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
if (m_peers.find(peer_id) == m_peers.end())
return false;
peer = m_peers[peer_id];
@@ -2852,7 +2847,7 @@ void Connection::Connect(Address address)
bool Connection::Connected()
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
if (m_peers.size() != 1)
return false;
@@ -2987,7 +2982,7 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
/*
Find an unused peer id
*/
- JMutexAutoLock lock(m_peers_mutex);
+ MutexAutoLock lock(m_peers_mutex);
bool out_of_ids = false;
for(;;) {
// Check if exists
@@ -3038,9 +3033,9 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
void Connection::PrintInfo(std::ostream &out)
{
- m_info_mutex.Lock();
+ m_info_mutex.lock();
out<<getDesc()<<": ";
- m_info_mutex.Unlock();
+ m_info_mutex.unlock();
}
void Connection::PrintInfo()
@@ -3091,7 +3086,7 @@ UDPPeer* Connection::createServerPeer(Address& address)
UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
{
- JMutexAutoLock lock(m_peers_mutex);
+ MutexAutoLock lock(m_peers_mutex);
m_peers[peer->id] = peer;
m_peer_ids.push_back(peer->id);
}