From e4bff8be94c0db4f94e63ad448d0eeb869ccdbbd Mon Sep 17 00:00:00 2001 From: ShadowNinja Date: Tue, 7 Apr 2015 06:13:12 -0400 Subject: Clean up threading * Rename everything. * Strip J prefix. * Change UpperCamelCase functions to lowerCamelCase. * Remove global (!) semaphore count mutex on OSX. * Remove semaphore count getter (unused, unsafe, depended on internal API functions on Windows, and used a hack on OSX). * Add `Atomic`. * Make `Thread` handle thread names. * Add support for C++11 multi-threading. * Combine pthread and win32 sources. * Remove `ThreadStarted` (unused, unneeded). * Move some includes from the headers to the sources. * Move all of `Event` into its header (allows inlining with no new includes). * Make `Event` use `Semaphore` (except on Windows). * Move some porting functions into `Thread`. * Integrate logging with `Thread`. * Add threading test. --- src/network/connection.cpp | 128 +++++++++++++++++++++------------------------ 1 file changed, 61 insertions(+), 67 deletions(-) (limited to 'src/network/connection.cpp') diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 7794ce10f..10f0d5543 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -42,10 +42,10 @@ namespace con #undef DEBUG_CONNECTION_KBPS #else /* this mutex is used to achieve log message consistency */ -JMutex log_message_mutex; +Mutex log_message_mutex; #define LOG(a) \ { \ - JMutexAutoLock loglock(log_message_mutex); \ + MutexAutoLock loglock(log_message_mutex); \ a; \ } #define PROFILE(a) a @@ -209,7 +209,7 @@ ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {} void ReliablePacketBuffer::print() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl); unsigned int index = 0; for(std::list::iterator i = m_list.begin(); @@ -223,7 +223,7 @@ void ReliablePacketBuffer::print() } bool ReliablePacketBuffer::empty() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); return m_list.empty(); } @@ -256,7 +256,7 @@ RPBSearchResult ReliablePacketBuffer::notFound() } bool ReliablePacketBuffer::getFirstSeqnum(u16& result) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) return false; BufferedPacket p = *m_list.begin(); @@ -266,7 +266,7 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result) BufferedPacket ReliablePacketBuffer::popFirst() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) throw NotFoundException("Buffer is empty"); BufferedPacket p = *m_list.begin(); @@ -283,7 +283,7 @@ BufferedPacket ReliablePacketBuffer::popFirst() } BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); RPBSearchResult r = findPacket(seqnum); if (r == notFound()) { LOG(dout_con<<"Sequence number: " << seqnum @@ -311,7 +311,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) } void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { errorstream << "ReliablePacketBuffer::insert(): Invalid data size for " "reliable packet" << std::endl; @@ -411,7 +411,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) void ReliablePacketBuffer::incrementTimeouts(float dtime) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { @@ -423,7 +423,7 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) std::list ReliablePacketBuffer::getTimedOuts(float timeout, unsigned int max_packets) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); std::list timed_outs; for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) @@ -446,7 +446,7 @@ std::list ReliablePacketBuffer::getTimedOuts(float timeout, IncomingSplitBuffer::~IncomingSplitBuffer() { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -459,7 +459,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer() */ SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); u32 headersize = BASE_HEADER_SIZE + 7; if (p.data.getSize() < headersize) { errorstream << "Invalid data size for split packet" << std::endl; @@ -546,7 +546,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { std::list remove_queue; { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -562,7 +562,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) for(std::list::iterator j = remove_queue.begin(); j != remove_queue.end(); ++j) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"< 10.0) { { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); cur_kbps = (((float) current_bytes_transfered)/bpm_counter)/1024.0; current_bytes_transfered = 0; @@ -903,7 +903,7 @@ bool PeerHelper::operator!=(void* ptr) bool Peer::IncUseCount() { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); if (!m_pending_deletion) { @@ -917,7 +917,7 @@ bool Peer::IncUseCount() void Peer::DecUseCount() { { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); sanity_check(m_usage > 0); m_usage--; @@ -978,7 +978,7 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id, bool Peer::isTimedOut(float timeout) { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); u32 current_time = porting::getTimeMs(); float dtime = CALC_DTIME(m_last_timeout_check,current_time); @@ -992,7 +992,7 @@ bool Peer::isTimedOut(float timeout) void Peer::Drop() { { - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); m_pending_deletion = true; if (m_usage != 0) return; @@ -1051,7 +1051,7 @@ void UDPPeer::reportRTT(float rtt) if (timeout > RESEND_TIMEOUT_MAX) timeout = RESEND_TIMEOUT_MAX; - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); resend_timeout = timeout; } @@ -1255,8 +1255,9 @@ SharedBuffer UDPPeer::addSpiltPacket(u8 channel, /* Connection Threads */ /******************************************************************************/ -ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size, - float timeout) : +ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, + float timeout) : + Thread("ConnectionSend"), m_connection(NULL), m_max_packet_size(max_packet_size), m_timeout(timeout), @@ -1266,11 +1267,9 @@ ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size, { } -void * ConnectionSendThread::Thread() +void * ConnectionSendThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionSend"); + assert(m_connection); LOG(dout_con<getDesc() <<"ConnectionSend thread started"<getDesc() << "]"); - porting::setThreadName("ConnectionSend"); - /* if stop is requested don't stop immediately but try to send all */ /* packets first */ - while(!StopRequested() || packetsQueued()) { + while(!stopRequested() || packetsQueued()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); m_iteration_packets_avaialble = m_max_data_packets_per_iteration; /* wait for trigger or timeout */ - m_send_sleep_semaphore.Wait(50); + m_send_sleep_semaphore.wait(50); /* remove all triggers */ - while(m_send_sleep_semaphore.Wait(0)) {} + while(m_send_sleep_semaphore.wait(0)) {} lasttime = curtime; curtime = porting::getTimeMs(); @@ -1328,7 +1325,7 @@ void * ConnectionSendThread::Thread() void ConnectionSendThread::Trigger() { - m_send_sleep_semaphore.Post(); + m_send_sleep_semaphore.post(); } bool ConnectionSendThread::packetsQueued() @@ -1984,7 +1981,7 @@ void ConnectionSendThread::sendPackets(float dtime) } else if ( ( peer->m_increment_packets_remaining > 0) || - (StopRequested())) { + (stopRequested())) { rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, packet.reliable); peer->m_increment_packets_remaining--; @@ -2014,15 +2011,14 @@ void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum, } ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) : + Thread("ConnectionReceive"), m_connection(NULL) { } -void * ConnectionReceiveThread::Thread() +void * ConnectionReceiveThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionReceive"); + assert(m_connection); LOG(dout_con<getDesc() <<"ConnectionReceive thread started"<getDesc() << "]"); - porting::setThreadName("ConnectionReceive"); - #ifdef DEBUG_CONNECTION_KBPS u32 curtime = porting::getTimeMs(); u32 lasttime = curtime; float debug_print_timer = 0.0; #endif - while(!StopRequested()) { + while(!stopRequested()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); @@ -2684,8 +2678,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_sendThread.setParent(this); m_receiveThread.setParent(this); - m_sendThread.Start(); - m_receiveThread.Start(); + m_sendThread.start(); + m_receiveThread.start(); } @@ -2694,8 +2688,8 @@ Connection::~Connection() { m_shutting_down = true; // request threads to stop - m_sendThread.Stop(); - m_receiveThread.Stop(); + m_sendThread.stop(); + m_receiveThread.stop(); //TODO for some unkonwn reason send/receive threads do not exit as they're // supposed to be but wait on peer timeout. To speed up shutdown we reduce @@ -2703,8 +2697,8 @@ Connection::~Connection() m_sendThread.setPeerTimeout(0.5); // wait for threads to finish - m_sendThread.Wait(); - m_receiveThread.Wait(); + m_sendThread.wait(); + m_receiveThread.wait(); // Delete peers for(std::map::iterator @@ -2724,7 +2718,7 @@ void Connection::putEvent(ConnectionEvent &e) PeerHelper Connection::getPeer(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2739,7 +2733,7 @@ PeerHelper Connection::getPeer(u16 peer_id) PeerHelper Connection::getPeerNoEx(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2755,7 +2749,7 @@ PeerHelper Connection::getPeerNoEx(u16 peer_id) /* find peer_id for address */ u16 Connection::lookupPeer(Address& sender) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator j; j = m_peers.begin(); for(; j != m_peers.end(); ++j) @@ -2794,7 +2788,7 @@ bool Connection::deletePeer(u16 peer_id, bool timeout) /* lock list as short as possible */ { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.find(peer_id) == m_peers.end()) return false; peer = m_peers[peer_id]; @@ -2852,7 +2846,7 @@ void Connection::Connect(Address address) bool Connection::Connected() { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.size() != 1) return false; @@ -2987,7 +2981,7 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) /* Find an unused peer id */ - JMutexAutoLock lock(m_peers_mutex); + MutexAutoLock lock(m_peers_mutex); bool out_of_ids = false; for(;;) { // Check if exists @@ -3038,9 +3032,9 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) void Connection::PrintInfo(std::ostream &out) { - m_info_mutex.Lock(); + m_info_mutex.lock(); out<id] = peer; m_peer_ids.push_back(peer->id); } -- cgit v1.2.3