1 /*
2  *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
12 
13 #include <stdlib.h>
14 
15 #include <algorithm>
16 #include <cmath>
17 #include <utility>
18 
19 #include "absl/algorithm/container.h"
20 #include "api/units/timestamp.h"
21 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
22 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
23 #include "rtc_base/checks.h"
24 #include "rtc_base/logging.h"
25 #include "system_wrappers/include/field_trial.h"
26 
27 namespace webrtc {
28 
29 constexpr TimeDelta kSendTimeHistoryWindow = TimeDelta::Seconds(60);
30 
AddInFlightPacketBytes(const PacketFeedback & packet)31 void InFlightBytesTracker::AddInFlightPacketBytes(
32     const PacketFeedback& packet) {
33   RTC_DCHECK(packet.sent.send_time.IsFinite());
34   auto it = in_flight_data_.find(packet.network_route);
35   if (it != in_flight_data_.end()) {
36     it->second += packet.sent.size;
37   } else {
38     in_flight_data_.insert({packet.network_route, packet.sent.size});
39   }
40 }
41 
RemoveInFlightPacketBytes(const PacketFeedback & packet)42 void InFlightBytesTracker::RemoveInFlightPacketBytes(
43     const PacketFeedback& packet) {
44   if (packet.sent.send_time.IsInfinite())
45     return;
46   auto it = in_flight_data_.find(packet.network_route);
47   if (it != in_flight_data_.end()) {
48     RTC_DCHECK_GE(it->second, packet.sent.size);
49     it->second -= packet.sent.size;
50     if (it->second.IsZero())
51       in_flight_data_.erase(it);
52   }
53 }
54 
GetOutstandingData(const rtc::NetworkRoute & network_route) const55 DataSize InFlightBytesTracker::GetOutstandingData(
56     const rtc::NetworkRoute& network_route) const {
57   auto it = in_flight_data_.find(network_route);
58   if (it != in_flight_data_.end()) {
59     return it->second;
60   } else {
61     return DataSize::Zero();
62   }
63 }
64 
65 // Comparator for consistent map with NetworkRoute as key.
operator ()(const rtc::NetworkRoute & a,const rtc::NetworkRoute & b) const66 bool InFlightBytesTracker::NetworkRouteComparator::operator()(
67     const rtc::NetworkRoute& a,
68     const rtc::NetworkRoute& b) const {
69   if (a.local.network_id() != b.local.network_id())
70     return a.local.network_id() < b.local.network_id();
71   if (a.remote.network_id() != b.remote.network_id())
72     return a.remote.network_id() < b.remote.network_id();
73 
74   if (a.local.adapter_id() != b.local.adapter_id())
75     return a.local.adapter_id() < b.local.adapter_id();
76   if (a.remote.adapter_id() != b.remote.adapter_id())
77     return a.remote.adapter_id() < b.remote.adapter_id();
78 
79   if (a.local.uses_turn() != b.local.uses_turn())
80     return a.local.uses_turn() < b.local.uses_turn();
81   if (a.remote.uses_turn() != b.remote.uses_turn())
82     return a.remote.uses_turn() < b.remote.uses_turn();
83 
84   return a.connected < b.connected;
85 }
86 
87 TransportFeedbackAdapter::TransportFeedbackAdapter() = default;
88 
89 
AddPacket(const RtpPacketSendInfo & packet_info,size_t overhead_bytes,Timestamp creation_time)90 void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
91                                          size_t overhead_bytes,
92                                          Timestamp creation_time) {
93   PacketFeedback packet;
94   packet.creation_time = creation_time;
95   packet.sent.sequence_number =
96       seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number);
97   packet.sent.size = DataSize::Bytes(packet_info.length + overhead_bytes);
98   packet.sent.audio = packet_info.packet_type == RtpPacketMediaType::kAudio;
99   packet.network_route = network_route_;
100   packet.sent.pacing_info = packet_info.pacing_info;
101 
102   while (!history_.empty() &&
103          creation_time - history_.begin()->second.creation_time >
104              kSendTimeHistoryWindow) {
105     // TODO(sprang): Warn if erasing (too many) old items?
106     if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_)
107       in_flight_.RemoveInFlightPacketBytes(history_.begin()->second);
108     history_.erase(history_.begin());
109   }
110   history_.insert(std::make_pair(packet.sent.sequence_number, packet));
111 }
112 
ProcessSentPacket(const rtc::SentPacket & sent_packet)113 absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
114     const rtc::SentPacket& sent_packet) {
115   auto send_time = Timestamp::Millis(sent_packet.send_time_ms);
116   // TODO(srte): Only use one way to indicate that packet feedback is used.
117   if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
118     int64_t unwrapped_seq_num =
119         seq_num_unwrapper_.Unwrap(sent_packet.packet_id);
120     auto it = history_.find(unwrapped_seq_num);
121     if (it != history_.end()) {
122       bool packet_retransmit = it->second.sent.send_time.IsFinite();
123       it->second.sent.send_time = send_time;
124       last_send_time_ = std::max(last_send_time_, send_time);
125       // TODO(srte): Don't do this on retransmit.
126       if (!pending_untracked_size_.IsZero()) {
127         if (send_time < last_untracked_send_time_)
128           RTC_LOG(LS_WARNING)
129               << "appending acknowledged data for out of order packet. (Diff: "
130               << ToString(last_untracked_send_time_ - send_time) << " ms.)";
131         it->second.sent.prior_unacked_data += pending_untracked_size_;
132         pending_untracked_size_ = DataSize::Zero();
133       }
134       if (!packet_retransmit) {
135         if (it->second.sent.sequence_number > last_ack_seq_num_)
136           in_flight_.AddInFlightPacketBytes(it->second);
137         it->second.sent.data_in_flight = GetOutstandingData();
138         return it->second.sent;
139       }
140     }
141   } else if (sent_packet.info.included_in_allocation) {
142     if (send_time < last_send_time_) {
143       RTC_LOG(LS_WARNING) << "ignoring untracked data for out of order packet.";
144     }
145     pending_untracked_size_ +=
146         DataSize::Bytes(sent_packet.info.packet_size_bytes);
147     last_untracked_send_time_ = std::max(last_untracked_send_time_, send_time);
148   }
149   return absl::nullopt;
150 }
151 
152 absl::optional<TransportPacketsFeedback>
ProcessTransportFeedback(const rtcp::TransportFeedback & feedback,Timestamp feedback_receive_time)153 TransportFeedbackAdapter::ProcessTransportFeedback(
154     const rtcp::TransportFeedback& feedback,
155     Timestamp feedback_receive_time) {
156   if (feedback.GetPacketStatusCount() == 0) {
157     RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
158     return absl::nullopt;
159   }
160 
161   TransportPacketsFeedback msg;
162   msg.feedback_time = feedback_receive_time;
163 
164   msg.prior_in_flight = in_flight_.GetOutstandingData(network_route_);
165   msg.packet_feedbacks =
166       ProcessTransportFeedbackInner(feedback, feedback_receive_time);
167   if (msg.packet_feedbacks.empty())
168     return absl::nullopt;
169 
170   auto it = history_.find(last_ack_seq_num_);
171   if (it != history_.end()) {
172     msg.first_unacked_send_time = it->second.sent.send_time;
173   }
174   msg.data_in_flight = in_flight_.GetOutstandingData(network_route_);
175 
176   return msg;
177 }
178 
SetNetworkRoute(const rtc::NetworkRoute & network_route)179 void TransportFeedbackAdapter::SetNetworkRoute(
180     const rtc::NetworkRoute& network_route) {
181   network_route_ = network_route;
182 }
183 
GetOutstandingData() const184 DataSize TransportFeedbackAdapter::GetOutstandingData() const {
185   return in_flight_.GetOutstandingData(network_route_);
186 }
187 
188 std::vector<PacketResult>
ProcessTransportFeedbackInner(const rtcp::TransportFeedback & feedback,Timestamp feedback_receive_time)189 TransportFeedbackAdapter::ProcessTransportFeedbackInner(
190     const rtcp::TransportFeedback& feedback,
191     Timestamp feedback_receive_time) {
192   // Add timestamp deltas to a local time base selected on first packet arrival.
193   // This won't be the true time base, but makes it easier to manually inspect
194   // time stamps.
195   if (last_timestamp_.IsInfinite()) {
196     current_offset_ = feedback_receive_time;
197   } else {
198     // TODO(srte): We shouldn't need to do rounding here.
199     const TimeDelta delta = feedback.GetBaseDelta(last_timestamp_)
200                                 .RoundDownTo(TimeDelta::Millis(1));
201     // Protect against assigning current_offset_ negative value.
202     if (delta < Timestamp::Zero() - current_offset_) {
203       RTC_LOG(LS_WARNING) << "Unexpected feedback timestamp received.";
204       current_offset_ = feedback_receive_time;
205     } else {
206       current_offset_ += delta;
207     }
208   }
209   last_timestamp_ = feedback.GetBaseTime();
210 
211   std::vector<PacketResult> packet_result_vector;
212   packet_result_vector.reserve(feedback.GetPacketStatusCount());
213 
214   size_t failed_lookups = 0;
215   size_t ignored = 0;
216   TimeDelta packet_offset = TimeDelta::Zero();
217   for (const auto& packet : feedback.GetAllPackets()) {
218     int64_t seq_num = seq_num_unwrapper_.Unwrap(packet.sequence_number());
219 
220     if (seq_num > last_ack_seq_num_) {
221       // Starts at history_.begin() if last_ack_seq_num_ < 0, since any valid
222       // sequence number is >= 0.
223       for (auto it = history_.upper_bound(last_ack_seq_num_);
224            it != history_.upper_bound(seq_num); ++it) {
225         in_flight_.RemoveInFlightPacketBytes(it->second);
226       }
227       last_ack_seq_num_ = seq_num;
228     }
229 
230     auto it = history_.find(seq_num);
231     if (it == history_.end()) {
232       ++failed_lookups;
233       continue;
234     }
235 
236     if (it->second.sent.send_time.IsInfinite()) {
237       // TODO(srte): Fix the tests that makes this happen and make this a
238       // DCHECK.
239       RTC_DLOG(LS_ERROR)
240           << "Received feedback before packet was indicated as sent";
241       continue;
242     }
243 
244     PacketFeedback packet_feedback = it->second;
245     if (packet.received()) {
246       packet_offset += packet.delta();
247       packet_feedback.receive_time =
248           current_offset_ + packet_offset.RoundDownTo(TimeDelta::Millis(1));
249       // Note: Lost packets are not removed from history because they might be
250       // reported as received by a later feedback.
251       history_.erase(it);
252     }
253     if (packet_feedback.network_route == network_route_) {
254       PacketResult result;
255       result.sent_packet = packet_feedback.sent;
256       result.receive_time = packet_feedback.receive_time;
257       packet_result_vector.push_back(result);
258     } else {
259       ++ignored;
260     }
261   }
262 
263   if (failed_lookups > 0) {
264     RTC_LOG(LS_WARNING) << "Failed to lookup send time for " << failed_lookups
265                         << " packet" << (failed_lookups > 1 ? "s" : "")
266                         << ". Send time history too small?";
267   }
268   if (ignored > 0) {
269     RTC_LOG(LS_INFO) << "Ignoring " << ignored
270                      << " packets because they were sent on a different route.";
271   }
272 
273   return packet_result_vector;
274 }
275 
276 }  // namespace webrtc
277