From 05b54a8d18051a3452c405f58bee5852f3d3a27b Mon Sep 17 00:00:00 2001 From: sfan5 Date: Fri, 17 Sep 2021 18:14:25 +0200 Subject: Shave off buffer copies in networking code (#11607) --- src/network/connection.cpp | 72 ++++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 34 deletions(-) (limited to 'src/network/connection.cpp') diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 0ba8c36b2..a4970954f 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -200,17 +200,12 @@ RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum) return i; } -RPBSearchResult ReliablePacketBuffer::notFound() -{ - return m_list.end(); -} - bool ReliablePacketBuffer::getFirstSeqnum(u16& result) { MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) return false; - const BufferedPacket &p = *m_list.begin(); + const BufferedPacket &p = m_list.front(); result = readU16(&p.data[BASE_HEADER_SIZE + 1]); return true; } @@ -220,14 +215,14 @@ BufferedPacket ReliablePacketBuffer::popFirst() MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) throw NotFoundException("Buffer is empty"); - BufferedPacket p = *m_list.begin(); - m_list.erase(m_list.begin()); + BufferedPacket p = std::move(m_list.front()); + m_list.pop_front(); if (m_list.empty()) { m_oldest_non_answered_ack = 0; } else { m_oldest_non_answered_ack = - readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]); + readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]); } return p; } @@ -241,15 +236,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) << " not found in reliable buffer"<data[BASE_HEADER_SIZE+1])); - m_oldest_non_answered_ack = s; - } + BufferedPacket p = std::move(*r); m_list.erase(r); @@ -257,12 +244,12 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) m_oldest_non_answered_ack = 0; } else { m_oldest_non_answered_ack = - readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]); + readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]); } return p; } -void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) +void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected) { MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { @@ -355,7 +342,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) } /* update last packet number */ - m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); + m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]); } void ReliablePacketBuffer::incrementTimeouts(float dtime) @@ -367,17 +354,19 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) } } -std::list ReliablePacketBuffer::getTimedOuts(float timeout, - unsigned int max_packets) +std::list + ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets) { MutexAutoLock listlock(m_list_mutex); std::list timed_outs; for (BufferedPacket &bufferedPacket : m_list) { if (bufferedPacket.time >= timeout) { + // caller will resend packet so reset time and increase counter + bufferedPacket.time = 0.0f; + bufferedPacket.resend_count++; + timed_outs.push_back(bufferedPacket); - //this packet will be sent right afterwards reset timeout here - bufferedPacket.time = 0.0f; if (timed_outs.size() >= max_packets) break; } @@ -1051,20 +1040,20 @@ bool UDPPeer::processReliableSendCommand( m_connection->GetProtocolID(), m_connection->GetPeerID(), c.channelnum); - toadd.push(p); + toadd.push(std::move(p)); } if (have_sequence_number) { volatile u16 pcount = 0; while (!toadd.empty()) { - BufferedPacket p = toadd.front(); + BufferedPacket p = std::move(toadd.front()); toadd.pop(); // LOG(dout_con<getDesc() // << " queuing reliable packet for peer_id: " << c.peer_id // << " channel: " << (c.channelnum&0xFF) // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) // << std::endl) - chan.queued_reliables.push(p); + chan.queued_reliables.push(std::move(p)); pcount++; } sanity_check(chan.queued_reliables.size() < 0xFFFF); @@ -1208,12 +1197,19 @@ Connection::~Connection() } /* Internal stuff */ -void Connection::putEvent(ConnectionEvent &e) + +void Connection::putEvent(const ConnectionEvent &e) { assert(e.type != CONNEVENT_NONE); // Pre-condition m_event_queue.push_back(e); } +void Connection::putEvent(ConnectionEvent &&e) +{ + assert(e.type != CONNEVENT_NONE); // Pre-condition + m_event_queue.push_back(std::move(e)); +} + void Connection::TriggerSend() { m_sendThread->Trigger(); @@ -1299,7 +1295,7 @@ ConnectionEvent Connection::waitEvent(u32 timeout_ms) } } -void Connection::putCommand(ConnectionCommand &c) +void Connection::putCommand(const ConnectionCommand &c) { if (!m_shutting_down) { m_command_queue.push_back(c); @@ -1307,6 +1303,14 @@ void Connection::putCommand(ConnectionCommand &c) } } +void Connection::putCommand(ConnectionCommand &&c) +{ + if (!m_shutting_down) { + m_command_queue.push_back(std::move(c)); + m_sendThread->Trigger(); + } +} + void Connection::Serve(Address bind_addr) { ConnectionCommand c; @@ -1408,7 +1412,7 @@ void Connection::Send(session_t peer_id, u8 channelnum, ConnectionCommand c; c.send(peer_id, channelnum, pkt, reliable); - putCommand(c); + putCommand(std::move(c)); } Address Connection::GetPeerAddress(session_t peer_id) @@ -1508,12 +1512,12 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) << "createPeer(): giving peer_id=" << peer_id_new << std::endl); ConnectionCommand cmd; - SharedBuffer reply(4); + Buffer reply(4); writeU8(&reply[0], PACKET_TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID); writeU16(&reply[2], peer_id_new); cmd.createPeer(peer_id_new,reply); - putCommand(cmd); + putCommand(std::move(cmd)); // Create peer addition event ConnectionEvent e; @@ -1560,7 +1564,7 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) writeU16(&ack[2], seqnum); c.ack(peer_id, channelnum, ack); - putCommand(c); + putCommand(std::move(c)); m_sendThread->Trigger(); } -- cgit v1.2.3