aboutsummaryrefslogtreecommitdiff
path: root/src/network/connectionthreads.cpp
diff options
context:
space:
mode:
authorSmallJoker <SmallJoker@users.noreply.github.com>2021-12-01 20:22:33 +0100
committerGitHub <noreply@github.com>2021-12-01 20:22:33 +0100
commit57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c (patch)
tree6efa0044d48f5e241540b412ef4c7fe63edec570 /src/network/connectionthreads.cpp
parent1dc1305ada07da8c2a278b46a34d58af86184af9 (diff)
downloadminetest-57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c.tar.gz
minetest-57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c.tar.bz2
minetest-57a59ae92d4bbfa4fdd60d7acd72c6440f63a49c.zip
Network: Delete copy constructor and use std::move instead (#11642)
This is a follow-up change which disables class copies where possible to avoid unnecessary memory movements.
Diffstat (limited to 'src/network/connectionthreads.cpp')
-rw-r--r--src/network/connectionthreads.cpp190
1 files changed, 92 insertions, 98 deletions
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp
index a306ced9b..dca065ae1 100644
--- a/src/network/connectionthreads.cpp
+++ b/src/network/connectionthreads.cpp
@@ -50,11 +50,11 @@ std::mutex log_conthread_mutex;
#define WINDOW_SIZE 5
-static session_t readPeerId(u8 *packetdata)
+static session_t readPeerId(const u8 *packetdata)
{
return readU16(&packetdata[4]);
}
-static u8 readChannel(u8 *packetdata)
+static u8 readChannel(const u8 *packetdata)
{
return readU8(&packetdata[6]);
}
@@ -114,9 +114,9 @@ void *ConnectionSendThread::run()
}
/* translate commands to packets */
- ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
- while (c.type != CONNCMD_NONE) {
- if (c.reliable)
+ auto c = m_connection->m_command_queue.pop_frontNoEx(0);
+ while (c && c->type != CONNCMD_NONE) {
+ if (c->reliable)
processReliableCommand(c);
else
processNonReliableCommand(c);
@@ -227,21 +227,21 @@ void ConnectionSendThread::runTimeouts(float dtime)
m_iteration_packets_avaialble -= timed_outs.size();
for (const auto &k : timed_outs) {
- u8 channelnum = readChannel(*k.data);
- u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1]));
+ u8 channelnum = readChannel(k->data);
+ u16 seqnum = k->getSeqnum();
- channel.UpdateBytesLost(k.data.getSize());
+ channel.UpdateBytesLost(k->size());
LOG(derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to "
- << k.address.serializeString()
+ << k->address.serializeString()
<< "(t/o=" << resend_timeout << "): "
- << "count=" << k.resend_count
+ << "count=" << k->resend_count
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl);
- rawSend(k);
+ rawSend(k.get());
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
@@ -274,25 +274,24 @@ void ConnectionSendThread::runTimeouts(float dtime)
}
}
-void ConnectionSendThread::rawSend(const BufferedPacket &packet)
+void ConnectionSendThread::rawSend(const BufferedPacket *p)
{
try {
- m_connection->m_udpSocket.Send(packet.address, *packet.data,
- packet.data.getSize());
+ m_connection->m_udpSocket.Send(p->address, p->data, p->size());
LOG(dout_con << m_connection->getDesc()
- << " rawSend: " << packet.data.getSize()
+ << " rawSend: " << p->size()
<< " bytes sent" << std::endl);
} catch (SendFailedException &e) {
LOG(derr_con << m_connection->getDesc()
<< "Connection::rawSend(): SendFailedException: "
- << packet.address.serializeString() << std::endl);
+ << p->address.serializeString() << std::endl);
}
}
-void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
+void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
{
try {
- p.absolute_send_time = porting::getTimeMs();
+ p->absolute_send_time = porting::getTimeMs();
// Buffer the packet
channel->outgoing_reliables_sent.insert(p,
(channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
@@ -305,7 +304,7 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan
}
// Send the packet
- rawSend(p);
+ rawSend(p.get());
}
bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
@@ -321,11 +320,10 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
if (reliable) {
- bool have_sequence_number_for_raw_packet = true;
- u16 seqnum =
- channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
+ bool have_seqnum = false;
+ const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
- if (!have_sequence_number_for_raw_packet)
+ if (!have_seqnum)
return false;
SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
@@ -333,13 +331,12 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, reliable,
+ BufferedPacketPtr p = con::makePacket(peer_address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// first check if our send window is already maxed out
- if (channel->outgoing_reliables_sent.size()
- < channel->getWindowSize()) {
+ if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a reliable packet to peer_id " << peer_id
<< " channel: " << (u32)channelnum
@@ -352,19 +349,19 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
<< " INFO: queueing reliable packet for peer_id: " << peer_id
<< " channel: " << (u32)channelnum
<< " seqnum: " << seqnum << std::endl);
- channel->queued_reliables.push(std::move(p));
+ channel->queued_reliables.push(p);
return false;
}
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, data,
+ BufferedPacketPtr p = con::makePacket(peer_address, data,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// Send the packet
- rawSend(p);
+ rawSend(p.get());
return true;
}
@@ -374,11 +371,11 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
return false;
}
-void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
{
- assert(c.reliable); // Pre-condition
+ assert(c->reliable); // Pre-condition
- switch (c.type) {
+ switch (c->type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_NONE" << std::endl);
@@ -399,7 +396,7 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
case CONCMD_CREATE_PEER:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
- if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
/* put to queue if we couldn't send it immediately */
sendReliable(c);
}
@@ -412,13 +409,14 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
default:
LOG(dout_con << m_connection->getDesc()
- << " Invalid reliable command type: " << c.type << std::endl);
+ << " Invalid reliable command type: " << c->type << std::endl);
}
}
-void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
{
+ const ConnectionCommand &c = *c_ptr;
assert(!c.reliable); // Pre-condition
switch (c.type) {
@@ -480,9 +478,7 @@ void ConnectionSendThread::serve(Address bind_address)
}
catch (SocketException &e) {
// Create event
- ConnectionEvent ce;
- ce.bindFailed();
- m_connection->putEvent(ce);
+ m_connection->putEvent(ConnectionEvent::bindFailed());
}
}
@@ -495,9 +491,7 @@ void ConnectionSendThread::connect(Address address)
UDPPeer *peer = m_connection->createServerPeer(address);
// Create event
- ConnectionEvent e;
- e.peerAdded(peer->id, peer->address);
- m_connection->putEvent(e);
+ m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
Address bind_addr;
@@ -586,9 +580,9 @@ void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
}
}
-void ConnectionSendThread::sendReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
{
- PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
+ PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
if (!peer)
return;
@@ -604,7 +598,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data
}
}
-void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
{
std::vector<session_t> peerids = m_connection->getPeerIDs();
@@ -663,8 +657,12 @@ void ConnectionSendThread::sendPackets(float dtime)
// first send queued reliable packets for all peers (if possible)
for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
Channel &channel = udpPeer->channels[i];
- u16 next_to_ack = 0;
+ // Reduces logging verbosity
+ if (channel.queued_reliables.empty())
+ continue;
+
+ u16 next_to_ack = 0;
channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
u16 next_to_receive = 0;
channel.incoming_reliables.getFirstSeqnum(next_to_receive);
@@ -694,13 +692,13 @@ void ConnectionSendThread::sendPackets(float dtime)
channel.outgoing_reliables_sent.size()
< channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) {
- BufferedPacket p = std::move(channel.queued_reliables.front());
+ BufferedPacketPtr p = channel.queued_reliables.front();
channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a queued reliable packet "
<< " channel: " << i
- << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
+ << ", seqnum: " << p->getSeqnum()
<< std::endl);
sendAsPacketReliable(p, &channel);
@@ -881,17 +879,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
try {
// First, see if there any buffered packets we can process now
if (packet_queued) {
- bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
- while (data_left) {
+ while (true) {
try {
- data_left = getFromBuffers(peer_id, resultdata);
- if (data_left) {
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(std::move(e));
- }
+ if (!getFromBuffers(peer_id, resultdata))
+ break;
+
+ m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
@@ -908,7 +903,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
return;
if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+ (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
@@ -999,9 +994,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(std::move(e));
+ m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
}
@@ -1026,10 +1019,11 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8
if (!peer)
continue;
- if (dynamic_cast<UDPPeer *>(&peer) == 0)
+ UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
+ if (!p)
continue;
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+ for (Channel &channel : p->channels) {
if (checkIncomingBuffers(&channel, peer_id, dst)) {
return true;
}
@@ -1042,32 +1036,34 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
session_t &peer_id, SharedBuffer<u8> &dst)
{
u16 firstseqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
- if (firstseqnum == channel->readNextIncomingSeqNum()) {
- BufferedPacket p = channel->incoming_reliables.popFirst();
- peer_id = readPeerId(*p.data);
- u8 channelnum = readChannel(*p.data);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
+ return false;
- LOG(dout_con << m_connection->getDesc()
- << "UNBUFFERING TYPE_RELIABLE"
- << " seqnum=" << seqnum
- << " peer_id=" << peer_id
- << " channel=" << ((int) channelnum & 0xff)
- << std::endl);
+ if (firstseqnum != channel->readNextIncomingSeqNum())
+ return false;
- channel->incNextIncomingSeqNum();
+ BufferedPacketPtr p = channel->incoming_reliables.popFirst();
- u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(p.data.getSize() - headers_size);
- memcpy(*payload, &p.data[headers_size], payload.getSize());
+ peer_id = readPeerId(p->data); // Carried over to caller function
+ u8 channelnum = readChannel(p->data);
+ u16 seqnum = p->getSeqnum();
- dst = processPacket(channel, payload, peer_id, channelnum, true);
- return true;
- }
- }
- return false;
+ LOG(dout_con << m_connection->getDesc()
+ << "UNBUFFERING TYPE_RELIABLE"
+ << " seqnum=" << seqnum
+ << " peer_id=" << peer_id
+ << " channel=" << ((int) channelnum & 0xff)
+ << std::endl);
+
+ channel->incNextIncomingSeqNum();
+
+ u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(p->size() - headers_size);
+ memcpy(*payload, &p->data[headers_size], payload.getSize());
+
+ dst = processPacket(channel, payload, peer_id, channelnum, true);
+ return true;
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
@@ -1115,7 +1111,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
- u8 controltype = readU8(&(packetdata[1]));
+ ControlType controltype = (ControlType)readU8(&(packetdata[1]));
if (controltype == CONTROLTYPE_ACK) {
assert(channel != NULL);
@@ -1131,7 +1127,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
<< seqnum << " ]" << std::endl);
try {
- BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
+ BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
// the rtt calculation will be a bit off for re-sent packets but that's okay
{
@@ -1140,14 +1136,14 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
- if (current_time > p.absolute_send_time) {
- float rtt = (current_time - p.absolute_send_time) / 1000.0;
+ if (current_time > p->absolute_send_time) {
+ float rtt = (current_time - p->absolute_send_time) / 1000.0;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
- } else if (p.totaltime > 0) {
- float rtt = p.totaltime;
+ } else if (p->totaltime > 0) {
+ float rtt = p->totaltime;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
@@ -1156,7 +1152,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
}
// put bytes for max bandwidth calculation
- channel->UpdateBytesSent(p.data.getSize(), 1);
+ channel->UpdateBytesSent(p->size(), 1);
if (channel->outgoing_reliables_sent.size() == 0)
m_connection->TriggerSend();
} catch (NotFoundException &e) {
@@ -1204,7 +1200,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
throw ProcessedSilentlyException("Got a DISCO");
} else {
LOG(derr_con << m_connection->getDesc()
- << "INVALID TYPE_CONTROL: invalid controltype="
+ << "INVALID controltype="
<< ((int) controltype & 0xff) << std::endl);
throw InvalidIncomingDataException("Invalid control type");
}
@@ -1232,7 +1228,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe
if (peer->getAddress(MTP_UDP, peer_address)) {
// We have to create a packet again for buffering
// This isn't actually too bad an idea.
- BufferedPacket packet = makePacket(peer_address,
+ BufferedPacketPtr packet = con::makePacket(peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
@@ -1267,7 +1263,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
- u16 seqnum = readU16(&packetdata[1]);
+ const u16 seqnum = readU16(&packetdata[1]);
bool is_future_packet = false;
bool is_old_packet = false;
@@ -1311,7 +1307,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
// This one comes later, buffer it.
// Actually we have to make a packet to buffer one.
// Well, we have all the ingredients, so just do it.
- BufferedPacket packet = con::makePacket(
+ BufferedPacketPtr packet = con::makePacket(
peer_address,
packetdata,
m_connection->GetProtocolID(),
@@ -1328,9 +1324,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
throw ProcessedQueued("Buffered future reliable packet");
} catch (AlreadyExistsException &e) {
} catch (IncomingDataCorruption &e) {
- ConnectionCommand discon;
- discon.disconnect_peer(peer->id);
- m_connection->putCommand(discon);
+ m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
LOG(derr_con << m_connection->getDesc()
<< "INVALID, TYPE_RELIABLE peer_id: " << peer->id
@@ -1351,7 +1345,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
u16 queued_seqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
if (queued_seqnum == seqnum) {
- BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
+ BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
/** TODO find a way to verify the new against the old packet */
}
}