/* Minetest Copyright (C) 2013 celeron55, Perttu Ahola This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "connection.h" #include "main.h" #include "serialization.h" #include "log.h" #include "porting.h" #include "util/serialize.h" #include "util/numeric.h" #include "util/string.h" #include "settings.h" namespace con { static u16 readPeerId(u8 *packetdata) { return readU16(&packetdata[4]); } static u8 readChannel(u8 *packetdata) { return readU8(&packetdata[6]); } BufferedPacket makePacket(Address &address, u8 *data, u32 datasize, u32 protocol_id, u16 sender_peer_id, u8 channel) { u32 packet_size = datasize + BASE_HEADER_SIZE; BufferedPacket p(packet_size); p.address = address; writeU32(&p.data[0], protocol_id); writeU16(&p.data[4], sender_peer_id); writeU8(&p.data[6], channel); memcpy(&p.data[BASE_HEADER_SIZE], data, datasize); return p; } BufferedPacket makePacket(Address &address, SharedBuffer &data, u32 protocol_id, u16 sender_peer_id, u8 channel) { return makePacket(address, *data, data.getSize(), protocol_id, sender_peer_id, channel); } SharedBuffer makeOriginalPacket( SharedBuffer data) { u32 header_size = 1; u32 packet_size = data.getSize() + header_size; SharedBuffer b(packet_size); writeU8(&b[0], TYPE_ORIGINAL); memcpy(&b[header_size], *data, data.getSize()); return b; } std::list > makeSplitPacket( SharedBuffer data, u32 chunksize_max, u16 seqnum) { // Chunk packets, containing the TYPE_SPLIT header std::list > chunks; u32 chunk_header_size = 7; u32 maximum_data_size = chunksize_max - chunk_header_size; u32 start = 0; u32 end = 0; u32 chunk_num = 0; u16 chunk_count = 0; do{ end = start + maximum_data_size - 1; if(end > data.getSize() - 1) end = data.getSize() - 1; u32 payload_size = end - start + 1; u32 packet_size = chunk_header_size + payload_size; SharedBuffer chunk(packet_size); writeU8(&chunk[0], TYPE_SPLIT); writeU16(&chunk[1], seqnum); // [3] u16 chunk_count is written at next stage writeU16(&chunk[5], chunk_num); memcpy(&chunk[chunk_header_size], &data[start], payload_size); chunks.push_back(chunk); chunk_count++; start = end + 1; chunk_num++; } while(end != data.getSize() - 1); for(std::list >::iterator i = chunks.begin(); i != chunks.end(); ++i) { // Write chunk_count writeU16(&((*i)[3]), chunk_count); } return chunks; } std::list > makeAutoSplitPacket( SharedBuffer data, u32 chunksize_max, u16 &split_seqnum) { u32 original_header_size = 1; std::list > list; if(data.getSize() + original_header_size > chunksize_max) { list = makeSplitPacket(data, chunksize_max, split_seqnum); split_seqnum++; return list; } else { list.push_back(makeOriginalPacket(data)); } return list; } SharedBuffer makeReliablePacket( SharedBuffer data, u16 seqnum) { /*dstream<<"BEGIN SharedBuffer makeReliablePacket()"< makeReliablePacket()"<::iterator i = m_list.begin(); i != m_list.end(); ++i) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); dout_con<::iterator i = m_list.begin(); for(; i != m_list.end(); ++i) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); /*dout_con<<"findPacket(): finding seqnum="<= BASE_HEADER_SIZE+3); u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); assert(type == TYPE_RELIABLE); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); ++m_list_size; // Find the right place for the packet and insert it there // If list is empty, just add it if(m_list.empty()) { m_list.push_back(p); // Done. return; } // Otherwise find the right place std::list::iterator i = m_list.begin(); // Find the first packet in the list which has a higher seqnum for(; i != m_list.end(); ++i){ u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); if(s == seqnum){ --m_list_size; throw AlreadyExistsException("Same seqnum in list"); } if(seqnum_higher(s, seqnum)){ break; } } // If we're at the end of the list, add the packet to the // end of the list if(i == m_list.end()) { m_list.push_back(p); // Done. return; } // Insert before i m_list.insert(i, p); } void ReliablePacketBuffer::incrementTimeouts(float dtime) { for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { i->time += dtime; i->totaltime += dtime; } } void ReliablePacketBuffer::resetTimedOuts(float timeout) { for(std::list::iterator i = m_list.begin(); i != m_l/* Minetest Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "test.h" #include <sstream> #include "gamedef.h" #include "inventory.h" class TestInventory : public TestBase { public: TestInventory() { TestManager::registerTestModule(this); } const char *getName() { return "TestInventory"; } void runTests(IGameDef *gamedef); void testSerializeDeserialize(IItemDefManager *idef); static const char *serialized_inventory_in; static const char *serialized_inventory_out; static const char *serialized_inventory_inc; }; static TestInventory g_test_instance; void TestInventory::runTests(IGameDef *gamedef) { TEST(testSerializeDeserialize, gamedef->getItemDefManager()); } //////////////////////////////////////////////////////////////////////////////// void TestInventory::testSerializeDeserialize(IItemDefManager *idef) { Inventory inv(idef); std::istringstream is(serialized_inventory_in, std::ios::binary); inv.deSerialize(is); UASSERT(inv.getList("0")); UASSERT(!inv.getList("main")); inv.getList("0")->setName("main"); UASSERT(!inv.getList("0")); UASSERT(inv.getList("main")); UASSERTEQ(u32, inv.getList("main")->getWidth(), 3); inv.getList("main")->setWidth(5); std::ostringstream inv_os(std::ios::binary); inv.serialize(inv_os, false); UASSERTEQ(std::string, inv_os.str(), serialized_inventory_out); inv.setModified(false); inv_os.str(""); inv_os.clear(); inv.serialize(inv_os, true); UASSERTEQ(std::string, inv_os.str(), serialized_inventory_inc); ItemStack leftover = inv.getList("main")->takeItem(7, 99 - 12); ItemStack wanted = ItemStack("default:dirt", 99 - 12, 0, idef); UASSERT(leftover == wanted); leftover = inv.getList("main")->getItem(7); wanted.count = 12; UASSERT(leftover == wanted); } const char *TestInventory::serialized_inventory_in = "List 0 10\n" "Width 3\n" "Empty\n" "Empty\n" "Item default:cobble 61\n" "Empty\n" "Empty\n" "Item default:dirt 71\n" "Empty\n" "Item default:dirt 99\n" "Item default:cobble 38\n" "Empty\n" "EndInventoryList\n" "List abc 1\n" "Item default:stick 3\n" "Width 0\n" "EndInventoryList\n" "EndInventory\n"; const char *TestInventory::serialized_inventory_out = "List main 10\n" "Width 5\n" "Empty\n" "Empty\n" "Item default:cobble 61\n" "Empty\n" "Empty\n" "Item default:dirt 71\n" "Empty\n" "Item default:dirt 99\n" "Item default:cobble 38\n" "Empty\n" "EndInventoryList\n" "List abc 1\n" "Width 0\n" "Item default:stick 3\n" "EndInventoryList\n" "EndInventory\n"; const char *TestInventory::serialized_inventory_inc = "KeepList main\n" "KeepList abc\n" "EndInventory\n"; ed_outs; Channel *channel = &peer->channels[i]; // Remove timed out incomplete unreliable split packets channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); // Increment reliable packet times channel->outgoing_reliables.incrementTimeouts(dtime); // Check reliable packet total times, remove peer if // over timeout. if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout)) { PrintInfo(derr_con); derr_con<<"RunTimeouts(): Peer "<id <<" has timed out." <<" (source=reliable packet totaltime)" <id); goto nextpeer; } // Re-send timed out outgoing reliables timed_outs = channel-> outgoing_reliables.getTimedOuts(resend_timeout); channel->outgoing_reliables.resetTimedOuts(resend_timeout); for(std::list::iterator j = timed_outs.begin(); j != timed_outs.end(); ++j) { u16 peer_id = readPeerId(*(j->data)); u8 channel = readChannel(*(j->data)); u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1])); PrintInfo(derr_con); derr_con<<"RE-SENDING timed-out RELIABLE to "; j->address.print(&derr_con); derr_con<<"(t/o="<::iterator node = m_peers.find(PEER_ID_SERVER); if(node != m_peers.end()){ throw ConnectionException("Already connected to a server"); } Peer *peer = new Peer(PEER_ID_SERVER, address); m_peers[peer->id] = peer; // Create event ConnectionEvent e; e.peerAdded(peer->id, peer->address); putEvent(e); m_socket.Bind(0); // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT m_peer_id = PEER_ID_INEXISTENT; SharedBuffer data(0); Send(PEER_ID_SERVER, 0, data, true); } void Connection::disconnect() { dout_con< data(2); writeU8(&data[0], TYPE_CONTROL); writeU8(&data[1], CONTROLTYPE_DISCO); // Send to all for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; rawSendAsPacket(peer->id, 0, data, false); } } void Connection::sendToAll(u8 channelnum, SharedBuffer data, bool reliable) { for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; send(peer->id, channelnum, data, reliable); } } void Connection::send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { dout_con<address, data, m_protocol_id, m_peer_id, channelnum); // Send the packet rawSend(p); } } void Connection::rawSend(const BufferedPacket &packet) { try{ m_socket.Send(packet.address, *packet.data, packet.data.getSize()); } catch(SendFailedException &e){ derr_con<<"Connection::rawSend(): SendFailedException: " <::iterator node = m_peers.find(peer_id); if(node == m_peers.end()){ throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)"); } // Error checking assert(node->second->id == peer_id); return node->second; } Peer* Connection::getPeerNoEx(u16 peer_id) { std::map::iterator node = m_peers.find(peer_id); if(node == m_peers.end()){ return NULL; } // Error checking assert(node->second->id == peer_id); return node->second; } std::list Connection::getPeers() { std::list list; for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; list.push_back(peer); } return list; } bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer &dst) { for(std::map::iterator j = m_peers.begin(); j != m_peers.end(); ++j) { Peer *peer = j->second; for(u16 i=0; ichannels[i]; SharedBuffer resultdata; bool got = checkIncomingBuffers(channel, peer_id, resultdata); if(got){ dst = resultdata; return true; } } } return false; } bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id, SharedBuffer &dst) { u16 firstseqnum = 0; // Clear old packets from start of buffer try{ for(;;){ firstseqnum = channel->incoming_reliables.getFirstSeqnum(); if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum)) channel->incoming_reliables.popFirst(); else break; } // This happens if all packets are old }catch(con::NotFoundException) {} if(channel->incoming_reliables.empty() == false) { if(firstseqnum == channel->next_incoming_seqnum) { BufferedPacket p = channel->incoming_reliables.popFirst(); peer_id = readPeerId(*p.data); u8 channelnum = readChannel(*p.data); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); PrintInfo(); dout_con<<"UNBUFFERING TYPE_RELIABLE" <<" seqnum="<outgoing_reliables.print(); dout_con< payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize()); return payload; } else if(type == TYPE_SPLIT) { // We have to create a packet again for buffering // This isn't actually too bad an idea. BufferedPacket packet = makePacket( getPeer(peer_id)->address, packetdata, GetProtocolID(), peer_id, channelnum); // Buffer the packet SharedBuffer data = channel->incoming_splits.insert(packet, reliable); if(data.getSize() != 0) { PrintInfo(); dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, " <<"size="<next_incoming_seqnum); bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum); PrintInfo(); if(is_future_packet) dout_con<<"BUFFERING"; else if(is_old_packet) dout_con<<"OLD"; else dout_con<<"RECUR"; dout_con<<" TYPE_RELIABLE seqnum="<incoming_reliables.size() < 100); // Send a CONTROLTYPE_ACK SharedBuffer reply(4); writeU8(&reply[0], TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_ACK); writeU16(&reply[2], seqnum); rawSendAsPacket(peer_id, channelnum, reply, false); //if(seqnum_higher(seqnum, channel->next_incoming_seqnum)) if(is_future_packet) { /*PrintInfo(); dout_con<<"Buffering reliable packet (seqnum=" <address, packetdata, GetProtocolID(), peer_id, channelnum); try{ channel->incoming_reliables.insert(packet); /*PrintInfo(); dout_con<<"INCOMING: "; channel->incoming_reliables.print(); dout_con<next_incoming_seqnum, seqnum)) else if(is_old_packet) { // An old packet, dump it throw InvalidIncomingDataException("Got an old reliable packet"); } channel->next_incoming_seqnum++; // Get out the inside packet and re-process it SharedBuffer payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); return processPacket(channel, payload, peer_id, channelnum, true); } else { PrintInfo(derr_con); derr_con<<"Got invalid type="<<((int)type&0xff)<address); putEvent(e); delete m_peers[peer_id]; m_peers.erase(peer_id); return true; } /* Interface */ ConnectionEvent Connection::getEvent() { if(m_event_queue.empty()){ ConnectionEvent e; e.type = CONNEVENT_NONE; return e; } return m_event_queue.pop_front(); } ConnectionEvent Connection::waitEvent(u32 timeout_ms) { try{ return m_event_queue.pop_front(timeout_ms); } catch(ItemNotFoundException &ex){ ConnectionEvent e; e.type = CONNEVENT_NONE; return e; } } void Connection::putCommand(ConnectionCommand &c) { m_command_queue.push_back(c); } void Connection::Serve(unsigned short port) { ConnectionCommand c; c.serve(port); putCommand(c); } void Connection::Connect(Address address) { ConnectionCommand c; c.connect(address); putCommand(c); } bool Connection::Connected() { JMutexAutoLock peerlock(m_peers_mutex); if(m_peers.size() != 1) return false; std::map::iterator node = m_peers.find(PEER_ID_SERVER); if(node == m_peers.end()) return false; if(m_peer_id == PEER_ID_INEXISTENT) return false; return true; } void Connection::Disconnect() { ConnectionCommand c; c.disconnect(); putCommand(c); } u32 Connection::Receive(u16 &peer_id, SharedBuffer &data) { for(;;){ ConnectionEvent e = waitEvent(m_bc_receive_timeout); if(e.type != CONNEVENT_NONE) dout_con<(e.data); return e.data.getSize(); case CONNEVENT_PEER_ADDED: { Peer tmp(e.peer_id, e.address); if(m_bc_peerhandler) m_bc_peerhandler->peerAdded(&tmp); continue; } case CONNEVENT_PEER_REMOVED: { Peer tmp(e.peer_id, e.address); if(m_bc_peerhandler) m_bc_peerhandler->deletingPeer(&tmp, e.timeout); continue; } case CONNEVENT_BIND_FAILED: throw ConnectionBindFailed("Failed to bind socket " "(port already in use?)"); } } throw NoIncomingDataException("No incoming data"); } void Connection::SendToAll(u8 channelnum, SharedBuffer data, bool reliable) { assert(channelnum < CHANNEL_COUNT); ConnectionCommand c; c.sendToAll(channelnum, data, reliable); putCommand(c); } void Connection::Send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable) { assert(channelnum < CHANNEL_COUNT); ConnectionCommand c; c.send(peer_id, channelnum, data, reliable); putCommand(c); } void Connection::RunTimeouts(float dtime) { // No-op } Address Connection::GetPeerAddress(u16 peer_id) { JMutexAutoLock peerlock(m_peers_mutex); return getPeer(peer_id)->address; } float Connection::GetPeerAvgRTT(u16 peer_id) { JMutexAutoLock peerlock(m_peers_mutex); return getPeer(peer_id)->avg_rtt; } void Connection::DeletePeer(u16 peer_id) { ConnectionCommand c; c.deletePeer(peer_id); putCommand(c); } void Connection::PrintInfo(std::ostream &out) { out<