From e4bff8be94c0db4f94e63ad448d0eeb869ccdbbd Mon Sep 17 00:00:00 2001 From: ShadowNinja Date: Tue, 7 Apr 2015 06:13:12 -0400 Subject: Clean up threading * Rename everything. * Strip J prefix. * Change UpperCamelCase functions to lowerCamelCase. * Remove global (!) semaphore count mutex on OSX. * Remove semaphore count getter (unused, unsafe, depended on internal API functions on Windows, and used a hack on OSX). * Add `Atomic`. * Make `Thread` handle thread names. * Add support for C++11 multi-threading. * Combine pthread and win32 sources. * Remove `ThreadStarted` (unused, unneeded). * Move some includes from the headers to the sources. * Move all of `Event` into its header (allows inlining with no new includes). * Make `Event` use `Semaphore` (except on Windows). * Move some porting functions into `Thread`. * Integrate logging with `Thread`. * Add threading test. --- src/network/clientpackethandler.cpp | 8 +-- src/network/connection.cpp | 128 +++++++++++++++++------------------- src/network/connection.h | 62 ++++++++--------- 3 files changed, 96 insertions(+), 102 deletions(-) (limited to 'src/network') diff --git a/src/network/clientpackethandler.cpp b/src/network/clientpackethandler.cpp index 86091bc88..04f94c58b 100644 --- a/src/network/clientpackethandler.cpp +++ b/src/network/clientpackethandler.cpp @@ -621,7 +621,7 @@ void Client::handleCommand_AnnounceMedia(NetworkPacket* pkt) // Mesh update thread must be stopped while // updating content definitions - sanity_check(!m_mesh_update_thread.IsRunning()); + sanity_check(!m_mesh_update_thread.isRunning()); for (u16 i = 0; i < num_files; i++) { std::string name, sha1_base64; @@ -694,7 +694,7 @@ void Client::handleCommand_Media(NetworkPacket* pkt) // Mesh update thread must be stopped while // updating content definitions - sanity_check(!m_mesh_update_thread.IsRunning()); + sanity_check(!m_mesh_update_thread.isRunning()); for (u32 i=0; i < num_files; i++) { std::string name; @@ -720,7 +720,7 @@ void Client::handleCommand_NodeDef(NetworkPacket* pkt) // Mesh update thread must be stopped while // updating content definitions - sanity_check(!m_mesh_update_thread.IsRunning()); + sanity_check(!m_mesh_update_thread.isRunning()); // Decompress node definitions std::string datastring(pkt->getString(0), pkt->getSize()); @@ -747,7 +747,7 @@ void Client::handleCommand_ItemDef(NetworkPacket* pkt) // Mesh update thread must be stopped while // updating content definitions - sanity_check(!m_mesh_update_thread.IsRunning()); + sanity_check(!m_mesh_update_thread.isRunning()); // Decompress item definitions std::string datastring(pkt->getString(0), pkt->getSize()); diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 7794ce10f..10f0d5543 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -42,10 +42,10 @@ namespace con #undef DEBUG_CONNECTION_KBPS #else /* this mutex is used to achieve log message consistency */ -JMutex log_message_mutex; +Mutex log_message_mutex; #define LOG(a) \ { \ - JMutexAutoLock loglock(log_message_mutex); \ + MutexAutoLock loglock(log_message_mutex); \ a; \ } #define PROFILE(a) a @@ -209,7 +209,7 @@ ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {} void ReliablePacketBuffer::print() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl); unsigned int index = 0; for(std::list::iterator i = m_list.begin(); @@ -223,7 +223,7 @@ void ReliablePacketBuffer::print() } bool ReliablePacketBuffer::empty() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); return m_list.empty(); } @@ -256,7 +256,7 @@ RPBSearchResult ReliablePacketBuffer::notFound() } bool ReliablePacketBuffer::getFirstSeqnum(u16& result) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) return false; BufferedPacket p = *m_list.begin(); @@ -266,7 +266,7 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result) BufferedPacket ReliablePacketBuffer::popFirst() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) throw NotFoundException("Buffer is empty"); BufferedPacket p = *m_list.begin(); @@ -283,7 +283,7 @@ BufferedPacket ReliablePacketBuffer::popFirst() } BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); RPBSearchResult r = findPacket(seqnum); if (r == notFound()) { LOG(dout_con<<"Sequence number: " << seqnum @@ -311,7 +311,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) } void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { errorstream << "ReliablePacketBuffer::insert(): Invalid data size for " "reliable packet" << std::endl; @@ -411,7 +411,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) void ReliablePacketBuffer::incrementTimeouts(float dtime) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { @@ -423,7 +423,7 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) std::list ReliablePacketBuffer::getTimedOuts(float timeout, unsigned int max_packets) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); std::list timed_outs; for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) @@ -446,7 +446,7 @@ std::list ReliablePacketBuffer::getTimedOuts(float timeout, IncomingSplitBuffer::~IncomingSplitBuffer() { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -459,7 +459,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer() */ SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); u32 headersize = BASE_HEADER_SIZE + 7; if (p.data.getSize() < headersize) { errorstream << "Invalid data size for split packet" << std::endl; @@ -546,7 +546,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { std::list remove_queue; { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -562,7 +562,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) for(std::list::iterator j = remove_queue.begin(); j != remove_queue.end(); ++j) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"< 10.0) { { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); cur_kbps = (((float) current_bytes_transfered)/bpm_counter)/1024.0; current_bytes_transfered = 0; @@ -903,7 +903,7 @@ bool PeerHelper::operator!=(void* ptr) bool Peer::IncUseCount() { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); if (!m_pending_deletion) { @@ -917,7 +917,7 @@ bool Peer::IncUseCount() void Peer::DecUseCount() { { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); sanity_check(m_usage > 0); m_usage--; @@ -978,7 +978,7 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id, bool Peer::isTimedOut(float timeout) { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); u32 current_time = porting::getTimeMs(); float dtime = CALC_DTIME(m_last_timeout_check,current_time); @@ -992,7 +992,7 @@ bool Peer::isTimedOut(float timeout) void Peer::Drop() { { - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); m_pending_deletion = true; if (m_usage != 0) return; @@ -1051,7 +1051,7 @@ void UDPPeer::reportRTT(float rtt) if (timeout > RESEND_TIMEOUT_MAX) timeout = RESEND_TIMEOUT_MAX; - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); resend_timeout = timeout; } @@ -1255,8 +1255,9 @@ SharedBuffer UDPPeer::addSpiltPacket(u8 channel, /* Connection Threads */ /******************************************************************************/ -ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size, - float timeout) : +ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, + float timeout) : + Thread("ConnectionSend"), m_connection(NULL), m_max_packet_size(max_packet_size), m_timeout(timeout), @@ -1266,11 +1267,9 @@ ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size, { } -void * ConnectionSendThread::Thread() +void * ConnectionSendThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionSend"); + assert(m_connection); LOG(dout_con<getDesc() <<"ConnectionSend thread started"<getDesc() << "]"); - porting::setThreadName("ConnectionSend"); - /* if stop is requested don't stop immediately but try to send all */ /* packets first */ - while(!StopRequested() || packetsQueued()) { + while(!stopRequested() || packetsQueued()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); m_iteration_packets_avaialble = m_max_data_packets_per_iteration; /* wait for trigger or timeout */ - m_send_sleep_semaphore.Wait(50); + m_send_sleep_semaphore.wait(50); /* remove all triggers */ - while(m_send_sleep_semaphore.Wait(0)) {} + while(m_send_sleep_semaphore.wait(0)) {} lasttime = curtime; curtime = porting::getTimeMs(); @@ -1328,7 +1325,7 @@ void * ConnectionSendThread::Thread() void ConnectionSendThread::Trigger() { - m_send_sleep_semaphore.Post(); + m_send_sleep_semaphore.post(); } bool ConnectionSendThread::packetsQueued() @@ -1984,7 +1981,7 @@ void ConnectionSendThread::sendPackets(float dtime) } else if ( ( peer->m_increment_packets_remaining > 0) || - (StopRequested())) { + (stopRequested())) { rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, packet.reliable); peer->m_increment_packets_remaining--; @@ -2014,15 +2011,14 @@ void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum, } ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) : + Thread("ConnectionReceive"), m_connection(NULL) { } -void * ConnectionReceiveThread::Thread() +void * ConnectionReceiveThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionReceive"); + assert(m_connection); LOG(dout_con<getDesc() <<"ConnectionReceive thread started"<getDesc() << "]"); - porting::setThreadName("ConnectionReceive"); - #ifdef DEBUG_CONNECTION_KBPS u32 curtime = porting::getTimeMs(); u32 lasttime = curtime; float debug_print_timer = 0.0; #endif - while(!StopRequested()) { + while(!stopRequested()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); @@ -2684,8 +2678,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_sendThread.setParent(this); m_receiveThread.setParent(this); - m_sendThread.Start(); - m_receiveThread.Start(); + m_sendThread.start(); + m_receiveThread.start(); } @@ -2694,8 +2688,8 @@ Connection::~Connection() { m_shutting_down = true; // request threads to stop - m_sendThread.Stop(); - m_receiveThread.Stop(); + m_sendThread.stop(); + m_receiveThread.stop(); //TODO for some unkonwn reason send/receive threads do not exit as they're // supposed to be but wait on peer timeout. To speed up shutdown we reduce @@ -2703,8 +2697,8 @@ Connection::~Connection() m_sendThread.setPeerTimeout(0.5); // wait for threads to finish - m_sendThread.Wait(); - m_receiveThread.Wait(); + m_sendThread.wait(); + m_receiveThread.wait(); // Delete peers for(std::map::iterator @@ -2724,7 +2718,7 @@ void Connection::putEvent(ConnectionEvent &e) PeerHelper Connection::getPeer(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2739,7 +2733,7 @@ PeerHelper Connection::getPeer(u16 peer_id) PeerHelper Connection::getPeerNoEx(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2755,7 +2749,7 @@ PeerHelper Connection::getPeerNoEx(u16 peer_id) /* find peer_id for address */ u16 Connection::lookupPeer(Address& sender) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator j; j = m_peers.begin(); for(; j != m_peers.end(); ++j) @@ -2794,7 +2788,7 @@ bool Connection::deletePeer(u16 peer_id, bool timeout) /* lock list as short as possible */ { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.find(peer_id) == m_peers.end()) return false; peer = m_peers[peer_id]; @@ -2852,7 +2846,7 @@ void Connection::Connect(Address address) bool Connection::Connected() { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.size() != 1) return false; @@ -2987,7 +2981,7 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) /* Find an unused peer id */ - JMutexAutoLock lock(m_peers_mutex); + MutexAutoLock lock(m_peers_mutex); bool out_of_ids = false; for(;;) { // Check if exists @@ -3038,9 +3032,9 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) void Connection::PrintInfo(std::ostream &out) { - m_info_mutex.Lock(); + m_info_mutex.lock(); out<id] = peer; m_peer_ids.push_back(peer->id); } diff --git a/src/network/connection.h b/src/network/connection.h index 15ea7e20f..fe2c9819d 100644 --- a/src/network/connection.h +++ b/src/network/connection.h @@ -349,7 +349,7 @@ private: u16 m_oldest_non_answered_ack; - JMutex m_list_mutex; + Mutex m_list_mutex; }; /* @@ -372,7 +372,7 @@ private: // Key is seqnum std::map m_buf; - JMutex m_map_mutex; + Mutex m_map_mutex; }; struct OutgoingPacket @@ -519,32 +519,32 @@ public: void UpdateTimers(float dtime, bool legacy_peer); const float getCurrentDownloadRateKB() - { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; }; + { MutexAutoLock lock(m_internal_mutex); return cur_kbps; }; const float getMaxDownloadRateKB() - { JMutexAutoLock lock(m_internal_mutex); return max_kbps; }; + { MutexAutoLock lock(m_internal_mutex); return max_kbps; }; const float getCurrentLossRateKB() - { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; + { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; const float getMaxLossRateKB() - { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; + { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; const float getCurrentIncomingRateKB() - { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; }; + { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; }; const float getMaxIncomingRateKB() - { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; }; + { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; }; const float getAvgDownloadRateKB() - { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; }; + { MutexAutoLock lock(m_internal_mutex); return avg_kbps; }; const float getAvgLossRateKB() - { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; + { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; const float getAvgIncomingRateKB() - { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; }; + { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; }; const unsigned int getWindowSize() const { return window_size; }; void setWindowSize(unsigned int size) { window_size = size; }; private: - JMutex m_internal_mutex; + Mutex m_internal_mutex; int window_size; u16 next_incoming_seqnum; @@ -675,7 +675,7 @@ class Peer { }; virtual ~Peer() { - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); FATAL_ERROR_IF(m_usage != 0, "Reference counting failure"); }; @@ -692,15 +692,15 @@ class Peer { virtual bool getAddress(MTProtocols type, Address& toset) = 0; void ResetTimeout() - {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; }; + {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; }; bool isTimedOut(float timeout); void setSentWithID() - { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; }; + { MutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; }; bool hasSentWithID() - { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; }; + { MutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; }; unsigned int m_increment_packets_remaining; unsigned int m_increment_bytes_remaining; @@ -744,7 +744,7 @@ class Peer { bool IncUseCount(); void DecUseCount(); - JMutex m_exclusive_access_mutex; + Mutex m_exclusive_access_mutex; bool m_pending_deletion; @@ -826,10 +826,10 @@ protected: unsigned int maxtransfer); float getResendTimeout() - { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } + { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } void setResendTimeout(float timeout) - { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } + { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } bool Ping(float dtime,SharedBuffer& data); Channel channels[CHANNEL_COUNT]; @@ -910,14 +910,14 @@ struct ConnectionEvent } }; -class ConnectionSendThread : public JThread { +class ConnectionSendThread : public Thread { public: friend class UDPPeer; ConnectionSendThread(unsigned int max_packet_size, float timeout); - void * Thread (); + void *run(); void Trigger(); @@ -961,7 +961,7 @@ private: unsigned int m_max_packet_size; float m_timeout; std::queue m_outgoing_queue; - JSemaphore m_send_sleep_semaphore; + Semaphore m_send_sleep_semaphore; unsigned int m_iteration_packets_avaialble; unsigned int m_max_commands_per_iteration; @@ -969,24 +969,24 @@ private: unsigned int m_max_packets_requeued; }; -class ConnectionReceiveThread : public JThread { +class ConnectionReceiveThread : public Thread { public: ConnectionReceiveThread(unsigned int max_packet_size); - void * Thread (); + void *run(); - void setParent(Connection* parent) { - assert(parent != NULL); // Pre-condition + void setParent(Connection *parent) { + assert(parent); // Pre-condition m_connection = parent; } private: - void receive (); + void receive(); // Returns next data from a buffer if possible // If found, returns true; if not, false. // If found, sets peer_id and dst - bool getFromBuffers (u16 &peer_id, SharedBuffer &dst); + bool getFromBuffers(u16 &peer_id, SharedBuffer &dst); bool checkIncomingBuffers(Channel *channel, u16 &peer_id, SharedBuffer &dst); @@ -1054,7 +1054,7 @@ protected: std::list getPeerIDs() { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); return m_peer_ids; } @@ -1075,12 +1075,12 @@ private: std::map m_peers; std::list m_peer_ids; - JMutex m_peers_mutex; + Mutex m_peers_mutex; ConnectionSendThread m_sendThread; ConnectionReceiveThread m_receiveThread; - JMutex m_info_mutex; + Mutex m_info_mutex; // Backwards compatibility PeerHandler *m_bc_peerhandler; -- cgit v1.2.3