summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPerttu Ahola <celeron55@gmail.com>2011-10-20 23:04:09 +0300
committerPerttu Ahola <celeron55@gmail.com>2011-10-20 23:04:09 +0300
commit4b6138e69b65271b0e568f821a4d1bd285affedd (patch)
tree003fd33f969e5a9bf0bc720bda7f869d1f9c1f45 /src
parentb6fcbc5fbaba4a7faa65f792b16e47a405fa4ebf (diff)
downloadminetest-4b6138e69b65271b0e568f821a4d1bd285affedd.tar.gz
minetest-4b6138e69b65271b0e568f821a4d1bd285affedd.tar.bz2
minetest-4b6138e69b65271b0e568f821a4d1bd285affedd.zip
Improve Connection with threading and some kind of congestion control
Diffstat (limited to 'src')
-rw-r--r--src/client.cpp23
-rw-r--r--src/client.h10
-rw-r--r--src/connection.cpp1368
-rw-r--r--src/connection.h281
-rw-r--r--src/defaultsettings.cpp2
-rw-r--r--src/main.cpp2
-rw-r--r--src/map.cpp2
-rw-r--r--src/server.cpp60
-rw-r--r--src/server.h4
-rw-r--r--src/servercommand.cpp23
-rw-r--r--src/test.cpp48
11 files changed, 1135 insertions, 688 deletions
diff --git a/src/client.cpp b/src/client.cpp
index 5ec53a524..a777293a3 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -246,7 +246,7 @@ void Client::connect(Address address)
{
DSTACK(__FUNCTION_NAME);
//JMutexAutoLock lock(m_con_mutex); //bulk comment-out
- m_con.setTimeoutMs(0);
+ m_con.SetTimeoutMs(0);
m_con.Connect(address);
}
@@ -563,8 +563,8 @@ void Client::step(float dtime)
counter = 0.0;
//JMutexAutoLock lock(m_con_mutex); //bulk comment-out
// connectedAndInitialized() is true, peer exists.
- con::Peer *peer = m_con.GetPeer(PEER_ID_SERVER);
- infostream<<"Client: avg_rtt="<<peer->avg_rtt<<std::endl;
+ float avg_rtt = m_con.GetPeerAvgRTT(PEER_ID_SERVER);
+ infostream<<"Client: avg_rtt="<<avg_rtt<<std::endl;
}
}
@@ -709,14 +709,6 @@ void Client::ProcessData(u8 *data, u32 datasize, u16 sender_peer_id)
return;
}
- con::Peer *peer;
- {
- //JMutexAutoLock lock(m_con_mutex); //bulk comment-out
- // All data is coming from the server
- // PeerNotFoundException is handled by caller.
- peer = m_con.GetPeer(PEER_ID_SERVER);
- }
-
u8 ser_version = m_server_ser_ver;
//infostream<<"Client received command="<<(int)command<<std::endl;
@@ -2168,9 +2160,10 @@ ClientEvent Client::getClientEvent()
float Client::getRTT(void)
{
- con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER);
- if(!peer)
- return 0.0;
- return peer->avg_rtt;
+ try{
+ return m_con.GetPeerAvgRTT(PEER_ID_SERVER);
+ } catch(con::PeerNotFoundException &e){
+ return 1337;
+ }
}
diff --git a/src/client.h b/src/client.h
index 52dd66ca2..8585f6d4a 100644
--- a/src/client.h
+++ b/src/client.h
@@ -251,11 +251,11 @@ public:
float getAvgRtt()
{
- //JMutexAutoLock lock(m_con_mutex); //bulk comment-out
- con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER);
- if(peer == NULL)
- return 0.0;
- return peer->avg_rtt;
+ try{
+ return m_con.GetPeerAvgRTT(PEER_ID_SERVER);
+ } catch(con::PeerNotFoundException){
+ return 1337;
+ }
}
bool getChatMessage(std::wstring &message)
diff --git a/src/connection.cpp b/src/connection.cpp
index 623994c4c..1c424839f 100644
--- a/src/connection.cpp
+++ b/src/connection.cpp
@@ -20,6 +20,8 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "connection.h"
#include "main.h"
#include "serialization.h"
+#include "log.h"
+#include "porting.h"
namespace con
{
@@ -439,16 +441,19 @@ Channel::~Channel()
Peer
*/
-Peer::Peer(u16 a_id, Address a_address)
+Peer::Peer(u16 a_id, Address a_address):
+ address(a_address),
+ id(a_id),
+ timeout_counter(0.0),
+ ping_timer(0.0),
+ resend_timeout(0.5),
+ avg_rtt(-1.0),
+ has_sent_with_id(false),
+ m_sendtime_accu(0),
+ m_max_packets_per_second(10),
+ m_num_sent(0),
+ m_max_num_sent(0)
{
- id = a_id;
- address = a_address;
- timeout_counter = 0.0;
- //resend_timeout = RESEND_TIMEOUT_MINIMUM;
- ping_timer = 0.0;
- resend_timeout = 0.5;
- avg_rtt = -1.0;
- has_sent_with_id = false;
}
Peer::~Peer()
{
@@ -456,6 +461,19 @@ Peer::~Peer()
void Peer::reportRTT(float rtt)
{
+ if(rtt >= 0.0){
+ if(rtt < 0.01){
+ if(m_max_packets_per_second < 100)
+ m_max_packets_per_second += 10;
+ } else if(rtt < 0.2){
+ if(m_max_packets_per_second < 100)
+ m_max_packets_per_second += 2;
+ } else {
+ if(m_max_packets_per_second > 5)
+ m_max_packets_per_second *= 0.5;
+ }
+ }
+
if(rtt < -0.999)
{}
else if(avg_rtt < 0.0)
@@ -485,461 +503,210 @@ void Peer::reportRTT(float rtt)
Connection
*/
-Connection::Connection(
- u32 protocol_id,
- u32 max_packet_size,
- float timeout,
- PeerHandler *peerhandler
-)
+Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
+ m_protocol_id(protocol_id),
+ m_max_packet_size(max_packet_size),
+ m_timeout(timeout),
+ m_peer_id(0),
+ m_bc_peerhandler(NULL),
+ m_bc_receive_timeout(0),
+ m_indentation(0)
{
- assert(peerhandler != NULL);
+ m_socket.setTimeoutMs(5);
- m_protocol_id = protocol_id;
- m_max_packet_size = max_packet_size;
- m_timeout = timeout;
- m_peer_id = PEER_ID_INEXISTENT;
- //m_waiting_new_peer_id = false;
- m_indentation = 0;
- m_peerhandler = peerhandler;
+ Start();
}
-Connection::~Connection()
+Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
+ PeerHandler *peerhandler):
+ m_protocol_id(protocol_id),
+ m_max_packet_size(max_packet_size),
+ m_timeout(timeout),
+ m_peer_id(0),
+ m_bc_peerhandler(peerhandler),
+ m_bc_receive_timeout(0),
+ m_indentation(0)
{
- // Clear peers
- core::map<u16, Peer*>::Iterator j;
- j = m_peers.getIterator();
- for(; j.atEnd() == false; j++)
- {
- Peer *peer = j.getNode()->getValue();
- delete peer;
- }
-}
+ m_socket.setTimeoutMs(5);
-void Connection::Serve(unsigned short port)
-{
- m_socket.Bind(port);
- m_peer_id = PEER_ID_SERVER;
+ Start();
}
-void Connection::Connect(Address address)
-{
- core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
- if(node != NULL){
- throw ConnectionException("Already connected to a server");
- }
- Peer *peer = new Peer(PEER_ID_SERVER, address);
- m_peers.insert(peer->id, peer);
- m_peerhandler->peerAdded(peer);
-
- m_socket.Bind(0);
-
- // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
- m_peer_id = PEER_ID_INEXISTENT;
- SharedBuffer<u8> data(0);
- Send(PEER_ID_SERVER, 0, data, true);
-
- //m_waiting_new_peer_id = true;
-}
-
-void Connection::Disconnect()
+Connection::~Connection()
{
- // Create and send DISCO packet
- SharedBuffer<u8> data(2);
- writeU8(&data[0], TYPE_CONTROL);
- writeU8(&data[1], CONTROLTYPE_DISCO);
-
- // Send to all
- core::map<u16, Peer*>::Iterator j;
- j = m_peers.getIterator();
- for(; j.atEnd() == false; j++)
- {
- Peer *peer = j.getNode()->getValue();
- SendAsPacket(peer->id, 0, data, false);
- }
+ stop();
}
-bool Connection::Connected()
-{
- if(m_peers.size() != 1)
- return false;
-
- core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
- if(node == NULL)
- return false;
-
- if(m_peer_id == PEER_ID_INEXISTENT)
- return false;
-
- return true;
-}
+/* Internal stuff */
-SharedBuffer<u8> Channel::ProcessPacket(
- SharedBuffer<u8> packetdata,
- Connection *con,
- u16 peer_id,
- u8 channelnum,
- bool reliable)
+void * Connection::Thread()
{
- IndentationRaiser iraiser(&(con->m_indentation));
-
- if(packetdata.getSize() < 1)
- throw InvalidIncomingDataException("packetdata.getSize() < 1");
+ ThreadStarted();
+ log_register_thread("Connection");
- u8 type = readU8(&packetdata[0]);
+ dout_con<<"Connection thread started"<<std::endl;
- if(type == TYPE_CONTROL)
- {
- if(packetdata.getSize() < 2)
- throw InvalidIncomingDataException("packetdata.getSize() < 2");
-
- u8 controltype = readU8(&packetdata[1]);
+ u32 curtime = porting::getTimeMs();
+ u32 lasttime = curtime;
- if(controltype == CONTROLTYPE_ACK)
- {
- if(packetdata.getSize() < 4)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < 4 (ACK header size)");
-
- u16 seqnum = readU16(&packetdata[2]);
- con->PrintInfo();
- dout_con<<"Got CONTROLTYPE_ACK: channelnum="
- <<((int)channelnum&0xff)<<", peer_id="<<peer_id
- <<", seqnum="<<seqnum<<std::endl;
-
- try{
- BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
- // Get round trip time
- float rtt = p.totaltime;
-
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- Peer *peer = con->GetPeer(peer_id);
- peer->reportRTT(rtt);
-
- //con->PrintInfo(dout_con);
- //dout_con<<"RTT = "<<rtt<<std::endl;
-
- /*dout_con<<"OUTGOING: ";
- con->PrintInfo();
- outgoing_reliables.print();
- dout_con<<std::endl;*/
- }
- catch(NotFoundException &e){
- con->PrintInfo(derr_con);
- derr_con<<"WARNING: ACKed packet not "
- "in outgoing queue"
- <<std::endl;
- }
-
- throw ProcessedSilentlyException("Got an ACK");
- }
- else if(controltype == CONTROLTYPE_SET_PEER_ID)
- {
- if(packetdata.getSize() < 4)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
- u16 peer_id_new = readU16(&packetdata[2]);
- con->PrintInfo();
- dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
-
- if(con->GetPeerID() != PEER_ID_INEXISTENT)
- {
- con->PrintInfo(derr_con);
- derr_con<<"WARNING: Not changing"
- " existing peer id."<<std::endl;
- }
- else
- {
- dout_con<<"changing."<<std::endl;
- con->SetPeerID(peer_id_new);
- }
- throw ProcessedSilentlyException("Got a SET_PEER_ID");
- }
- else if(controltype == CONTROLTYPE_PING)
- {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- con->PrintInfo();
- dout_con<<"PING"<<std::endl;
- throw ProcessedSilentlyException("Got a PING");
- }
- else if(controltype == CONTROLTYPE_DISCO)
- {
- // Just ignore it, the incoming data already reset
- // the timeout counter
- con->PrintInfo();
- dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
-
- if(con->deletePeer(peer_id, false) == false)
- {
- con->PrintInfo(derr_con);
- derr_con<<"DISCO: Peer not found"<<std::endl;
- }
-
- throw ProcessedSilentlyException("Got a DISCO");
- }
- else{
- con->PrintInfo(derr_con);
- derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
- <<((int)controltype&0xff)<<std::endl;
- throw InvalidIncomingDataException("Invalid control type");
- }
- }
- else if(type == TYPE_ORIGINAL)
- {
- if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
- con->PrintInfo();
- dout_con<<"RETURNING TYPE_ORIGINAL to user"
- <<std::endl;
- // Get the inside packet out and return it
- SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
- memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
- return payload;
- }
- else if(type == TYPE_SPLIT)
+ while(getRun())
{
- // We have to create a packet again for buffering
- // This isn't actually too bad an idea.
- BufferedPacket packet = makePacket(
- con->GetPeer(peer_id)->address,
- packetdata,
- con->GetProtocolID(),
- peer_id,
- channelnum);
- // Buffer the packet
- SharedBuffer<u8> data = incoming_splits.insert(packet, reliable);
- if(data.getSize() != 0)
- {
- con->PrintInfo();
- dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
- <<"size="<<data.getSize()<<std::endl;
- return data;
- }
- con->PrintInfo();
- dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
- throw ProcessedSilentlyException("Buffered a split packet chunk");
- }
- else if(type == TYPE_RELIABLE)
- {
- // Recursive reliable packets not allowed
- assert(reliable == false);
-
- if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
-
- u16 seqnum = readU16(&packetdata[1]);
-
- bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
- bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
+ BEGIN_DEBUG_EXCEPTION_HANDLER
- con->PrintInfo();
- if(is_future_packet)
- dout_con<<"BUFFERING";
- else if(is_old_packet)
- dout_con<<"OLD";
- else
- dout_con<<"RECUR";
- dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
- <<" next="<<next_incoming_seqnum;
- dout_con<<" [sending CONTROLTYPE_ACK"
- " to peer_id="<<peer_id<<"]";
- dout_con<<std::endl;
+ lasttime = curtime;
+ curtime = porting::getTimeMs();
+ float dtime = (float)(curtime - lasttime) / 1000.;
+ if(dtime > 0.1)
+ dtime = 0.1;
+ if(dtime < 0.0)
+ dtime = 0.0;
- //DEBUG
- //assert(incoming_reliables.size() < 100);
-
- // Send a CONTROLTYPE_ACK
- SharedBuffer<u8> reply(4);
- writeU8(&reply[0], TYPE_CONTROL);
- writeU8(&reply[1], CONTROLTYPE_ACK);
- writeU16(&reply[2], seqnum);
- con->SendAsPacket(peer_id, channelnum, reply, false);
-
- //if(seqnum_higher(seqnum, next_incoming_seqnum))
- if(is_future_packet)
- {
- /*con->PrintInfo();
- dout_con<<"Buffering reliable packet (seqnum="
- <<seqnum<<")"<<std::endl;*/
-
- // This one comes later, buffer it.
- // Actually we have to make a packet to buffer one.
- // Well, we have all the ingredients, so just do it.
- BufferedPacket packet = makePacket(
- con->GetPeer(peer_id)->address,
- packetdata,
- con->GetProtocolID(),
- peer_id,
- channelnum);
- try{
- incoming_reliables.insert(packet);
-
- /*con->PrintInfo();
- dout_con<<"INCOMING: ";
- incoming_reliables.print();
- dout_con<<std::endl;*/
- }
- catch(AlreadyExistsException &e)
- {
- }
+ runTimeouts(dtime);
- throw ProcessedSilentlyException("Buffered future reliable packet");
- }
- //else if(seqnum_higher(next_incoming_seqnum, seqnum))
- else if(is_old_packet)
- {
- // An old packet, dump it
- throw InvalidIncomingDataException("Got an old reliable packet");
+ while(m_command_queue.size() != 0){
+ ConnectionCommand c = m_command_queue.pop_front();
+ processCommand(c);
}
- next_incoming_seqnum++;
-
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
- memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
+ send(dtime);
- return ProcessPacket(payload, con, peer_id, channelnum, true);
- }
- else
- {
- con->PrintInfo(derr_con);
- derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
- throw InvalidIncomingDataException("Invalid packet type");
+ receive();
+
+ END_DEBUG_EXCEPTION_HANDLER(derr_con);
}
-
- // We should never get here.
- // If you get here, add an exception or a return to some of the
- // above conditionals.
- assert(0);
- throw BaseException("Error in Channel::ProcessPacket()");
+
+ return NULL;
}
-SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
- u16 &peer_id)
+void Connection::putEvent(ConnectionEvent &e)
{
- u16 firstseqnum = 0;
- // Clear old packets from start of buffer
- try{
- for(;;){
- firstseqnum = incoming_reliables.getFirstSeqnum();
- if(seqnum_higher(next_incoming_seqnum, firstseqnum))
- incoming_reliables.popFirst();
- else
- break;
- }
- // This happens if all packets are old
- }catch(con::NotFoundException)
- {}
-
- if(incoming_reliables.empty() == false)
- {
- if(firstseqnum == next_incoming_seqnum)
- {
- BufferedPacket p = incoming_reliables.popFirst();
-
- peer_id = readPeerId(*p.data);
- u8 channelnum = readChannel(*p.data);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
-
- con->PrintInfo();
- dout_con<<"UNBUFFERING TYPE_RELIABLE"
- <<" seqnum="<<seqnum
- <<" peer_id="<<peer_id
- <<" channel="<<((int)channelnum&0xff)
- <<std::endl;
-
- next_incoming_seqnum++;
-
- u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(p.data.getSize() - headers_size);
- memcpy(*payload, &p.data[headers_size], payload.getSize());
+ assert(e.type != CONNEVENT_NONE);
+ m_event_queue.push_back(e);
+}
- return ProcessPacket(payload, con, peer_id, channelnum, true);
- }
+void Connection::processCommand(ConnectionCommand &c)
+{
+ switch(c.type){
+ case CONNCMD_NONE:
+ dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
+ return;
+ case CONNCMD_SERVE:
+ dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
+ <<c.port<<std::endl;
+ serve(c.port);
+ return;
+ case CONNCMD_CONNECT:
+ dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
+ connect(c.address);
+ return;
+ case CONNCMD_DISCONNECT:
+ dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
+ disconnect();
+ return;
+ case CONNCMD_SEND:
+ dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
+ send(c.peer_id, c.channelnum, c.data, c.reliable);
+ return;
+ case CONNCMD_SEND_TO_ALL:
+ dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
+ sendToAll(c.channelnum, c.data, c.reliable);
+ return;
+ case CONNCMD_DELETE_PEER:
+ dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
+ deletePeer(c.peer_id, false);
+ return;
}
-
- throw NoIncomingDataException("No relevant data in buffers");
}
-SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
+void Connection::send(float dtime)
{
- core::map<u16, Peer*>::Iterator j;
- j = m_peers.getIterator();
- for(; j.atEnd() == false; j++)
+ for(core::map<u16, Peer*>::Iterator
+ j = m_peers.getIterator();
+ j.atEnd() == false; j++)
{
Peer *peer = j.getNode()->getValue();
- for(u16 i=0; i<CHANNEL_COUNT; i++)
- {
- Channel *channel = &peer->channels[i];
- try{
- SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
- (this, peer_id);
-
- return resultdata;
- }
- catch(NoIncomingDataException &e)
- {
- }
- catch(InvalidIncomingDataException &e)
- {
- }
- catch(ProcessedSilentlyException &e)
- {
- }
+ peer->m_sendtime_accu += dtime;
+ peer->m_num_sent = 0;
+ peer->m_max_num_sent = peer->m_sendtime_accu *
+ peer->m_max_packets_per_second;
+ }
+ Queue<OutgoingPacket> postponed_packets;
+ while(m_outgoing_queue.size() != 0){
+ OutgoingPacket packet = m_outgoing_queue.pop_front();
+ Peer *peer = getPeerNoEx(packet.peer_id);
+ if(!peer)
+ continue;
+ if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
+ postponed_packets.push_back(packet);
+ } else if(peer->m_num_sent < peer->m_max_num_sent){
+ rawSendAsPacket(packet.peer_id, packet.channelnum,
+ packet.data, packet.reliable);
+ peer->m_num_sent++;
+ } else {
+ postponed_packets.push_back(packet);
}
}
- throw NoIncomingDataException("No relevant data in buffers");
+ while(postponed_packets.size() != 0){
+ m_outgoing_queue.push_back(postponed_packets.pop_front());
+ }
+ for(core::map<u16, Peer*>::Iterator
+ j = m_peers.getIterator();
+ j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ peer->m_sendtime_accu -= (float)peer->m_num_sent /
+ peer->m_max_packets_per_second;
+ if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
+ peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
+ }
}
-u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
+// Receive packets from the network and buffers and create ConnectionEvents
+void Connection::receive()
{
- /*
- Receive a packet from the network
- */
-
+ u32 datasize = 100000;
// TODO: We can not know how many layers of header there are.
// For now, just assume there are no other than the base headers.
u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
Buffer<u8> packetdata(packet_maxsize);
+
+ bool single_wait_done = false;
for(;;)
{
- try
- {
- /*
- Check if some buffer has relevant data
- */
- try{
- SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
-
- if(datasize < resultdata.getSize())
- throw InvalidIncomingDataException
- ("Buffer too small for received data");
-
- memcpy(data, *resultdata, resultdata.getSize());
- return resultdata.getSize();
- }
- catch(NoIncomingDataException &e)
+ try{
+ /* Check if some buffer has relevant data */
{
+ u16 peer_id;
+ SharedBuffer<u8> resultdata;
+ bool got = getFromBuffers(peer_id, resultdata);
+ if(got){
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ putEvent(e);
+ continue;
+ }
}
-
- Address sender;
+
+ if(single_wait_done){
+ if(m_socket.WaitData(0) == false)
+ break;
+ }
+
+ single_wait_done = true;
+ Address sender;
s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
if(received_size < 0)
- throw NoIncomingDataException("No incoming data");
+ break;
if(received_size < BASE_HEADER_SIZE)
- throw InvalidIncomingDataException("No full header received");
+ continue;
if(readU32(&packetdata[0]) != m_protocol_id)
- throw InvalidIncomingDataException("Invalid protocol id");
+ continue;
- peer_id = readPeerId(*packetdata);
+ u16 peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata);
if(channelnum > CHANNEL_COUNT-1){
PrintInfo(derr_con);
@@ -999,17 +766,23 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
/*
Find an unused peer id
*/
+ bool out_of_ids = false;
for(;;)
{
// Check if exists
if(m_peers.find(peer_id_new) == NULL)
break;
// Check for overflow
- if(peer_id_new == 65535)
- throw ConnectionException
- ("Connection ran out of peer ids");
+ if(peer_id_new == 65535){
+ out_of_ids = true;
+ break;
+ }
peer_id_new++;
}
+ if(out_of_ids){
+ errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
+ continue;
+ }
PrintInfo();
dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
@@ -1018,14 +791,18 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
// Create a peer
Peer *peer = new Peer(peer_id_new, sender);
m_peers.insert(peer->id, peer);
- m_peerhandler->peerAdded(peer);
+
+ // Create peer addition event
+ ConnectionEvent e;
+ e.peerAdded(peer_id_new, sender);
+ putEvent(e);
// Create CONTROL packet to tell the peer id to the new peer.
SharedBuffer<u8> reply(4);
writeU8(&reply[0], TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
writeU16(&reply[2], peer_id_new);
- SendAsPacket(peer_id_new, 0, reply, true);
+ sendAsPacket(peer_id_new, 0, reply, true);
// We're now talking to a valid peer_id
peer_id = peer_id_new;
@@ -1053,12 +830,7 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
PrintInfo(derr_con);
derr_con<<"Peer "<<peer_id<<" sending from different address."
" Ignoring."<<std::endl;
- throw InvalidIncomingDataException
- ("Peer sending from different address");
- /*// If there is more data, receive again
- if(m_socket.WaitData(0) == true)
- continue;
- throw NoIncomingDataException("No incoming data (2)");*/
+ continue;
}
peer->timeout_counter = 0.0;
@@ -1074,8 +846,8 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
try{
// Process it (the result is some data with no headers made by us)
- SharedBuffer<u8> resultdata = channel->ProcessPacket
- (strippeddata, this, peer_id, channelnum);
+ SharedBuffer<u8> resultdata = processPacket
+ (channel, strippeddata, peer_id, channelnum, false);
PrintInfo();
dout_con<<"ProcessPacket returned data of size "
@@ -1085,123 +857,20 @@ u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
throw InvalidIncomingDataException
("Buffer too small for received data");
- memcpy(data, *resultdata, resultdata.getSize());
- return resultdata.getSize();
- }
- catch(ProcessedSilentlyException &e)
- {
- // If there is more data, receive again
- if(m_socket.WaitData(0) == true)
- continue;
- }
- throw NoIncomingDataException("No incoming data (2)");
- } // try
- catch(InvalidIncomingDataException &e)
- {
- // If there is more data, receive again
- if(m_socket.WaitData(0) == true)
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ putEvent(e);
continue;
- }
- catch(SendFailedException &e)
- {
- }
- } // for
-}
-
-void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
-{
- core::map<u16, Peer*>::Iterator j;
- j = m_peers.getIterator();
- for(; j.atEnd() == false; j++)
- {
- Peer *peer = j.getNode()->getValue();
- Send(peer->id, channelnum, data, reliable);
- }
-}
-
-void Connection::Send(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable)
-{
- assert(channelnum < CHANNEL_COUNT);
-
- Peer *peer = GetPeerNoEx(peer_id);
- if(peer == NULL)
- return;
- Channel *channel = &(peer->channels[channelnum]);
-
- u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
- if(reliable)
- chunksize_max -= RELIABLE_HEADER_SIZE;
-
- core::list<SharedBuffer<u8> > originals;
- originals = makeAutoSplitPacket(data, chunksize_max,
- channel->next_outgoing_split_seqnum);
-
- core::list<SharedBuffer<u8> >::Iterator i;
- i = originals.begin();
- for(; i != originals.end(); i++)
- {
- SharedBuffer<u8> original = *i;
-
- SendAsPacket(peer_id, channelnum, original, reliable);
- }
-}
-
-void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable)
-{
- Peer *peer = GetPeer(peer_id);
- Channel *channel = &(peer->channels[channelnum]);
-
- if(reliable)
- {
- u16 seqnum = channel->next_outgoing_seqnum;
- channel->next_outgoing_seqnum++;
-
- SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
-
- // Add base headers and make a packet
- BufferedPacket p = makePacket(peer->address, reliable,
- m_protocol_id, m_peer_id, channelnum);
-
- try{
- // Buffer the packet
- channel->outgoing_reliables.insert(p);
- }
- catch(AlreadyExistsException &e)
- {
- PrintInfo(derr_con);
- derr_con<<"WARNING: Going to send a reliable packet "
- "seqnum="<<seqnum<<" that is already "
- "in outgoing buffer"<<std::endl;
- //assert(0);
+ }catch(ProcessedSilentlyException &e){
}
-
- // Send the packet
- RawSend(p);
- }
- else
- {
- // Add base headers and make a packet
- BufferedPacket p = makePacket(peer->address, data,
- m_protocol_id, m_peer_id, channelnum);
-
- // Send the packet
- RawSend(p);
+ }catch(InvalidIncomingDataException &e){
}
-}
-
-void Connection::RawSend(const BufferedPacket &packet)
-{
- try{
- m_socket.Send(packet.address, *packet.data, packet.data.getSize());
- } catch(SendFailedException &e){
- derr_con<<"Connection::RawSend(): SendFailedException: "
- <<packet.address.serializeString()<<std::endl;
+ catch(ProcessedSilentlyException &e){
}
+ } // for
}
-void Connection::RunTimeouts(float dtime)
+void Connection::runTimeouts(float dtime)
{
core::list<u16> timeouted_peers;
core::map<u16, Peer*>::Iterator j;
@@ -1278,7 +947,7 @@ void Connection::RunTimeouts(float dtime)
<<", seqnum="<<seqnum
<<std::endl;
- RawSend(*j);
+ rawSend(*j);
// Enlarge avg_rtt and resend_timeout:
// The rtt will be at least the timeout.
@@ -1298,7 +967,7 @@ void Connection::RunTimeouts(float dtime)
SharedBuffer<u8> data(2);
writeU8(&data[0], TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_PING);
- SendAsPacket(peer->id, 0, data, true);
+ rawSendAsPacket(peer->id, 0, data, true);
peer->ping_timer = 0.0;
}
@@ -1317,12 +986,167 @@ nextpeer:
}
}
-Peer* Connection::GetPeer(u16 peer_id)
+void Connection::serve(u16 port)
+{
+ dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
+ m_socket.Bind(port);
+ m_peer_id = PEER_ID_SERVER;
+}
+
+void Connection::connect(Address address)
+{
+ dout_con<<getDesc()<<" connecting to "<<address.serializeString()
+ <<":"<<address.getPort()<<std::endl;
+
+ core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
+ if(node != NULL){
+ throw ConnectionException("Already connected to a server");
+ }
+
+ Peer *peer = new Peer(PEER_ID_SERVER, address);
+ m_peers.insert(peer->id, peer);
+
+ // Create event
+ ConnectionEvent e;
+ e.peerAdded(peer->id, peer->address);
+ putEvent(e);
+
+ m_socket.Bind(0);
+
+ // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
+ m_peer_id = PEER_ID_INEXISTENT;
+ SharedBuffer<u8> data(0);
+ Send(PEER_ID_SERVER, 0, data, true);
+}
+
+void Connection::disconnect()
+{
+ dout_con<<getDesc()<<" disconnecting"<<std::endl;
+
+ // Create and send DISCO packet
+ SharedBuffer<u8> data(2);
+ writeU8(&data[0], TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_DISCO);
+
+ // Send to all
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ rawSendAsPacket(peer->id, 0, data, false);
+ }
+}
+
+void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
+{
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ send(peer->id, channelnum, data, reliable);
+ }
+}
+
+void Connection::send(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable)
+{
+ dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
+
+ assert(channelnum < CHANNEL_COUNT);
+
+ Peer *peer = getPeerNoEx(peer_id);
+ if(peer == NULL)
+ return;
+ Channel *channel = &(peer->channels[channelnum]);
+
+ u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
+ if(reliable)
+ chunksize_max -= RELIABLE_HEADER_SIZE;
+
+ core::list<SharedBuffer<u8> > originals;
+ originals = makeAutoSplitPacket(data, chunksize_max,
+ channel->next_outgoing_split_seqnum);
+
+ core::list<SharedBuffer<u8> >::Iterator i;
+ i = originals.begin();
+ for(; i != originals.end(); i++)
+ {
+ SharedBuffer<u8> original = *i;
+
+ sendAsPacket(peer_id, channelnum, original, reliable);
+ }
+}
+
+void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable)
+{
+ OutgoingPacket packet(peer_id, channelnum, data, reliable);
+ m_outgoing_queue.push_back(packet);
+}
+
+void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable)
+{
+ Peer *peer = getPeerNoEx(peer_id);
+ if(!peer)
+ return;
+ Channel *channel = &(peer->channels[channelnum]);
+
+ if(reliable)
+ {
+ u16 seqnum = channel->next_outgoing_seqnum;
+ channel->next_outgoing_seqnum++;
+
+ SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
+
+ // Add base headers and make a packet
+ BufferedPacket p = makePacket(peer->address, reliable,
+ m_protocol_id, m_peer_id, channelnum);
+
+ try{
+ // Buffer the packet
+ channel->outgoing_reliables.insert(p);
+ }
+ catch(AlreadyExistsException &e)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"WARNING: Going to send a reliable packet "
+ "seqnum="<<seqnum<<" that is already "
+ "in outgoing buffer"<<std::endl;
+ //assert(0);
+ }
+
+ // Send the packet
+ rawSend(p);
+ }
+ else
+ {
+ // Add base headers and make a packet
+ BufferedPacket p = makePacket(peer->address, data,
+ m_protocol_id, m_peer_id, channelnum);
+
+ // Send the packet
+ rawSend(p);
+ }
+}
+
+void Connection::rawSend(const BufferedPacket &packet)
+{
+ try{
+ m_socket.Send(packet.address, *packet.data, packet.data.getSize());
+ } catch(SendFailedException &e){
+ derr_con<<"Connection::rawSend(): SendFailedException: "
+ <<packet.address.serializeString()<<std::endl;
+ }
+}
+
+Peer* Connection::getPeer(u16 peer_id)
{
core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
if(node == NULL){
- // Peer not found
throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
}
@@ -1332,7 +1156,7 @@ Peer* Connection::GetPeer(u16 peer_id)
return node->getValue();
}
-Peer* Connection::GetPeerNoEx(u16 peer_id)
+Peer* Connection::getPeerNoEx(u16 peer_id)
{
core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
@@ -1346,7 +1170,7 @@ Peer* Connection::GetPeerNoEx(u16 peer_id)
return node->getValue();
}
-core::list<Peer*> Connection::GetPeers()
+core::list<Peer*> Connection::getPeers()
{
core::list<Peer*> list;
core::map<u16, Peer*>::Iterator j;
@@ -1359,23 +1183,474 @@ core::list<Peer*> Connection::GetPeers()
return list;
}
+bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
+{
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ for(u16 i=0; i<CHANNEL_COUNT; i++)
+ {
+ Channel *channel = &peer->channels[i];
+ SharedBuffer<u8> resultdata;
+ bool got = checkIncomingBuffers(channel, peer_id, resultdata);
+ if(got){
+ dst = resultdata;
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
+ SharedBuffer<u8> &dst)
+{
+ u16 firstseqnum = 0;
+ // Clear old packets from start of buffer
+ try{
+ for(;;){
+ firstseqnum = channel->incoming_reliables.getFirstSeqnum();
+ if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
+ channel->incoming_reliables.popFirst();
+ else
+ break;
+ }
+ // This happens if all packets are old
+ }catch(con::NotFoundException)
+ {}
+
+ if(channel->incoming_reliables.empty() == false)
+ {
+ if(firstseqnum == channel->next_incoming_seqnum)
+ {
+ BufferedPacket p = channel->incoming_reliables.popFirst();
+
+ peer_id = readPeerId(*p.data);
+ u8 channelnum = readChannel(*p.data);
+ u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
+
+ PrintInfo();
+ dout_con<<"UNBUFFERING TYPE_RELIABLE"
+ <<" seqnum="<<seqnum
+ <<" peer_id="<<peer_id
+ <<" channel="<<((int)channelnum&0xff)
+ <<std::endl;
+
+ channel->next_incoming_seqnum++;
+
+ u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(p.data.getSize() - headers_size);
+ memcpy(*payload, &p.data[headers_size], payload.getSize());
+
+ dst = processPacket(channel, payload, peer_id, channelnum, true);
+ return true;
+ }
+ }
+ return false;
+}
+
+SharedBuffer<u8> Connection::processPacket(Channel *channel,
+ SharedBuffer<u8> packetdata, u16 peer_id,
+ u8 channelnum, bool reliable)
+{
+ IndentationRaiser iraiser(&(m_indentation));
+
+ if(packetdata.getSize() < 1)
+ throw InvalidIncomingDataException("packetdata.getSize() < 1");
+
+ u8 type = readU8(&packetdata[0]);
+
+ if(type == TYPE_CONTROL)
+ {
+ if(packetdata.getSize() < 2)
+ throw InvalidIncomingDataException("packetdata.getSize() < 2");
+
+ u8 controltype = readU8(&packetdata[1]);
+
+ if(controltype == CONTROLTYPE_ACK)
+ {
+ if(packetdata.getSize() < 4)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < 4 (ACK header size)");
+
+ u16 seqnum = readU16(&packetdata[2]);
+ PrintInfo();
+ dout_con<<"Got CONTROLTYPE_ACK: channelnum="
+ <<((int)channelnum&0xff)<<", peer_id="<<peer_id
+ <<", seqnum="<<seqnum<<std::endl;
+
+ try{
+ BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
+ // Get round trip time
+ float rtt = p.totaltime;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ Peer *peer = getPeer(peer_id);
+ peer->reportRTT(rtt);
+
+ //PrintInfo(dout_con);
+ //dout_con<<"RTT = "<<rtt<<std::endl;
+
+ /*dout_con<<"OUTGOING: ";
+ PrintInfo();
+ channel->outgoing_reliables.print();
+ dout_con<<std::endl;*/
+ }
+ catch(NotFoundException &e){
+ PrintInfo(derr_con);
+ derr_con<<"WARNING: ACKed packet not "
+ "in outgoing queue"
+ <<std::endl;
+ }
+
+ throw ProcessedSilentlyException("Got an ACK");
+ }
+ else if(controltype == CONTROLTYPE_SET_PEER_ID)
+ {
+ if(packetdata.getSize() < 4)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
+ u16 peer_id_new = readU16(&packetdata[2]);
+ PrintInfo();
+ dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
+
+ if(GetPeerID() != PEER_ID_INEXISTENT)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"WARNING: Not changing"
+ " existing peer id."<<std::endl;
+ }
+ else
+ {
+ dout_con<<"changing."<<std::endl;
+ SetPeerID(peer_id_new);
+ }
+ throw ProcessedSilentlyException("Got a SET_PEER_ID");
+ }
+ else if(controltype == CONTROLTYPE_PING)
+ {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ PrintInfo();
+ dout_con<<"PING"<<std::endl;
+ throw ProcessedSilentlyException("Got a PING");
+ }
+ else if(controltype == CONTROLTYPE_DISCO)
+ {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ PrintInfo();
+ dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
+
+ if(deletePeer(peer_id, false) == false)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"DISCO: Peer not found"<<std::endl;
+ }
+
+ throw ProcessedSilentlyException("Got a DISCO");
+ }
+ else{
+ PrintInfo(derr_con);
+ derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
+ <<((int)controltype&0xff)<<std::endl;
+ throw InvalidIncomingDataException("Invalid control type");
+ }
+ }
+ else if(type == TYPE_ORIGINAL)
+ {
+ if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
+ PrintInfo();
+ dout_con<<"RETURNING TYPE_ORIGINAL to user"
+ <<std::endl;
+ // Get the inside packet out and return it
+ SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
+ memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
+ return payload;
+ }
+ else if(type == TYPE_SPLIT)
+ {
+ // We have to create a packet again for buffering
+ // This isn't actually too bad an idea.
+ BufferedPacket packet = makePacket(
+ getPeer(peer_id)->address,
+ packetdata,
+ GetProtocolID(),
+ peer_id,
+ channelnum);
+ // Buffer the packet
+ SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
+ if(data.getSize() != 0)
+ {
+ PrintInfo();
+ dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
+ <<"size="<<data.getSize()<<std::endl;
+ return data;
+ }
+ PrintInfo();
+ dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
+ throw ProcessedSilentlyException("Buffered a split packet chunk");
+ }
+ else if(type == TYPE_RELIABLE)
+ {
+ // Recursive reliable packets not allowed
+ assert(reliable == false);
+
+ if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
+
+ u16 seqnum = readU16(&packetdata[1]);
+
+ bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
+ bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
+
+ PrintInfo();
+ if(is_future_packet)
+ dout_con<<"BUFFERING";
+ else if(is_old_packet)
+ dout_con<<"OLD";
+ else
+ dout_con<<"RECUR";
+ dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
+ <<" next="<<channel->next_incoming_seqnum;
+ dout_con<<" [sending CONTROLTYPE_ACK"
+ " to peer_id="<<peer_id<<"]";
+ dout_con<<std::endl;
+
+ //DEBUG
+ //assert(channel->incoming_reliables.size() < 100);
+
+ // Send a CONTROLTYPE_ACK
+ SharedBuffer<u8> reply(4);
+ writeU8(&reply[0], TYPE_CONTROL);
+ writeU8(&reply[1], CONTROLTYPE_ACK);
+ writeU16(&reply[2], seqnum);
+ rawSendAsPacket(peer_id, channelnum, reply, false);
+
+ //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
+ if(is_future_packet)
+ {
+ /*PrintInfo();
+ dout_con<<"Buffering reliable packet (seqnum="
+ <<seqnum<<")"<<std::endl;*/
+
+ // This one comes later, buffer it.
+ // Actually we have to make a packet to buffer one.
+ // Well, we have all the ingredients, so just do it.
+ BufferedPacket packet = makePacket(
+ getPeer(peer_id)->address,
+ packetdata,
+ GetProtocolID(),
+ peer_id,
+ channelnum);
+ try{
+ channel->incoming_reliables.insert(packet);
+
+ /*PrintInfo();
+ dout_con<<"INCOMING: ";
+ channel->incoming_reliables.print();
+ dout_con<<std::endl;*/
+ }
+ catch(AlreadyExistsException &e)
+ {
+ }
+
+ throw ProcessedSilentlyException("Buffered future reliable packet");
+ }
+ //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
+ else if(is_old_packet)
+ {
+ // An old packet, dump it
+ throw InvalidIncomingDataException("Got an old reliable packet");
+ }
+
+ channel->next_incoming_seqnum++;
+
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
+ memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
+
+ return processPacket(channel, payload, peer_id, channelnum, true);
+ }
+ else
+ {
+ PrintInfo(derr_con);
+ derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
+ throw InvalidIncomingDataException("Invalid packet type");
+ }
+
+ // We should never get here.
+ // If you get here, add an exception or a return to some of the
+ // above conditionals.
+ assert(0);
+ throw BaseException("Error in Channel::ProcessPacket()");
+}
+
bool Connection::deletePeer(u16 peer_id, bool timeout)
{
if(m_peers.find(peer_id) == NULL)
return false;
- m_peerhandler->deletingPeer(m_peers[peer_id], timeout);
+
+ Peer *peer = m_peers[peer_id];
+
+ // Create event
+ ConnectionEvent e;
+ e.peerRemoved(peer_id, timeout, peer->address);
+ putEvent(e);
+
delete m_peers[peer_id];
m_peers.remove(peer_id);
return true;
}
+/* Interface */
+
+ConnectionEvent Connection::getEvent()
+{
+ if(m_event_queue.size() == 0){
+ ConnectionEvent e;
+ e.type = CONNEVENT_NONE;
+ return e;
+ }
+ return m_event_queue.pop_front();
+}
+
+ConnectionEvent Connection::waitEvent(u32 timeout_ms)
+{
+ try{
+ return m_event_queue.pop_front(timeout_ms);
+ } catch(ItemNotFoundException &e){
+ ConnectionEvent e;
+ e.type = CONNEVENT_NONE;
+ return e;
+ }
+}
+
+void Connection::putCommand(ConnectionCommand &c)
+{
+ m_command_queue.push_back(c);
+}
+
+void Connection::Serve(unsigned short port)
+{
+ ConnectionCommand c;
+ c.serve(port);
+ putCommand(c);
+}
+
+void Connection::Connect(Address address)
+{
+ ConnectionCommand c;
+ c.connect(address);
+ putCommand(c);
+}
+
+bool Connection::Connected()
+{
+ JMutexAutoLock peerlock(m_peers_mutex);
+
+ if(m_peers.size() != 1)
+ return false;
+
+ core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
+ if(node == NULL)
+ return false;
+
+ if(m_peer_id == PEER_ID_INEXISTENT)
+ return false;
+
+ return true;
+}
+
+void Connection::Disconnect()
+{
+ ConnectionCommand c;
+ c.disconnect();
+ putCommand(c);
+}
+
+u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
+{
+ for(;;){
+ ConnectionEvent e = waitEvent(m_bc_receive_timeout);
+ if(e.type != CONNEVENT_NONE)
+ dout_con<<getDesc()<<": Receive: got event: "
+ <<e.describe()<<std::endl;
+ switch(e.type){
+ case CONNEVENT_NONE:
+ throw NoIncomingDataException("No incoming data");
+ case CONNEVENT_DATA_RECEIVED:
+ peer_id = e.peer_id;
+ memcpy(data, *e.data, e.data.getSize());
+ return e.data.getSize();
+ case CONNEVENT_PEER_ADDED: {
+ Peer tmp(e.peer_id, e.address);
+ if(m_bc_peerhandler)
+ m_bc_peerhandler->peerAdded(&tmp);
+ continue; }
+ case CONNEVENT_PEER_REMOVED: {
+ Peer tmp(e.peer_id, e.address);
+ if(m_bc_peerhandler)
+ m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
+ continue; }
+ }
+ }
+ throw NoIncomingDataException("No incoming data");
+}
+
+void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
+{
+ assert(channelnum < CHANNEL_COUNT);
+
+ ConnectionCommand c;
+ c.sendToAll(channelnum, data, reliable);
+ putCommand(c);
+}
+
+void Connection::Send(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable)
+{
+ assert(channelnum < CHANNEL_COUNT);
+
+ ConnectionCommand c;
+ c.send(peer_id, channelnum, data, reliable);
+ putCommand(c);
+}
+
+void Connection::RunTimeouts(float dtime)
+{
+ // No-op
+}
+
+Address Connection::GetPeerAddress(u16 peer_id)
+{
+ JMutexAutoLock peerlock(m_peers_mutex);
+ return getPeer(peer_id)->address;
+}
+
+float Connection::GetPeerAvgRTT(u16 peer_id)
+{
+ JMutexAutoLock peerlock(m_peers_mutex);
+ return getPeer(peer_id)->avg_rtt;
+}
+
+void Connection::DeletePeer(u16 peer_id)
+{
+ ConnectionCommand c;
+ c.deletePeer(peer_id);
+ putCommand(c);
+}
+
void Connection::PrintInfo(std::ostream &out)
{
- out<<m_socket.GetHandle();
- out<<" ";
- out<<"con "<<m_peer_id<<": ";
- for(s16 i=0; i<(s16)m_indentation-1; i++)
- out<<" ";
+ out<<getDesc()<<": ";
}
void Connection::PrintInfo()
@@ -1383,5 +1658,10 @@ void Connection::PrintInfo()
PrintInfo(dout_con);
}
+std::string Connection::getDesc()
+{
+ return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
+}
+
} // namespace
diff --git a/src/connection.h b/src/connection.h
index 6eb2f2824..570bc92ab 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -319,28 +319,6 @@ struct Channel
{
Channel();
~Channel();
- /*
- Processes a packet with the basic header stripped out.
- Parameters:
- packetdata: Data in packet (with no base headers)
- con: The connection to which the channel is associated
- (used for sending back stuff (ACKs))
- peer_id: peer id of the sender of the packet in question
- channelnum: channel on which the packet was sent
- reliable: true if recursing into a reliable packet
- */
- SharedBuffer<u8> ProcessPacket(
- SharedBuffer<u8> packetdata,
- Connection *con,
- u16 peer_id,
- u8 channelnum,
- bool reliable=false);
-
- // Returns next data from a buffer if possible
- // throws a NoIncomingDataException if no data is available
- // If found, sets peer_id
- SharedBuffer<u8> CheckIncomingBuffers(Connection *con,
- u16 &peer_id);
u16 next_outgoing_seqnum;
u16 next_incoming_seqnum;
@@ -412,78 +390,237 @@ public:
// with the id we have given to it
bool has_sent_with_id;
+ float m_sendtime_accu;
+ float m_max_packets_per_second;
+ int m_num_sent;
+ int m_max_num_sent;
+
private:
};
-class Connection
+/*
+ Connection
+*/
+
+struct OutgoingPacket
+{
+ u16 peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+
+ OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
+ bool reliable_):
+ peer_id(peer_id_),
+ channelnum(channelnum_),
+ data(data_),
+ reliable(reliable_)
+ {
+ }
+};
+
+enum ConnectionEventType{
+ CONNEVENT_NONE,
+ CONNEVENT_DATA_RECEIVED,
+ CONNEVENT_PEER_ADDED,
+ CONNEVENT_PEER_REMOVED,
+};
+
+struct ConnectionEvent
+{
+ enum ConnectionEventType type;
+ u16 peer_id;
+ SharedBuffer<u8> data;
+ bool timeout;
+ Address address;
+
+ ConnectionEvent(): type(CONNEVENT_NONE) {}
+
+ std::string describe()
+ {
+ switch(type){
+ case CONNEVENT_NONE:
+ return "CONNEVENT_NONE";
+ case CONNEVENT_DATA_RECEIVED:
+ return "CONNEVENT_DATA_RECEIVED";
+ case CONNEVENT_PEER_ADDED:
+ return "CONNEVENT_PEER_ADDED";
+ case CONNEVENT_PEER_REMOVED:
+ return "CONNEVENT_PEER_REMOVED";
+ }
+ return "Invalid ConnectionEvent";
+ }
+
+ void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
+ {
+ type = CONNEVENT_DATA_RECEIVED;
+ peer_id = peer_id_;
+ data = data_;
+ }
+ void peerAdded(u16 peer_id_, Address address_)
+ {
+ type = CONNEVENT_PEER_ADDED;
+ peer_id = peer_id_;
+ address = address_;
+ }
+ void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
+ {
+ type = CONNEVENT_PEER_REMOVED;
+ peer_id = peer_id_;
+ timeout = timeout_;
+ address = address_;
+ }
+};
+
+enum ConnectionCommandType{
+ CONNCMD_NONE,
+ CONNCMD_SERVE,
+ CONNCMD_CONNECT,
+ CONNCMD_DISCONNECT,
+ CONNCMD_SEND,
+ CONNCMD_SEND_TO_ALL,
+ CONNCMD_DELETE_PEER,
+};
+
+struct ConnectionCommand
+{
+ enum ConnectionCommandType type;
+ u16 port;
+ Address address;
+ u16 peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+
+ ConnectionCommand(): type(CONNCMD_NONE) {}
+
+ void serve(u16 port_)
+ {
+ type = CONNCMD_SERVE;
+ port = port_;
+ }
+ void connect(Address address_)
+ {
+ type = CONNCMD_CONNECT;
+ address = address_;
+ }
+ void disconnect()
+ {
+ type = CONNCMD_DISCONNECT;
+ }
+ void send(u16 peer_id_, u8 channelnum_,
+ SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND;
+ peer_id = peer_id_;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+ void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND_TO_ALL;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+ void deletePeer(u16 peer_id_)
+ {
+ type = CONNCMD_DELETE_PEER;
+ peer_id = peer_id_;
+ }
+};
+
+class Connection: public SimpleThread
{
public:
- Connection(
- u32 protocol_id,
- u32 max_packet_size,
- float timeout,
- PeerHandler *peerhandler
- );
+ Connection(u32 protocol_id, u32 max_packet_size, float timeout);
+ Connection(u32 protocol_id, u32 max_packet_size, float timeout,
+ PeerHandler *peerhandler);
~Connection();
- void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); }
- // Start being a server
+ void * Thread();
+
+ /* Interface */
+
+ ConnectionEvent getEvent();
+ ConnectionEvent waitEvent(u32 timeout_ms);
+ void putCommand(ConnectionCommand &c);
+
+ void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
void Serve(unsigned short port);
- // Connect to a server
void Connect(Address address);
bool Connected();
-
void Disconnect();
-
- // Sets peer_id
- SharedBuffer<u8> GetFromBuffers(u16 &peer_id);
-
- // The peer_id of sender is stored in peer_id
- // Return value: I guess this always throws an exception or
- // actually gets data
- // May call PeerHandler methods
u32 Receive(u16 &peer_id, u8 *data, u32 datasize);
-
- // These will automatically package the data as an original or split
void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
- // Send data as a packet; it will be wrapped in base header and
- // optionally to a reliable packet.
- void SendAsPacket(u16 peer_id, u8 channelnum,
+ void RunTimeouts(float dtime); // dummy
+ u16 GetPeerID(){ return m_peer_id; }
+ Address GetPeerAddress(u16 peer_id);
+ float GetPeerAvgRTT(u16 peer_id);
+ void DeletePeer(u16 peer_id);
+
+private:
+ void putEvent(ConnectionEvent &e);
+ void processCommand(ConnectionCommand &c);
+ void send(float dtime);
+ void receive();
+ void runTimeouts(float dtime);
+ void serve(u16 port);
+ void connect(Address address);
+ void disconnect();
+ void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
+ void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
+ void sendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
- // Sends a raw packet
- void RawSend(const BufferedPacket &packet);
+ void rawSendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable);
+ void rawSend(const BufferedPacket &packet);
+ Peer* getPeer(u16 peer_id);
+ Peer* getPeerNoEx(u16 peer_id);
+ core::list<Peer*> getPeers();
+ bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
+ // Returns next data from a buffer if possible
+ // If found, returns true; if not, false.
+ // If found, sets peer_id and dst
+ bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
+ SharedBuffer<u8> &dst);
+ /*
+ Processes a packet with the basic header stripped out.
+ Parameters:
+ packetdata: Data in packet (with no base headers)
+ peer_id: peer id of the sender of the packet in question
+ channelnum: channel on which the packet was sent
+ reliable: true if recursing into a reliable packet
+ */
+ SharedBuffer<u8> processPacket(Channel *channel,
+ SharedBuffer<u8> packetdata, u16 peer_id,
+ u8 channelnum, bool reliable);
+ bool deletePeer(u16 peer_id, bool timeout);
- // May call PeerHandler methods
- void RunTimeouts(float dtime);
-
- // Can throw a PeerNotFoundException
- Peer* GetPeer(u16 peer_id);
- // returns NULL if failed
- Peer* GetPeerNoEx(u16 peer_id);
- core::list<Peer*> GetPeers();
+ Queue<OutgoingPacket> m_outgoing_queue;
+ MutexedQueue<ConnectionEvent> m_event_queue;
+ MutexedQueue<ConnectionCommand> m_command_queue;
- // Calls PeerHandler::deletingPeer
- // Returns false if peer was not found
- bool deletePeer(u16 peer_id, bool timeout);
+ u32 m_protocol_id;
+ u32 m_max_packet_size;
+ float m_timeout;
+ UDPSocket m_socket;
+ u16 m_peer_id;
+
+ core::map<u16, Peer*> m_peers;
+ JMutex m_peers_mutex;
+ // Backwards compatibility
+ PeerHandler *m_bc_peerhandler;
+ int m_bc_receive_timeout;
+
void SetPeerID(u16 id){ m_peer_id = id; }
- u16 GetPeerID(){ return m_peer_id; }
u32 GetProtocolID(){ return m_protocol_id; }
-
- // For debug printing
void PrintInfo(std::ostream &out);
void PrintInfo();
+ std::string getDesc();
u16 m_indentation;
-
-private:
- u32 m_protocol_id;
- float m_timeout;
- PeerHandler *m_peerhandler;
- core::map<u16, Peer*> m_peers;
- u16 m_peer_id;
- //bool m_waiting_new_peer_id;
- u32 m_max_packet_size;
- UDPSocket m_socket;
};
} // namespace
diff --git a/src/defaultsettings.cpp b/src/defaultsettings.cpp
index 586eaa282..7f0d46a10 100644
--- a/src/defaultsettings.cpp
+++ b/src/defaultsettings.cpp
@@ -100,7 +100,7 @@ void set_default_settings(Settings *settings)
//settings->setDefault("max_simultaneous_block_sends_per_client", "1");
// This causes frametime jitter on client side, or does it?
settings->setDefault("max_simultaneous_block_sends_per_client", "2");
- settings->setDefault("max_simultaneous_block_sends_server_total", "2");
+ settings->setDefault("max_simultaneous_block_sends_server_total", "8");
settings->setDefault("max_block_send_distance", "7");
settings->setDefault("max_block_generate_distance", "5");
settings->setDefault("time_send_interval", "20");
diff --git a/src/main.cpp b/src/main.cpp
index df1347f12..bc44775bd 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -481,6 +481,8 @@ MainGameCallback *g_gamecallback = NULL;
// Connection
std::ostream *dout_con_ptr = &dummyout;
std::ostream *derr_con_ptr = &verbosestream;
+//std::ostream *dout_con_ptr = &infostream;
+//std::ostream *derr_con_ptr = &errorstream;
// Server
std::ostream *dout_server_ptr = &infostream;
diff --git a/src/map.cpp b/src/map.cpp
index ba4130ca2..8aad4e539 100644
--- a/src/map.cpp
+++ b/src/map.cpp
@@ -21,7 +21,9 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "mapsector.h"
#include "mapblock.h"
#include "main.h"
+#ifndef SERVER
#include "client.h"
+#endif
#include "filesys.h"
#include "utility.h"
#include "voxel.h"
diff --git a/src/server.cpp b/src/server.cpp
index 1a441e819..37ba65a95 100644
--- a/src/server.cpp
+++ b/src/server.cpp
@@ -1074,7 +1074,7 @@ void Server::start(unsigned short port)
m_thread.stop();
// Initialize connection
- m_con.setTimeoutMs(30);
+ m_con.SetTimeoutMs(30);
m_con.Serve(port);
// Start thread
@@ -1823,9 +1823,18 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
JMutexAutoLock envlock(m_env_mutex);
JMutexAutoLock conlock(m_con_mutex);
- con::Peer *peer;
try{
- peer = m_con.GetPeer(peer_id);
+ Address address = m_con.GetPeerAddress(peer_id);
+
+ // drop player if is ip is banned
+ if(m_banmanager.isIpBanned(address.serializeString())){
+ SendAccessDenied(m_con, peer_id,
+ L"Your ip is banned. Banned name was "
+ +narrow_to_wide(m_banmanager.getBanName(
+ address.serializeString())));
+ m_con.DeletePeer(peer_id);
+ return;
+ }
}
catch(con::PeerNotFoundException &e)
{
@@ -1834,17 +1843,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
return;
}
- // drop player if is ip is banned
- if(m_banmanager.isIpBanned(peer->address.serializeString())){
- SendAccessDenied(m_con, peer_id,
- L"Your ip is banned. Banned name was "
- +narrow_to_wide(m_banmanager.getBanName(
- peer->address.serializeString())));
- m_con.deletePeer(peer_id, false);
- return;
- }
-
- u8 peer_ser_ver = getClient(peer->id)->serialization_version;
+ u8 peer_ser_ver = getClient(peer_id)->serialization_version;
try
{
@@ -1865,7 +1864,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
return;
infostream<<"Server: Got TOSERVER_INIT from "
- <<peer->id<<std::endl;
+ <<peer_id<<std::endl;
// First byte after command is maximum supported
// serialization version
@@ -1878,7 +1877,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
deployed = SER_FMT_VER_INVALID;
//peer->serialization_version = deployed;
- getClient(peer->id)->pending_serialization_version = deployed;
+ getClient(peer_id)->pending_serialization_version = deployed;
if(deployed == SER_FMT_VER_INVALID)
{
@@ -1900,7 +1899,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
net_proto_version = readU16(&data[2+1+PLAYERNAME_SIZE+PASSWORD_SIZE]);
}
- getClient(peer->id)->net_proto_version = net_proto_version;
+ getClient(peer_id)->net_proto_version = net_proto_version;
if(net_proto_version == 0)
{
@@ -2045,11 +2044,11 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
if(command == TOSERVER_INIT2)
{
infostream<<"Server: Got TOSERVER_INIT2 from "
- <<peer->id<<std::endl;
+ <<peer_id<<std::endl;
- getClient(peer->id)->serialization_version
- = getClient(peer->id)->pending_serialization_version;
+ getClient(peer_id)->serialization_version
+ = getClient(peer_id)->pending_serialization_version;
/*
Send some initialization data
@@ -2059,8 +2058,8 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
SendPlayerInfos();
// Send inventory to player
- UpdateCrafting(peer->id);
- SendInventory(peer->id);
+ UpdateCrafting(peer_id);
+ SendInventory(peer_id);
// Send player items to all players
SendPlayerItems();
@@ -2074,7 +2073,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
{
SharedBuffer<u8> data = makePacket_TOCLIENT_TIME_OF_DAY(
m_env.getTimeOfDay());
- m_con.Send(peer->id, 0, data, true);
+ m_con.Send(peer_id, 0, data, true);
}
// Send information about server to player in chat
@@ -2095,7 +2094,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
}
// Warnings about protocol version can be issued here
- if(getClient(peer->id)->net_proto_version < PROTOCOL_VERSION)
+ if(getClient(peer_id)->net_proto_version < PROTOCOL_VERSION)
{
SendChatMessage(peer_id, L"# Server: WARNING: YOUR CLIENT IS OLD AND MAY WORK PROPERLY WITH THIS SERVER");
}
@@ -2402,7 +2401,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
else if(action == 2)
{
#if 0
- RemoteClient *client = getClient(peer->id);
+ RemoteClient *client = getClient(peer_id);
JMutexAutoLock digmutex(client->m_dig_mutex);
client->m_dig_tool_item = -1;
#endif
@@ -2685,7 +2684,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
}
// Reset build time counter
- getClient(peer->id)->m_time_from_building = 0.0;
+ getClient(peer_id)->m_time_from_building = 0.0;
// Create node data
MaterialItem *mitem = (MaterialItem*)item;
@@ -3428,11 +3427,10 @@ core::list<PlayerInfo> Server::getPlayerInfo()
Player *player = *i;
try{
- con::Peer *peer = m_con.GetPeer(player->peer_id);
- // Copy info from peer to info struct
- info.id = peer->id;
- info.address = peer->address;
- info.avg_rtt = peer->avg_rtt;
+ // Copy info from connection to info struct
+ info.id = player->peer_id;
+ info.address = m_con.GetPeerAddress(player->peer_id);
+ info.avg_rtt = m_con.GetPeerAvgRTT(player->peer_id);
}
catch(con::PeerNotFoundException &e)
{
diff --git a/src/server.h b/src/server.h
index dac7e2826..b238bec26 100644
--- a/src/server.h
+++ b/src/server.h
@@ -468,9 +468,9 @@ public:
return m_banmanager.getBanDescription(ip_or_name);
}
- con::Peer* getPeerNoEx(u16 peer_id)
+ Address getPeerAddress(u16 peer_id)
{
- return m_con.GetPeerNoEx(peer_id);
+ return m_con.GetPeerAddress(peer_id);
}
// Envlock and conlock should be locked when calling this
diff --git a/src/servercommand.cpp b/src/servercommand.cpp
index a09003960..156598940 100644
--- a/src/servercommand.cpp
+++ b/src/servercommand.cpp
@@ -250,20 +250,19 @@ void cmd_banunban(std::wostringstream &os, ServerCommandContext *ctx)
os<<L"-!- No such player";
return;
}
-
- con::Peer *peer = ctx->server->getPeerNoEx(player->peer_id);
- if(peer == NULL)
- {
+
+ try{
+ Address address = ctx->server->getPeerAddress(player->peer_id);
+ std::string ip_string = address.serializeString();
+ ctx->server->setIpBanned(ip_string, player->getName());
+ os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|"
+ <<narrow_to_wide(player->getName());
+
+ actionstream<<ctx->player->getName()<<" bans "
+ <<player->getName()<<" / "<<ip_string<<std::endl;
+ } catch(con::PeerNotFoundException){
dstream<<__FUNCTION_NAME<<": peer was not found"<<std::endl;
- return;
}
- std::string ip_string = peer->address.serializeString();
- ctx->server->setIpBanned(ip_string, player->getName());
- os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|"
- <<narrow_to_wide(player->getName());
-
- actionstream<<ctx->player->getName()<<" bans "
- <<player->getName()<<" / "<<ip_string<<std::endl;
}
else
{
diff --git a/src/test.cpp b/src/test.cpp
index 6b9ef4b6f..c1f04b2ef 100644
--- a/src/test.cpp
+++ b/src/test.cpp
@@ -819,7 +819,10 @@ struct TestConnection
/*
Test some real connections
+
+ NOTE: This mostly tests the legacy interface.
*/
+
u32 proto_id = 0xad26846a;
Handler hand_server("server");
@@ -843,11 +846,30 @@ struct TestConnection
sleep_ms(50);
+ // Client should not have added client yet
+ assert(hand_client.count == 0);
+
+ try
+ {
+ u16 peer_id;
+ u8 data[100];
+ infostream<<"** running client.Receive()"<<std::endl;
+ u32 size = client.Receive(peer_id, data, 100);
+ infostream<<"** Client received: peer_id="<<peer_id
+ <<", size="<<size
+ <<std::endl;
+ }
+ catch(con::NoIncomingDataException &e)
+ {
+ }
+
// Client should have added server now
assert(hand_client.count == 1);
assert(hand_client.last_id == 1);
- // But server should not have added client
+ // Server should not have added client yet
assert(hand_server.count == 0);
+
+ sleep_ms(50);
try
{
@@ -930,7 +952,7 @@ struct TestConnection
}
u16 peer_id_client = 2;
-
+#if 0
{
/*
Send consequent packets in different order
@@ -941,13 +963,13 @@ struct TestConnection
SharedBuffer<u8> data2 = SharedBufferFromString("Hello2");
Address client_address =
- server.GetPeer(peer_id_client)->address;
+ server.GetPeerAddress(peer_id_client);
infostream<<"*** Sending packets in wrong order (2,1,2)"
<<std::endl;
u8 chn = 0;
- con::Channel *ch = &server.GetPeer(peer_id_client)->channels[chn];
+ con::Channel *ch = &server.getPeer(peer_id_client)->channels[chn];
u16 sn = ch->next_outgoing_seqnum;
ch->next_outgoing_seqnum = sn+1;
server.Send(peer_id_client, chn, data2, true);
@@ -1004,6 +1026,7 @@ struct TestConnection
}
assert(got_exception);
}
+#endif
{
const int datasize = 30000;
SharedBuffer<u8> data1(datasize);
@@ -1022,12 +1045,25 @@ struct TestConnection
server.Send(peer_id_client, 0, data1, true);
- sleep_ms(50);
+ sleep_ms(3000);
u8 recvdata[datasize + 1000];
infostream<<"** running client.Receive()"<<std::endl;
u16 peer_id = 132;
- u16 size = client.Receive(peer_id, recvdata, datasize + 1000);
+ u16 size = 0;
+ bool received = false;
+ u32 timems0 = porting::getTimeMs();
+ for(;;){
+ if(porting::getTimeMs() - timems0 > 5000)
+ break;
+ try{
+ size = client.Receive(peer_id, recvdata, datasize + 1000);
+ received = true;
+ }catch(con::NoIncomingDataException &e){
+ }
+ sleep_ms(10);
+ }
+ assert(received);
infostream<<"** Client received: peer_id="<<peer_id
<<", size="<<size
<<std::endl;