From c03d7dc8a7c35708a39f9c14e2df243e212b283b Mon Sep 17 00:00:00 2001 From: sapier Date: Sat, 26 Apr 2014 01:15:46 +0200 Subject: Add download rate to media progress bar (non http mode only!) Minor coding style fixes --- src/connection.cpp | 328 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 211 insertions(+), 117 deletions(-) (limited to 'src/connection.cpp') diff --git a/src/connection.cpp b/src/connection.cpp index a51cc9df4..e6b763206 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -190,9 +190,6 @@ SharedBuffer makeReliablePacket( SharedBuffer data, u16 seqnum) { - /*dstream<<"BEGIN SharedBuffer makeReliablePacket()"< makeReliablePacket()"<data[BASE_HEADER_SIZE+1])),i->data.getSize(), i->address.serializeString().c_str()); + readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(), + i->address.serializeString().c_str()); fprintf(stderr, "New: seqnum: %05d size: %04d, address: %s\n", - readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(), p.address.serializeString().c_str()); + readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(), + p.address.serializeString().c_str()); throw IncomingDataCorruption("duplicated packet isn't same as original one"); } @@ -562,14 +563,19 @@ Channel::Channel() : current_packet_too_late(0), packet_loss_counter(0), current_bytes_transfered(0), + current_bytes_received(0), current_bytes_lost(0), max_kbps(0.0), cur_kbps(0.0), avg_kbps(0.0), + max_incoming_kbps(0.0), + cur_incoming_kbps(0.0), + avg_incoming_kbps(0.0), max_kbps_lost(0.0), cur_kbps_lost(0.0), avg_kbps_lost(0.0), - bpm_counter(0.0) + bpm_counter(0.0), + rate_samples(0) { } @@ -665,6 +671,11 @@ void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets) current_packet_successfull += packets; } +void Channel::UpdateBytesReceived(unsigned int bytes) { + JMutexAutoLock internal(m_internal_mutex); + current_bytes_received += bytes; +} + void Channel::UpdateBytesLost(unsigned int bytes) { JMutexAutoLock internal(m_internal_mutex); @@ -684,7 +695,7 @@ void Channel::UpdatePacketTooLateCounter() current_packet_too_late++; } -void Channel::UpdateTimers(float dtime) +void Channel::UpdateTimers(float dtime,bool legacy_peer) { bpm_counter += dtime; packet_loss_counter += dtime; @@ -714,53 +725,56 @@ void Channel::UpdateTimers(float dtime) current_packet_successfull = 0; } - float successfull_to_lost_ratio = 0.0; - bool done = false; + /* dynamic window size is only available for non legacy peers */ + if (!legacy_peer) { + float successfull_to_lost_ratio = 0.0; + bool done = false; - if (packets_successfull > 0) { - successfull_to_lost_ratio = packet_loss/packets_successfull; - } - else if (packet_loss > 0) - { - window_size = MYMAX( - (window_size - 10), - MIN_RELIABLE_WINDOW_SIZE); - done = true; - } - - if (!done) - { - if ((successfull_to_lost_ratio < 0.01) && - (window_size < MAX_RELIABLE_WINDOW_SIZE)) - { - /* don't even think about increasing if we didn't even - * use major parts of our window */ - if (reasonable_amount_of_data_transmitted) - window_size = MYMIN( - (window_size + 100), - MAX_RELIABLE_WINDOW_SIZE); - } - else if ((successfull_to_lost_ratio < 0.05) && - (window_size < MAX_RELIABLE_WINDOW_SIZE)) - { - /* don't even think about increasing if we didn't even - * use major parts of our window */ - if (reasonable_amount_of_data_transmitted) - window_size = MYMIN( - (window_size + 50), - MAX_RELIABLE_WINDOW_SIZE); + if (packets_successfull > 0) { + successfull_to_lost_ratio = packet_loss/packets_successfull; } - else if (successfull_to_lost_ratio > 0.15) + else if (packet_loss > 0) { window_size = MYMAX( - (window_size - 100), - MIN_RELIABLE_WINDOW_SIZE); + (window_size - 10), + MIN_RELIABLE_WINDOW_SIZE); + done = true; } - else if (successfull_to_lost_ratio > 0.1) + + if (!done) { - window_size = MYMAX( - (window_size - 50), - MIN_RELIABLE_WINDOW_SIZE); + if ((successfull_to_lost_ratio < 0.01) && + (window_size < MAX_RELIABLE_WINDOW_SIZE)) + { + /* don't even think about increasing if we didn't even + * use major parts of our window */ + if (reasonable_amount_of_data_transmitted) + window_size = MYMIN( + (window_size + 100), + MAX_RELIABLE_WINDOW_SIZE); + } + else if ((successfull_to_lost_ratio < 0.05) && + (window_size < MAX_RELIABLE_WINDOW_SIZE)) + { + /* don't even think about increasing if we didn't even + * use major parts of our window */ + if (reasonable_amount_of_data_transmitted) + window_size = MYMIN( + (window_size + 50), + MAX_RELIABLE_WINDOW_SIZE); + } + else if (successfull_to_lost_ratio > 0.15) + { + window_size = MYMAX( + (window_size - 100), + MIN_RELIABLE_WINDOW_SIZE); + } + else if (successfull_to_lost_ratio > 0.1) + { + window_size = MYMAX( + (window_size - 50), + MIN_RELIABLE_WINDOW_SIZE); + } } } } @@ -769,11 +783,16 @@ void Channel::UpdateTimers(float dtime) { { JMutexAutoLock internal(m_internal_mutex); - cur_kbps = (current_bytes_transfered/bpm_counter)/1024; + cur_kbps = + (((float) current_bytes_transfered)/bpm_counter)/1024.0; current_bytes_transfered = 0; - cur_kbps_lost = (current_bytes_lost/bpm_counter)/1024; - current_bytes_lost = 0; - bpm_counter = 0; + cur_kbps_lost = + (((float) current_bytes_lost)/bpm_counter)/1024.0; + current_bytes_lost = 0; + cur_incoming_kbps = + (((float) current_bytes_received)/bpm_counter)/1024.0; + current_bytes_received = 0; + bpm_counter = 0; } if (cur_kbps > max_kbps) @@ -786,8 +805,18 @@ void Channel::UpdateTimers(float dtime) max_kbps_lost = cur_kbps_lost; } - avg_kbps = avg_kbps * 0.9 + cur_kbps * 0.1; - avg_kbps_lost = avg_kbps_lost * 0.9 + cur_kbps_lost * 0.1; + if (cur_incoming_kbps > max_incoming_kbps) { + max_incoming_kbps = cur_incoming_kbps; + } + + rate_samples = MYMIN(rate_samples+1,10); + float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples); + avg_kbps = avg_kbps * old_fraction + + cur_kbps * (1.0 - old_fraction); + avg_kbps_lost = avg_kbps_lost * old_fraction + + cur_kbps_lost * (1.0 - old_fraction); + avg_incoming_kbps = avg_incoming_kbps * old_fraction + + cur_incoming_kbps * (1.0 - old_fraction); } } @@ -798,8 +827,7 @@ void Channel::UpdateTimers(float dtime) PeerHelper::PeerHelper() : m_peer(0) -{ -} +{} PeerHelper::PeerHelper(Peer* peer) : m_peer(peer) @@ -879,9 +907,8 @@ void Peer::DecUseCount() delete this; } -void Peer::RTTStatistics(float rtt, - std::string profiler_id, - unsigned int num_samples) { +void Peer::RTTStatistics(float rtt, std::string profiler_id, + unsigned int num_samples) { if (m_last_rtt > 0) { /* set min max values */ @@ -952,10 +979,12 @@ void Peer::Drop() } PROFILE(std::stringstream peerIdentifier1); - PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() << ";" << id << ";RELIABLE]"); + PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() + << ";" << id << ";RELIABLE]"); PROFILE(g_profiler->remove(peerIdentifier1.str())); PROFILE(std::stringstream peerIdentifier2); - PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() << ";" << id << ";RELIABLE]"); + PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() + << ";" << id << ";RELIABLE]"); PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG)); delete this; @@ -1136,10 +1165,14 @@ bool UDPPeer::processReliableSendCommand( assert(successfully_put_back_sequence_number); } LOG(dout_con<getDesc() - << " Windowsize exceeded on reliable sending " << c.data.getSize() << " bytes" - << std::endl << "\t\tinitial_sequence_number: " << initial_sequence_number - << std::endl << "\t\tgot at most : " << packets_available << " packets" - << std::endl << "\t\tpackets queued : " << channels[c.channelnum].outgoing_reliables_sent.size() + << " Windowsize exceeded on reliable sending " + << c.data.getSize() << " bytes" + << std::endl << "\t\tinitial_sequence_number: " + << initial_sequence_number + << std::endl << "\t\tgot at most : " + << packets_available << " packets" + << std::endl << "\t\tpackets queued : " + << channels[c.channelnum].outgoing_reliables_sent.size() << std::endl); return false; } @@ -1166,7 +1199,8 @@ void UDPPeer::RunCommandQueues( if (!processReliableSendCommand(c,max_packet_size)) { LOG(dout_con<getDesc() << " Failed to queue packets for peer_id: " << c.peer_id - << ", delaying sending of " << c.data.getSize() << " bytes" << std::endl); + << ", delaying sending of " << c.data.getSize() + << " bytes" << std::endl); channels[i].queued_commands.push_front(c); } } @@ -1327,7 +1361,8 @@ void ConnectionSendThread::runTimeouts(float dtime) continue; PROFILE(std::stringstream peerIdentifier); - PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc() << ";" << *j << ";RELIABLE]"); + PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc() + << ";" << *j << ";RELIABLE]"); PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG)); SharedBuffer data(2); // data for sending ping, required here because of goto @@ -1377,32 +1412,30 @@ void ConnectionSendThread::runTimeouts(float dtime) m_iteration_packets_avaialble -= timed_outs.size(); - for(std::list::iterator j = timed_outs.begin(); - j != timed_outs.end(); ++j) + for(std::list::iterator k = timed_outs.begin(); + k != timed_outs.end(); ++k) { - u16 peer_id = readPeerId(*(j->data)); - u8 channelnum = readChannel(*(j->data)); - u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1])); + u16 peer_id = readPeerId(*(k->data)); + u8 channelnum = readChannel(*(k->data)); + u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE+1])); - channel->UpdateBytesLost(j->data.getSize()); + channel->UpdateBytesLost(k->data.getSize()); LOG(derr_con<getDesc() <<"RE-SENDING timed-out RELIABLE to " - << j->address.serializeString() + << k->address.serializeString() << "(t/o="<getDesc() <<"Connection::rawSend(): SendFailedException: " @@ -1452,7 +1487,8 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket& p, Channel* chan p.absolute_send_time = porting::getTimeMs(); // Buffer the packet channel->outgoing_reliables_sent.insert(p, - (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE) % (MAX_RELIABLE_WINDOW_SIZE+1)); + (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE) + % (MAX_RELIABLE_WINDOW_SIZE+1)); } catch(AlreadyExistsException &e) { @@ -1472,7 +1508,8 @@ bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum, PeerHelper peer = m_connection->getPeerNoEx(peer_id); if(!peer) { LOG(dout_con<getDesc() - <<" INFO: dropped packet for non existent peer_id: " << peer_id << std::endl); + <<" INFO: dropped packet for non existent peer_id: " + << peer_id << std::endl); assert(reliable && "trying to send raw packet reliable but no peer found!"); return false; } @@ -1481,7 +1518,8 @@ bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum, if(reliable) { bool have_sequence_number_for_raw_packet = true; - u16 seqnum = channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet); + u16 seqnum = + channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet); if (!have_sequence_number_for_raw_packet) return false; @@ -1547,21 +1585,25 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) switch(c.type){ case CONNCMD_NONE: - LOG(dout_con<getDesc()<<"UDP processing reliable CONNCMD_NONE"<getDesc() + <<"UDP processing reliable CONNCMD_NONE"<getDesc()<<"UDP processing reliable CONNCMD_SEND"<getDesc() + <<"UDP processing reliable CONNCMD_SEND"<getDesc()<<"UDP processing CONNCMD_SEND_TO_ALL"<getDesc() + <<"UDP processing CONNCMD_SEND_TO_ALL"<getDesc()<<"UDP processing reliable CONCMD_CREATE_PEER"<getDesc() + <<"UDP processing reliable CONCMD_CREATE_PEER"<getDesc()<<"UDP processing reliable CONCMD_DISABLE_LEGACY"<getDesc() + <<"UDP processing reliable CONCMD_DISABLE_LEGACY"<getDesc()<<" Invalid reliable command type: " << c.type <getDesc() + <<" Invalid reliable command type: " << c.type <getDesc()<<" UDP processing CONNCMD_NONE"<getDesc() + <<" UDP processing CONNCMD_NONE"<getDesc()<<" UDP processing CONNCMD_SERVE port=" + LOG(dout_con<getDesc() + <<" UDP processing CONNCMD_SERVE port=" <getDesc()<<" UDP processing CONNCMD_CONNECT"<getDesc() + <<" UDP processing CONNCMD_CONNECT"<getDesc()<<" UDP processing CONNCMD_DISCONNECT"<getDesc() + <<" UDP processing CONNCMD_DISCONNECT"<getDesc()<<" UDP processing CONNCMD_DISCONNECT_PEER"<getDesc() + <<" UDP processing CONNCMD_DISCONNECT_PEER"<getDesc()<<" UDP processing CONNCMD_SEND"<getDesc() + <<" UDP processing CONNCMD_SEND"<getDesc()<<" UDP processing CONNCMD_SEND_TO_ALL"<getDesc() + <<" UDP processing CONNCMD_SEND_TO_ALL"<getDesc()<<" UDP processing CONCMD_ACK"<getDesc() + <<" UDP processing CONCMD_ACK"<getDesc()<<" Invalid command type: " << c.type <getDesc() + <<" Invalid command type: " << c.type <(&peer)->channels[channelnum]); } + if (channel != 0) { + channel->UpdateBytesReceived(received_size); + } + // Throw the received packet to channel->processPacket() // Make a new SharedBuffer from the data without the base headers @@ -2223,8 +2280,8 @@ bool ConnectionReceiveThread::getFromBuffers(u16 &peer_id, SharedBuffer &dst return false; } -bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, u16 &peer_id, - SharedBuffer &dst) +bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, + u16 &peer_id, SharedBuffer &dst) { u16 firstseqnum = 0; if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) @@ -2258,8 +2315,7 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, u16 &peer_i } SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, - SharedBuffer packetdata, u16 peer_id, - u8 channelnum, bool reliable) + SharedBuffer packetdata, u16 peer_id, u8 channelnum, bool reliable) { PeerHelper peer = m_connection->getPeer(peer_id); @@ -2578,7 +2634,7 @@ SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, */ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, - bool ipv6): + bool ipv6) : m_udpSocket(ipv6), m_command_queue(), m_event_queue(), @@ -2599,7 +2655,7 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, } Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, - bool ipv6, PeerHandler *peerhandler): + bool ipv6, PeerHandler *peerhandler) : m_udpSocket(ipv6), m_command_queue(), m_event_queue(), @@ -2885,6 +2941,43 @@ float Connection::getPeerStat(u16 peer_id, rtt_stat_type type) return peer->getStat(type); } +float Connection::getLocalStat(rate_stat_type type) +{ + PeerHelper peer = getPeerNoEx(PEER_ID_SERVER); + + if (!peer) { + assert("Connection::getLocalStat we couldn't get our own peer? are you serious???" == 0); + } + + float retval = 0.0; + + for (u16 j=0; j(&peer)->channels[j].getCurrentDownloadRateKB(); + break; + case AVG_DL_RATE: + retval += dynamic_cast(&peer)->channels[j].getAvgDownloadRateKB(); + break; + case CUR_INC_RATE: + retval += dynamic_cast(&peer)->channels[j].getCurrentIncomingRateKB(); + break; + case AVG_INC_RATE: + retval += dynamic_cast(&peer)->channels[j].getAvgIncomingRateKB(); + break; + case AVG_LOSS_RATE: + retval += dynamic_cast(&peer)->channels[j].getAvgLossRateKB(); + break; + case CUR_LOSS_RATE: + retval += dynamic_cast(&peer)->channels[j].getCurrentLossRateKB(); + break; + default: + assert("Connection::getLocalStat Invalid stat type" == 0); + } + } + return retval; +} + u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) { // Somebody wants to make a new connection @@ -2959,7 +3052,8 @@ void Connection::PrintInfo() const std::string Connection::getDesc() { - return std::string("con(")+itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")"; + return std::string("con(")+ + itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")"; } void Connection::DisconnectPeer(u16 peer_id) @@ -2969,8 +3063,8 @@ void Connection::DisconnectPeer(u16 peer_id) putCommand(discon); } -void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum) { - +void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum) +{ assert(channelnum < CHANNEL_COUNT); LOG(dout_con<