aboutsummaryrefslogtreecommitdiff
path: root/src/network
diff options
context:
space:
mode:
Diffstat (limited to 'src/network')
-rw-r--r--src/network/address.cpp119
-rw-r--r--src/network/address.h35
-rw-r--r--src/network/clientopcodes.cpp2
-rw-r--r--src/network/clientpackethandler.cpp250
-rw-r--r--src/network/connection.cpp477
-rw-r--r--src/network/connection.h446
-rw-r--r--src/network/connectionthreads.cpp190
-rw-r--r--src/network/connectionthreads.h31
-rw-r--r--src/network/networkpacket.h2
-rw-r--r--src/network/networkprotocol.h17
-rw-r--r--src/network/serveropcodes.cpp4
-rw-r--r--src/network/serverpackethandler.cpp128
-rw-r--r--src/network/socket.cpp74
-rw-r--r--src/network/socket.h14
14 files changed, 925 insertions, 864 deletions
diff --git a/src/network/address.cpp b/src/network/address.cpp
index 05678aa62..90e561802 100644
--- a/src/network/address.cpp
+++ b/src/network/address.cpp
@@ -87,38 +87,31 @@ Address::Address(const IPv6AddressBytes *ipv6_bytes, u16 port)
setPort(port);
}
-// Equality (address family, address and port must be equal)
-bool Address::operator==(const Address &address)
+// Equality (address family, IP and port must be equal)
+bool Address::operator==(const Address &other)
{
- if (address.m_addr_family != m_addr_family || address.m_port != m_port)
+ if (other.m_addr_family != m_addr_family || other.m_port != m_port)
return false;
if (m_addr_family == AF_INET) {
- return m_address.ipv4.sin_addr.s_addr ==
- address.m_address.ipv4.sin_addr.s_addr;
+ return m_address.ipv4.s_addr == other.m_address.ipv4.s_addr;
}
if (m_addr_family == AF_INET6) {
- return memcmp(m_address.ipv6.sin6_addr.s6_addr,
- address.m_address.ipv6.sin6_addr.s6_addr, 16) == 0;
+ return memcmp(m_address.ipv6.s6_addr,
+ other.m_address.ipv6.s6_addr, 16) == 0;
}
return false;
}
-bool Address::operator!=(const Address &address)
-{
- return !(*this == address);
-}
-
void Address::Resolve(const char *name)
{
if (!name || name[0] == 0) {
- if (m_addr_family == AF_INET) {
- setAddress((u32)0);
- } else if (m_addr_family == AF_INET6) {
- setAddress((IPv6AddressBytes *)0);
- }
+ if (m_addr_family == AF_INET)
+ setAddress(static_cast<u32>(0));
+ else if (m_addr_family == AF_INET6)
+ setAddress(static_cast<IPv6AddressBytes*>(nullptr));
return;
}
@@ -126,9 +119,6 @@ void Address::Resolve(const char *name)
memset(&hints, 0, sizeof(hints));
// Setup hints
- hints.ai_socktype = 0;
- hints.ai_protocol = 0;
- hints.ai_flags = 0;
if (g_settings->getBool("enable_ipv6")) {
// AF_UNSPEC allows both IPv6 and IPv4 addresses to be returned
hints.ai_family = AF_UNSPEC;
@@ -145,14 +135,13 @@ void Address::Resolve(const char *name)
if (resolved->ai_family == AF_INET) {
struct sockaddr_in *t = (struct sockaddr_in *)resolved->ai_addr;
m_addr_family = AF_INET;
- m_address.ipv4 = *t;
+ m_address.ipv4 = t->sin_addr;
} else if (resolved->ai_family == AF_INET6) {
struct sockaddr_in6 *t = (struct sockaddr_in6 *)resolved->ai_addr;
m_addr_family = AF_INET6;
- m_address.ipv6 = *t;
+ m_address.ipv6 = t->sin6_addr;
} else {
- freeaddrinfo(resolved);
- throw ResolveError("");
+ m_addr_family = 0;
}
freeaddrinfo(resolved);
}
@@ -163,47 +152,37 @@ std::string Address::serializeString() const
// windows XP doesnt have inet_ntop, maybe use better func
#ifdef _WIN32
if (m_addr_family == AF_INET) {
- u8 a, b, c, d;
- u32 addr;
- addr = ntohl(m_address.ipv4.sin_addr.s_addr);
- a = (addr & 0xFF000000) >> 24;
- b = (addr & 0x00FF0000) >> 16;
- c = (addr & 0x0000FF00) >> 8;
- d = (addr & 0x000000FF);
- return itos(a) + "." + itos(b) + "." + itos(c) + "." + itos(d);
+ return inet_ntoa(m_address.ipv4);
} else if (m_addr_family == AF_INET6) {
std::ostringstream os;
+ os << std::hex;
for (int i = 0; i < 16; i += 2) {
- u16 section = (m_address.ipv6.sin6_addr.s6_addr[i] << 8) |
- (m_address.ipv6.sin6_addr.s6_addr[i + 1]);
- os << std::hex << section;
+ u16 section = (m_address.ipv6.s6_addr[i] << 8) |
+ (m_address.ipv6.s6_addr[i + 1]);
+ os << section;
if (i < 14)
os << ":";
}
return os.str();
- } else
- return std::string("");
+ } else {
+ return "";
+ }
#else
char str[INET6_ADDRSTRLEN];
- if (inet_ntop(m_addr_family,
- (m_addr_family == AF_INET)
- ? (void *)&(m_address.ipv4.sin_addr)
- : (void *)&(m_address.ipv6.sin6_addr),
- str, INET6_ADDRSTRLEN) == NULL) {
- return std::string("");
- }
- return std::string(str);
+ if (inet_ntop(m_addr_family, (void*) &m_address, str, sizeof(str)) == nullptr)
+ return "";
+ return str;
#endif
}
-struct sockaddr_in Address::getAddress() const
+struct in_addr Address::getAddress() const
{
- return m_address.ipv4; // NOTE: NO PORT INCLUDED, use getPort()
+ return m_address.ipv4;
}
-struct sockaddr_in6 Address::getAddress6() const
+struct in6_addr Address::getAddress6() const
{
- return m_address.ipv6; // NOTE: NO PORT INCLUDED, use getPort()
+ return m_address.ipv6;
}
u16 Address::getPort() const
@@ -211,52 +190,39 @@ u16 Address::getPort() const
return m_port;
}
-int Address::getFamily() const
-{
- return m_addr_family;
-}
-
-bool Address::isIPv6() const
-{
- return m_addr_family == AF_INET6;
-}
-
bool Address::isZero() const
{
if (m_addr_family == AF_INET) {
- return m_address.ipv4.sin_addr.s_addr == 0;
+ return m_address.ipv4.s_addr == 0;
}
if (m_addr_family == AF_INET6) {
static const char zero[16] = {0};
- return memcmp(m_address.ipv6.sin6_addr.s6_addr, zero, 16) == 0;
+ return memcmp(m_address.ipv6.s6_addr, zero, 16) == 0;
}
+
return false;
}
void Address::setAddress(u32 address)
{
m_addr_family = AF_INET;
- m_address.ipv4.sin_family = AF_INET;
- m_address.ipv4.sin_addr.s_addr = htonl(address);
+ m_address.ipv4.s_addr = htonl(address);
}
void Address::setAddress(u8 a, u8 b, u8 c, u8 d)
{
- m_addr_family = AF_INET;
- m_address.ipv4.sin_family = AF_INET;
- u32 addr = htonl((a << 24) | (b << 16) | (c << 8) | d);
- m_address.ipv4.sin_addr.s_addr = addr;
+ u32 addr = (a << 24) | (b << 16) | (c << 8) | d;
+ setAddress(addr);
}
void Address::setAddress(const IPv6AddressBytes *ipv6_bytes)
{
m_addr_family = AF_INET6;
- m_address.ipv6.sin6_family = AF_INET6;
if (ipv6_bytes)
- memcpy(m_address.ipv6.sin6_addr.s6_addr, ipv6_bytes->bytes, 16);
+ memcpy(m_address.ipv6.s6_addr, ipv6_bytes->bytes, 16);
else
- memset(m_address.ipv6.sin6_addr.s6_addr, 0, 16);
+ memset(m_address.ipv6.s6_addr, 0, 16);
}
void Address::setPort(u16 port)
@@ -268,23 +234,26 @@ void Address::print(std::ostream *s) const
{
if (m_addr_family == AF_INET6)
*s << "[" << serializeString() << "]:" << m_port;
- else
+ else if (m_addr_family == AF_INET)
*s << serializeString() << ":" << m_port;
+ else
+ *s << "(undefined)";
}
bool Address::isLocalhost() const
{
if (isIPv6()) {
- static const unsigned char localhost_bytes[] = {
+ static const u8 localhost_bytes[] = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1};
- static const unsigned char mapped_ipv4_localhost[] = {
+ static const u8 mapped_ipv4_localhost[] = {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 0x7f, 0, 0, 0};
- auto addr = m_address.ipv6.sin6_addr.s6_addr;
+ auto addr = m_address.ipv6.s6_addr;
return memcmp(addr, localhost_bytes, 16) == 0 ||
memcmp(addr, mapped_ipv4_localhost, 13) == 0;
}
- return (m_address.ipv4.sin_addr.s_addr & 0xFF) == 0x7f;
+ auto addr = ntohl(m_address.ipv4.s_addr);
+ return (addr >> 24) == 0x7f;
}
diff --git a/src/network/address.h b/src/network/address.h
index 4329c84a8..c2f5f2eef 100644
--- a/src/network/address.h
+++ b/src/network/address.h
@@ -36,9 +36,8 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "irrlichttypes.h"
#include "networkexceptions.h"
-class IPv6AddressBytes
+struct IPv6AddressBytes
{
-public:
u8 bytes[16];
IPv6AddressBytes() { memset(bytes, 0, 16); }
};
@@ -50,30 +49,34 @@ public:
Address(u32 address, u16 port);
Address(u8 a, u8 b, u8 c, u8 d, u16 port);
Address(const IPv6AddressBytes *ipv6_bytes, u16 port);
+
bool operator==(const Address &address);
- bool operator!=(const Address &address);
+ bool operator!=(const Address &address) { return !(*this == address); }
+
+ struct in_addr getAddress() const;
+ struct in6_addr getAddress6() const;
+ u16 getPort() const;
+ int getFamily() const { return m_addr_family; }
+ bool isIPv6() const { return m_addr_family == AF_INET6; }
+ bool isZero() const;
+ void print(std::ostream *s) const;
+ std::string serializeString() const;
+ bool isLocalhost() const;
+
// Resolve() may throw ResolveError (address is unchanged in this case)
void Resolve(const char *name);
- struct sockaddr_in getAddress() const;
- unsigned short getPort() const;
+
void setAddress(u32 address);
void setAddress(u8 a, u8 b, u8 c, u8 d);
void setAddress(const IPv6AddressBytes *ipv6_bytes);
- struct sockaddr_in6 getAddress6() const;
- int getFamily() const;
- bool isIPv6() const;
- bool isZero() const;
- void setPort(unsigned short port);
- void print(std::ostream *s) const;
- std::string serializeString() const;
- bool isLocalhost() const;
+ void setPort(u16 port);
private:
- unsigned int m_addr_family = 0;
+ unsigned short m_addr_family = 0;
union
{
- struct sockaddr_in ipv4;
- struct sockaddr_in6 ipv6;
+ struct in_addr ipv4;
+ struct in6_addr ipv6;
} m_address;
u16 m_port = 0; // Port is separate from sockaddr structures
};
diff --git a/src/network/clientopcodes.cpp b/src/network/clientopcodes.cpp
index 55cfdd4dc..a98a5e7d1 100644
--- a/src/network/clientopcodes.cpp
+++ b/src/network/clientopcodes.cpp
@@ -204,7 +204,7 @@ const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
null_command_factory, // 0x3e
null_command_factory, // 0x3f
{ "TOSERVER_REQUEST_MEDIA", 1, true }, // 0x40
- null_command_factory, // 0x41
+ { "TOSERVER_HAVE_MEDIA", 2, true }, // 0x41
null_command_factory, // 0x42
{ "TOSERVER_CLIENT_READY", 1, true }, // 0x43
null_command_factory, // 0x44
diff --git a/src/network/clientpackethandler.cpp b/src/network/clientpackethandler.cpp
index 44bd81dac..48ad60ac6 100644
--- a/src/network/clientpackethandler.cpp
+++ b/src/network/clientpackethandler.cpp
@@ -43,6 +43,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "tileanimation.h"
#include "gettext.h"
#include "skyparams.h"
+#include <memory>
void Client::handleCommand_Deprecated(NetworkPacket* pkt)
{
@@ -88,7 +89,7 @@ void Client::handleCommand_Hello(NetworkPacket* pkt)
// This is only neccessary though when we actually want to add casing support
if (m_chosen_auth_mech != AUTH_MECHANISM_NONE) {
- // we recieved a TOCLIENT_HELLO while auth was already going on
+ // we received a TOCLIENT_HELLO while auth was already going on
errorstream << "Client: TOCLIENT_HELLO while auth was already going on"
<< "(chosen_mech=" << m_chosen_auth_mech << ")." << std::endl;
if (m_chosen_auth_mech == AUTH_MECHANISM_SRP ||
@@ -156,7 +157,7 @@ void Client::handleCommand_AcceptSudoMode(NetworkPacket* pkt)
m_password = m_new_password;
- verbosestream << "Client: Recieved TOCLIENT_ACCEPT_SUDO_MODE." << std::endl;
+ verbosestream << "Client: Received TOCLIENT_ACCEPT_SUDO_MODE." << std::endl;
// send packet to actually set the password
startAuth(AUTH_MECHANISM_FIRST_SRP);
@@ -261,7 +262,7 @@ void Client::handleCommand_NodemetaChanged(NetworkPacket *pkt)
return;
std::istringstream is(pkt->readLongString(), std::ios::binary);
- std::stringstream sstr;
+ std::stringstream sstr(std::ios::binary | std::ios::in | std::ios::out);
decompressZlib(is, sstr);
NodeMetadataList meta_updates_list(false);
@@ -670,21 +671,19 @@ void Client::handleCommand_AnnounceMedia(NetworkPacket* pkt)
m_media_downloader->addFile(name, sha1_raw);
}
- try {
+ {
std::string str;
-
*pkt >> str;
Strfnd sf(str);
- while(!sf.at_end()) {
+ while (!sf.at_end()) {
std::string baseurl = trim(sf.next(","));
- if (!baseurl.empty())
+ if (!baseurl.empty()) {
+ m_remote_media_servers.emplace_back(baseurl);
m_media_downloader->addRemoteServer(baseurl);
+ }
}
}
- catch(SerializationError& e) {
- // not supported by server or turned off
- }
m_media_downloader->step(this);
}
@@ -716,31 +715,38 @@ void Client::handleCommand_Media(NetworkPacket* pkt)
if (num_files == 0)
return;
- if (!m_media_downloader || !m_media_downloader->isStarted()) {
- const char *problem = m_media_downloader ?
- "media has not been requested" :
- "all media has been received already";
- errorstream << "Client: Received media but "
- << problem << "! "
- << " bunch " << bunch_i << "/" << num_bunches
- << " files=" << num_files
- << " size=" << pkt->getSize() << std::endl;
- return;
- }
+ bool init_phase = m_media_downloader && m_media_downloader->isStarted();
- // Mesh update thread must be stopped while
- // updating content definitions
- sanity_check(!m_mesh_update_thread.isRunning());
+ if (init_phase) {
+ // Mesh update thread must be stopped while
+ // updating content definitions
+ sanity_check(!m_mesh_update_thread.isRunning());
+ }
- for (u32 i=0; i < num_files; i++) {
- std::string name;
+ for (u32 i = 0; i < num_files; i++) {
+ std::string name, data;
*pkt >> name;
+ data = pkt->readLongString();
- std::string data = pkt->readLongString();
-
- m_media_downloader->conventionalTransferDone(
- name, data, this);
+ bool ok = false;
+ if (init_phase) {
+ ok = m_media_downloader->conventionalTransferDone(name, data, this);
+ } else {
+ // Check pending dynamic transfers, one of them must be it
+ for (const auto &it : m_pending_media_downloads) {
+ if (it.second->conventionalTransferDone(name, data, this)) {
+ ok = true;
+ break;
+ }
+ }
+ }
+ if (!ok) {
+ errorstream << "Client: Received media \"" << name
+ << "\" but no downloads pending. " << num_bunches << " bunches, "
+ << num_files << " in this one. (init_phase=" << init_phase
+ << ")" << std::endl;
+ }
}
}
@@ -755,12 +761,11 @@ void Client::handleCommand_NodeDef(NetworkPacket* pkt)
// Decompress node definitions
std::istringstream tmp_is(pkt->readLongString(), std::ios::binary);
- std::ostringstream tmp_os;
+ std::stringstream tmp_os(std::ios::binary | std::ios::in | std::ios::out);
decompressZlib(tmp_is, tmp_os);
// Deserialize node definitions
- std::istringstream tmp_is2(tmp_os.str());
- m_nodedef->deSerialize(tmp_is2);
+ m_nodedef->deSerialize(tmp_os);
m_nodedef_received = true;
}
@@ -775,12 +780,11 @@ void Client::handleCommand_ItemDef(NetworkPacket* pkt)
// Decompress item definitions
std::istringstream tmp_is(pkt->readLongString(), std::ios::binary);
- std::ostringstream tmp_os;
+ std::stringstream tmp_os(std::ios::binary | std::ios::in | std::ios::out);
decompressZlib(tmp_is, tmp_os);
// Deserialize node definitions
- std::istringstream tmp_is2(tmp_os.str());
- m_itemdef->deSerialize(tmp_is2);
+ m_itemdef->deSerialize(tmp_os);
m_itemdef_received = true;
}
@@ -1041,9 +1045,6 @@ void Client::handleCommand_DeleteParticleSpawner(NetworkPacket* pkt)
void Client::handleCommand_HudAdd(NetworkPacket* pkt)
{
- std::string datastring(pkt->getString(0), pkt->getSize());
- std::istringstream is(datastring, std::ios_base::binary);
-
u32 server_id;
u8 type;
v2f pos;
@@ -1059,6 +1060,7 @@ void Client::handleCommand_HudAdd(NetworkPacket* pkt)
v2s32 size;
s16 z_index = 0;
std::string text2;
+ u32 style = 0;
*pkt >> server_id >> type >> pos >> name >> scale >> text >> number >> item
>> dir >> align >> offset;
@@ -1067,25 +1069,28 @@ void Client::handleCommand_HudAdd(NetworkPacket* pkt)
*pkt >> size;
*pkt >> z_index;
*pkt >> text2;
+ *pkt >> style;
} catch(PacketError &e) {};
ClientEvent *event = new ClientEvent();
- event->type = CE_HUDADD;
- event->hudadd.server_id = server_id;
- event->hudadd.type = type;
- event->hudadd.pos = new v2f(pos);
- event->hudadd.name = new std::string(name);
- event->hudadd.scale = new v2f(scale);
- event->hudadd.text = new std::string(text);
- event->hudadd.number = number;
- event->hudadd.item = item;
- event->hudadd.dir = dir;
- event->hudadd.align = new v2f(align);
- event->hudadd.offset = new v2f(offset);
- event->hudadd.world_pos = new v3f(world_pos);
- event->hudadd.size = new v2s32(size);
- event->hudadd.z_index = z_index;
- event->hudadd.text2 = new std::string(text2);
+ event->type = CE_HUDADD;
+ event->hudadd = new ClientEventHudAdd();
+ event->hudadd->server_id = server_id;
+ event->hudadd->type = type;
+ event->hudadd->pos = pos;
+ event->hudadd->name = name;
+ event->hudadd->scale = scale;
+ event->hudadd->text = text;
+ event->hudadd->number = number;
+ event->hudadd->item = item;
+ event->hudadd->dir = dir;
+ event->hudadd->align = align;
+ event->hudadd->offset = offset;
+ event->hudadd->world_pos = world_pos;
+ event->hudadd->size = size;
+ event->hudadd->z_index = z_index;
+ event->hudadd->text2 = text2;
+ event->hudadd->style = style;
m_client_event_queue.push(event);
}
@@ -1113,27 +1118,40 @@ void Client::handleCommand_HudChange(NetworkPacket* pkt)
*pkt >> server_id >> stat;
- if (stat == HUD_STAT_POS || stat == HUD_STAT_SCALE ||
- stat == HUD_STAT_ALIGN || stat == HUD_STAT_OFFSET)
- *pkt >> v2fdata;
- else if (stat == HUD_STAT_NAME || stat == HUD_STAT_TEXT || stat == HUD_STAT_TEXT2)
- *pkt >> sdata;
- else if (stat == HUD_STAT_WORLD_POS)
- *pkt >> v3fdata;
- else if (stat == HUD_STAT_SIZE )
- *pkt >> v2s32data;
- else
- *pkt >> intdata;
+ // Keep in sync with:server.cpp -> SendHUDChange
+ switch ((HudElementStat)stat) {
+ case HUD_STAT_POS:
+ case HUD_STAT_SCALE:
+ case HUD_STAT_ALIGN:
+ case HUD_STAT_OFFSET:
+ *pkt >> v2fdata;
+ break;
+ case HUD_STAT_NAME:
+ case HUD_STAT_TEXT:
+ case HUD_STAT_TEXT2:
+ *pkt >> sdata;
+ break;
+ case HUD_STAT_WORLD_POS:
+ *pkt >> v3fdata;
+ break;
+ case HUD_STAT_SIZE:
+ *pkt >> v2s32data;
+ break;
+ default:
+ *pkt >> intdata;
+ break;
+ }
ClientEvent *event = new ClientEvent();
- event->type = CE_HUDCHANGE;
- event->hudchange.id = server_id;
- event->hudchange.stat = (HudElementStat)stat;
- event->hudchange.v2fdata = new v2f(v2fdata);
- event->hudchange.v3fdata = new v3f(v3fdata);
- event->hudchange.sdata = new std::string(sdata);
- event->hudchange.data = intdata;
- event->hudchange.v2s32data = new v2s32(v2s32data);
+ event->type = CE_HUDCHANGE;
+ event->hudchange = new ClientEventHudChange();
+ event->hudchange->id = server_id;
+ event->hudchange->stat = static_cast<HudElementStat>(stat);
+ event->hudchange->v2fdata = v2fdata;
+ event->hudchange->v3fdata = v3fdata;
+ event->hudchange->sdata = sdata;
+ event->hudchange->data = intdata;
+ event->hudchange->v2s32data = v2s32data;
m_client_event_queue.push(event);
}
@@ -1219,19 +1237,17 @@ void Client::handleCommand_HudSetSky(NetworkPacket* pkt)
} catch (...) {}
// Use default skybox settings:
- SkyboxDefaults sky_defaults;
- SunParams sun = sky_defaults.getSunDefaults();
- MoonParams moon = sky_defaults.getMoonDefaults();
- StarParams stars = sky_defaults.getStarDefaults();
+ SunParams sun = SkyboxDefaults::getSunDefaults();
+ MoonParams moon = SkyboxDefaults::getMoonDefaults();
+ StarParams stars = SkyboxDefaults::getStarDefaults();
// Fix for "regular" skies, as color isn't kept:
if (skybox.type == "regular") {
- skybox.sky_color = sky_defaults.getSkyColorDefaults();
+ skybox.sky_color = SkyboxDefaults::getSkyColorDefaults();
skybox.fog_tint_type = "default";
skybox.fog_moon_tint = video::SColor(255, 255, 255, 255);
skybox.fog_sun_tint = video::SColor(255, 255, 255, 255);
- }
- else {
+ } else {
sun.visible = false;
sun.sunrise_visible = false;
moon.visible = false;
@@ -1381,6 +1397,8 @@ void Client::handleCommand_LocalPlayerAnimations(NetworkPacket* pkt)
*pkt >> player->local_animations[2];
*pkt >> player->local_animations[3];
*pkt >> player->local_animation_speed;
+
+ player->last_animation = -1;
}
void Client::handleCommand_EyeOffset(NetworkPacket* pkt)
@@ -1478,46 +1496,72 @@ void Client::handleCommand_PlayerSpeed(NetworkPacket *pkt)
void Client::handleCommand_MediaPush(NetworkPacket *pkt)
{
std::string raw_hash, filename, filedata;
+ u32 token;
bool cached;
*pkt >> raw_hash >> filename >> cached;
- filedata = pkt->readLongString();
+ if (m_proto_ver >= 40)
+ *pkt >> token;
+ else
+ filedata = pkt->readLongString();
- if (raw_hash.size() != 20 || filedata.empty() || filename.empty() ||
+ if (raw_hash.size() != 20 || filename.empty() ||
+ (m_proto_ver < 40 && filedata.empty()) ||
!string_allowed(filename, TEXTURENAME_ALLOWED_CHARS)) {
throw PacketError("Illegal filename, data or hash");
}
- verbosestream << "Server pushes media file \"" << filename << "\" with "
- << filedata.size() << " bytes of data (cached=" << cached
- << ")" << std::endl;
+ verbosestream << "Server pushes media file \"" << filename << "\" ";
+ if (filedata.empty())
+ verbosestream << "to be fetched ";
+ else
+ verbosestream << "with " << filedata.size() << " bytes ";
+ verbosestream << "(cached=" << cached << ")" << std::endl;
if (m_media_pushed_files.count(filename) != 0) {
- // Silently ignore for synchronization purposes
+ // Ignore (but acknowledge). Previously this was for sync purposes,
+ // but even in new versions media cannot be replaced at runtime.
+ if (m_proto_ver >= 40)
+ sendHaveMedia({ token });
return;
}
- // Compute and check checksum of data
- std::string computed_hash;
- {
- SHA1 ctx;
- ctx.addBytes(filedata.c_str(), filedata.size());
- unsigned char *buf = ctx.getDigest();
- computed_hash.assign((char*) buf, 20);
- free(buf);
- }
- if (raw_hash != computed_hash) {
- verbosestream << "Hash of file data mismatches, ignoring." << std::endl;
+ if (!filedata.empty()) {
+ // LEGACY CODEPATH
+ // Compute and check checksum of data
+ std::string computed_hash;
+ {
+ SHA1 ctx;
+ ctx.addBytes(filedata.c_str(), filedata.size());
+ unsigned char *buf = ctx.getDigest();
+ computed_hash.assign((char*) buf, 20);
+ free(buf);
+ }
+ if (raw_hash != computed_hash) {
+ verbosestream << "Hash of file data mismatches, ignoring." << std::endl;
+ return;
+ }
+
+ // Actually load media
+ loadMedia(filedata, filename, true);
+ m_media_pushed_files.insert(filename);
+
+ // Cache file for the next time when this client joins the same server
+ if (cached)
+ clientMediaUpdateCache(raw_hash, filedata);
return;
}
- // Actually load media
- loadMedia(filedata, filename, true);
m_media_pushed_files.insert(filename);
- // Cache file for the next time when this client joins the same server
- if (cached)
- clientMediaUpdateCache(raw_hash, filedata);
+ // create a downloader for this file
+ auto downloader(std::make_shared<SingleMediaDownloader>(cached));
+ m_pending_media_downloads.emplace_back(token, downloader);
+ downloader->addFile(filename, raw_hash);
+ for (const auto &baseurl : m_remote_media_servers)
+ downloader->addRemoteServer(baseurl);
+
+ downloader->step(this);
}
/*
diff --git a/src/network/connection.cpp b/src/network/connection.cpp
index a4970954f..2d3cf6e88 100644
--- a/src/network/connection.cpp
+++ b/src/network/connection.cpp
@@ -62,18 +62,27 @@ namespace con
#define PING_TIMEOUT 5.0
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+u16 BufferedPacket::getSeqnum() const
+{
+ if (size() < BASE_HEADER_SIZE + 3)
+ return 0; // should never happen
+
+ return readU16(&data[BASE_HEADER_SIZE + 1]);
+}
+
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel)
{
u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
- BufferedPacket p(packet_size);
- p.address = address;
- writeU32(&p.data[0], protocol_id);
- writeU16(&p.data[4], sender_peer_id);
- writeU8(&p.data[6], channel);
+ BufferedPacketPtr p(new BufferedPacket(packet_size));
+ p->address = address;
+
+ writeU32(&p->data[0], protocol_id);
+ writeU16(&p->data[4], sender_peer_id);
+ writeU8(&p->data[6], channel);
- memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
+ memcpy(&p->data[BASE_HEADER_SIZE], *data, data.getSize());
return p;
}
@@ -169,9 +178,8 @@ void ReliablePacketBuffer::print()
MutexAutoLock listlock(m_list_mutex);
LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
unsigned int index = 0;
- for (BufferedPacket &bufferedPacket : m_list) {
- u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
- LOG(dout_con<<index<< ":" << s << std::endl);
+ for (BufferedPacketPtr &packet : m_list) {
+ LOG(dout_con<<index<< ":" << packet->getSeqnum() << std::endl);
index++;
}
}
@@ -188,16 +196,13 @@ u32 ReliablePacketBuffer::size()
return m_list.size();
}
-RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
+RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum)
{
- std::list<BufferedPacket>::iterator i = m_list.begin();
- for(; i != m_list.end(); ++i)
- {
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
- if (s == seqnum)
- break;
+ for (auto it = m_list.begin(); it != m_list.end(); ++it) {
+ if ((*it)->getSeqnum() == seqnum)
+ return it;
}
- return i;
+ return m_list.end();
}
bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
@@ -205,54 +210,54 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
- const BufferedPacket &p = m_list.front();
- result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ result = m_list.front()->getSeqnum();
return true;
}
-BufferedPacket ReliablePacketBuffer::popFirst()
+BufferedPacketPtr ReliablePacketBuffer::popFirst()
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
throw NotFoundException("Buffer is empty");
- BufferedPacket p = std::move(m_list.front());
+
+ BufferedPacketPtr p(m_list.front());
m_list.pop_front();
if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
- m_oldest_non_answered_ack =
- readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
return p;
}
-BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
+BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum)
{
MutexAutoLock listlock(m_list_mutex);
- RPBSearchResult r = findPacket(seqnum);
- if (r == notFound()) {
+ RPBSearchResult r = findPacketNoLock(seqnum);
+ if (r == m_list.end()) {
LOG(dout_con<<"Sequence number: " << seqnum
<< " not found in reliable buffer"<<std::endl);
throw NotFoundException("seqnum not found in buffer");
}
- BufferedPacket p = std::move(*r);
+ BufferedPacketPtr p(*r);
m_list.erase(r);
if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
- m_oldest_non_answered_ack =
- readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
return p;
}
-void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
+void ReliablePacketBuffer::insert(BufferedPacketPtr &p_ptr, u16 next_expected)
{
MutexAutoLock listlock(m_list_mutex);
- if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
+ const BufferedPacket &p = *p_ptr;
+
+ if (p.size() < BASE_HEADER_SIZE + 3) {
errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
"reliable packet" << std::endl;
return;
@@ -263,7 +268,7 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
<< std::endl;
return;
}
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ const u16 seqnum = p.getSeqnum();
if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
@@ -280,44 +285,44 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
// Find the right place for the packet and insert it there
// If list is empty, just add it
- if (m_list.empty())
- {
- m_list.push_back(p);
+ if (m_list.empty()) {
+ m_list.push_back(p_ptr);
m_oldest_non_answered_ack = seqnum;
// Done.
return;
}
// Otherwise find the right place
- std::list<BufferedPacket>::iterator i = m_list.begin();
+ auto it = m_list.begin();
// Find the first packet in the list which has a higher seqnum
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ u16 s = (*it)->getSeqnum();
/* case seqnum is smaller then next_expected seqnum */
/* this is true e.g. on wrap around */
if (seqnum < next_expected) {
- while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
- ++i;
- if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ while(((s < seqnum) || (s >= next_expected)) && (it != m_list.end())) {
+ ++it;
+ if (it != m_list.end())
+ s = (*it)->getSeqnum();
}
}
/* non wrap around case (at least for incoming and next_expected */
else
{
- while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
- ++i;
- if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ while(((s < seqnum) && (s >= next_expected)) && (it != m_list.end())) {
+ ++it;
+ if (it != m_list.end())
+ s = (*it)->getSeqnum();
}
}
if (s == seqnum) {
/* nothing to do this seems to be a resent packet */
/* for paranoia reason data should be compared */
+ auto &i = *it;
if (
- (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
- (i->data.getSize() != p.data.getSize()) ||
+ (i->getSeqnum() != seqnum) ||
+ (i->size() != p.size()) ||
(i->address != p.address)
)
{
@@ -325,51 +330,52 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
fprintf(stderr,
"Duplicated seqnum %d non matching packet detected:\n",
seqnum);
- fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
- readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
+ fprintf(stderr, "Old: seqnum: %05d size: %04zu, address: %s\n",
+ i->getSeqnum(), i->size(),
i->address.serializeString().c_str());
- fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
- readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
+ fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n",
+ p.getSeqnum(), p.size(),
p.address.serializeString().c_str());
throw IncomingDataCorruption("duplicated packet isn't same as original one");
}
}
/* insert or push back */
- else if (i != m_list.end()) {
- m_list.insert(i, p);
+ else if (it != m_list.end()) {
+ m_list.insert(it, p_ptr);
} else {
- m_list.push_back(p);
+ m_list.push_back(p_ptr);
}
/* update last packet number */
- m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
void ReliablePacketBuffer::incrementTimeouts(float dtime)
{
MutexAutoLock listlock(m_list_mutex);
- for (BufferedPacket &bufferedPacket : m_list) {
- bufferedPacket.time += dtime;
- bufferedPacket.totaltime += dtime;
+ for (auto &packet : m_list) {
+ packet->time += dtime;
+ packet->totaltime += dtime;
}
}
-std::list<BufferedPacket>
+std::list<ConstSharedPtr<BufferedPacket>>
ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
{
MutexAutoLock listlock(m_list_mutex);
- std::list<BufferedPacket> timed_outs;
- for (BufferedPacket &bufferedPacket : m_list) {
- if (bufferedPacket.time >= timeout) {
- // caller will resend packet so reset time and increase counter
- bufferedPacket.time = 0.0f;
- bufferedPacket.resend_count++;
+ std::list<ConstSharedPtr<BufferedPacket>> timed_outs;
+ for (auto &packet : m_list) {
+ if (packet->time < timeout)
+ continue;
- timed_outs.push_back(bufferedPacket);
+ // caller will resend packet so reset time and increase counter
+ packet->time = 0.0f;
+ packet->resend_count++;
- if (timed_outs.size() >= max_packets)
- break;
- }
+ timed_outs.emplace_back(packet);
+
+ if (timed_outs.size() >= max_packets)
+ break;
}
return timed_outs;
}
@@ -428,11 +434,13 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
}
}
-SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
+SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reliable)
{
MutexAutoLock listlock(m_map_mutex);
+ const BufferedPacket &p = *p_ptr;
+
u32 headersize = BASE_HEADER_SIZE + 7;
- if (p.data.getSize() < headersize) {
+ if (p.size() < headersize) {
errorstream << "Invalid data size for split packet" << std::endl;
return SharedBuffer<u8>();
}
@@ -473,7 +481,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
<<std::endl);
// Cut chunk data out of packet
- u32 chunkdatasize = p.data.getSize() - headersize;
+ u32 chunkdatasize = p.size() - headersize;
SharedBuffer<u8> chunkdata(chunkdatasize);
memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
@@ -520,14 +528,67 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
ConnectionCommand
*/
-void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
- bool reliable_)
+ConnectionCommandPtr ConnectionCommand::create(ConnectionCommandType type)
+{
+ return ConnectionCommandPtr(new ConnectionCommand(type));
+}
+
+ConnectionCommandPtr ConnectionCommand::serve(Address address)
+{
+ auto c = create(CONNCMD_SERVE);
+ c->address = address;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::connect(Address address)
{
- type = CONNCMD_SEND;
- peer_id = peer_id_;
- channelnum = channelnum_;
- data = pkt->oldForgePacket();
- reliable = reliable_;
+ auto c = create(CONNCMD_CONNECT);
+ c->address = address;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect()
+{
+ return create(CONNCMD_DISCONNECT);
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect_peer(session_t peer_id)
+{
+ auto c = create(CONNCMD_DISCONNECT_PEER);
+ c->peer_id = peer_id;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum,
+ NetworkPacket *pkt, bool reliable)
+{
+ auto c = create(CONNCMD_SEND);
+ c->peer_id = peer_id;
+ c->channelnum = channelnum;
+ c->reliable = reliable;
+ c->data = pkt->oldForgePacket();
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data)
+{
+ auto c = create(CONCMD_ACK);
+ c->peer_id = peer_id;
+ c->channelnum = channelnum;
+ c->reliable = false;
+ data.copyTo(c->data);
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::createPeer(session_t peer_id, const Buffer<u8> &data)
+{
+ auto c = create(CONCMD_CREATE_PEER);
+ c->peer_id = peer_id;
+ c->channelnum = 0;
+ c->reliable = true;
+ c->raw = true;
+ data.copyTo(c->data);
+ return c;
}
/*
@@ -562,39 +623,38 @@ void Channel::setNextSplitSeqNum(u16 seqnum)
u16 Channel::getOutgoingSequenceNumber(bool& successful)
{
MutexAutoLock internal(m_internal_mutex);
+
u16 retval = next_outgoing_seqnum;
- u16 lowest_unacked_seqnumber;
+ successful = false;
/* shortcut if there ain't any packet in outgoing list */
- if (outgoing_reliables_sent.empty())
- {
+ if (outgoing_reliables_sent.empty()) {
+ successful = true;
next_outgoing_seqnum++;
return retval;
}
- if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
- {
+ u16 lowest_unacked_seqnumber;
+ if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) {
if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
- if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
- successful = false;
+ if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > m_window_size) {
return 0;
}
- }
- else {
+ } else {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
- window_size) {
- successful = false;
+ m_window_size) {
return 0;
}
}
}
+ successful = true;
next_outgoing_seqnum++;
return retval;
}
@@ -666,7 +726,7 @@ void Channel::UpdateTimers(float dtime)
//packet_too_late = current_packet_too_late;
packets_successful = current_packet_successful;
- if (current_bytes_transfered > (unsigned int) (window_size*512/2)) {
+ if (current_bytes_transfered > (unsigned int) (m_window_size*512/2)) {
reasonable_amount_of_data_transmitted = true;
}
current_packet_loss = 0;
@@ -681,37 +741,25 @@ void Channel::UpdateTimers(float dtime)
if (packets_successful > 0) {
successful_to_lost_ratio = packet_loss/packets_successful;
} else if (packet_loss > 0) {
- window_size = std::max(
- (window_size - 10),
- MIN_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size - 10);
done = true;
}
if (!done) {
- if ((successful_to_lost_ratio < 0.01f) &&
- (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+ if (successful_to_lost_ratio < 0.01f) {
/* don't even think about increasing if we didn't even
* use major parts of our window */
if (reasonable_amount_of_data_transmitted)
- window_size = std::min(
- (window_size + 100),
- MAX_RELIABLE_WINDOW_SIZE);
- } else if ((successful_to_lost_ratio < 0.05f) &&
- (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+ setWindowSize(m_window_size + 100);
+ } else if (successful_to_lost_ratio < 0.05f) {
/* don't even think about increasing if we didn't even
* use major parts of our window */
if (reasonable_amount_of_data_transmitted)
- window_size = std::min(
- (window_size + 50),
- MAX_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size + 50);
} else if (successful_to_lost_ratio > 0.15f) {
- window_size = std::max(
- (window_size - 100),
- MIN_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size - 100);
} else if (successful_to_lost_ratio > 0.1f) {
- window_size = std::max(
- (window_size - 50),
- MIN_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size - 50);
}
}
}
@@ -958,45 +1006,45 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
return false;
}
-void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
+void UDPPeer::PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return;
- Channel &chan = channels[c.channelnum];
+ Channel &chan = channels[c->channelnum];
if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */
- (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
+ (chan.queued_reliables.size() + 1 < chan.getWindowSize() / 2)) {
LOG(dout_con<<m_connection->getDesc()
- <<" processing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() << std::endl);
- if (!processReliableSendCommand(c,max_packet_size)) {
- chan.queued_commands.push_back(c);
- }
- }
- else {
+ <<" processing reliable command for peer id: " << c->peer_id
+ <<" data size: " << c->data.getSize() << std::endl);
+ if (processReliableSendCommand(c, max_packet_size))
+ return;
+ } else {
LOG(dout_con<<m_connection->getDesc()
- <<" Queueing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() <<std::endl);
- chan.queued_commands.push_back(c);
- if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+ <<" Queueing reliable command for peer id: " << c->peer_id
+ <<" data size: " << c->data.getSize() <<std::endl);
+
+ if (chan.queued_commands.size() + 1 >= chan.getWindowSize() / 2) {
LOG(derr_con << m_connection->getDesc()
- << "Possible packet stall to peer id: " << c.peer_id
+ << "Possible packet stall to peer id: " << c->peer_id
<< " queued_commands=" << chan.queued_commands.size()
<< std::endl);
}
}
+ chan.queued_commands.push_back(c);
}
bool UDPPeer::processReliableSendCommand(
- ConnectionCommand &c,
+ ConnectionCommandPtr &c_ptr,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return true;
+ const auto &c = *c_ptr;
Channel &chan = channels[c.channelnum];
u32 chunksize_max = max_packet_size
@@ -1015,9 +1063,9 @@ bool UDPPeer::processReliableSendCommand(
chan.setNextSplitSeqNum(split_sequence_number);
}
- bool have_sequence_number = true;
+ bool have_sequence_number = false;
bool have_initial_sequence_number = false;
- std::queue<BufferedPacket> toadd;
+ std::queue<BufferedPacketPtr> toadd;
volatile u16 initial_sequence_number = 0;
for (SharedBuffer<u8> &original : originals) {
@@ -1036,25 +1084,23 @@ bool UDPPeer::processReliableSendCommand(
SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(address, reliable,
+ BufferedPacketPtr p = con::makePacket(address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
c.channelnum);
- toadd.push(std::move(p));
+ toadd.push(p);
}
if (have_sequence_number) {
- volatile u16 pcount = 0;
while (!toadd.empty()) {
- BufferedPacket p = std::move(toadd.front());
+ BufferedPacketPtr p = toadd.front();
toadd.pop();
// LOG(dout_con<<connection->getDesc()
// << " queuing reliable packet for peer_id: " << c.peer_id
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
- chan.queued_reliables.push(std::move(p));
- pcount++;
+ chan.queued_reliables.push(p);
}
sanity_check(chan.queued_reliables.size() < 0xFFFF);
return true;
@@ -1063,6 +1109,7 @@ bool UDPPeer::processReliableSendCommand(
volatile u16 packets_available = toadd.size();
/* we didn't get a single sequence number no need to fill queue */
if (!have_initial_sequence_number) {
+ LOG(derr_con << m_connection->getDesc() << "Ran out of sequence numbers!" << std::endl);
return false;
}
@@ -1108,18 +1155,18 @@ void UDPPeer::RunCommandQueues(
(channel.queued_reliables.size() < maxtransfer) &&
(commands_processed < maxcommands)) {
try {
- ConnectionCommand c = channel.queued_commands.front();
+ ConnectionCommandPtr c = channel.queued_commands.front();
LOG(dout_con << m_connection->getDesc()
<< " processing queued reliable command " << std::endl);
// Packet is processed, remove it from queue
- if (processReliableSendCommand(c,max_packet_size)) {
+ if (processReliableSendCommand(c, max_packet_size)) {
channel.queued_commands.pop_front();
} else {
LOG(dout_con << m_connection->getDesc()
- << " Failed to queue packets for peer_id: " << c.peer_id
- << ", delaying sending of " << c.data.getSize()
+ << " Failed to queue packets for peer_id: " << c->peer_id
+ << ", delaying sending of " << c->data.getSize()
<< " bytes" << std::endl);
}
}
@@ -1142,7 +1189,7 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
channels[channel].setNextSplitSeqNum(seqnum);
}
-SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
+SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable)
{
assert(channel < CHANNEL_COUNT); // Pre-condition
@@ -1150,6 +1197,63 @@ SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd
}
/*
+ ConnectionEvent
+*/
+
+const char *ConnectionEvent::describe() const
+{
+ switch(type) {
+ case CONNEVENT_NONE:
+ return "CONNEVENT_NONE";
+ case CONNEVENT_DATA_RECEIVED:
+ return "CONNEVENT_DATA_RECEIVED";
+ case CONNEVENT_PEER_ADDED:
+ return "CONNEVENT_PEER_ADDED";
+ case CONNEVENT_PEER_REMOVED:
+ return "CONNEVENT_PEER_REMOVED";
+ case CONNEVENT_BIND_FAILED:
+ return "CONNEVENT_BIND_FAILED";
+ }
+ return "Invalid ConnectionEvent";
+}
+
+
+ConnectionEventPtr ConnectionEvent::create(ConnectionEventType type)
+{
+ return std::shared_ptr<ConnectionEvent>(new ConnectionEvent(type));
+}
+
+ConnectionEventPtr ConnectionEvent::dataReceived(session_t peer_id, const Buffer<u8> &data)
+{
+ auto e = create(CONNEVENT_DATA_RECEIVED);
+ e->peer_id = peer_id;
+ data.copyTo(e->data);
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerAdded(session_t peer_id, Address address)
+{
+ auto e = create(CONNEVENT_PEER_ADDED);
+ e->peer_id = peer_id;
+ e->address = address;
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerRemoved(session_t peer_id, bool is_timeout, Address address)
+{
+ auto e = create(CONNEVENT_PEER_REMOVED);
+ e->peer_id = peer_id;
+ e->timeout = is_timeout;
+ e->address = address;
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::bindFailed()
+{
+ return create(CONNEVENT_BIND_FAILED);
+}
+
+/*
Connection
*/
@@ -1198,18 +1302,12 @@ Connection::~Connection()
/* Internal stuff */
-void Connection::putEvent(const ConnectionEvent &e)
+void Connection::putEvent(ConnectionEventPtr e)
{
- assert(e.type != CONNEVENT_NONE); // Pre-condition
+ assert(e->type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(e);
}
-void Connection::putEvent(ConnectionEvent &&e)
-{
- assert(e.type != CONNEVENT_NONE); // Pre-condition
- m_event_queue.push_back(std::move(e));
-}
-
void Connection::TriggerSend()
{
m_sendThread->Trigger();
@@ -1272,11 +1370,9 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
Address peer_address;
//any peer has a primary address this never fails!
peer->getAddress(MTP_PRIMARY, peer_address);
- // Create event
- ConnectionEvent e;
- e.peerRemoved(peer_id, timeout, peer_address);
- putEvent(e);
+ // Create event
+ putEvent(ConnectionEvent::peerRemoved(peer_id, timeout, peer_address));
peer->Drop();
return true;
@@ -1284,18 +1380,16 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
/* Interface */
-ConnectionEvent Connection::waitEvent(u32 timeout_ms)
+ConnectionEventPtr Connection::waitEvent(u32 timeout_ms)
{
try {
return m_event_queue.pop_front(timeout_ms);
} catch(ItemNotFoundException &ex) {
- ConnectionEvent e;
- e.type = CONNEVENT_NONE;
- return e;
+ return ConnectionEvent::create(CONNEVENT_NONE);
}
}
-void Connection::putCommand(const ConnectionCommand &c)
+void Connection::putCommand(ConnectionCommandPtr c)
{
if (!m_shutting_down) {
m_command_queue.push_back(c);
@@ -1303,26 +1397,14 @@ void Connection::putCommand(const ConnectionCommand &c)
}
}
-void Connection::putCommand(ConnectionCommand &&c)
-{
- if (!m_shutting_down) {
- m_command_queue.push_back(std::move(c));
- m_sendThread->Trigger();
- }
-}
-
void Connection::Serve(Address bind_addr)
{
- ConnectionCommand c;
- c.serve(bind_addr);
- putCommand(c);
+ putCommand(ConnectionCommand::serve(bind_addr));
}
void Connection::Connect(Address address)
{
- ConnectionCommand c;
- c.connect(address);
- putCommand(c);
+ putCommand(ConnectionCommand::connect(address));
}
bool Connection::Connected()
@@ -1344,9 +1426,7 @@ bool Connection::Connected()
void Connection::Disconnect()
{
- ConnectionCommand c;
- c.disconnect();
- putCommand(c);
+ putCommand(ConnectionCommand::disconnect());
}
bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
@@ -1357,11 +1437,15 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
This is not considered to be a problem (is it?)
*/
for(;;) {
- ConnectionEvent e = waitEvent(timeout);
- if (e.type != CONNEVENT_NONE)
+ ConnectionEventPtr e_ptr = waitEvent(timeout);
+ const ConnectionEvent &e = *e_ptr;
+
+ if (e.type != CONNEVENT_NONE) {
LOG(dout_con << getDesc() << ": Receive: got event: "
<< e.describe() << std::endl);
- switch(e.type) {
+ }
+
+ switch (e.type) {
case CONNEVENT_NONE:
return false;
case CONNEVENT_DATA_RECEIVED:
@@ -1409,10 +1493,7 @@ void Connection::Send(session_t peer_id, u8 channelnum,
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
- ConnectionCommand c;
-
- c.send(peer_id, channelnum, pkt, reliable);
- putCommand(std::move(c));
+ putCommand(ConnectionCommand::send(peer_id, channelnum, pkt, reliable));
}
Address Connection::GetPeerAddress(session_t peer_id)
@@ -1511,41 +1592,31 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
LOG(dout_con << getDesc()
<< "createPeer(): giving peer_id=" << peer_id_new << std::endl);
- ConnectionCommand cmd;
- Buffer<u8> reply(4);
- writeU8(&reply[0], PACKET_TYPE_CONTROL);
- writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
- writeU16(&reply[2], peer_id_new);
- cmd.createPeer(peer_id_new,reply);
- putCommand(std::move(cmd));
+ {
+ Buffer<u8> reply(4);
+ writeU8(&reply[0], PACKET_TYPE_CONTROL);
+ writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
+ writeU16(&reply[2], peer_id_new);
+ putCommand(ConnectionCommand::createPeer(peer_id_new, reply));
+ }
// Create peer addition event
- ConnectionEvent e;
- e.peerAdded(peer_id_new, sender);
- putEvent(e);
+ putEvent(ConnectionEvent::peerAdded(peer_id_new, sender));
// We're now talking to a valid peer_id
return peer_id_new;
}
-void Connection::PrintInfo(std::ostream &out)
-{
- m_info_mutex.lock();
- out<<getDesc()<<": ";
- m_info_mutex.unlock();
-}
-
const std::string Connection::getDesc()
{
+ MutexAutoLock _(m_info_mutex);
return std::string("con(")+
itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
}
void Connection::DisconnectPeer(session_t peer_id)
{
- ConnectionCommand discon;
- discon.disconnect_peer(peer_id);
- putCommand(discon);
+ putCommand(ConnectionCommand::disconnect_peer(peer_id));
}
void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
@@ -1557,14 +1628,12 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
" channel: " << (channelnum & 0xFF) <<
" seqnum: " << seqnum << std::endl);
- ConnectionCommand c;
SharedBuffer<u8> ack(4);
writeU8(&ack[0], PACKET_TYPE_CONTROL);
writeU8(&ack[1], CONTROLTYPE_ACK);
writeU16(&ack[2], seqnum);
- c.ack(peer_id, channelnum, ack);
- putCommand(std::move(c));
+ putCommand(ConnectionCommand::ack(peer_id, channelnum, ack));
m_sendThread->Trigger();
}
diff --git a/src/network/connection.h b/src/network/connection.h
index 49bb65c3e..1afb4ae84 100644
--- a/src/network/connection.h
+++ b/src/network/connection.h
@@ -32,6 +32,95 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <vector>
#include <map>
+#define MAX_UDP_PEERS 65535
+
+/*
+=== NOTES ===
+
+A packet is sent through a channel to a peer with a basic header:
+ Header (7 bytes):
+ [0] u32 protocol_id
+ [4] session_t sender_peer_id
+ [6] u8 channel
+sender_peer_id:
+ Unique to each peer.
+ value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
+ value 1 (PEER_ID_SERVER) is reserved for server
+ these constants are defined in constants.h
+channel:
+ Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
+*/
+#define BASE_HEADER_SIZE 7
+#define CHANNEL_COUNT 3
+
+/*
+Packet types:
+
+CONTROL: This is a packet used by the protocol.
+- When this is processed, nothing is handed to the user.
+ Header (2 byte):
+ [0] u8 type
+ [1] u8 controltype
+controltype and data description:
+ CONTROLTYPE_ACK
+ [2] u16 seqnum
+ CONTROLTYPE_SET_PEER_ID
+ [2] session_t peer_id_new
+ CONTROLTYPE_PING
+ - There is no actual reply, but this can be sent in a reliable
+ packet to get a reply
+ CONTROLTYPE_DISCO
+*/
+enum ControlType : u8 {
+ CONTROLTYPE_ACK = 0,
+ CONTROLTYPE_SET_PEER_ID = 1,
+ CONTROLTYPE_PING = 2,
+ CONTROLTYPE_DISCO = 3,
+};
+
+/*
+ORIGINAL: This is a plain packet with no control and no error
+checking at all.
+- When this is processed, it is directly handed to the user.
+ Header (1 byte):
+ [0] u8 type
+*/
+//#define TYPE_ORIGINAL 1
+#define ORIGINAL_HEADER_SIZE 1
+
+/*
+SPLIT: These are sequences of packets forming one bigger piece of
+data.
+- When processed and all the packet_nums 0...packet_count-1 are
+ present (this should be buffered), the resulting data shall be
+ directly handed to the user.
+- If the data fails to come up in a reasonable time, the buffer shall
+ be silently discarded.
+- These can be sent as-is or atop of a RELIABLE packet stream.
+ Header (7 bytes):
+ [0] u8 type
+ [1] u16 seqnum
+ [3] u16 chunk_count
+ [5] u16 chunk_num
+*/
+//#define TYPE_SPLIT 2
+
+/*
+RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
+and they shall be delivered in the same order as sent. This is done
+with a buffer in the receiving and transmitting end.
+- When this is processed, the contents of each packet is recursively
+ processed as packets.
+ Header (3 bytes):
+ [0] u8 type
+ [1] u16 seqnum
+
+*/
+//#define TYPE_RELIABLE 3
+#define RELIABLE_HEADER_SIZE 3
+#define SEQNUM_INITIAL 65500
+#define SEQNUM_MAX 65535
+
class NetworkPacket;
namespace con
@@ -46,9 +135,13 @@ typedef enum MTProtocols {
MTP_MINETEST_RELIABLE_UDP
} MTProtocols;
-#define MAX_UDP_PEERS 65535
-
-#define SEQNUM_MAX 65535
+enum PacketType : u8 {
+ PACKET_TYPE_CONTROL = 0,
+ PACKET_TYPE_ORIGINAL = 1,
+ PACKET_TYPE_SPLIT = 2,
+ PACKET_TYPE_RELIABLE = 3,
+ PACKET_TYPE_MAX
+};
inline bool seqnum_higher(u16 totest, u16 base)
{
@@ -85,24 +178,40 @@ static inline float CALC_DTIME(u64 lasttime, u64 curtime)
return MYMAX(MYMIN(value,0.1),0.0);
}
-struct BufferedPacket
-{
- BufferedPacket(u8 *a_data, u32 a_size):
- data(a_data, a_size)
- {}
- BufferedPacket(u32 a_size):
- data(a_size)
- {}
- Buffer<u8> data; // Data of the packet, including headers
+/*
+ Struct for all kinds of packets. Includes following data:
+ BASE_HEADER
+ u8[] packet data (usually copied from SharedBuffer<u8>)
+*/
+struct BufferedPacket {
+ BufferedPacket(u32 a_size)
+ {
+ m_data.resize(a_size);
+ data = &m_data[0];
+ }
+
+ DISABLE_CLASS_COPY(BufferedPacket)
+
+ u16 getSeqnum() const;
+
+ inline const size_t size() const { return m_data.size(); }
+
+ u8 *data; // Direct memory access
float time = 0.0f; // Seconds from buffering the packet or re-sending
float totaltime = 0.0f; // Seconds from buffering the packet
u64 absolute_send_time = -1;
Address address; // Sender or destination
unsigned int resend_count = 0;
+
+private:
+ std::vector<u8> m_data; // Data of the packet, including headers
};
+typedef std::shared_ptr<BufferedPacket> BufferedPacketPtr;
+
+
// This adds the base headers to the data and makes a packet out of it
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel);
// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
@@ -137,100 +246,11 @@ private:
};
/*
-=== NOTES ===
-
-A packet is sent through a channel to a peer with a basic header:
- Header (7 bytes):
- [0] u32 protocol_id
- [4] session_t sender_peer_id
- [6] u8 channel
-sender_peer_id:
- Unique to each peer.
- value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
- value 1 (PEER_ID_SERVER) is reserved for server
- these constants are defined in constants.h
-channel:
- Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
-*/
-#define BASE_HEADER_SIZE 7
-#define CHANNEL_COUNT 3
-/*
-Packet types:
-
-CONTROL: This is a packet used by the protocol.
-- When this is processed, nothing is handed to the user.
- Header (2 byte):
- [0] u8 type
- [1] u8 controltype
-controltype and data description:
- CONTROLTYPE_ACK
- [2] u16 seqnum
- CONTROLTYPE_SET_PEER_ID
- [2] session_t peer_id_new
- CONTROLTYPE_PING
- - There is no actual reply, but this can be sent in a reliable
- packet to get a reply
- CONTROLTYPE_DISCO
-*/
-//#define TYPE_CONTROL 0
-#define CONTROLTYPE_ACK 0
-#define CONTROLTYPE_SET_PEER_ID 1
-#define CONTROLTYPE_PING 2
-#define CONTROLTYPE_DISCO 3
-
-/*
-ORIGINAL: This is a plain packet with no control and no error
-checking at all.
-- When this is processed, it is directly handed to the user.
- Header (1 byte):
- [0] u8 type
-*/
-//#define TYPE_ORIGINAL 1
-#define ORIGINAL_HEADER_SIZE 1
-/*
-SPLIT: These are sequences of packets forming one bigger piece of
-data.
-- When processed and all the packet_nums 0...packet_count-1 are
- present (this should be buffered), the resulting data shall be
- directly handed to the user.
-- If the data fails to come up in a reasonable time, the buffer shall
- be silently discarded.
-- These can be sent as-is or atop of a RELIABLE packet stream.
- Header (7 bytes):
- [0] u8 type
- [1] u16 seqnum
- [3] u16 chunk_count
- [5] u16 chunk_num
-*/
-//#define TYPE_SPLIT 2
-/*
-RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
-and they shall be delivered in the same order as sent. This is done
-with a buffer in the receiving and transmitting end.
-- When this is processed, the contents of each packet is recursively
- processed as packets.
- Header (3 bytes):
- [0] u8 type
- [1] u16 seqnum
-
-*/
-//#define TYPE_RELIABLE 3
-#define RELIABLE_HEADER_SIZE 3
-#define SEQNUM_INITIAL 65500
-
-enum PacketType: u8 {
- PACKET_TYPE_CONTROL = 0,
- PACKET_TYPE_ORIGINAL = 1,
- PACKET_TYPE_SPLIT = 2,
- PACKET_TYPE_RELIABLE = 3,
- PACKET_TYPE_MAX
-};
-/*
A buffer which stores reliable packets and sorts them internally
for fast access to the smallest one.
*/
-typedef std::list<BufferedPacket>::iterator RPBSearchResult;
+typedef std::list<BufferedPacketPtr>::iterator RPBSearchResult;
class ReliablePacketBuffer
{
@@ -239,12 +259,12 @@ public:
bool getFirstSeqnum(u16& result);
- BufferedPacket popFirst();
- BufferedPacket popSeqnum(u16 seqnum);
- void insert(const BufferedPacket &p, u16 next_expected);
+ BufferedPacketPtr popFirst();
+ BufferedPacketPtr popSeqnum(u16 seqnum);
+ void insert(BufferedPacketPtr &p_ptr, u16 next_expected);
void incrementTimeouts(float dtime);
- std::list<BufferedPacket> getTimedOuts(float timeout, u32 max_packets);
+ std::list<ConstSharedPtr<BufferedPacket>> getTimedOuts(float timeout, u32 max_packets);
void print();
bool empty();
@@ -252,10 +272,9 @@ public:
private:
- RPBSearchResult findPacket(u16 seqnum); // does not perform locking
- inline RPBSearchResult notFound() { return m_list.end(); }
+ RPBSearchResult findPacketNoLock(u16 seqnum);
- std::list<BufferedPacket> m_list;
+ std::list<BufferedPacketPtr> m_list;
u16 m_oldest_non_answered_ack;
@@ -274,7 +293,7 @@ public:
Returns a reference counted buffer of length != 0 when a full split
packet is constructed. If not, returns one of length 0.
*/
- SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
+ SharedBuffer<u8> insert(BufferedPacketPtr &p_ptr, bool reliable);
void removeUnreliableTimedOuts(float dtime, float timeout);
@@ -285,25 +304,6 @@ private:
std::mutex m_map_mutex;
};
-struct OutgoingPacket
-{
- session_t peer_id;
- u8 channelnum;
- SharedBuffer<u8> data;
- bool reliable;
- bool ack;
-
- OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
- bool reliable_,bool ack_=false):
- peer_id(peer_id_),
- channelnum(channelnum_),
- data(data_),
- reliable(reliable_),
- ack(ack_)
- {
- }
-};
-
enum ConnectionCommandType{
CONNCMD_NONE,
CONNCMD_SERVE,
@@ -316,9 +316,13 @@ enum ConnectionCommandType{
CONCMD_CREATE_PEER
};
+struct ConnectionCommand;
+typedef std::shared_ptr<ConnectionCommand> ConnectionCommandPtr;
+
+// This is very similar to ConnectionEvent
struct ConnectionCommand
{
- enum ConnectionCommandType type = CONNCMD_NONE;
+ const ConnectionCommandType type;
Address address;
session_t peer_id = PEER_ID_INEXISTENT;
u8 channelnum = 0;
@@ -326,48 +330,21 @@ struct ConnectionCommand
bool reliable = false;
bool raw = false;
- ConnectionCommand() = default;
+ DISABLE_CLASS_COPY(ConnectionCommand);
- void serve(Address address_)
- {
- type = CONNCMD_SERVE;
- address = address_;
- }
- void connect(Address address_)
- {
- type = CONNCMD_CONNECT;
- address = address_;
- }
- void disconnect()
- {
- type = CONNCMD_DISCONNECT;
- }
- void disconnect_peer(session_t peer_id_)
- {
- type = CONNCMD_DISCONNECT_PEER;
- peer_id = peer_id_;
- }
-
- void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
+ static ConnectionCommandPtr serve(Address address);
+ static ConnectionCommandPtr connect(Address address);
+ static ConnectionCommandPtr disconnect();
+ static ConnectionCommandPtr disconnect_peer(session_t peer_id);
+ static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
+ static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data);
+ static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer<u8> &data);
- void ack(session_t peer_id_, u8 channelnum_, const Buffer<u8> &data_)
- {
- type = CONCMD_ACK;
- peer_id = peer_id_;
- channelnum = channelnum_;
- data = data_;
- reliable = false;
- }
+private:
+ ConnectionCommand(ConnectionCommandType type_) :
+ type(type_) {}
- void createPeer(session_t peer_id_, const Buffer<u8> &data_)
- {
- type = CONCMD_CREATE_PEER;
- peer_id = peer_id_;
- data = data_;
- channelnum = 0;
- reliable = true;
- raw = true;
- }
+ static ConnectionCommandPtr create(ConnectionCommandType type);
};
/* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
@@ -402,10 +379,10 @@ public:
ReliablePacketBuffer outgoing_reliables_sent;
//queued reliable packets
- std::queue<BufferedPacket> queued_reliables;
+ std::queue<BufferedPacketPtr> queued_reliables;
//queue commands prior splitting to packets
- std::deque<ConnectionCommand> queued_commands;
+ std::deque<ConnectionCommandPtr> queued_commands;
IncomingSplitBuffer incoming_splits;
@@ -420,34 +397,38 @@ public:
void UpdateTimers(float dtime);
- const float getCurrentDownloadRateKB()
+ float getCurrentDownloadRateKB()
{ MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
- const float getMaxDownloadRateKB()
+ float getMaxDownloadRateKB()
{ MutexAutoLock lock(m_internal_mutex); return max_kbps; };
- const float getCurrentLossRateKB()
+ float getCurrentLossRateKB()
{ MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
- const float getMaxLossRateKB()
+ float getMaxLossRateKB()
{ MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
- const float getCurrentIncomingRateKB()
+ float getCurrentIncomingRateKB()
{ MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
- const float getMaxIncomingRateKB()
+ float getMaxIncomingRateKB()
{ MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
- const float getAvgDownloadRateKB()
+ float getAvgDownloadRateKB()
{ MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
- const float getAvgLossRateKB()
+ float getAvgLossRateKB()
{ MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
- const float getAvgIncomingRateKB()
+ float getAvgIncomingRateKB()
{ MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
- const unsigned int getWindowSize() const { return window_size; };
+ u16 getWindowSize() const { return m_window_size; };
+
+ void setWindowSize(long size)
+ {
+ m_window_size = (u16)rangelim(size, MIN_RELIABLE_WINDOW_SIZE, MAX_RELIABLE_WINDOW_SIZE);
+ }
- void setWindowSize(unsigned int size) { window_size = size; };
private:
std::mutex m_internal_mutex;
- int window_size = MIN_RELIABLE_WINDOW_SIZE;
+ u16 m_window_size = MIN_RELIABLE_WINDOW_SIZE;
u16 next_incoming_seqnum = SEQNUM_INITIAL;
@@ -510,7 +491,7 @@ class Peer {
public:
friend class PeerHelper;
- Peer(Address address_,u16 id_,Connection* connection) :
+ Peer(Address address_,session_t id_,Connection* connection) :
id(id_),
m_connection(connection),
address(address_),
@@ -524,11 +505,11 @@ class Peer {
};
// Unique id of the peer
- u16 id;
+ const session_t id;
void Drop();
- virtual void PutReliableSendCommand(ConnectionCommand &c,
+ virtual void PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size) {};
virtual bool getAddress(MTProtocols type, Address& toset) = 0;
@@ -545,7 +526,7 @@ class Peer {
virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
- virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+ virtual SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable)
{
errorstream << "Peer::addSplitPacket called,"
@@ -582,7 +563,7 @@ class Peer {
bool IncUseCount();
void DecUseCount();
- std::mutex m_exclusive_access_mutex;
+ mutable std::mutex m_exclusive_access_mutex;
bool m_pending_deletion = false;
@@ -630,7 +611,7 @@ public:
UDPPeer(u16 a_id, Address a_address, Connection* connection);
virtual ~UDPPeer() = default;
- void PutReliableSendCommand(ConnectionCommand &c,
+ void PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size);
bool getAddress(MTProtocols type, Address& toset);
@@ -638,7 +619,7 @@ public:
u16 getNextSplitSequenceNumber(u8 channel);
void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
- SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+ SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable);
protected:
@@ -667,7 +648,7 @@ private:
float resend_timeout = 0.5;
bool processReliableSendCommand(
- ConnectionCommand &c,
+ ConnectionCommandPtr &c_ptr,
unsigned int max_packet_size);
};
@@ -675,7 +656,7 @@ private:
Connection
*/
-enum ConnectionEventType{
+enum ConnectionEventType {
CONNEVENT_NONE,
CONNEVENT_DATA_RECEIVED,
CONNEVENT_PEER_ADDED,
@@ -683,56 +664,32 @@ enum ConnectionEventType{
CONNEVENT_BIND_FAILED,
};
+struct ConnectionEvent;
+typedef std::shared_ptr<ConnectionEvent> ConnectionEventPtr;
+
+// This is very similar to ConnectionCommand
struct ConnectionEvent
{
- enum ConnectionEventType type = CONNEVENT_NONE;
+ const ConnectionEventType type;
session_t peer_id = 0;
Buffer<u8> data;
bool timeout = false;
Address address;
- ConnectionEvent() = default;
+ // We don't want to copy "data"
+ DISABLE_CLASS_COPY(ConnectionEvent);
- const char *describe() const
- {
- switch(type) {
- case CONNEVENT_NONE:
- return "CONNEVENT_NONE";
- case CONNEVENT_DATA_RECEIVED:
- return "CONNEVENT_DATA_RECEIVED";
- case CONNEVENT_PEER_ADDED:
- return "CONNEVENT_PEER_ADDED";
- case CONNEVENT_PEER_REMOVED:
- return "CONNEVENT_PEER_REMOVED";
- case CONNEVENT_BIND_FAILED:
- return "CONNEVENT_BIND_FAILED";
- }
- return "Invalid ConnectionEvent";
- }
+ static ConnectionEventPtr create(ConnectionEventType type);
+ static ConnectionEventPtr dataReceived(session_t peer_id, const Buffer<u8> &data);
+ static ConnectionEventPtr peerAdded(session_t peer_id, Address address);
+ static ConnectionEventPtr peerRemoved(session_t peer_id, bool is_timeout, Address address);
+ static ConnectionEventPtr bindFailed();
- void dataReceived(session_t peer_id_, const Buffer<u8> &data_)
- {
- type = CONNEVENT_DATA_RECEIVED;
- peer_id = peer_id_;
- data = data_;
- }
- void peerAdded(session_t peer_id_, Address address_)
- {
- type = CONNEVENT_PEER_ADDED;
- peer_id = peer_id_;
- address = address_;
- }
- void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
- {
- type = CONNEVENT_PEER_REMOVED;
- peer_id = peer_id_;
- timeout = timeout_;
- address = address_;
- }
- void bindFailed()
- {
- type = CONNEVENT_BIND_FAILED;
- }
+ const char *describe() const;
+
+private:
+ ConnectionEvent(ConnectionEventType type_) :
+ type(type_) {}
};
class PeerHandler;
@@ -748,10 +705,9 @@ public:
~Connection();
/* Interface */
- ConnectionEvent waitEvent(u32 timeout_ms);
- // Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible
- void putCommand(const ConnectionCommand &c);
- void putCommand(ConnectionCommand &&c);
+ ConnectionEventPtr waitEvent(u32 timeout_ms);
+
+ void putCommand(ConnectionCommandPtr c);
void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
void Serve(Address bind_addr);
@@ -765,7 +721,7 @@ public:
Address GetPeerAddress(session_t peer_id);
float getPeerStat(session_t peer_id, rtt_stat_type type);
float getLocalStat(rate_stat_type type);
- const u32 GetProtocolID() const { return m_protocol_id; };
+ u32 GetProtocolID() const { return m_protocol_id; };
const std::string getDesc();
void DisconnectPeer(session_t peer_id);
@@ -781,8 +737,6 @@ protected:
void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
- void PrintInfo(std::ostream &out);
-
std::vector<session_t> getPeerIDs()
{
MutexAutoLock peerlock(m_peers_mutex);
@@ -791,13 +745,11 @@ protected:
UDPSocket m_udpSocket;
// Command queue: user -> SendThread
- MutexedQueue<ConnectionCommand> m_command_queue;
+ MutexedQueue<ConnectionCommandPtr> m_command_queue;
bool Receive(NetworkPacket *pkt, u32 timeout);
- // Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible
- void putEvent(const ConnectionEvent &e);
- void putEvent(ConnectionEvent &&e);
+ void putEvent(ConnectionEventPtr e);
void TriggerSend();
@@ -807,7 +759,7 @@ protected:
}
private:
// Event queue: ReceiveThread -> user
- MutexedQueue<ConnectionEvent> m_event_queue;
+ MutexedQueue<ConnectionEventPtr> m_event_queue;
session_t m_peer_id = 0;
u32 m_protocol_id;
@@ -819,7 +771,7 @@ private:
std::unique_ptr<ConnectionSendThread> m_sendThread;
std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
- std::mutex m_info_mutex;
+ mutable std::mutex m_info_mutex;
// Backwards compatibility
PeerHandler *m_bc_peerhandler;
diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp
index a306ced9b..dca065ae1 100644
--- a/src/network/connectionthreads.cpp
+++ b/src/network/connectionthreads.cpp
@@ -50,11 +50,11 @@ std::mutex log_conthread_mutex;
#define WINDOW_SIZE 5
-static session_t readPeerId(u8 *packetdata)
+static session_t readPeerId(const u8 *packetdata)
{
return readU16(&packetdata[4]);
}
-static u8 readChannel(u8 *packetdata)
+static u8 readChannel(const u8 *packetdata)
{
return readU8(&packetdata[6]);
}
@@ -114,9 +114,9 @@ void *ConnectionSendThread::run()
}
/* translate commands to packets */
- ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
- while (c.type != CONNCMD_NONE) {
- if (c.reliable)
+ auto c = m_connection->m_command_queue.pop_frontNoEx(0);
+ while (c && c->type != CONNCMD_NONE) {
+ if (c->reliable)
processReliableCommand(c);
else
processNonReliableCommand(c);
@@ -227,21 +227,21 @@ void ConnectionSendThread::runTimeouts(float dtime)
m_iteration_packets_avaialble -= timed_outs.size();
for (const auto &k : timed_outs) {
- u8 channelnum = readChannel(*k.data);
- u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1]));
+ u8 channelnum = readChannel(k->data);
+ u16 seqnum = k->getSeqnum();
- channel.UpdateBytesLost(k.data.getSize());
+ channel.UpdateBytesLost(k->size());
LOG(derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to "
- << k.address.serializeString()
+ << k->address.serializeString()
<< "(t/o=" << resend_timeout << "): "
- << "count=" << k.resend_count
+ << "count=" << k->resend_count
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl);
- rawSend(k);
+ rawSend(k.get());
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
@@ -274,25 +274,24 @@ void ConnectionSendThread::runTimeouts(float dtime)
}
}
-void ConnectionSendThread::rawSend(const BufferedPacket &packet)
+void ConnectionSendThread::rawSend(const BufferedPacket *p)
{
try {
- m_connection->m_udpSocket.Send(packet.address, *packet.data,
- packet.data.getSize());
+ m_connection->m_udpSocket.Send(p->address, p->data, p->size());
LOG(dout_con << m_connection->getDesc()
- << " rawSend: " << packet.data.getSize()
+ << " rawSend: " << p->size()
<< " bytes sent" << std::endl);
} catch (SendFailedException &e) {
LOG(derr_con << m_connection->getDesc()
<< "Connection::rawSend(): SendFailedException: "
- << packet.address.serializeString() << std::endl);
+ << p->address.serializeString() << std::endl);
}
}
-void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
+void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
{
try {
- p.absolute_send_time = porting::getTimeMs();
+ p->absolute_send_time = porting::getTimeMs();
// Buffer the packet
channel->outgoing_reliables_sent.insert(p,
(channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
@@ -305,7 +304,7 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan
}
// Send the packet
- rawSend(p);
+ rawSend(p.get());
}
bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
@@ -321,11 +320,10 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
if (reliable) {
- bool have_sequence_number_for_raw_packet = true;
- u16 seqnum =
- channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
+ bool have_seqnum = false;
+ const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
- if (!have_sequence_number_for_raw_packet)
+ if (!have_seqnum)
return false;
SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
@@ -333,13 +331,12 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, reliable,
+ BufferedPacketPtr p = con::makePacket(peer_address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// first check if our send window is already maxed out
- if (channel->outgoing_reliables_sent.size()
- < channel->getWindowSize()) {
+ if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a reliable packet to peer_id " << peer_id
<< " channel: " << (u32)channelnum
@@ -352,19 +349,19 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
<< " INFO: queueing reliable packet for peer_id: " << peer_id
<< " channel: " << (u32)channelnum
<< " seqnum: " << seqnum << std::endl);
- channel->queued_reliables.push(std::move(p));
+ channel->queued_reliables.push(p);
return false;
}
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(peer_address, data,
+ BufferedPacketPtr p = con::makePacket(peer_address, data,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// Send the packet
- rawSend(p);
+ rawSend(p.get());
return true;
}
@@ -374,11 +371,11 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
return false;
}
-void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
{
- assert(c.reliable); // Pre-condition
+ assert(c->reliable); // Pre-condition
- switch (c.type) {
+ switch (c->type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_NONE" << std::endl);
@@ -399,7 +396,7 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
case CONCMD_CREATE_PEER:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
- if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+ if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
/* put to queue if we couldn't send it immediately */
sendReliable(c);
}
@@ -412,13 +409,14 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
default:
LOG(dout_con << m_connection->getDesc()
- << " Invalid reliable command type: " << c.type << std::endl);
+ << " Invalid reliable command type: " << c->type << std::endl);
}
}
-void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
{
+ const ConnectionCommand &c = *c_ptr;
assert(!c.reliable); // Pre-condition
switch (c.type) {
@@ -480,9 +478,7 @@ void ConnectionSendThread::serve(Address bind_address)
}
catch (SocketException &e) {
// Create event
- ConnectionEvent ce;
- ce.bindFailed();
- m_connection->putEvent(ce);
+ m_connection->putEvent(ConnectionEvent::bindFailed());
}
}
@@ -495,9 +491,7 @@ void ConnectionSendThread::connect(Address address)
UDPPeer *peer = m_connection->createServerPeer(address);
// Create event
- ConnectionEvent e;
- e.peerAdded(peer->id, peer->address);
- m_connection->putEvent(e);
+ m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
Address bind_addr;
@@ -586,9 +580,9 @@ void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
}
}
-void ConnectionSendThread::sendReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
{
- PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
+ PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
if (!peer)
return;
@@ -604,7 +598,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data
}
}
-void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
{
std::vector<session_t> peerids = m_connection->getPeerIDs();
@@ -663,8 +657,12 @@ void ConnectionSendThread::sendPackets(float dtime)
// first send queued reliable packets for all peers (if possible)
for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
Channel &channel = udpPeer->channels[i];
- u16 next_to_ack = 0;
+ // Reduces logging verbosity
+ if (channel.queued_reliables.empty())
+ continue;
+
+ u16 next_to_ack = 0;
channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
u16 next_to_receive = 0;
channel.incoming_reliables.getFirstSeqnum(next_to_receive);
@@ -694,13 +692,13 @@ void ConnectionSendThread::sendPackets(float dtime)
channel.outgoing_reliables_sent.size()
< channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) {
- BufferedPacket p = std::move(channel.queued_reliables.front());
+ BufferedPacketPtr p = channel.queued_reliables.front();
channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a queued reliable packet "
<< " channel: " << i
- << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
+ << ", seqnum: " << p->getSeqnum()
<< std::endl);
sendAsPacketReliable(p, &channel);
@@ -881,17 +879,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
try {
// First, see if there any buffered packets we can process now
if (packet_queued) {
- bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
- while (data_left) {
+ while (true) {
try {
- data_left = getFromBuffers(peer_id, resultdata);
- if (data_left) {
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(std::move(e));
- }
+ if (!getFromBuffers(peer_id, resultdata))
+ break;
+
+ m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
@@ -908,7 +903,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
return;
if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+ (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
@@ -999,9 +994,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(std::move(e));
+ m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
}
@@ -1026,10 +1019,11 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8
if (!peer)
continue;
- if (dynamic_cast<UDPPeer *>(&peer) == 0)
+ UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
+ if (!p)
continue;
- for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+ for (Channel &channel : p->channels) {
if (checkIncomingBuffers(&channel, peer_id, dst)) {
return true;
}
@@ -1042,32 +1036,34 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
session_t &peer_id, SharedBuffer<u8> &dst)
{
u16 firstseqnum = 0;
- if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
- if (firstseqnum == channel->readNextIncomingSeqNum()) {
- BufferedPacket p = channel->incoming_reliables.popFirst();
- peer_id = readPeerId(*p.data);
- u8 channelnum = readChannel(*p.data);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
+ return false;
- LOG(dout_con << m_connection->getDesc()
- << "UNBUFFERING TYPE_RELIABLE"
- << " seqnum=" << seqnum
- << " peer_id=" << peer_id
- << " channel=" << ((int) channelnum & 0xff)
- << std::endl);
+ if (firstseqnum != channel->readNextIncomingSeqNum())
+ return false;
- channel->incNextIncomingSeqNum();
+ BufferedPacketPtr p = channel->incoming_reliables.popFirst();
- u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
- // Get out the inside packet and re-process it
- SharedBuffer<u8> payload(p.data.getSize() - headers_size);
- memcpy(*payload, &p.data[headers_size], payload.getSize());
+ peer_id = readPeerId(p->data); // Carried over to caller function
+ u8 channelnum = readChannel(p->data);
+ u16 seqnum = p->getSeqnum();
- dst = processPacket(channel, payload, peer_id, channelnum, true);
- return true;
- }
- }
- return false;
+ LOG(dout_con << m_connection->getDesc()
+ << "UNBUFFERING TYPE_RELIABLE"
+ << " seqnum=" << seqnum
+ << " peer_id=" << peer_id
+ << " channel=" << ((int) channelnum & 0xff)
+ << std::endl);
+
+ channel->incNextIncomingSeqNum();
+
+ u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(p->size() - headers_size);
+ memcpy(*payload, &p->data[headers_size], payload.getSize());
+
+ dst = processPacket(channel, payload, peer_id, channelnum, true);
+ return true;
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
@@ -1115,7 +1111,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
- u8 controltype = readU8(&(packetdata[1]));
+ ControlType controltype = (ControlType)readU8(&(packetdata[1]));
if (controltype == CONTROLTYPE_ACK) {
assert(channel != NULL);
@@ -1131,7 +1127,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
<< seqnum << " ]" << std::endl);
try {
- BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
+ BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
// the rtt calculation will be a bit off for re-sent packets but that's okay
{
@@ -1140,14 +1136,14 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
- if (current_time > p.absolute_send_time) {
- float rtt = (current_time - p.absolute_send_time) / 1000.0;
+ if (current_time > p->absolute_send_time) {
+ float rtt = (current_time - p->absolute_send_time) / 1000.0;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
- } else if (p.totaltime > 0) {
- float rtt = p.totaltime;
+ } else if (p->totaltime > 0) {
+ float rtt = p->totaltime;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
@@ -1156,7 +1152,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
}
// put bytes for max bandwidth calculation
- channel->UpdateBytesSent(p.data.getSize(), 1);
+ channel->UpdateBytesSent(p->size(), 1);
if (channel->outgoing_reliables_sent.size() == 0)
m_connection->TriggerSend();
} catch (NotFoundException &e) {
@@ -1204,7 +1200,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
throw ProcessedSilentlyException("Got a DISCO");
} else {
LOG(derr_con << m_connection->getDesc()
- << "INVALID TYPE_CONTROL: invalid controltype="
+ << "INVALID controltype="
<< ((int) controltype & 0xff) << std::endl);
throw InvalidIncomingDataException("Invalid control type");
}
@@ -1232,7 +1228,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe
if (peer->getAddress(MTP_UDP, peer_address)) {
// We have to create a packet again for buffering
// This isn't actually too bad an idea.
- BufferedPacket packet = makePacket(peer_address,
+ BufferedPacketPtr packet = con::makePacket(peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
@@ -1267,7 +1263,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
- u16 seqnum = readU16(&packetdata[1]);
+ const u16 seqnum = readU16(&packetdata[1]);
bool is_future_packet = false;
bool is_old_packet = false;
@@ -1311,7 +1307,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
// This one comes later, buffer it.
// Actually we have to make a packet to buffer one.
// Well, we have all the ingredients, so just do it.
- BufferedPacket packet = con::makePacket(
+ BufferedPacketPtr packet = con::makePacket(
peer_address,
packetdata,
m_connection->GetProtocolID(),
@@ -1328,9 +1324,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
throw ProcessedQueued("Buffered future reliable packet");
} catch (AlreadyExistsException &e) {
} catch (IncomingDataCorruption &e) {
- ConnectionCommand discon;
- discon.disconnect_peer(peer->id);
- m_connection->putCommand(discon);
+ m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
LOG(derr_con << m_connection->getDesc()
<< "INVALID, TYPE_RELIABLE peer_id: " << peer->id
@@ -1351,7 +1345,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
u16 queued_seqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
if (queued_seqnum == seqnum) {
- BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
+ BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
/** TODO find a way to verify the new against the old packet */
}
}
diff --git a/src/network/connectionthreads.h b/src/network/connectionthreads.h
index 612407c3b..c2e2dae12 100644
--- a/src/network/connectionthreads.h
+++ b/src/network/connectionthreads.h
@@ -29,6 +29,25 @@ namespace con
class Connection;
+struct OutgoingPacket
+{
+ session_t peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+ bool ack;
+
+ OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
+ bool reliable_,bool ack_=false):
+ peer_id(peer_id_),
+ channelnum(channelnum_),
+ data(data_),
+ reliable(reliable_),
+ ack(ack_)
+ {
+ }
+};
+
class ConnectionSendThread : public Thread
{
@@ -51,27 +70,27 @@ public:
private:
void runTimeouts(float dtime);
- void rawSend(const BufferedPacket &packet);
+ void rawSend(const BufferedPacket *p);
bool rawSendAsPacket(session_t peer_id, u8 channelnum,
const SharedBuffer<u8> &data, bool reliable);
- void processReliableCommand(ConnectionCommand &c);
- void processNonReliableCommand(ConnectionCommand &c);
+ void processReliableCommand(ConnectionCommandPtr &c);
+ void processNonReliableCommand(ConnectionCommandPtr &c);
void serve(Address bind_address);
void connect(Address address);
void disconnect();
void disconnect_peer(session_t peer_id);
void send(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data);
- void sendReliable(ConnectionCommand &c);
+ void sendReliable(ConnectionCommandPtr &c);
void sendToAll(u8 channelnum, const SharedBuffer<u8> &data);
- void sendToAllReliable(ConnectionCommand &c);
+ void sendToAllReliable(ConnectionCommandPtr &c);
void sendPackets(float dtime);
void sendAsPacket(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data,
bool ack = false);
- void sendAsPacketReliable(BufferedPacket &p, Channel *channel);
+ void sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel);
bool packetsQueued();
diff --git a/src/network/networkpacket.h b/src/network/networkpacket.h
index b1c44f055..b9c39f332 100644
--- a/src/network/networkpacket.h
+++ b/src/network/networkpacket.h
@@ -41,7 +41,7 @@ public:
u32 getSize() const { return m_datasize; }
session_t getPeerId() const { return m_peer_id; }
u16 getCommand() { return m_command; }
- const u32 getRemainingBytes() const { return m_datasize - m_read_offset; }
+ u32 getRemainingBytes() const { return m_datasize - m_read_offset; }
const char *getRemainingString() { return getString(m_read_offset); }
// Returns a c-string without copying.
diff --git a/src/network/networkprotocol.h b/src/network/networkprotocol.h
index 838bf0b2c..a5ff53216 100644
--- a/src/network/networkprotocol.h
+++ b/src/network/networkprotocol.h
@@ -205,9 +205,11 @@ with this program; if not, write to the Free Software Foundation, Inc.,
Updated set_sky packet
Adds new sun, moon and stars packets
Minimap modes
+ PROTOCOL VERSION 40:
+ TOCLIENT_MEDIA_PUSH changed, TOSERVER_HAVE_MEDIA added
*/
-#define LATEST_PROTOCOL_VERSION 39
+#define LATEST_PROTOCOL_VERSION 40
#define LATEST_PROTOCOL_VERSION_STRING TOSTRING(LATEST_PROTOCOL_VERSION)
// Server's supported network protocol range
@@ -227,7 +229,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
// base64-encoded SHA-1 (27+\0).
// See also: Formspec Version History in doc/lua_api.txt
-#define FORMSPEC_API_VERSION 4
+#define FORMSPEC_API_VERSION 5
#define TEXTURENAME_ALLOWED_CHARS "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_.-"
@@ -315,9 +317,8 @@ enum ToClientCommand
/*
std::string raw_hash
std::string filename
+ u32 callback_token
bool should_be_cached
- u32 len
- char filedata[len]
*/
// (oops, there is some gap here)
@@ -936,7 +937,13 @@ enum ToServerCommand
}
*/
- TOSERVER_RECEIVED_MEDIA = 0x41, // Obsolete
+ TOSERVER_HAVE_MEDIA = 0x41,
+ /*
+ u8 number of callback tokens
+ for each:
+ u32 token
+ */
+
TOSERVER_BREATH = 0x42, // Obsolete
TOSERVER_CLIENT_READY = 0x43,
diff --git a/src/network/serveropcodes.cpp b/src/network/serveropcodes.cpp
index aea5d7174..44b65e8da 100644
--- a/src/network/serveropcodes.cpp
+++ b/src/network/serveropcodes.cpp
@@ -89,7 +89,7 @@ const ToServerCommandHandler toServerCommandTable[TOSERVER_NUM_MSG_TYPES] =
null_command_handler, // 0x3e
null_command_handler, // 0x3f
{ "TOSERVER_REQUEST_MEDIA", TOSERVER_STATE_STARTUP, &Server::handleCommand_RequestMedia }, // 0x40
- null_command_handler, // 0x41
+ { "TOSERVER_HAVE_MEDIA", TOSERVER_STATE_INGAME, &Server::handleCommand_HaveMedia }, // 0x41
null_command_handler, // 0x42
{ "TOSERVER_CLIENT_READY", TOSERVER_STATE_STARTUP, &Server::handleCommand_ClientReady }, // 0x43
null_command_handler, // 0x44
@@ -167,7 +167,7 @@ const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
{ "TOCLIENT_TIME_OF_DAY", 0, true }, // 0x29
{ "TOCLIENT_CSM_RESTRICTION_FLAGS", 0, true }, // 0x2A
{ "TOCLIENT_PLAYER_SPEED", 0, true }, // 0x2B
- { "TOCLIENT_MEDIA_PUSH", 0, true }, // 0x2C (sent over channel 1 too)
+ { "TOCLIENT_MEDIA_PUSH", 0, true }, // 0x2C (sent over channel 1 too if legacy)
null_command_factory, // 0x2D
null_command_factory, // 0x2E
{ "TOCLIENT_CHAT_MESSAGE", 0, true }, // 0x2F
diff --git a/src/network/serverpackethandler.cpp b/src/network/serverpackethandler.cpp
index fd5aed9d1..12dc24460 100644
--- a/src/network/serverpackethandler.cpp
+++ b/src/network/serverpackethandler.cpp
@@ -86,7 +86,7 @@ void Server::handleCommand_Init(NetworkPacket* pkt)
// Do not allow multiple players in simple singleplayer mode.
// This isn't a perfect way to do it, but will suffice for now
- if (m_simple_singleplayer_mode && m_clients.getClientIDs().size() > 1) {
+ if (m_simple_singleplayer_mode && !m_clients.getClientIDs().empty()) {
infostream << "Server: Not allowing another client (" << addr_s <<
") to connect in simple singleplayer mode" << std::endl;
DenyAccess(peer_id, SERVER_ACCESSDENIED_SINGLEPLAYER);
@@ -174,6 +174,16 @@ void Server::handleCommand_Init(NetworkPacket* pkt)
return;
}
+ RemotePlayer *player = m_env->getPlayer(playername);
+
+ // If player is already connected, cancel
+ if (player && player->getPeerId() != PEER_ID_INEXISTENT) {
+ actionstream << "Server: Player with name \"" << playername <<
+ "\" tried to connect, but player with same name is already connected" << std::endl;
+ DenyAccess(peer_id, SERVER_ACCESSDENIED_ALREADY_CONNECTED);
+ return;
+ }
+
m_clients.setPlayerName(peer_id, playername);
//TODO (later) case insensitivity
@@ -352,16 +362,15 @@ void Server::handleCommand_RequestMedia(NetworkPacket* pkt)
session_t peer_id = pkt->getPeerId();
infostream << "Sending " << numfiles << " files to " <<
getPlayerName(peer_id) << std::endl;
- verbosestream << "TOSERVER_REQUEST_MEDIA: " << std::endl;
+ verbosestream << "TOSERVER_REQUEST_MEDIA: requested file(s)" << std::endl;
for (u16 i = 0; i < numfiles; i++) {
std::string name;
*pkt >> name;
- tosend.push_back(name);
- verbosestream << "TOSERVER_REQUEST_MEDIA: requested file "
- << name << std::endl;
+ tosend.emplace_back(name);
+ verbosestream << " " << name << std::endl;
}
sendRequestedMedia(peer_id, tosend);
@@ -473,7 +482,6 @@ void Server::process_PlayerPos(RemotePlayer *player, PlayerSAO *playersao,
f32 yaw = (f32)f32yaw / 100.0f;
u32 keyPressed = 0;
- // default behavior (in case an old client doesn't send these)
f32 fov = 0;
u8 wanted_range = 0;
@@ -499,17 +507,7 @@ void Server::process_PlayerPos(RemotePlayer *player, PlayerSAO *playersao,
playersao->setFov(fov);
playersao->setWantedRange(wanted_range);
- player->keyPressed = keyPressed;
- player->control.up = (keyPressed & (0x1 << 0));
- player->control.down = (keyPressed & (0x1 << 1));
- player->control.left = (keyPressed & (0x1 << 2));
- player->control.right = (keyPressed & (0x1 << 3));
- player->control.jump = (keyPressed & (0x1 << 4));
- player->control.aux1 = (keyPressed & (0x1 << 5));
- player->control.sneak = (keyPressed & (0x1 << 6));
- player->control.dig = (keyPressed & (0x1 << 7));
- player->control.place = (keyPressed & (0x1 << 8));
- player->control.zoom = (keyPressed & (0x1 << 9));
+ player->control.unpackKeysPressed(keyPressed);
if (playersao->checkMovementCheat()) {
// Call callbacks
@@ -821,8 +819,7 @@ void Server::handleCommand_Damage(NetworkPacket* pkt)
<< std::endl;
PlayerHPChangeReason reason(PlayerHPChangeReason::FALL);
- playersao->setHP((s32)playersao->getHP() - (s32)damage, reason);
- SendPlayerHPOrDie(playersao, reason);
+ playersao->setHP((s32)playersao->getHP() - (s32)damage, reason, true);
}
}
@@ -916,6 +913,13 @@ bool Server::checkInteractDistance(RemotePlayer *player, const f32 d, const std:
return true;
}
+// Tiny helper to retrieve the selected item into an Optional
+static inline void getWieldedItem(const PlayerSAO *playersao, Optional<ItemStack> &ret)
+{
+ ret = ItemStack();
+ playersao->getWieldedItem(&(*ret));
+}
+
void Server::handleCommand_Interact(NetworkPacket *pkt)
{
/*
@@ -1107,11 +1111,8 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
float time_from_last_punch =
playersao->resetTimeFromLastPunch();
- u16 src_original_hp = pointed_object->getHP();
- u16 dst_origin_hp = playersao->getHP();
-
- u16 wear = pointed_object->punch(dir, &toolcap, playersao,
- time_from_last_punch);
+ u32 wear = pointed_object->punch(dir, &toolcap, playersao,
+ time_from_last_punch, tool_item.wear);
// Callback may have changed item, so get it again
playersao->getWieldedItem(&selected_item);
@@ -1119,18 +1120,6 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
if (changed)
playersao->setWieldedItem(selected_item);
- // If the object is a player and its HP changed
- if (src_original_hp != pointed_object->getHP() &&
- pointed_object->getType() == ACTIVEOBJECT_TYPE_PLAYER) {
- SendPlayerHPOrDie((PlayerSAO *)pointed_object,
- PlayerHPChangeReason(PlayerHPChangeReason::PLAYER_PUNCH, playersao));
- }
-
- // If the puncher is a player and its HP changed
- if (dst_origin_hp != playersao->getHP())
- SendPlayerHPOrDie(playersao,
- PlayerHPChangeReason(PlayerHPChangeReason::PLAYER_PUNCH, pointed_object));
-
return;
} // action == INTERACT_START_DIGGING
@@ -1176,7 +1165,8 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
// Get diggability and expected digging time
DigParams params = getDigParams(m_nodedef->get(n).groups,
- &selected_item.getToolCapabilities(m_itemdef));
+ &selected_item.getToolCapabilities(m_itemdef),
+ selected_item.wear);
// If can't dig, try hand
if (!params.diggable) {
params = getDigParams(m_nodedef->get(n).groups,
@@ -1238,14 +1228,17 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
// Place block or right-click object
case INTERACT_PLACE: {
- ItemStack selected_item;
- playersao->getWieldedItem(&selected_item, nullptr);
+ Optional<ItemStack> selected_item;
+ getWieldedItem(playersao, selected_item);
// Reset build time counter
if (pointed.type == POINTEDTHING_NODE &&
- selected_item.getDefinition(m_itemdef).type == ITEM_NODE)
+ selected_item->getDefinition(m_itemdef).type == ITEM_NODE)
getClient(peer_id)->m_time_from_building = 0.0;
+ const bool had_prediction = !selected_item->getDefinition(m_itemdef).
+ node_placement_prediction.empty();
+
if (pointed.type == POINTEDTHING_OBJECT) {
// Right click object
@@ -1258,11 +1251,9 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
<< pointed_object->getDescription() << std::endl;
// Do stuff
- if (m_script->item_OnSecondaryUse(
- selected_item, playersao, pointed)) {
- if (playersao->setWieldedItem(selected_item)) {
+ if (m_script->item_OnSecondaryUse(selected_item, playersao, pointed)) {
+ if (selected_item.has_value() && playersao->setWieldedItem(*selected_item))
SendInventory(playersao, true);
- }
}
pointed_object->rightClick(playersao);
@@ -1270,7 +1261,7 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
// Placement was handled in lua
// Apply returned ItemStack
- if (playersao->setWieldedItem(selected_item))
+ if (selected_item.has_value() && playersao->setWieldedItem(*selected_item))
SendInventory(playersao, true);
}
@@ -1282,8 +1273,7 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
RemoteClient *client = getClient(peer_id);
v3s16 blockpos = getNodeBlockPos(pointed.node_abovesurface);
v3s16 blockpos2 = getNodeBlockPos(pointed.node_undersurface);
- if (!selected_item.getDefinition(m_itemdef
- ).node_placement_prediction.empty()) {
+ if (had_prediction) {
client->SetBlockNotSent(blockpos);
if (blockpos2 != blockpos)
client->SetBlockNotSent(blockpos2);
@@ -1297,15 +1287,15 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
} // action == INTERACT_PLACE
case INTERACT_USE: {
- ItemStack selected_item;
- playersao->getWieldedItem(&selected_item, nullptr);
+ Optional<ItemStack> selected_item;
+ getWieldedItem(playersao, selected_item);
- actionstream << player->getName() << " uses " << selected_item.name
+ actionstream << player->getName() << " uses " << selected_item->name
<< ", pointing at " << pointed.dump() << std::endl;
if (m_script->item_OnUse(selected_item, playersao, pointed)) {
// Apply returned ItemStack
- if (playersao->setWieldedItem(selected_item))
+ if (selected_item.has_value() && playersao->setWieldedItem(*selected_item))
SendInventory(playersao, true);
}
@@ -1314,16 +1304,17 @@ void Server::handleCommand_Interact(NetworkPacket *pkt)
// Rightclick air
case INTERACT_ACTIVATE: {
- ItemStack selected_item;
- playersao->getWieldedItem(&selected_item, nullptr);
+ Optional<ItemStack> selected_item;
+ getWieldedItem(playersao, selected_item);
actionstream << player->getName() << " activates "
- << selected_item.name << std::endl;
+ << selected_item->name << std::endl;
pointed.type = POINTEDTHING_NOTHING; // can only ever be NOTHING
if (m_script->item_OnSecondaryUse(selected_item, playersao, pointed)) {
- if (playersao->setWieldedItem(selected_item))
+ // Apply returned ItemStack
+ if (selected_item.has_value() && playersao->setWieldedItem(*selected_item))
SendInventory(playersao, true);
}
@@ -1811,3 +1802,30 @@ void Server::handleCommand_ModChannelMsg(NetworkPacket *pkt)
broadcastModChannelMessage(channel_name, channel_msg, peer_id);
}
+
+void Server::handleCommand_HaveMedia(NetworkPacket *pkt)
+{
+ std::vector<u32> tokens;
+ u8 numtokens;
+
+ *pkt >> numtokens;
+ for (u16 i = 0; i < numtokens; i++) {
+ u32 n;
+ *pkt >> n;
+ tokens.emplace_back(n);
+ }
+
+ const session_t peer_id = pkt->getPeerId();
+ auto player = m_env->getPlayer(peer_id);
+
+ for (const u32 token : tokens) {
+ auto it = m_pending_dyn_media.find(token);
+ if (it == m_pending_dyn_media.end())
+ continue;
+ if (it->second.waiting_players.count(peer_id)) {
+ it->second.waiting_players.erase(peer_id);
+ if (player)
+ getScriptIface()->on_dynamic_media_added(token, player->getName());
+ }
+ }
+}
diff --git a/src/network/socket.cpp b/src/network/socket.cpp
index 94a9f4180..0bb7ea234 100644
--- a/src/network/socket.cpp
+++ b/src/network/socket.cpp
@@ -23,14 +23,11 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <iostream>
#include <cstdlib>
#include <cstring>
-#include <cerrno>
-#include <sstream>
#include <iomanip>
#include "util/string.h"
#include "util/numeric.h"
#include "constants.h"
#include "debug.h"
-#include "settings.h"
#include "log.h"
#ifdef _WIN32
@@ -42,9 +39,10 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <winsock2.h>
#include <ws2tcpip.h>
#define LAST_SOCKET_ERR() WSAGetLastError()
-typedef SOCKET socket_t;
+#define SOCKET_ERR_STR(e) itos(e)
typedef int socklen_t;
#else
+#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
@@ -53,7 +51,7 @@ typedef int socklen_t;
#include <unistd.h>
#include <arpa/inet.h>
#define LAST_SOCKET_ERR() (errno)
-typedef int socket_t;
+#define SOCKET_ERR_STR(e) strerror(e)
#endif
// Set to true to enable verbose debug output
@@ -113,7 +111,7 @@ bool UDPSocket::init(bool ipv6, bool noExceptions)
}
throw SocketException(std::string("Failed to create socket: error ") +
- itos(LAST_SOCKET_ERR()));
+ SOCKET_ERR_STR(LAST_SOCKET_ERR()));
}
setTimeoutMs(0);
@@ -153,40 +151,40 @@ void UDPSocket::Bind(Address addr)
}
if (addr.getFamily() != m_addr_family) {
- static const char *errmsg =
+ const char *errmsg =
"Socket and bind address families do not match";
errorstream << "Bind failed: " << errmsg << std::endl;
throw SocketException(errmsg);
}
+ int ret = 0;
+
if (m_addr_family == AF_INET6) {
struct sockaddr_in6 address;
memset(&address, 0, sizeof(address));
- address = addr.getAddress6();
address.sin6_family = AF_INET6;
+ address.sin6_addr = addr.getAddress6();
address.sin6_port = htons(addr.getPort());
- if (bind(m_handle, (const struct sockaddr *)&address,
- sizeof(struct sockaddr_in6)) < 0) {
- dstream << (int)m_handle << ": Bind failed: " << strerror(errno)
- << std::endl;
- throw SocketException("Failed to bind socket");
- }
+ ret = bind(m_handle, (const struct sockaddr *) &address,
+ sizeof(struct sockaddr_in6));
} else {
struct sockaddr_in address;
memset(&address, 0, sizeof(address));
- address = addr.getAddress();
address.sin_family = AF_INET;
+ address.sin_addr = addr.getAddress();
address.sin_port = htons(addr.getPort());
- if (bind(m_handle, (const struct sockaddr *)&address,
- sizeof(struct sockaddr_in)) < 0) {
- dstream << (int)m_handle << ": Bind failed: " << strerror(errno)
- << std::endl;
- throw SocketException("Failed to bind socket");
- }
+ ret = bind(m_handle, (const struct sockaddr *) &address,
+ sizeof(struct sockaddr_in));
+ }
+
+ if (ret < 0) {
+ dstream << (int)m_handle << ": Bind failed: "
+ << SOCKET_ERR_STR(LAST_SOCKET_ERR()) << std::endl;
+ throw SocketException("Failed to bind socket");
}
}
@@ -233,13 +231,19 @@ void UDPSocket::Send(const Address &destination, const void *data, int size)
int sent;
if (m_addr_family == AF_INET6) {
- struct sockaddr_in6 address = destination.getAddress6();
+ struct sockaddr_in6 address = {0};
+ address.sin6_family = AF_INET6;
+ address.sin6_addr = destination.getAddress6();
address.sin6_port = htons(destination.getPort());
+
sent = sendto(m_handle, (const char *)data, size, 0,
(struct sockaddr *)&address, sizeof(struct sockaddr_in6));
} else {
- struct sockaddr_in address = destination.getAddress();
+ struct sockaddr_in address = {0};
+ address.sin_family = AF_INET;
+ address.sin_addr = destination.getAddress();
address.sin_port = htons(destination.getPort());
+
sent = sendto(m_handle, (const char *)data, size, 0,
(struct sockaddr *)&address, sizeof(struct sockaddr_in));
}
@@ -267,9 +271,9 @@ int UDPSocket::Receive(Address &sender, void *data, int size)
return -1;
u16 address_port = ntohs(address.sin6_port);
- IPv6AddressBytes bytes;
- memcpy(bytes.bytes, address.sin6_addr.s6_addr, 16);
- sender = Address(&bytes, address_port);
+ const auto *bytes = reinterpret_cast<IPv6AddressBytes*>
+ (address.sin6_addr.s6_addr);
+ sender = Address(bytes, address_port);
} else {
struct sockaddr_in address;
memset(&address, 0, sizeof(address));
@@ -341,7 +345,12 @@ bool UDPSocket::WaitData(int timeout_ms)
if (result == 0)
return false;
- if (result < 0 && (errno == EINTR || errno == EBADF)) {
+ int e = LAST_SOCKET_ERR();
+#ifdef _WIN32
+ if (result < 0 && (e == WSAEINTR || e == WSAEBADF)) {
+#else
+ if (result < 0 && (e == EINTR || e == EBADF)) {
+#endif
// N.B. select() fails when sockets are destroyed on Connection's dtor
// with EBADF. Instead of doing tricky synchronization, allow this
// thread to exit but don't throw an exception.
@@ -349,18 +358,9 @@ bool UDPSocket::WaitData(int timeout_ms)
}
if (result < 0) {
- dstream << m_handle << ": Select failed: " << strerror(errno)
+ dstream << (int)m_handle << ": Select failed: " << SOCKET_ERR_STR(e)
<< std::endl;
-#ifdef _WIN32
- int e = WSAGetLastError();
- dstream << (int)m_handle << ": WSAGetLastError()=" << e << std::endl;
- if (e == 10004 /* WSAEINTR */ || e == 10009 /* WSAEBADF */) {
- infostream << "Ignoring WSAEINTR/WSAEBADF." << std::endl;
- return false;
- }
-#endif
-
throw SocketException("Select failed");
} else if (!FD_ISSET(m_handle, &readset)) {
// No data
diff --git a/src/network/socket.h b/src/network/socket.h
index e0e76f4c2..d34186b44 100644
--- a/src/network/socket.h
+++ b/src/network/socket.h
@@ -19,18 +19,6 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#pragma once
-#ifdef _WIN32
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#include <windows.h>
-#include <winsock2.h>
-#include <ws2tcpip.h>
-#else
-#include <sys/socket.h>
-#include <netinet/in.h>
-#endif
-
#include <ostream>
#include <cstring>
#include "address.h"
@@ -53,8 +41,6 @@ public:
bool init(bool ipv6, bool noExceptions = false);
- // void Close();
- // bool IsOpen();
void Send(const Address &destination, const void *data, int size);
// Returns -1 if there is no data
int Receive(Address &sender, void *data, int size);