aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/client/client.cpp2
-rw-r--r--src/network/connection.cpp445
-rw-r--r--src/network/connection.h416
-rw-r--r--src/network/connectionthreads.cpp190
-rw-r--r--src/network/connectionthreads.h31
-rw-r--r--src/unittest/test_connection.cpp10
-rw-r--r--src/util/pointer.h58
7 files changed, 600 insertions, 552 deletions
diff --git a/src/client/client.cpp b/src/client/client.cpp
index 45cc62a33..3ee1298ff 100644
--- a/src/client/client.cpp
+++ b/src/client/client.cpp
@@ -877,7 +877,7 @@ void Client::ProcessData(NetworkPacket *pkt)
*/
if(sender_peer_id != PEER_ID_SERVER) {
infostream << "Client::ProcessData(): Discarding data not "
- "coming from server: peer_id=" << sender_peer_id
+ "coming from server: peer_id=" << sender_peer_id << " command=" << pkt->getCommand()
<< std::endl;
return;
}
diff --git a/src/network/connection.cpp b/src/network/connection.cpp
index 548b2e3a0..2d3cf6e88 100644
--- a/src/network/connection.cpp
+++ b/src/network/connection.cpp
@@ -62,18 +62,27 @@ namespace con
#define PING_TIMEOUT 5.0
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+u16 BufferedPacket::getSeqnum() const
+{
+ if (size() < BASE_HEADER_SIZE + 3)
+ return 0; // should never happen
+
+ return readU16(&data[BASE_HEADER_SIZE + 1]);
+}
+
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel)
{
u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
- BufferedPacket p(packet_size);
- p.address = address;
- writeU32(&p.data[0], protocol_id);
- writeU16(&p.data[4], sender_peer_id);
- writeU8(&p.data[6], channel);
+ BufferedPacketPtr p(new BufferedPacket(packet_size));
+ p->address = address;
+
+ writeU32(&p->data[0], protocol_id);
+ writeU16(&p->data[4], sender_peer_id);
+ writeU8(&p->data[6], channel);
- memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
+ memcpy(&p->data[BASE_HEADER_SIZE], *data, data.getSize());
return p;
}
@@ -169,9 +178,8 @@ void ReliablePacketBuffer::print()
MutexAutoLock listlock(m_list_mutex);
LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
unsigned int index = 0;
- for (BufferedPacket &bufferedPacket : m_list) {
- u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
- LOG(dout_con<<index<< ":" << s << std::endl);
+ for (BufferedPacketPtr &packet : m_list) {
+ LOG(dout_con<<index<< ":" << packet->getSeqnum() << std::endl);
index++;
}
}
@@ -188,16 +196,13 @@ u32 ReliablePacketBuffer::size()
return m_list.size();
}
-RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
+RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum)
{
- std::list<BufferedPacket>::iterator i = m_list.begin();
- for(; i != m_list.end(); ++i)
- {
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
- if (s == seqnum)
- break;
+ for (auto it = m_list.begin(); it != m_list.end(); ++it) {
+ if ((*it)->getSeqnum() == seqnum)
+ return it;
}
- return i;
+ return m_list.end();
}
bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
@@ -205,54 +210,54 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
- const BufferedPacket &p = m_list.front();
- result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ result = m_list.front()->getSeqnum();
return true;
}
-BufferedPacket ReliablePacketBuffer::popFirst()
+BufferedPacketPtr ReliablePacketBuffer::popFirst()
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
throw NotFoundException("Buffer is empty");
- BufferedPacket p = std::move(m_list.front());
+
+ BufferedPacketPtr p(m_list.front());
m_list.pop_front();
if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
- m_oldest_non_answered_ack =
- readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
return p;
}
-BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
+BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum)
{
MutexAutoLock listlock(m_list_mutex);
- RPBSearchResult r = findPacket(seqnum);
- if (r == notFound()) {
+ RPBSearchResult r = findPacketNoLock(seqnum);
+ if (r == m_list.end()) {
LOG(dout_con<<"Sequence number: " << seqnum
<< " not found in reliable buffer"<<std::endl);
throw NotFoundException("seqnum not found in buffer");
}
- BufferedPacket p = std::move(*r);
+ BufferedPacketPtr p(*r);
m_list.erase(r);
if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
- m_oldest_non_answered_ack =
- readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
return p;
}
-void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
+void ReliablePacketBuffer::insert(BufferedPacketPtr &p_ptr, u16 next_expected)
{
MutexAutoLock listlock(m_list_mutex);
- if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
+ const BufferedPacket &p = *p_ptr;
+
+ if (p.size() < BASE_HEADER_SIZE + 3) {
errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
"reliable packet" << std::endl;
return;
@@ -263,7 +268,7 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
<< std::endl;
return;
}
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ const u16 seqnum = p.getSeqnum();
if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
@@ -280,44 +285,44 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
// Find the right place for the packet and insert it there
// If list is empty, just add it
- if (m_list.empty())
- {
- m_list.push_back(p);
+ if (m_list.empty()) {
+ m_list.push_back(p_ptr);
m_oldest_non_answered_ack = seqnum;
// Done.
return;
}
// Otherwise find the right place
- std::list<BufferedPacket>::iterator i = m_list.begin();
+ auto it = m_list.begin();
// Find the first packet in the list which has a higher seqnum
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ u16 s = (*it)->getSeqnum();
/* case seqnum is smaller then next_expected seqnum */
/* this is true e.g. on wrap around */
if (seqnum < next_expected) {
- while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
- ++i;
- if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ while(((s < seqnum) || (s >= next_expected)) && (it != m_list.end())) {
+ ++it;
+ if (it != m_list.end())
+ s = (*it)->getSeqnum();
}
}
/* non wrap around case (at least for incoming and next_expected */
else
{
- while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
- ++i;
- if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ while(((s < seqnum) && (s >= next_expected)) && (it != m_list.end())) {
+ ++it;
+ if (it != m_list.end())
+ s = (*it)->getSeqnum();
}
}
if (s == seqnum) {
/* nothing to do this seems to be a resent packet */
/* for paranoia reason data should be compared */
+ auto &i = *it;
if (
- (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
- (i->data.getSize() != p.data.getSize()) ||
+ (i->getSeqnum() != seqnum) ||
+ (i->size() != p.size()) ||
(i->address != p.address)
)
{
@@ -325,51 +330,52 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
fprintf(stderr,
"Duplicated seqnum %d non matching packet detected:\n",
seqnum);
- fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
- readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
+ fprintf(stderr, "Old: seqnum: %05d size: %04zu, address: %s\n",
+ i->getSeqnum(), i->size(),
i->address.serializeString().c_str());
- fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
- readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
+ fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n",
+ p.getSeqnum(), p.size(),
p.address.serializeString().c_str());
throw IncomingDataCorruption("duplicated packet isn't same as original one");
}
}
/* insert or push back */
- else if (i != m_list.end()) {
- m_list.insert(i, p);
+ else if (it != m_list.end()) {
+ m_list.insert(it, p_ptr);
} else {
- m_list.push_back(p);
+ m_list.push_back(p_ptr);
}
/* update last packet number */
- m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
void ReliablePacketBuffer::incrementTimeouts(float dtime)
{
MutexAutoLock listlock(m_list_mutex);
- for (BufferedPacket &bufferedPacket : m_list) {
- bufferedPacket.time += dtime;
- bufferedPacket.totaltime += dtime;
+ for (auto &packet : m_list) {
+ packet->time += dtime;
+ packet->totaltime += dtime;
}
}
-std::list<BufferedPacket>
+std::list<ConstSharedPtr<BufferedPacket>>
ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
{
MutexAutoLock listlock(m_list_mutex);
- std::list<BufferedPacket> timed_outs;
- for (BufferedPacket &bufferedPacket : m_list) {
- if (bufferedPacket.time >= timeout) {
- // caller will resend packet so reset time and increase counter
- bufferedPacket.time = 0.0f;
- bufferedPacket.resend_count++;
+ std::list<ConstSharedPtr<BufferedPacket>> timed_outs;
+ for (auto &packet : m_list) {
+ if (packet->time < timeout)
+ continue;
- timed_outs.push_back(bufferedPacket);
+ // caller will resend packet so reset time and increase counter
+ packet->time = 0.0f;
+ packet->resend_count++;
- if (timed_outs.size() >= max_packets)
- break;
- }
+ timed_outs.emplace_back(packet);
+
+ if (timed_outs.size() >= max_packets)
+ break;
}
return timed_outs;
}
@@ -428,11 +434,13 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
}
}
-SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
+SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reliable)
{
MutexAutoLock listlock(m_map_mutex);
+ const BufferedPacket &p = *p_ptr;
+
u32 headersize = BASE_HEADER_SIZE + 7;
- if (p.data.getSize() < headersize) {
+ if (p.size() < headersize) {
errorstream << "Invalid data size for split packet" << std::endl;
return SharedBuffer<u8>();
}
@@ -473,7 +481,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
<<std::endl);
// Cut chunk data out of packet
- u32 chunkdatasize = p.data.getSize() - headersize;
+ u32 chunkdatasize = p.size() - headersize;
SharedBuffer<u8> chunkdata(chunkdatasize);
memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
@@ -520,14 +528,67 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
ConnectionCommand
*/
-void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
- bool reliable_)
+ConnectionCommandPtr ConnectionCommand::create(ConnectionCommandType type)
+{
+ return ConnectionCommandPtr(new ConnectionCommand(type));
+}
+
+ConnectionCommandPtr ConnectionCommand::serve(Address address)
+{
+ auto c = create(CONNCMD_SERVE);
+ c->address = address;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::connect(Address address)
{
- type = CONNCMD_SEND;
- peer_id = peer_id_;
- channelnum = channelnum_;
- data = pkt->oldForgePacket();
- reliable = reliable_;
+ auto c = create(CONNCMD_CONNECT);
+ c->address = address;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect()
+{
+ return create(CONNCMD_DISCONNECT);
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect_peer(session_t peer_id)
+{
+ auto c = create(CONNCMD_DISCONNECT_PEER);
+ c->peer_id = peer_id;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum,
+ NetworkPacket *pkt, bool reliable)
+{
+ auto c = create(CONNCMD_SEND);
+ c->peer_id = peer_id;
+ c->channelnum = channelnum;
+ c->reliable = reliable;
+ c->data = pkt->oldForgePacket();
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data)
+{
+ auto c = create(CONCMD_ACK);
+ c->peer_id = peer_id;
+ c->channelnum = channelnum;
+ c->reliable = false;
+ data.copyTo(c->data);
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::createPeer(session_t peer_id, const Buffer<u8> &data)
+{
+ auto c = create(CONCMD_CREATE_PEER);
+ c->peer_id = peer_id;
+ c->channelnum = 0;
+ c->reliable = true;
+ c->raw = true;
+ data.copyTo(c->data);
+ return c;
}
/*
@@ -562,39 +623,38 @@ void Channel::setNextSplitSeqNum(u16 seqnum)
u16 Channel::getOutgoingSequenceNumber(bool& successful)
{
MutexAutoLock internal(m_internal_mutex);
+
u16 retval = next_outgoing_seqnum;
- u16 lowest_unacked_seqnumber;
+ successful = false;
/* shortcut if there ain't any packet in outgoing list */
- if (outgoing_reliables_sent.empty())
- {
+ if (outgoing_reliables_sent.empty()) {
+ successful = true;
next_outgoing_seqnum++;
return retval;
}
- if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
- {
+ u16 lowest_unacked_seqnumber;
+ if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) {
if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > m_window_size) {
- successful = false;
return 0;
}
- }
- else {
+ } else {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
m_window_size) {
- successful = false;
return 0;
}
}
}
+ successful = true;
next_outgoing_seqnum++;
return retval;
}
@@ -946,45 +1006,45 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
return false;
}
-void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
+void UDPPeer::PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return;
- Channel &chan = channels[c.channelnum];
+ Channel &chan = channels[c->channelnum];
if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */
- (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
+ (chan.queued_reliables.size() + 1 < chan.getWindowSize() / 2)) {
LOG(dout_con<<m_connection->getDesc()
- <<" processing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() << std::endl);
- if (!processReliableSendCommand(c,max_packet_size)) {
- chan.queued_commands.push_back(c);
- }
- }
- else {
+ <<" processing reliable command for peer id: " << c->peer_id
+ <<" data size: " << c->data.getSize() << std::endl);
+ if (processReliableSendCommand(c, max_packet_size))
+ return;
+ } else {
LOG(dout_con<<m_connection->getDesc()
- <<" Queueing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() <<std::endl);
- chan.queued_commands.push_back(c);
- if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+ <<" Queueing reliable command for peer id: " << c->peer_id
+ <<" data size: " << c->data.getSize() <<std::endl);
+
+ if (chan.queued_commands.size() + 1 >= chan.getWindowSize() / 2) {
LOG(derr_con << m_connection->getDesc()
- << "Possible packet stall to peer id: " << c.peer_id
+ << "Possible packet stall to peer id: " << c->peer_id
<< " queued_commands=" << chan.queued_commands.size()
<< std::endl);
}
}
+ chan.queued_commands.push_back(c);
}
bool UDPPeer::processReliableSendCommand(
- ConnectionCommand &c,
+ ConnectionCommandPtr &c_ptr,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return true;
+ const auto &c = *c_ptr;
Channel &chan = channels[c.channelnum];
u32 chunksize_max = max_packet_size
@@ -1003,9 +1063,9 @@ bool UDPPeer::processReliableSendCommand(
chan.setNextSplitSeqNum(split_sequence_number);
}
- bool have_sequence_number = true;
+ bool have_sequence_number = false;
bool have_initial_sequence_number = false;
- std::queue<BufferedPacket> toadd;
+ std::queue<BufferedPacketPtr> toadd;
volatile u16 initial_sequence_number = 0;
for (SharedBuffer<u8> &original : originals) {
@@ -1024,25 +1084,23 @@ bool UDPPeer::processReliableSendCommand(
SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(address, reliable,
+ BufferedPacketPtr p = con::makePacket(address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
c.channelnum);
- toadd.push(std::move(p));
+ toadd.push(p);
}
if (have_sequence_number) {
- volatile u16 pcount = 0;
while (!toadd.empty()) {
- BufferedPacket p = std::move(toadd.front());
+ BufferedPacketPtr p = toadd.front();
toadd.pop();
// LOG(dout_con<<connection->getDesc()
// << " queuing reliable packet for peer_id: " << c.peer_id
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
- chan.queued_reliables.push(std::move(p));
- pcount++;
+ chan.queued_reliables.push(p);
}
sanity_check(chan.queued_reliables.size() < 0xFFFF);
return true;
@@ -1051,6 +1109,7 @@ bool UDPPeer::processReliableSendCommand(
volatile u16 packets_available = toadd.size();
/* we didn't get a single sequence number no need to fill queue */
if (!have_initial_sequence_number) {
+ LOG(derr_con << m_connection->getDesc() << "Ran out of sequence numbers!" << std::endl);
return false;
}
@@ -1096,18 +1155,18 @@ void UDPPeer::RunCommandQueues(
(channel.queued_reliables.size() < maxtransfer) &&
(commands_processed < maxcommands)) {
try {
- ConnectionCommand c = channel.queued_commands.front();
+ ConnectionCommandPtr c = channel.queued_commands.front();
LOG(dout_con << m_connection->getDesc()
<< " processing queued reliable command " << std::endl);
// Packet is processed, remove it from queue
- if (processReliableSendCommand(c,max_packet_size)) {
+ if (processReliableSendCommand(c, max_packet_size)) {
channel.queued_commands.pop_front();
} else {
LOG(dout_con << m_connection->getDesc()
- << " Failed to queue packets for peer_id: " << c.peer_id
- << ", delaying sending of " << c.data.getSize()
+ << " Failed to queue packets for peer_id: " << c->peer_id
+ << ", delaying sending of " << c->data.getSize()
<< " bytes" << std::endl);
}
}
@@ -1130,7 +1189,7 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
channels[channel].setNextSplitSeqNum(seqnum);
}
-SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
+SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable)
{
assert(channel < CHANNEL_COUNT); // Pre-condition
@@ -1138,6 +1197,63 @@ SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd
}
/*
+ ConnectionEvent
+*/
+
+const char *ConnectionEvent::describe() const
+{
+ switch(type) {
+ case CONNEVENT_NONE:
+ return "CONNEVENT_NONE";
+ case CONNEVENT_DATA_RECEIVED:
+ return "CONNEVENT_DATA_RECEIVED";
+ case CONNEVENT_PEER_ADDED:
+ return "CONNEVENT_PEER_ADDED";
+ case CONNEVENT_PEER_REMOVED:
+ return "CONNEVENT_PEER_REMOVED";
+ case CONNEVENT_BIND_FAILED:
+ return "CONNEVENT_BIND_FAILED";
+ }
+ return "Invalid ConnectionEvent";
+}
+
+
+ConnectionEventPtr ConnectionEvent::create(ConnectionEventType type)
+{
+ return std::shared_ptr<ConnectionEvent>(new ConnectionEvent(type));
+}
+
+ConnectionEventPtr ConnectionEvent::dataReceived(session_t peer_id, const Buffer<u8> &data)
+{
+ auto e = create(CONNEVENT_DATA_RECEIVED);
+ e->peer_id = peer_id;
+ data.copyTo(e->data);
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerAdded(session_t peer_id, Address address)
+{
+ auto e = create(CONNEVENT_PEER_ADDED);
+ e->peer_id = peer_id;
+ e->address = address;
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerRemoved(session_t peer_id, bool is_timeout, Address address)
+{
+ auto e = create(CONNEVENT_PEER_REMOVED);
+ e->peer_id = peer_id;
+ e->timeout = is_timeout;
+ e->address = address;
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::bindFailed()
+{
+ return create(CONNEVENT_BIND_FAILED);
+}
+
+/*
Connection
*/
@@ -1186,18 +1302,12 @@ Connection::~Connection()
/* Internal stuff */
-void Connection::putEvent(const ConnectionEvent &e)
+void Connection::putEvent(ConnectionEventPtr e)
{
- assert(e.type != CONNEVENT_NONE); // Pre-condition
+ assert(e->type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(e);
}
-void Connection::putEvent(ConnectionEvent &&e)
-{
- assert(e.type != CONNEVENT_NONE); // Pre-condition
- m_event_queue.push_back(std::move(e));
-}
-
void Connection::TriggerSend()
{
m_sendThread->Trigger();
@@ -1260,11 +1370,9 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
Address peer_address;
//any peer has a primary address this never fails!
peer->getAddress(MTP_PRIMARY, peer_address);
- // Create event
- ConnectionEvent e;
- e.peerRemoved(peer_id, timeout, peer_address);
- putEvent(e);
+ // Create event
+ putEvent(ConnectionEvent::peerRemoved(peer_id, timeout, peer_address));
peer->Drop();
return true;
@@ -1272,18 +1380,16 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
/* Interface */
-ConnectionEvent Connection::waitEvent(u32 timeout_ms)
+ConnectionEventPtr Connection::waitEvent(u32 timeout_ms)
{
try {
return m_event_queue.pop_front(timeout_ms);
} catch(ItemNotFoundException &ex) {
- ConnectionEvent e;
- e.type = CONNEVENT_NONE;
- return e;
+ return ConnectionEvent::create(CONNEVENT_NONE);
}
}
-void Connection::putCommand(const ConnectionCommand &c)
+void Connection::putCommand(ConnectionCommandPtr c)
{
if (!m_shutting_down) {
m_command_queue.push_back(c);
@@ -1291,26 +1397,14 @@ void Connection::putCommand(const ConnectionCommand &c)
}
}
-void Connection::putCommand(ConnectionCommand &&c)
-{
- if (!m_shutting_down) {
- m_command_queue.push_back(std::move(c));
- m_sendThread->Trigger();
- }
-}
-
void Connection::Serve(Address bind_addr)
{
- ConnectionCommand c;
- c.serve(bind_addr);
- putCommand(c);
+ putCommand(ConnectionCommand::serve(bind_addr));
}
void Connection::Connect(Address address)
{
- ConnectionCommand c;
- c.connect(address);
- putCommand(c);
+ putCommand(ConnectionCommand::connect(address));
}
bool Connection::Connected()
@@ -1332,9 +1426,7 @@ bool Connection::Connected()
void Connection::Disconnect()
{
- ConnectionCommand c;
- c.disconnect();
- putCommand(c);
+ putCommand(ConnectionCommand::disconnect());
}
bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
@@ -1345,11 +1437,15 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
This is not considered to be a problem (is it?)
*/
for(;;) {
- ConnectionEvent e = waitEvent(timeout);
- if (e.type != CONNEVENT_NONE)
+ ConnectionEventPtr e_ptr = waitEvent(timeout);
+ const ConnectionEvent &e = *e_ptr;
+
+ if (e.type != CONNEVENT_NONE) {
LOG(dout_con << getDesc() << ": Receive: got event: "
<< e.describe() << std::endl);
- switch(e.type) {
+ }
+
+ switch (e.type) {
case CONNEVENT_NONE:
return false;
case CONNEVENT_DATA_RECEIVED:
@@ -1397,10 +1493,7 @@ void Connection::Send(session_t peer_id, u8 channelnum,
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
- ConnectionCommand c;
-
- c.send(peer_id, channelnum, pkt, reliable);
- putCommand(std::move(c));
+ putCommand(ConnectionCommand::send(peer_id, channelnum, pkt, reliable));
}
Address Connection::GetPeerAddress(session_t peer_id)
@@ -1499,41 +1592,31 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
LOG(dout_con << getDesc()
<< "createPeer(): giving peer_id=" << peer_id_new << std::endl);
- ConnectionCommand cmd;
- Buffer<u8> reply(4);
- writeU8(&reply[0], PACKET_TYPE_CONTROL);
- writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
- writeU16(&reply[2], peer_id_new);
- cmd.createPeer(peer_id_new,reply);
- putCommand(std::move(cmd));
+ {
+ Buffer<u8> reply(4);
+ writeU8(&reply[0], PACKET_TYPE_CONTROL);
+ writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
+ writeU16(&reply[2], peer_id_new);
+ putCommand(ConnectionCommand::createPeer(peer_id_new, reply));
+ }
// Create peer addition event
- ConnectionEvent e;
- e.peerAdded(peer_id_new, sender);
- putEvent(e);
+ putEvent(ConnectionEvent::peerAdded(peer_id_new, sender));
// We're now talking to a valid peer_id
return peer_id_new;
}
-void Connection::PrintInfo(std::ostream &out)
-{
- m_info_mutex.lock();
- out<<getDesc()<<": ";
- m_info_mutex.unlock();
-}
-
const std::string Connection::getDesc()
{
+ MutexAutoLock _(m_info_mutex);
return std::string("con(")+
itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
}
void Connection::DisconnectPeer(session_t peer_id)
{
- ConnectionCommand discon;
- discon.disconnect_peer(peer_id);
- putCommand(discon);
+ putCommand(ConnectionCommand::disconnect_peer(peer_id));
}
void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
@@ -1545,14 +1628,12 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
" channel: " << (channelnum & 0xFF) <<
" seqnum: " << seqnum << std::endl);
- ConnectionCommand c;
SharedBuffer<u8> ack(4);
writeU8(&ack[0], PACKET_TYPE_CONTROL);
writeU8(&ack[1], CONTROLTYPE_ACK);
writeU16(&ack[2], seqnum);
- c.ack(peer_id, channelnum, ack);
- putCommand(std::move(c));
+ putCommand(ConnectionCommand::ack(peer_id, channelnum, ack));
m_sendThread->Trigger();
}
diff --git a/src/network/connection.h b/src/network/connection.h
index ea74ffb1c..1afb4ae84 100644
--- a/src/network/connection.h
+++ b/src/network/connection.h
@@ -32,6 +32,95 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <vector>
#include <map>
+#define MAX_UDP_PEERS 65535
+
+/*
+=== NOTES ===
+
+A packet is sent through a channel to a peer with a basic header:
+ Header (7 bytes):
+ [0] u32 protocol_id
+ [4] session_t sender_peer_id
+ [6] u8 channel
+sender_peer_id:
+ Unique to each peer.
+ value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
+ value 1 (PEER_ID_SERVER) is reserved for server
+ these constants are defined in constants.h
+channel:
+ Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
+*/
+#define BASE_HEADER_SIZE 7
+#define CHANNEL_COUNT 3
+
+/*
+Packet types:
+
+CONTROL: This is a packet used by the protocol.
+- When this is processed, nothing is handed to the user.
+ Header (2 byte):
+ [0] u8 type
+ [1] u8 controltype
+controltype and data description:
+ CONTROLTYPE_ACK
+ [2] u16 seqnum
+ CONTROLTYPE_SET_PEER_ID
+ [2] session_t peer_id_new
+ CONTROLTYPE_PING
+ - There is no actual reply, but this can be sent in a reliable
+ packet to get a reply
+ CONTROLTYPE_DISCO
+*/
+enum ControlType : u8 {
+ CONTROLTYPE_ACK = 0,
+ CONTROLTYPE_SET_PEER_ID = 1,
+ CONTROLTYPE_PING = 2,
+ CONTROLTYPE_DISCO = 3,
+};
+
+/*
+ORIGINAL: This is a plain packet with no control and no error
+checking at all.
+- When this is processed, it is directly handed to the user.
+ Header (1 byte):
+ [0] u8 type
+*/
+//#define TYPE_ORIGINAL 1
+#define ORIGINAL_HEADER_SIZE 1
+
+/*
+SPLIT: These are sequences of packets forming one bigger piece of
+data.
+- When processed and all the packet_nums 0...packet_count-1 are
+ present (this should be buffered), the resulting data shall be
+ directly handed to the user.
+- If the data fails to come up in a reasonable time, the buffer shall
+ be silently discarded.
+- These can be sent as-is or atop of a RELIABLE packet stream.
+ Header (7 bytes):
+ [0] u8 type
+ [1] u16 seqnum
+ [3] u16 chunk_count
+ [5] u16 chunk_num
+*/
+//#define TYPE_SPLIT 2
+
+/*
+RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
+and they shall be delivered in the same order as sent. This is done
+with a buffer in the receiving and transmitting end.
+- When this is processed, the contents of each packet is recursively
+ processed as packets.
+ Header (3 bytes):
+ [0] u8 type
+ [1] u16 seqnum
+
+*/
+//#define TYPE_RELIABLE 3
+#define RELIABLE_HEADER_SIZE 3
+#define SEQNUM_INITIAL 65500
+#define SEQNUM_MAX 65535
+
class NetworkPacket;
namespace con
@@ -46,9 +135,13 @@ typedef enum MTProtocols {
MTP_MINETEST_RELIABLE_UDP
} MTProtocols;
-#define MAX_UDP_PEERS 65535
-
-#define SEQNUM_MAX 65535
+enum PacketType : u8 {
+ PACKET_TYPE_CONTROL = 0,
+ PACKET_TYPE_ORIGINAL = 1,
+ PACKET_TYPE_SPLIT = 2,
+ PACKET_TYPE_RELIABLE = 3,
+ PACKET_TYPE_MAX
+};
inline bool seqnum_higher(u16 totest, u16 base)
{
@@ -85,24 +178,40 @@ static inline float CALC_DTIME(u64 lasttime, u64 curtime)
return MYMAX(MYMIN(value,0.1),0.0);
}
-struct BufferedPacket
-{
- BufferedPacket(u8 *a_data, u32 a_size):
- data(a_data, a_size)
- {}
- BufferedPacket(u32 a_size):
- data(a_size)
- {}
- Buffer<u8> data; // Data of the packet, including headers
+/*
+ Struct for all kinds of packets. Includes following data:
+ BASE_HEADER
+ u8[] packet data (usually copied from SharedBuffer<u8>)
+*/
+struct BufferedPacket {
+ BufferedPacket(u32 a_size)
+ {
+ m_data.resize(a_size);
+ data = &m_data[0];
+ }
+
+ DISABLE_CLASS_COPY(BufferedPacket)
+
+ u16 getSeqnum() const;
+
+ inline const size_t size() const { return m_data.size(); }
+
+ u8 *data; // Direct memory access
float time = 0.0f; // Seconds from buffering the packet or re-sending
float totaltime = 0.0f; // Seconds from buffering the packet
u64 absolute_send_time = -1;
Address address; // Sender or destination
unsigned int resend_count = 0;
+
+private:
+ std::vector<u8> m_data; // Data of the packet, including headers
};
+typedef std::shared_ptr<BufferedPacket> BufferedPacketPtr;
+
+
// This adds the base headers to the data and makes a packet out of it
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel);
// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
@@ -137,100 +246,11 @@ private:
};
/*
-=== NOTES ===
-
-A packet is sent through a channel to a peer with a basic header:
- Header (7 bytes):
- [0] u32 protocol_id
- [4] session_t sender_peer_id
- [6] u8 channel
-sender_peer_id:
- Unique to each peer.
- value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
- value 1 (PEER_ID_SERVER) is reserved for server
- these constants are defined in constants.h
-channel:
- Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
-*/
-#define BASE_HEADER_SIZE 7
-#define CHANNEL_COUNT 3
-/*
-Packet types:
-
-CONTROL: This is a packet used by the protocol.
-- When this is processed, nothing is handed to the user.
- Header (2 byte):
- [0] u8 type
- [1] u8 controltype
-controltype and data description:
- CONTROLTYPE_ACK
- [2] u16 seqnum
- CONTROLTYPE_SET_PEER_ID
- [2] session_t peer_id_new
- CONTROLTYPE_PING
- - There is no actual reply, but this can be sent in a reliable
- packet to get a reply
- CONTROLTYPE_DISCO
-*/
-//#define TYPE_CONTROL 0
-#define CONTROLTYPE_ACK 0
-#define CONTROLTYPE_SET_PEER_ID 1
-#define CONTROLTYPE_PING 2
-#define CONTROLTYPE_DISCO 3
-
-/*
-ORIGINAL: This is a plain packet with no control and no error
-checking at all.
-- When this is processed, it is directly handed to the user.
- Header (1 byte):
- [0] u8 type
-*/
-//#define TYPE_ORIGINAL 1
-#define ORIGINAL_HEADER_SIZE 1
-/*
-SPLIT: These are sequences of packets forming one bigger piece of
-data.
-- When processed and all the packet_nums 0...packet_count-1 are
- present (this should be buffered), the resulting data shall be
- directly handed to the user.
-- If the data fails to come up in a reasonable time, the buffer shall
- be silently discarded.
-- These can be sent as-is or atop of a RELIABLE packet stream.
- Header (7 bytes):
- [0] u8 type
- [1] u16 seqnum
- [3] u16 chunk_count
- [5] u16 chunk_num
-*/
-//#define TYPE_SPLIT 2
-/*
-RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
-and they shall be delivered in the same order as sent. This is done
-with a buffer in the receiving and transmitting end.
-- When this is processed, the contents of each packet is recursively
- processed as packets.
- Header (3 bytes):
- [0] u8 type
- [1] u16 seqnum
-
-*/
-//#define TYPE_RELIABLE 3
-#define RELIABLE_HEADER_SIZE 3
-#define SEQNUM_INITIAL 65500
-
-enum PacketType: u8 {
- PACKET_TYPE_CONTROL = 0,
- PACKET_TYPE_ORIGINAL = 1,
- PACKET_TYPE_SPLIT = 2,
- PACKET_TYPE_RELIABLE = 3,
- PACKET_TYPE_MAX
-};
-/*
A buffer which stores reliable packets and sorts them internally
for fast access to the smallest one.
*/
-typedef std::list<BufferedPacket>::iterator RPBSearchResult;
+typedef std::list<BufferedPacketPtr>::iterator RPBSearchResult;
class ReliablePacketBuffer
{
@@ -239,12 +259,12 @@ public:
bool getFirstSeqnum(u16& result);
- BufferedPacket popFirst();
- BufferedPacket popSeqnum(u16 seqnum);
- void insert(const BufferedPacket &p, u16 next_expected);
+ BufferedPacketPtr popFirst();
+ BufferedPacketPtr popSeqnum(u16 seqnum);
+ void insert(BufferedPacketPtr &p_ptr, u16 next_expected);
void incrementTimeouts(float dtime);
- std::list<BufferedPacket> getTimedOuts(float timeout, u32 max_packets);
+ std::list<ConstSharedPtr<BufferedPacket>> getTimedOuts(float timeout, u32 max_packets);
void print();
bool empty();
@@ -252,10 +272,9 @@ public:
private:
- RPBSearchResult findPacket(u16 seqnum); // does not perform locking
- inline RPBSearchResult notFound() { return m_list.end(); }
+ RPBSearchResult findPacketNoLock(u16 seqnum);
- std::list<BufferedPacket> m_list;
+ std::list<BufferedPacketPtr> m_list;
u16 m_oldest_non_answered_ack;
@@ -274,7 +293,7 @@ public:
Returns a reference counted buffer of length != 0 when a full split
packet is constructed. If not, returns one of length 0.
*/
- SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
+ SharedBuffer<u8> insert(BufferedPacketPtr &p_ptr, bool reliable);
void removeUnreliableTimedOuts(float dtime, float timeout);
@@ -285,25 +304,6 @@ private:
std::mutex m_map_mutex;
};
-struct OutgoingPacket
-{
- session_t peer_id;
- u8 channelnum;
- SharedBuffer<u8> data;
- bool reliable;
- bool ack;
-
- OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
- bool reliable_,bool ack_=false):
- peer_id(peer_id_),
- channelnum(channelnum_),
- data(data_),
- reliable(reliable_),
- ack(ack_)
- {
- }
-};
-
enum ConnectionCommandType{
CONNCMD_NONE,
CONNCMD_SERVE,
@@ -316,9 +316,13 @@ enum ConnectionCommandType{
CONCMD_CREATE_PEER
};
+struct ConnectionCommand;
+typedef std::shared_ptr<ConnectionCommand> ConnectionCommandPtr;
+
+// This is very similar to ConnectionEvent
struct ConnectionCommand
{
- enum ConnectionCommandType type = CONNCMD_NONE;
+ const ConnectionCommandType type;
Address address;
session_t peer_id = PEER_ID_INEXISTENT;
u8 channelnum = 0;
@@ -326,48 +330,21 @@ struct ConnectionCommand
bool reliable = false;
bool raw = false;
- ConnectionCommand() = default;
-
- void serve(Address address_)
- {
- type = CONNCMD_SERVE;
- address = address_;
- }
- void connect(Address address_)
- {
- type = CONNCMD_CONNECT;
- address = address_;
- }
- void disconnect()
- {
- type = CONNCMD_DISCONNECT;
- }
- void disconnect_peer(session_t peer_id_)
- {
- type = CONNCMD_DISCONNECT_PEER;
- peer_id = peer_id_;
- }
+ DISABLE_CLASS_COPY(ConnectionCommand);
- void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
+ static ConnectionCommandPtr serve(Address address);
+ static ConnectionCommandPtr connect(Address address);
+ static ConnectionCommandPtr disconnect();
+ static ConnectionCommandPtr disconnect_peer(session_t peer_id);
+ static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
+ static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data);
+ static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer<u8> &data);
- void ack(session_t peer_id_, u8 channelnum_, const Buffer<u8> &data_)
- {
- type = CONCMD_ACK;
- peer_id = peer_id_;
- channelnum = channelnum_;
- data = data_;
- reliable = false;
- }
+private:
+ ConnectionCommand(ConnectionCommandType type_) :
+ type(type_) {}
- void createPeer(session_t peer_id_, const Buffer<u8> &data_)
- {
- type = CONCMD_CREATE_PEER;
- peer_id = peer_id_;
- data = data_;
- channelnum = 0;
- reliable = true;
- raw = true;
- }
+ static ConnectionCommandPtr create(ConnectionCommandType type);
};
/* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
@@ -402,10 +379,10 @@ public:
ReliablePacketBuffer outgoing_reliables_sent;
//queued reliable packets
- std::queue<BufferedPacket> queued_reliables;
+ std::queue<BufferedPacketPtr> queued_reliables;
//queue commands prior splitting to packets
- std::deque<ConnectionCommand> queued_commands;
+ std::deque<ConnectionCommandPtr> queued_commands;
IncomingSplitBuffer incoming_splits;
@@ -514,7 +491,7 @@ class Peer {
public:
friend class PeerHelper;
- Peer(Address address_,u16 id_,Connection* connection) :
+ Peer(Address address_,session_t id_,Connection* connection) :
id(id_),
m_connection(connection),
address(address_),
@@ -528,11 +505,11 @@ class Peer {
};
// Unique id of the peer
- u16 id;
+ const session_t id;
void Drop();
- virtual void PutReliableSendCommand(ConnectionCommand &c,
+ virtual void PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size) {};
virtual bool getAddress(MTProtocols type, Address& toset) = 0;
@@ -549,7 +526,7 @@ class Peer {
virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
- virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+ virtual SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable)
{
errorstream << "Peer::addSplitPacket called,"
@@ -586,7 +563,7 @@ class Peer {
bool IncUseCount();
void DecUseCount();
- std::mutex m_exclusive_access_mutex;
+ mutable std::mutex m_exclusive_access_mutex;
bool m_pending_deletion = false;
@@ -634,7 +611,7 @@ public:
UDPPeer(u16 a_id, Address a_address, Connection* connection);
virtual ~UDPPeer() = default;
- void PutReliableSendCommand(ConnectionCommand &c,
+ void PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size);
bool getAddress(MTProtocols type, Address& toset);
@@ -642,7 +619,7 @@ public:
u16 getNextSplitSequenceNumber(u8 channel);
void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
- SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+ SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable);
protected:
@@ -671,7 +648,7 @@ private:
float resend_timeout = 0.5;
bool processReliableSendCommand(
- ConnectionCommand &c,
+ ConnectionCommandPtr &c_ptr,
unsigned int max_packet_size);
};
@@ -679,7 +656,7 @@ private:
Connection
*/
-enum ConnectionEventType{
+enum ConnectionEventType {
CONNEVENT_NONE,
CONNEVENT_DATA_RECEIVED,
CONNEVENT_PEER_ADDED,
@@ -687,56 +664,32 @@ enum ConnectionEventType{
CONNEVENT_BIND_FAILED,
};
+struct ConnectionEvent;
+typedef std::shared_ptr<ConnectionEvent> ConnectionEventPtr;
+
+// This is very similar to ConnectionCommand
struct ConnectionEvent
{
- enum ConnectionEventType type = CONNEVENT_NONE;
+ const ConnectionEventType type;
session_t peer_id = 0;
Buffer<u8> data;
bool timeout = false;
Address address;
- ConnectionEvent() = default;
+ // We don't want to copy "data"
+ DISABLE_CLASS_COPY(ConnectionEvent);
- const char *describe() const
- {
- switch(type) {
- case CONNEVENT_NONE:
- return "CONNEVENT_NONE";
- case CONNEVENT_DATA_RECEIVED:
- return "CONNEVENT_DATA_RECEIVED";
- case CONNEVENT_PEER_ADDED:
- return "CONNEVENT_PEER_ADDED";
- case CONNEVENT_PEER_REMOVED:
- return "CONNEVENT_PEER_REMOVED";
- case CONNEVENT_BIND_FAILED:
- return "CONNEVENT_BIND_FAILED";
- }
- return "Invalid ConnectionEvent";
- }
+ static ConnectionEventPtr create(ConnectionEventType type);
+ static ConnectionEventPtr dataReceived(session_t peer_id, const Buffer<u8> &data);
+ static ConnectionEventPtr peerAdded(session_t peer_id, Address address);
+ static ConnectionEventPtr peerRemoved(session_t peer_id, bool is_timeout, Address address);
+ static ConnectionEventPtr bindFailed();
- void dataReceived(session_t peer_id_, const Buffer<u8> &data_)
- {
- type = CONNEVENT_DATA_RECEIVED;
- peer_id = peer_id_;
- data = data_;
- }
- void peerAdded(session_t peer_id_, Address address_)
- {
- type = CONNEVENT_PEER_ADDED;
- peer_id = peer_id_;
- address = address_;
- }
- void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
- {
- type = CONNEVENT_PEER_REMOVED;
- peer_id = peer_id_;
- timeout = timeout_;
- address = address_;
- }
- void bindFailed()
- {
- type = CONNEVENT_BIND_FAILED;
- }
+ const char *describe() const;
+
+private:
+ ConnectionEvent(ConnectionEventType type_) :
+ type(type_) {}
};
class PeerHandler;
@@ -752,10 +705,9 @@ public:
~Connection();
/* Interface */
- ConnectionEvent waitEvent(u32 timeout_ms);
- // Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible
- void putCommand(const ConnectionCommand &c);
- void putCommand(ConnectionCommand &&c);
+ ConnectionEventPtr waitEvent(u32 timeout_ms);
+
+ void putCommand(ConnectionCommandPtr c);
void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
void Serve(Address bind_addr);
@@ -785,8 +737,6 @@ protected:
void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
- void PrintInfo(std::ostream &out);
-
std::vector<session_t> getPeerIDs()
{
MutexAutoLock peerlock(m_peers_mutex);
@@ -795,13 +745,11 @@ protected:
UDPSocket m_udpSocket;
// Command queue: user -> SendThread
- MutexedQueue<ConnectionCommand> m_command_queue;
+ MutexedQueue<ConnectionCommandPtr> m_command_queue;
bool Receive(NetworkPacket *pkt, u32 timeout);
- // Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible
- void putEvent(const ConnectionEvent &e);
- void putEvent(ConnectionEvent &&e);
+ void putEvent(ConnectionEventPtr e);
void TriggerSend();
@@ -811,7 +759,7 @@ protected:
}
private:
// Event queue: ReceiveThread -> user
- MutexedQueue<ConnectionEvent> m_event_queue;
+ MutexedQueue<ConnectionEventPtr> m_event_queue;
session_t m_peer_id = 0;
u32 m_protocol_id;
@@ -823,7 +771,7 @@ private:
std::unique_ptr<ConnectionSendThread> m_sendThread;
std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
- std::mutex m_info_mutex;
+ mutable std::mutex m_info_mutex;
// Backwards compatibility
PeerHandler *m_bc_peerhandler;
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp
index a306ced9b..dca065ae1 100644
--- a/src/network/connectionthreads.cpp
+++ b/src/network/connectionthreads.cpp
@@ -50,11 +50,11 @@ std::mutex log_conthread_mutex;
#define WINDOW_SIZE 5
-static session_t readPeerId(u8 *packetdata)
+static session_t readPeerId(const u8 *packetdata)
{
return readU16(&packetdata[4]);
}
-static u8 readChannel(u8 *packetdata)
+static u8 readChannel(const u8 *packetdata)
{
return readU8(&packetdata[6]);
}
@@ -114,9 +114,9 @@ void *ConnectionSendThread::run()
}
/* translate commands to packets */
- ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
- while (c.type != CONNCMD_NONE) {
- if (c.reliable)
+ auto c = m_connection->m_command_queue.pop_frontNoEx(0);
+ while (c && c->type != CONNCMD_NONE) {
+ if (c->reliable)
processReliableCommand(c);
else
processNonReliableCommand(c);
@@ -227,21 +227,21 @@ void ConnectionSendThread::runTimeouts(float dtime)
m_iteration_packets_avaialble -= timed_outs.size();
for (const auto &k : timed_outs) {
- u8 channelnum = readChannel(*k.data);
- u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1]));
+ u8 channelnum = readChannel(k->data);
+ u16 seqnum = k->getSeqnum();
- channel.UpdateBytesLost(k.data.getSize());
+ channel.UpdateBytesLost(k->size());
LOG(derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to "
- << k.address.serializeString()
+ << k->address.serializeString()
<< "(t/o=" << resend_timeout << "): "
- << "count=" << k.resend_count
+ << "count=" << k->resend_count
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl);
- rawSend(k);
+ rawSend(k.get());
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
@@ -274,25 +274,24 @@ void ConnectionSendThread::runTimeouts(float dtime)
}
}
-void ConnectionSendThread::rawSend(const BufferedPacket &packet)
+void ConnectionSendThread::rawSend(const BufferedPacket *p)
{
try {
- m_connection->m_udpSocket.Send(packet.address, *packet.data,
- packet.data.getSize());
+ m_connection->m_udpSocket.Send(p->address, p->data, p->size());
LOG(dout_con << m_connection->getDesc()
- << " rawSend: " << packet.data.getSize()
+ << " rawSend: " << p->size()
<< " bytes sent" << std::endl);
} catch (SendFailedException &e) {
LOG(derr_con << m_connection->getDesc()
<< "Connection::rawSend(): SendFailedException: "
- << packet.address.serializeString() << std::endl);
+ << p->address.serializeString() << std::endl);
}
}
-void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
+void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
{
try {
- p.absolute_send_time = porting::getTimeMs();
+ p->absolute_send_time = porting::getTimeMs();
// Buffer the packet
channel->outgoing_reliables_sent.insert(p,
(channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
@@ -305,7 +304,7 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan
}
// Send the packet
- rawSend(p);
+ rawSend(p.get());
}
bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
@@ -321,11 +320,10 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
if (reliable) {
- bool have_sequence_number_for_raw_packet = true;
- u16 seqnum =
- channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
+ bool have_seqnum = false;
+ const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
- if (!have_sequence_number_for_raw_packet)
+ if (!have_seqnum)
return false;
SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
@@ -333,13 +331,12 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, reliable,
+ BufferedPacketPtr p = con::makePacket(peer_address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// first check if our send window is already maxed out
- if (channel->outgoing_reliables_sent.size()
- < channel->getWindowSize()) {
+ if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a reliable packet to peer_id " << peer_id
<< " channel: " << (u32)channelnum
@@ -352,19 +349,19 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
<< " INFO: queueing reliable packet for peer_id: " << peer_id
<< " channel: " << (u32)channelnum
<< " seqnum: " << seqnum << std::endl);
- channel->queued_reliables.push(std::move(p));
+ channel->queued_reliables.push(p);
return false;
}
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, data,
+ BufferedPacketPtr p = con::makePacket(peer_address, data,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// Send the packet
- rawSend(p);
+ rawSend(p.get());
return true;
}
@@ -374,11 +371,11 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
return false;
}
-void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
{
- assert(c.reliable); // Pre-condition
+ assert(c->reliable); // Pre-condition
- switch (c.type) {
+ switch (c->type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_NONE" << std::endl);
@@ -399,7 +396,7 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
case CONCMD_CREATE_PEER:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
- if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
/* put to queue if we couldn't send it immediately */
sendReliable(c);
}
@@ -412,13 +409,14 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
default:
LOG(dout_con << m_connection->getDesc()
- << " Invalid reliable command type: " << c.type << std::endl);
+ << " Invalid reliable command type: " << c->type << std::endl);
}
}
-void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
{
+ const ConnectionCommand &c = *c_ptr;
assert(!c.reliable); // Pre-condition
switch (c.type) {
@@ -480,9 +478,7 @@ void ConnectionSendThread::serve(Address bind_address)
}
catch (SocketException &e) {
// Create event
- ConnectionEvent ce;
- ce.bindFailed();
- m_connection->putEvent(ce);
+ m_connection->putEvent(ConnectionEvent::bindFailed());
}
}
@@ -495,9 +491,7 @@ void ConnectionSendThread::connect(Address address)
UDPPeer *peer = m_connection->createServerPeer(address);
// Create event
- ConnectionEvent e;
- e.peerAdded(peer->id, peer->address);
- m_connection->putEvent(e);
+ m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
Address bind_addr;
@@ -586,9 +580,9 @@ void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
}
}
-void ConnectionSendThread::sendReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
{
- PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
+ PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
if (!peer)
return;
@@ -604,7 +598,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data
}
}
-void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
{
std::vector<session_t> peerids = m_connection->getPeerIDs();
@@ -663,8 +657,12 @@ void ConnectionSendThread::sendPackets(float dtime)
// first send queued reliable packets for all peers (if possible)
for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
Channel &channel = udpPeer->channels[i];
- u16 next_to_ack = 0;
+ // Reduces logging verbosity
+ if (channel.queued_reliables.empty())
+ continue;
+
+ u16 next_to_ack = 0;
channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
u16 next_to_receive = 0;
channel.incoming_reliables.getFirstSeqnum(next_to_receive);
@@ -694,13 +692,13 @@ void ConnectionSendThread::sendPackets(float dtime)
channel.outgoing_reliables_sent.size()
< channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) {
- BufferedPacket p = std::move(channel.queued_reliables.front());
+ BufferedPacketPtr p = channel.queued_reliables.front();
channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a queued reliable packet "
<< " channel: " << i
- << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
+ << ", seqnum: " << p->getSeqnum()
<< std::endl);
sendAsPacketReliable(p, &channel);
@@ -881,17 +879,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
try {
// First, see if there any buffered packets we can process now
if (packet_queued) {
- bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
- while (data_left) {
+ while (true) {
try {
- data_left = getFromBuffers(peer_id, resultdata);
- if (data_left) {
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(std::move(e));
- }
+ if (!getFromBuffers(peer_id, resultdata))
+ break;
+
+ m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
@@ -908,7 +903,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
return;
if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+ (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
@@ -999,9 +994,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(std::move(e));
+ m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
}
@@ -1026,10 +1019,11 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8
if (!peer)
continue;
- if (dynamic_cast<UDPPeer *>(&peer) == 0)
+ UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
+ if (!p)
continue;
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+ for (Channel &channel : p->channels) {
if (checkIncomingBuffers(&channel, peer_id, dst)) {
return true;
}
@@ -1042,32 +1036,34 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
session_t &peer_id, SharedBuffer<u8> &dst)
{
u16 firstseqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
- if (firstseqnum == channel->readNextIncomingSeqNum()) {
- BufferedPacket p = channel->incoming_reliables.popFirst();
- peer_id = readPeerId(*p.data);
- u8 channelnum = readChannel(*p.data);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
+ return false;
- LOG(dout_con << m_connection->getDesc()
- << "UNBUFFERING TYPE_RELIABLE"
- << " seqnum=" << seqnum
- << " peer_id=" << peer_id
- << " channel=" << ((int) channelnum & 0xff)
- << std::endl);
+ if (firstseqnum != channel->readNextIncomingSeqNum())
+ return false;
- channel->incNextIncomingSeqNum();
+ BufferedPacketPtr p = channel->incoming_reliables.popFirst();
- u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(p.data.getSize() - headers_size);
- memcpy(*payload, &p.data[headers_size], payload.getSize());
+ peer_id = readPeerId(p->data); // Carried over to caller function
+ u8 channelnum = readChannel(p->data);
+ u16 seqnum = p->getSeqnum();
- dst = processPacket(channel, payload, peer_id, channelnum, true);
- return true;
- }
- }
- return false;
+ LOG(dout_con << m_connection->getDesc()
+ << "UNBUFFERING TYPE_RELIABLE"
+ << " seqnum=" << seqnum
+ << " peer_id=" << peer_id
+ << " channel=" << ((int) channelnum & 0xff)
+ << std::endl);
+
+ channel->incNextIncomingSeqNum();
+
+ u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(p->size() - headers_size);
+ memcpy(*payload, &p->data[headers_size], payload.getSize());
+
+ dst = processPacket(channel, payload, peer_id, channelnum, true);
+ return true;
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
@@ -1115,7 +1111,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
- u8 controltype = readU8(&(packetdata[1]));
+ ControlType controltype = (ControlType)readU8(&(packetdata[1]));
if (controltype == CONTROLTYPE_ACK) {
assert(channel != NULL);
@@ -1131,7 +1127,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
<< seqnum << " ]" << std::endl);
try {
- BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
+ BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
// the rtt calculation will be a bit off for re-sent packets but that's okay
{
@@ -1140,14 +1136,14 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
- if (current_time > p.absolute_send_time) {
- float rtt = (current_time - p.absolute_send_time) / 1000.0;
+ if (current_time > p->absolute_send_time) {
+ float rtt = (current_time - p->absolute_send_time) / 1000.0;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
- } else if (p.totaltime > 0) {
- float rtt = p.totaltime;
+ } else if (p->totaltime > 0) {
+ float rtt = p->totaltime;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
@@ -1156,7 +1152,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
}
// put bytes for max bandwidth calculation
- channel->UpdateBytesSent(p.data.getSize(), 1);
+ channel->UpdateBytesSent(p->size(), 1);
if (channel->outgoing_reliables_sent.size() == 0)
m_connection->TriggerSend();
} catch (NotFoundException &e) {
@@ -1204,7 +1200,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
throw ProcessedSilentlyException("Got a DISCO");
} else {
LOG(derr_con << m_connection->getDesc()
- << "INVALID TYPE_CONTROL: invalid controltype="
+ << "INVALID controltype="
<< ((int) controltype & 0xff) << std::endl);
throw InvalidIncomingDataException("Invalid control type");
}
@@ -1232,7 +1228,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe
if (peer->getAddress(MTP_UDP, peer_address)) {
// We have to create a packet again for buffering
// This isn't actually too bad an idea.
- BufferedPacket packet = makePacket(peer_address,
+ BufferedPacketPtr packet = con::makePacket(peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
@@ -1267,7 +1263,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
- u16 seqnum = readU16(&packetdata[1]);
+ const u16 seqnum = readU16(&packetdata[1]);
bool is_future_packet = false;
bool is_old_packet = false;
@@ -1311,7 +1307,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
// This one comes later, buffer it.
// Actually we have to make a packet to buffer one.
// Well, we have all the ingredients, so just do it.
- BufferedPacket packet = con::makePacket(
+ BufferedPacketPtr packet = con::makePacket(
peer_address,
packetdata,
m_connection->GetProtocolID(),
@@ -1328,9 +1324,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
throw ProcessedQueued("Buffered future reliable packet");
} catch (AlreadyExistsException &e) {
} catch (IncomingDataCorruption &e) {
- ConnectionCommand discon;
- discon.disconnect_peer(peer->id);
- m_connection->putCommand(discon);
+ m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
LOG(derr_con << m_connection->getDesc()
<< "INVALID, TYPE_RELIABLE peer_id: " << peer->id
@@ -1351,7 +1345,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
u16 queued_seqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
if (queued_seqnum == seqnum) {
- BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
+ BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
/** TODO find a way to verify the new against the old packet */
}
}
diff --git a/src/network/connectionthreads.h b/src/network/connectionthreads.h
index 612407c3b..c2e2dae12 100644
--- a/src/network/connectionthreads.h
+++ b/src/network/connectionthreads.h
@@ -29,6 +29,25 @@ namespace con
class Connection;
+struct OutgoingPacket
+{
+ session_t peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+ bool ack;
+
+ OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
+ bool reliable_,bool ack_=false):
+ peer_id(peer_id_),
+ channelnum(channelnum_),
+ data(data_),
+ reliable(reliable_),
+ ack(ack_)
+ {
+ }
+};
+
class ConnectionSendThread : public Thread
{
@@ -51,27 +70,27 @@ public:
private:
void runTimeouts(float dtime);
- void rawSend(const BufferedPacket &packet);
+ void rawSend(const BufferedPacket *p);
bool rawSendAsPacket(session_t peer_id, u8 channelnum,
const SharedBuffer<u8> &data, bool reliable);
- void processReliableCommand(ConnectionCommand &c);
- void processNonReliableCommand(ConnectionCommand &c);
+ void processReliableCommand(ConnectionCommandPtr &c);
+ void processNonReliableCommand(ConnectionCommandPtr &c);
void serve(Address bind_address);
void connect(Address address);
void disconnect();
void disconnect_peer(session_t peer_id);
void send(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data);
- void sendReliable(ConnectionCommand &c);
+ void sendReliable(ConnectionCommandPtr &c);
void sendToAll(u8 channelnum, const SharedBuffer<u8> &data);
- void sendToAllReliable(ConnectionCommand &c);
+ void sendToAllReliable(ConnectionCommandPtr &c);
void sendPackets(float dtime);
void sendAsPacket(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data,
bool ack = false);
- void sendAsPacketReliable(BufferedPacket &p, Channel *channel);
+ void sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel);
bool packetsQueued();
diff --git a/src/unittest/test_connection.cpp b/src/unittest/test_connection.cpp
index 23b7e9105..04fea90d6 100644
--- a/src/unittest/test_connection.cpp
+++ b/src/unittest/test_connection.cpp
@@ -124,7 +124,7 @@ void TestConnection::testHelpers()
Address a(127,0,0,1, 10);
const u16 seqnum = 34352;
- con::BufferedPacket p1 = con::makePacket(a, data1,
+ con::BufferedPacketPtr p1 = con::makePacket(a, data1,
proto_id, peer_id, channel);
/*
We should now have a packet with this data:
@@ -135,10 +135,10 @@ void TestConnection::testHelpers()
Data:
[7] u8 data1[0]
*/
- UASSERT(readU32(&p1.data[0]) == proto_id);
- UASSERT(readU16(&p1.data[4]) == peer_id);
- UASSERT(readU8(&p1.data[6]) == channel);
- UASSERT(readU8(&p1.data[7]) == data1[0]);
+ UASSERT(readU32(&p1->data[0]) == proto_id);
+ UASSERT(readU16(&p1->data[4]) == peer_id);
+ UASSERT(readU8(&p1->data[6]) == channel);
+ UASSERT(readU8(&p1->data[7]) == data1[0]);
//infostream<<"initial data1[0]="<<((u32)data1[0]&0xff)<<std::endl;
diff --git a/src/util/pointer.h b/src/util/pointer.h
index 7fc5de551..245ac85bf 100644
--- a/src/util/pointer.h
+++ b/src/util/pointer.h
@@ -22,6 +22,21 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "irrlichttypes.h"
#include "debug.h" // For assert()
#include <cstring>
+#include <memory> // std::shared_ptr
+
+
+template<typename T> class ConstSharedPtr {
+public:
+ ConstSharedPtr(T *ptr) : ptr(ptr) {}
+ ConstSharedPtr(const std::shared_ptr<T> &ptr) : ptr(ptr) {}
+
+ const T* get() const noexcept { return ptr.get(); }
+ const T& operator*() const noexcept { return *ptr.get(); }
+ const T* operator->() const noexcept { return ptr.get(); }
+
+private:
+ std::shared_ptr<T> ptr;
+};
template <typename T>
class Buffer
@@ -40,17 +55,11 @@ public:
else
data = NULL;
}
- Buffer(const Buffer &buffer)
- {
- m_size = buffer.m_size;
- if(m_size != 0)
- {
- data = new T[buffer.m_size];
- memcpy(data, buffer.data, buffer.m_size);
- }
- else
- data = NULL;
- }
+
+ // Disable class copy
+ Buffer(const Buffer &) = delete;
+ Buffer &operator=(const Buffer &) = delete;
+
Buffer(Buffer &&buffer)
{
m_size = buffer.m_size;
@@ -81,21 +90,6 @@ public:
drop();
}
- Buffer& operator=(const Buffer &buffer)
- {
- if(this == &buffer)
- return *this;
- drop();
- m_size = buffer.m_size;
- if(m_size != 0)
- {
- data = new T[buffer.m_size];
- memcpy(data, buffer.data, buffer.m_size);
- }
- else
- data = NULL;
- return *this;
- }
Buffer& operator=(Buffer &&buffer)
{
if(this == &buffer)
@@ -113,6 +107,18 @@ public:
return *this;
}
+ void copyTo(Buffer &buffer) const
+ {
+ buffer.drop();
+ buffer.m_size = m_size;
+ if (m_size != 0) {
+ buffer.data = new T[m_size];
+ memcpy(buffer.data, data, m_size);
+ } else {
+ buffer.data = nullptr;
+ }
+ }
+
T & operator[](unsigned int i) const
{
return data[i];