diff options
author | SmallJoker <SmallJoker@users.noreply.github.com> | 2021-12-01 20:22:33 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-01 20:22:33 +0100 |
commit | 57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c (patch) | |
tree | 6efa0044d48f5e241540b412ef4c7fe63edec570 /src/network/connectionthreads.cpp | |
parent | 1dc1305ada07da8c2a278b46a34d58af86184af9 (diff) | |
download | minetest-57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c.tar.gz minetest-57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c.tar.bz2 minetest-57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c.zip |
Network: Delete copy constructor and use std::move instead (#11642)
This is a follow-up change which disables class copies where possible to avoid unnecessary memory movements.
Diffstat (limited to 'src/network/connectionthreads.cpp')
-rw-r--r-- | src/network/connectionthreads.cpp | 190 |
1 files changed, 92 insertions, 98 deletions
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index a306ced9b..dca065ae1 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -50,11 +50,11 @@ std::mutex log_conthread_mutex; #define WINDOW_SIZE 5 -static session_t readPeerId(u8 *packetdata) +static session_t readPeerId(const u8 *packetdata) { return readU16(&packetdata[4]); } -static u8 readChannel(u8 *packetdata) +static u8 readChannel(const u8 *packetdata) { return readU8(&packetdata[6]); } @@ -114,9 +114,9 @@ void *ConnectionSendThread::run() } /* translate commands to packets */ - ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0); - while (c.type != CONNCMD_NONE) { - if (c.reliable) + auto c = m_connection->m_command_queue.pop_frontNoEx(0); + while (c && c->type != CONNCMD_NONE) { + if (c->reliable) processReliableCommand(c); else processNonReliableCommand(c); @@ -227,21 +227,21 @@ void ConnectionSendThread::runTimeouts(float dtime) m_iteration_packets_avaialble -= timed_outs.size(); for (const auto &k : timed_outs) { - u8 channelnum = readChannel(*k.data); - u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1])); + u8 channelnum = readChannel(k->data); + u16 seqnum = k->getSeqnum(); - channel.UpdateBytesLost(k.data.getSize()); + channel.UpdateBytesLost(k->size()); LOG(derr_con << m_connection->getDesc() << "RE-SENDING timed-out RELIABLE to " - << k.address.serializeString() + << k->address.serializeString() << "(t/o=" << resend_timeout << "): " - << "count=" << k.resend_count + << "count=" << k->resend_count << ", channel=" << ((int) channelnum & 0xff) << ", seqnum=" << seqnum << std::endl); - rawSend(k); + rawSend(k.get()); // do not handle rtt here as we can't decide if this packet was // lost or really takes more time to transmit @@ -274,25 +274,24 @@ void ConnectionSendThread::runTimeouts(float dtime) } } -void ConnectionSendThread::rawSend(const BufferedPacket &packet) +void ConnectionSendThread::rawSend(const BufferedPacket *p) { try { - m_connection->m_udpSocket.Send(packet.address, *packet.data, - packet.data.getSize()); + m_connection->m_udpSocket.Send(p->address, p->data, p->size()); LOG(dout_con << m_connection->getDesc() - << " rawSend: " << packet.data.getSize() + << " rawSend: " << p->size() << " bytes sent" << std::endl); } catch (SendFailedException &e) { LOG(derr_con << m_connection->getDesc() << "Connection::rawSend(): SendFailedException: " - << packet.address.serializeString() << std::endl); + << p->address.serializeString() << std::endl); } } -void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel) +void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel) { try { - p.absolute_send_time = porting::getTimeMs(); + p->absolute_send_time = porting::getTimeMs(); // Buffer the packet channel->outgoing_reliables_sent.insert(p, (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE) @@ -305,7 +304,7 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan } // Send the packet - rawSend(p); + rawSend(p.get()); } bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, @@ -321,11 +320,10 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]); if (reliable) { - bool have_sequence_number_for_raw_packet = true; - u16 seqnum = - channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet); + bool have_seqnum = false; + const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum); - if (!have_sequence_number_for_raw_packet) + if (!have_seqnum) return false; SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum); @@ -333,13 +331,12 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address); // Add base headers and make a packet - BufferedPacket p = con::makePacket(peer_address, reliable, + BufferedPacketPtr p = con::makePacket(peer_address, reliable, m_connection->GetProtocolID(), m_connection->GetPeerID(), channelnum); // first check if our send window is already maxed out - if (channel->outgoing_reliables_sent.size() - < channel->getWindowSize()) { + if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) { LOG(dout_con << m_connection->getDesc() << " INFO: sending a reliable packet to peer_id " << peer_id << " channel: " << (u32)channelnum @@ -352,19 +349,19 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, << " INFO: queueing reliable packet for peer_id: " << peer_id << " channel: " << (u32)channelnum << " seqnum: " << seqnum << std::endl); - channel->queued_reliables.push(std::move(p)); + channel->queued_reliables.push(p); return false; } Address peer_address; if (peer->getAddress(MTP_UDP, peer_address)) { // Add base headers and make a packet - BufferedPacket p = con::makePacket(peer_address, data, + BufferedPacketPtr p = con::makePacket(peer_address, data, m_connection->GetProtocolID(), m_connection->GetPeerID(), channelnum); // Send the packet - rawSend(p); + rawSend(p.get()); return true; } @@ -374,11 +371,11 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, return false; } -void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) +void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c) { - assert(c.reliable); // Pre-condition + assert(c->reliable); // Pre-condition - switch (c.type) { + switch (c->type) { case CONNCMD_NONE: LOG(dout_con << m_connection->getDesc() << "UDP processing reliable CONNCMD_NONE" << std::endl); @@ -399,7 +396,7 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) case CONCMD_CREATE_PEER: LOG(dout_con << m_connection->getDesc() << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl); - if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) { + if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) { /* put to queue if we couldn't send it immediately */ sendReliable(c); } @@ -412,13 +409,14 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) FATAL_ERROR("Got command that shouldn't be reliable as reliable command"); default: LOG(dout_con << m_connection->getDesc() - << " Invalid reliable command type: " << c.type << std::endl); + << " Invalid reliable command type: " << c->type << std::endl); } } -void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c) +void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr) { + const ConnectionCommand &c = *c_ptr; assert(!c.reliable); // Pre-condition switch (c.type) { @@ -480,9 +478,7 @@ void ConnectionSendThread::serve(Address bind_address) } catch (SocketException &e) { // Create event - ConnectionEvent ce; - ce.bindFailed(); - m_connection->putEvent(ce); + m_connection->putEvent(ConnectionEvent::bindFailed()); } } @@ -495,9 +491,7 @@ void ConnectionSendThread::connect(Address address) UDPPeer *peer = m_connection->createServerPeer(address); // Create event - ConnectionEvent e; - e.peerAdded(peer->id, peer->address); - m_connection->putEvent(e); + m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address)); Address bind_addr; @@ -586,9 +580,9 @@ void ConnectionSendThread::send(session_t peer_id, u8 channelnum, } } -void ConnectionSendThread::sendReliable(ConnectionCommand &c) +void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c) { - PeerHelper peer = m_connection->getPeerNoEx(c.peer_id); + PeerHelper peer = m_connection->getPeerNoEx(c->peer_id); if (!peer) return; @@ -604,7 +598,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data } } -void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c) +void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c) { std::vector<session_t> peerids = m_connection->getPeerIDs(); @@ -663,8 +657,12 @@ void ConnectionSendThread::sendPackets(float dtime) // first send queued reliable packets for all peers (if possible) for (unsigned int i = 0; i < CHANNEL_COUNT; i++) { Channel &channel = udpPeer->channels[i]; - u16 next_to_ack = 0; + // Reduces logging verbosity + if (channel.queued_reliables.empty()) + continue; + + u16 next_to_ack = 0; channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack); u16 next_to_receive = 0; channel.incoming_reliables.getFirstSeqnum(next_to_receive); @@ -694,13 +692,13 @@ void ConnectionSendThread::sendPackets(float dtime) channel.outgoing_reliables_sent.size() < channel.getWindowSize() && peer->m_increment_packets_remaining > 0) { - BufferedPacket p = std::move(channel.queued_reliables.front()); + BufferedPacketPtr p = channel.queued_reliables.front(); channel.queued_reliables.pop(); LOG(dout_con << m_connection->getDesc() << " INFO: sending a queued reliable packet " << " channel: " << i - << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1]) + << ", seqnum: " << p->getSeqnum() << std::endl); sendAsPacketReliable(p, &channel); @@ -881,17 +879,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, try { // First, see if there any buffered packets we can process now if (packet_queued) { - bool data_left = true; session_t peer_id; SharedBuffer<u8> resultdata; - while (data_left) { + while (true) { try { - data_left = getFromBuffers(peer_id, resultdata); - if (data_left) { - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(std::move(e)); - } + if (!getFromBuffers(peer_id, resultdata)) + break; + + m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata)); } catch (ProcessedSilentlyException &e) { /* try reading again */ @@ -908,7 +903,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, return; if ((received_size < BASE_HEADER_SIZE) || - (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { + (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { LOG(derr_con << m_connection->getDesc() << "Receive(): Invalid incoming packet, " << "size: " << received_size @@ -999,9 +994,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata, << ", channel: " << (u32)channelnum << ", returned " << resultdata.getSize() << " bytes" << std::endl); - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(std::move(e)); + m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata)); } catch (ProcessedSilentlyException &e) { } @@ -1026,10 +1019,11 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8 if (!peer) continue; - if (dynamic_cast<UDPPeer *>(&peer) == 0) + UDPPeer *p = dynamic_cast<UDPPeer *>(&peer); + if (!p) continue; - for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) { + for (Channel &channel : p->channels) { if (checkIncomingBuffers(&channel, peer_id, dst)) { return true; } @@ -1042,32 +1036,34 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, session_t &peer_id, SharedBuffer<u8> &dst) { u16 firstseqnum = 0; - if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) { - if (firstseqnum == channel->readNextIncomingSeqNum()) { - BufferedPacket p = channel->incoming_reliables.popFirst(); - peer_id = readPeerId(*p.data); - u8 channelnum = readChannel(*p.data); - u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); + if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum)) + return false; - LOG(dout_con << m_connection->getDesc() - << "UNBUFFERING TYPE_RELIABLE" - << " seqnum=" << seqnum - << " peer_id=" << peer_id - << " channel=" << ((int) channelnum & 0xff) - << std::endl); + if (firstseqnum != channel->readNextIncomingSeqNum()) + return false; - channel->incNextIncomingSeqNum(); + BufferedPacketPtr p = channel->incoming_reliables.popFirst(); - u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE; - // Get out the inside packet and re-process it - SharedBuffer<u8> payload(p.data.getSize() - headers_size); - memcpy(*payload, &p.data[headers_size], payload.getSize()); + peer_id = readPeerId(p->data); // Carried over to caller function + u8 channelnum = readChannel(p->data); + u16 seqnum = p->getSeqnum(); - dst = processPacket(channel, payload, peer_id, channelnum, true); - return true; - } - } - return false; + LOG(dout_con << m_connection->getDesc() + << "UNBUFFERING TYPE_RELIABLE" + << " seqnum=" << seqnum + << " peer_id=" << peer_id + << " channel=" << ((int) channelnum & 0xff) + << std::endl); + + channel->incNextIncomingSeqNum(); + + u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE; + // Get out the inside packet and re-process it + SharedBuffer<u8> payload(p->size() - headers_size); + memcpy(*payload, &p->data[headers_size], payload.getSize()); + + dst = processPacket(channel, payload, peer_id, channelnum, true); + return true; } SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel, @@ -1115,7 +1111,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan if (packetdata.getSize() < 2) throw InvalidIncomingDataException("packetdata.getSize() < 2"); - u8 controltype = readU8(&(packetdata[1])); + ControlType controltype = (ControlType)readU8(&(packetdata[1])); if (controltype == CONTROLTYPE_ACK) { assert(channel != NULL); @@ -1131,7 +1127,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan << seqnum << " ]" << std::endl); try { - BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum); + BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum); // the rtt calculation will be a bit off for re-sent packets but that's okay { @@ -1140,14 +1136,14 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan // a overflow is quite unlikely but as it'd result in major // rtt miscalculation we handle it here - if (current_time > p.absolute_send_time) { - float rtt = (current_time - p.absolute_send_time) / 1000.0; + if (current_time > p->absolute_send_time) { + float rtt = (current_time - p->absolute_send_time) / 1000.0; // Let peer calculate stuff according to it // (avg_rtt and resend_timeout) dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt); - } else if (p.totaltime > 0) { - float rtt = p.totaltime; + } else if (p->totaltime > 0) { + float rtt = p->totaltime; // Let peer calculate stuff according to it // (avg_rtt and resend_timeout) @@ -1156,7 +1152,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan } // put bytes for max bandwidth calculation - channel->UpdateBytesSent(p.data.getSize(), 1); + channel->UpdateBytesSent(p->size(), 1); if (channel->outgoing_reliables_sent.size() == 0) m_connection->TriggerSend(); } catch (NotFoundException &e) { @@ -1204,7 +1200,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan throw ProcessedSilentlyException("Got a DISCO"); } else { LOG(derr_con << m_connection->getDesc() - << "INVALID TYPE_CONTROL: invalid controltype=" + << "INVALID controltype=" << ((int) controltype & 0xff) << std::endl); throw InvalidIncomingDataException("Invalid control type"); } @@ -1232,7 +1228,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe if (peer->getAddress(MTP_UDP, peer_address)) { // We have to create a packet again for buffering // This isn't actually too bad an idea. - BufferedPacket packet = makePacket(peer_address, + BufferedPacketPtr packet = con::makePacket(peer_address, packetdata, m_connection->GetProtocolID(), peer->id, @@ -1267,7 +1263,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha if (packetdata.getSize() < RELIABLE_HEADER_SIZE) throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE"); - u16 seqnum = readU16(&packetdata[1]); + const u16 seqnum = readU16(&packetdata[1]); bool is_future_packet = false; bool is_old_packet = false; @@ -1311,7 +1307,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha // This one comes later, buffer it. // Actually we have to make a packet to buffer one. // Well, we have all the ingredients, so just do it. - BufferedPacket packet = con::makePacket( + BufferedPacketPtr packet = con::makePacket( peer_address, packetdata, m_connection->GetProtocolID(), @@ -1328,9 +1324,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha throw ProcessedQueued("Buffered future reliable packet"); } catch (AlreadyExistsException &e) { } catch (IncomingDataCorruption &e) { - ConnectionCommand discon; - discon.disconnect_peer(peer->id); - m_connection->putCommand(discon); + m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id)); LOG(derr_con << m_connection->getDesc() << "INVALID, TYPE_RELIABLE peer_id: " << peer->id @@ -1351,7 +1345,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha u16 queued_seqnum = 0; if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) { if (queued_seqnum == seqnum) { - BufferedPacket queued_packet = channel->incoming_reliables.popFirst(); + BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst(); /** TODO find a way to verify the new against the old packet */ } } |