diff options
Diffstat (limited to 'src/network/connection.cpp')
-rw-r--r-- | src/network/connection.cpp | 145 |
1 files changed, 70 insertions, 75 deletions
diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 7794ce10f..f7452d8e4 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -42,10 +42,10 @@ namespace con #undef DEBUG_CONNECTION_KBPS #else /* this mutex is used to achieve log message consistency */ -JMutex log_message_mutex; +Mutex log_message_mutex; #define LOG(a) \ { \ - JMutexAutoLock loglock(log_message_mutex); \ + MutexAutoLock loglock(log_message_mutex); \ a; \ } #define PROFILE(a) a @@ -209,7 +209,7 @@ ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {} void ReliablePacketBuffer::print() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl); unsigned int index = 0; for(std::list<BufferedPacket>::iterator i = m_list.begin(); @@ -223,7 +223,7 @@ void ReliablePacketBuffer::print() } bool ReliablePacketBuffer::empty() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); return m_list.empty(); } @@ -256,7 +256,7 @@ RPBSearchResult ReliablePacketBuffer::notFound() } bool ReliablePacketBuffer::getFirstSeqnum(u16& result) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) return false; BufferedPacket p = *m_list.begin(); @@ -266,7 +266,7 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result) BufferedPacket ReliablePacketBuffer::popFirst() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) throw NotFoundException("Buffer is empty"); BufferedPacket p = *m_list.begin(); @@ -283,7 +283,7 @@ BufferedPacket ReliablePacketBuffer::popFirst() } BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); RPBSearchResult r = findPacket(seqnum); if (r == notFound()) { LOG(dout_con<<"Sequence number: " << seqnum @@ -294,7 +294,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) RPBSearchResult next = r; - next++; + ++next; if (next != notFound()) { u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1])); m_oldest_non_answered_ack = s; @@ -311,7 +311,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) } void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { errorstream << "ReliablePacketBuffer::insert(): Invalid data size for " "reliable packet" << std::endl; @@ -358,7 +358,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) /* this is true e.g. on wrap around */ if (seqnum < next_expected) { while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) { - i++; + ++i; if (i != m_list.end()) s = readU16(&(i->data[BASE_HEADER_SIZE+1])); } @@ -367,7 +367,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) else { while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) { - i++; + ++i; if (i != m_list.end()) s = readU16(&(i->data[BASE_HEADER_SIZE+1])); } @@ -411,7 +411,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) void ReliablePacketBuffer::incrementTimeouts(float dtime) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); for(std::list<BufferedPacket>::iterator i = m_list.begin(); i != m_list.end(); ++i) { @@ -423,7 +423,7 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout, unsigned int max_packets) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); std::list<BufferedPacket> timed_outs; for(std::list<BufferedPacket>::iterator i = m_list.begin(); i != m_list.end(); ++i) @@ -446,7 +446,7 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout, IncomingSplitBuffer::~IncomingSplitBuffer() { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -459,7 +459,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer() */ SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); u32 headersize = BASE_HEADER_SIZE + 7; if (p.data.getSize() < headersize) { errorstream << "Invalid data size for split packet" << std::endl; @@ -546,7 +546,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { std::list<u16> remove_queue; { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -562,7 +562,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) for(std::list<u16>::iterator j = remove_queue.begin(); j != remove_queue.end(); ++j) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl); delete m_buf[*j]; m_buf.erase(*j); @@ -605,13 +605,13 @@ Channel::~Channel() u16 Channel::readNextIncomingSeqNum() { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); return next_incoming_seqnum; } u16 Channel::incNextIncomingSeqNum() { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); u16 retval = next_incoming_seqnum; next_incoming_seqnum++; return retval; @@ -619,18 +619,18 @@ u16 Channel::incNextIncomingSeqNum() u16 Channel::readNextSplitSeqNum() { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); return next_outgoing_split_seqnum; } void Channel::setNextSplitSeqNum(u16 seqnum) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); next_outgoing_split_seqnum = seqnum; } u16 Channel::getOutgoingSequenceNumber(bool& successful) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); u16 retval = next_outgoing_seqnum; u16 lowest_unacked_seqnumber; @@ -670,7 +670,7 @@ u16 Channel::getOutgoingSequenceNumber(bool& successful) u16 Channel::readOutgoingSequenceNumber() { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); return next_outgoing_seqnum; } @@ -686,32 +686,32 @@ bool Channel::putBackSequenceNumber(u16 seqnum) void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_bytes_transfered += bytes; current_packet_successfull += packets; } void Channel::UpdateBytesReceived(unsigned int bytes) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_bytes_received += bytes; } void Channel::UpdateBytesLost(unsigned int bytes) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_bytes_lost += bytes; } void Channel::UpdatePacketLossCounter(unsigned int count) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_packet_loss += count; } void Channel::UpdatePacketTooLateCounter() { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_packet_too_late++; } @@ -731,7 +731,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer) bool reasonable_amount_of_data_transmitted = false; { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); packet_loss = current_packet_loss; //packet_too_late = current_packet_too_late; packets_successfull = current_packet_successfull; @@ -802,7 +802,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer) if (bpm_counter > 10.0) { { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); cur_kbps = (((float) current_bytes_transfered)/bpm_counter)/1024.0; current_bytes_transfered = 0; @@ -903,7 +903,7 @@ bool PeerHelper::operator!=(void* ptr) bool Peer::IncUseCount() { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); if (!m_pending_deletion) { @@ -917,7 +917,7 @@ bool Peer::IncUseCount() void Peer::DecUseCount() { { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); sanity_check(m_usage > 0); m_usage--; @@ -978,7 +978,7 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id, bool Peer::isTimedOut(float timeout) { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); u32 current_time = porting::getTimeMs(); float dtime = CALC_DTIME(m_last_timeout_check,current_time); @@ -992,7 +992,7 @@ bool Peer::isTimedOut(float timeout) void Peer::Drop() { { - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); m_pending_deletion = true; if (m_usage != 0) return; @@ -1051,7 +1051,7 @@ void UDPPeer::reportRTT(float rtt) if (timeout > RESEND_TIMEOUT_MAX) timeout = RESEND_TIMEOUT_MAX; - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); resend_timeout = timeout; } @@ -1255,8 +1255,9 @@ SharedBuffer<u8> UDPPeer::addSpiltPacket(u8 channel, /* Connection Threads */ /******************************************************************************/ -ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size, - float timeout) : +ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, + float timeout) : + Thread("ConnectionSend"), m_connection(NULL), m_max_packet_size(max_packet_size), m_timeout(timeout), @@ -1266,11 +1267,9 @@ ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size, { } -void * ConnectionSendThread::Thread() +void * ConnectionSendThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionSend"); + assert(m_connection); LOG(dout_con<<m_connection->getDesc() <<"ConnectionSend thread started"<<std::endl); @@ -1281,21 +1280,19 @@ void * ConnectionSendThread::Thread() PROFILE(std::stringstream ThreadIdentifier); PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]"); - porting::setThreadName("ConnectionSend"); - /* if stop is requested don't stop immediately but try to send all */ /* packets first */ - while(!StopRequested() || packetsQueued()) { + while(!stopRequested() || packetsQueued()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); m_iteration_packets_avaialble = m_max_data_packets_per_iteration; /* wait for trigger or timeout */ - m_send_sleep_semaphore.Wait(50); + m_send_sleep_semaphore.wait(50); /* remove all triggers */ - while(m_send_sleep_semaphore.Wait(0)) {} + while(m_send_sleep_semaphore.wait(0)) {} lasttime = curtime; curtime = porting::getTimeMs(); @@ -1319,7 +1316,7 @@ void * ConnectionSendThread::Thread() /* send non reliable packets */ sendPackets(dtime); - END_DEBUG_EXCEPTION_HANDLER(errorstream); + END_DEBUG_EXCEPTION_HANDLER } PROFILE(g_profiler->remove(ThreadIdentifier.str())); @@ -1328,7 +1325,7 @@ void * ConnectionSendThread::Thread() void ConnectionSendThread::Trigger() { - m_send_sleep_semaphore.Post(); + m_send_sleep_semaphore.post(); } bool ConnectionSendThread::packetsQueued() @@ -1763,7 +1760,7 @@ void ConnectionSendThread::disconnect() for (std::list<u16>::iterator i = peerids.begin(); i != peerids.end(); - i++) + ++i) { sendAsPacket(*i, 0,data,false); } @@ -1843,7 +1840,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data) for (std::list<u16>::iterator i = peerids.begin(); i != peerids.end(); - i++) + ++i) { send(*i, channelnum, data); } @@ -1855,7 +1852,7 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c) for (std::list<u16>::iterator i = peerids.begin(); i != peerids.end(); - i++) + ++i) { PeerHelper peer = m_connection->getPeerNoEx(*i); @@ -1984,7 +1981,7 @@ void ConnectionSendThread::sendPackets(float dtime) } else if ( ( peer->m_increment_packets_remaining > 0) || - (StopRequested())) { + (stopRequested())) { rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, packet.reliable); peer->m_increment_packets_remaining--; @@ -2014,15 +2011,14 @@ void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum, } ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) : + Thread("ConnectionReceive"), m_connection(NULL) { } -void * ConnectionReceiveThread::Thread() +void * ConnectionReceiveThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionReceive"); + assert(m_connection); LOG(dout_con<<m_connection->getDesc() <<"ConnectionReceive thread started"<<std::endl); @@ -2030,15 +2026,13 @@ void * ConnectionReceiveThread::Thread() PROFILE(std::stringstream ThreadIdentifier); PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]"); - porting::setThreadName("ConnectionReceive"); - #ifdef DEBUG_CONNECTION_KBPS u32 curtime = porting::getTimeMs(); u32 lasttime = curtime; float debug_print_timer = 0.0; #endif - while(!StopRequested()) { + while(!stopRequested()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); @@ -2103,8 +2097,9 @@ void * ConnectionReceiveThread::Thread() } } #endif - END_DEBUG_EXCEPTION_HANDLER(errorstream); + END_DEBUG_EXCEPTION_HANDLER } + PROFILE(g_profiler->remove(ThreadIdentifier.str())); return NULL; } @@ -2684,8 +2679,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_sendThread.setParent(this); m_receiveThread.setParent(this); - m_sendThread.Start(); - m_receiveThread.Start(); + m_sendThread.start(); + m_receiveThread.start(); } @@ -2694,8 +2689,8 @@ Connection::~Connection() { m_shutting_down = true; // request threads to stop - m_sendThread.Stop(); - m_receiveThread.Stop(); + m_sendThread.stop(); + m_receiveThread.stop(); //TODO for some unkonwn reason send/receive threads do not exit as they're // supposed to be but wait on peer timeout. To speed up shutdown we reduce @@ -2703,8 +2698,8 @@ Connection::~Connection() m_sendThread.setPeerTimeout(0.5); // wait for threads to finish - m_sendThread.Wait(); - m_receiveThread.Wait(); + m_sendThread.wait(); + m_receiveThread.wait(); // Delete peers for(std::map<u16, Peer*>::iterator @@ -2724,7 +2719,7 @@ void Connection::putEvent(ConnectionEvent &e) PeerHelper Connection::getPeer(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map<u16, Peer*>::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2739,7 +2734,7 @@ PeerHelper Connection::getPeer(u16 peer_id) PeerHelper Connection::getPeerNoEx(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map<u16, Peer*>::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2755,7 +2750,7 @@ PeerHelper Connection::getPeerNoEx(u16 peer_id) /* find peer_id for address */ u16 Connection::lookupPeer(Address& sender) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map<u16, Peer*>::iterator j; j = m_peers.begin(); for(; j != m_peers.end(); ++j) @@ -2794,7 +2789,7 @@ bool Connection::deletePeer(u16 peer_id, bool timeout) /* lock list as short as possible */ { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.find(peer_id) == m_peers.end()) return false; peer = m_peers[peer_id]; @@ -2852,7 +2847,7 @@ void Connection::Connect(Address address) bool Connection::Connected() { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.size() != 1) return false; @@ -2987,7 +2982,7 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) /* Find an unused peer id */ - JMutexAutoLock lock(m_peers_mutex); + MutexAutoLock lock(m_peers_mutex); bool out_of_ids = false; for(;;) { // Check if exists @@ -3038,9 +3033,9 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) void Connection::PrintInfo(std::ostream &out) { - m_info_mutex.Lock(); + m_info_mutex.lock(); out<<getDesc()<<": "; - m_info_mutex.Unlock(); + m_info_mutex.unlock(); } void Connection::PrintInfo() @@ -3091,7 +3086,7 @@ UDPPeer* Connection::createServerPeer(Address& address) UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this); { - JMutexAutoLock lock(m_peers_mutex); + MutexAutoLock lock(m_peers_mutex); m_peers[peer->id] = peer; m_peer_ids.push_back(peer->id); } |