1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/paced_sender.h"
12 
13 #include <algorithm>
14 #include <map>
15 #include <queue>
16 #include <set>
17 #include <vector>
18 #include <utility>
19 
20 #include "modules/include/module_common_types.h"
21 #include "modules/pacing/alr_detector.h"
22 #include "modules/pacing/bitrate_prober.h"
23 #include "modules/pacing/interval_budget.h"
24 #include "modules/utility/include/process_thread.h"
25 #include "rtc_base/checks.h"
26 #include "rtc_base/logging.h"
27 #include "rtc_base/ptr_util.h"
28 #include "system_wrappers/include/clock.h"
29 #include "system_wrappers/include/field_trial.h"
30 
31 namespace {
32 // Time limit in milliseconds between packet bursts.
33 const int64_t kMinPacketLimitMs = 5;
34 const int64_t kPausedPacketIntervalMs = 500;
35 
36 // Upper cap on process interval, in case process has not been called in a long
37 // time.
38 const int64_t kMaxIntervalTimeMs = 30;
39 
40 }  // namespace
41 
42 namespace webrtc {
43 
44 const int64_t PacedSender::kMaxQueueLengthMs = 2000;
45 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
46 
PacedSender(const Clock * clock,PacketSender * packet_sender,RtcEventLog * event_log)47 PacedSender::PacedSender(const Clock* clock,
48                          PacketSender* packet_sender,
49                          RtcEventLog* event_log) :
50     PacedSender(clock, packet_sender, event_log,
51                 webrtc::field_trial::IsEnabled("WebRTC-RoundRobinPacing")
52                     ? rtc::MakeUnique<PacketQueue2>(clock)
53                     : rtc::MakeUnique<PacketQueue>(clock)) {}
54 
PacedSender(const Clock * clock,PacketSender * packet_sender,RtcEventLog * event_log,std::unique_ptr<PacketQueue> packets)55 PacedSender::PacedSender(const Clock* clock,
56                          PacketSender* packet_sender,
57                          RtcEventLog* event_log,
58                          std::unique_ptr<PacketQueue> packets)
59     : clock_(clock),
60       packet_sender_(packet_sender),
61       alr_detector_(rtc::MakeUnique<AlrDetector>()),
62       paused_(false),
63       media_budget_(rtc::MakeUnique<IntervalBudget>(0)),
64       padding_budget_(rtc::MakeUnique<IntervalBudget>(0)),
65       prober_(rtc::MakeUnique<BitrateProber>(event_log)),
66       probing_send_failure_(false),
67       estimated_bitrate_bps_(0),
68       min_send_bitrate_kbps_(0u),
69       max_padding_bitrate_kbps_(0u),
70       pacing_bitrate_kbps_(0),
71       time_last_update_us_(clock->TimeInMicroseconds()),
72       first_sent_packet_ms_(-1),
73       packets_(std::move(packets)),
74       packet_counter_(0),
75       pacing_factor_(kDefaultPaceMultiplier),
76       queue_time_limit(kMaxQueueLengthMs),
77       account_for_audio_(false) {
78   UpdateBudgetWithElapsedTime(kMinPacketLimitMs);
79 }
80 
~PacedSender()81 PacedSender::~PacedSender() {}
82 
CreateProbeCluster(int bitrate_bps)83 void PacedSender::CreateProbeCluster(int bitrate_bps) {
84   rtc::CritScope cs(&critsect_);
85   prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds());
86 }
87 
Pause()88 void PacedSender::Pause() {
89   {
90     rtc::CritScope cs(&critsect_);
91     if (!paused_)
92       RTC_LOG(LS_INFO) << "PacedSender paused.";
93     paused_ = true;
94     packets_->SetPauseState(true, clock_->TimeInMilliseconds());
95   }
96   // Tell the process thread to call our TimeUntilNextProcess() method to get
97   // a new (longer) estimate for when to call Process().
98   if (process_thread_)
99     process_thread_->WakeUp(this);
100 }
101 
Resume()102 void PacedSender::Resume() {
103   {
104     rtc::CritScope cs(&critsect_);
105     if (paused_)
106       RTC_LOG(LS_INFO) << "PacedSender resumed.";
107     paused_ = false;
108     packets_->SetPauseState(false, clock_->TimeInMilliseconds());
109   }
110   // Tell the process thread to call our TimeUntilNextProcess() method to
111   // refresh the estimate for when to call Process().
112   if (process_thread_)
113     process_thread_->WakeUp(this);
114 }
115 
SetProbingEnabled(bool enabled)116 void PacedSender::SetProbingEnabled(bool enabled) {
117   rtc::CritScope cs(&critsect_);
118   RTC_CHECK_EQ(0, packet_counter_);
119   prober_->SetEnabled(enabled);
120 }
121 
SetEstimatedBitrate(uint32_t bitrate_bps)122 void PacedSender::SetEstimatedBitrate(uint32_t bitrate_bps) {
123   if (bitrate_bps == 0)
124     RTC_LOG(LS_ERROR) << "PacedSender is not designed to handle 0 bitrate.";
125   rtc::CritScope cs(&critsect_);
126   estimated_bitrate_bps_ = bitrate_bps;
127   padding_budget_->set_target_rate_kbps(
128       std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_));
129   pacing_bitrate_kbps_ =
130       std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
131       pacing_factor_;
132   alr_detector_->SetEstimatedBitrate(bitrate_bps);
133 }
134 
SetSendBitrateLimits(int min_send_bitrate_bps,int padding_bitrate)135 void PacedSender::SetSendBitrateLimits(int min_send_bitrate_bps,
136                                        int padding_bitrate) {
137   rtc::CritScope cs(&critsect_);
138   min_send_bitrate_kbps_ = min_send_bitrate_bps / 1000;
139   pacing_bitrate_kbps_ =
140       std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
141       pacing_factor_;
142   max_padding_bitrate_kbps_ = padding_bitrate / 1000;
143   padding_budget_->set_target_rate_kbps(
144       std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_));
145 }
146 
InsertPacket(RtpPacketSender::Priority priority,uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,size_t bytes,bool retransmission)147 void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
148                                uint32_t ssrc,
149                                uint16_t sequence_number,
150                                int64_t capture_time_ms,
151                                size_t bytes,
152                                bool retransmission) {
153   rtc::CritScope cs(&critsect_);
154   RTC_DCHECK(estimated_bitrate_bps_ > 0)
155         << "SetEstimatedBitrate must be called before InsertPacket.";
156 
157   int64_t now_ms = clock_->TimeInMilliseconds();
158   prober_->OnIncomingPacket(bytes);
159 
160   if (capture_time_ms < 0)
161     capture_time_ms = now_ms;
162 
163   packets_->Push(PacketQueue::Packet(priority, ssrc, sequence_number,
164                                      capture_time_ms, now_ms, bytes,
165                                      retransmission, packet_counter_++));
166 }
167 
SetAccountForAudioPackets(bool account_for_audio)168 void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
169   rtc::CritScope cs(&critsect_);
170   account_for_audio_ = account_for_audio;
171 }
172 
ExpectedQueueTimeMs() const173 int64_t PacedSender::ExpectedQueueTimeMs() const {
174   rtc::CritScope cs(&critsect_);
175   RTC_DCHECK_GT(pacing_bitrate_kbps_, 0);
176   return static_cast<int64_t>(packets_->SizeInBytes() * 8 /
177                               pacing_bitrate_kbps_);
178 }
179 
GetApplicationLimitedRegionStartTime() const180 rtc::Optional<int64_t> PacedSender::GetApplicationLimitedRegionStartTime()
181     const {
182   rtc::CritScope cs(&critsect_);
183   return alr_detector_->GetApplicationLimitedRegionStartTime();
184 }
185 
QueueSizePackets() const186 size_t PacedSender::QueueSizePackets() const {
187   rtc::CritScope cs(&critsect_);
188   return packets_->SizeInPackets();
189 }
190 
FirstSentPacketTimeMs() const191 int64_t PacedSender::FirstSentPacketTimeMs() const {
192   rtc::CritScope cs(&critsect_);
193   return first_sent_packet_ms_;
194 }
195 
QueueInMs() const196 int64_t PacedSender::QueueInMs() const {
197   rtc::CritScope cs(&critsect_);
198 
199   int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
200   if (oldest_packet == 0)
201     return 0;
202 
203   return clock_->TimeInMilliseconds() - oldest_packet;
204 }
205 
TimeUntilNextProcess()206 int64_t PacedSender::TimeUntilNextProcess() {
207   rtc::CritScope cs(&critsect_);
208   int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
209   int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
210   // When paused we wake up every 500 ms to send a padding packet to ensure
211   // we won't get stuck in the paused state due to no feedback being received.
212   if (paused_)
213     return std::max<int64_t>(kPausedPacketIntervalMs - elapsed_time_ms, 0);
214 
215   if (prober_->IsProbing()) {
216     int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
217     if (ret > 0 || (ret == 0 && !probing_send_failure_))
218       return ret;
219   }
220   return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
221 }
222 
Process()223 void PacedSender::Process() {
224   int64_t now_us = clock_->TimeInMicroseconds();
225   rtc::CritScope cs(&critsect_);
226   int64_t elapsed_time_ms = std::min(
227       kMaxIntervalTimeMs, (now_us - time_last_update_us_ + 500) / 1000);
228   int target_bitrate_kbps = pacing_bitrate_kbps_;
229 
230   if (paused_) {
231     PacedPacketInfo pacing_info;
232     time_last_update_us_ = now_us;
233     // We can not send padding unless a normal packet has first been sent. If we
234     // do, timestamps get messed up.
235     if (packet_counter_ == 0)
236       return;
237     size_t bytes_sent = SendPadding(1, pacing_info);
238     alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms);
239     return;
240   }
241 
242   if (elapsed_time_ms > 0) {
243     size_t queue_size_bytes = packets_->SizeInBytes();
244     if (queue_size_bytes > 0) {
245       // Assuming equal size packets and input/output rate, the average packet
246       // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
247       // time constraint shall be met. Determine bitrate needed for that.
248       packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
249       int64_t avg_time_left_ms = std::max<int64_t>(
250           1, queue_time_limit - packets_->AverageQueueTimeMs());
251       int min_bitrate_needed_kbps =
252           static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
253       if (min_bitrate_needed_kbps > target_bitrate_kbps)
254         target_bitrate_kbps = min_bitrate_needed_kbps;
255     }
256 
257     media_budget_->set_target_rate_kbps(target_bitrate_kbps);
258     UpdateBudgetWithElapsedTime(elapsed_time_ms);
259   }
260 
261   time_last_update_us_ = now_us;
262 
263   bool is_probing = prober_->IsProbing();
264   PacedPacketInfo pacing_info;
265   size_t bytes_sent = 0;
266   size_t recommended_probe_size = 0;
267   if (is_probing) {
268     pacing_info = prober_->CurrentCluster();
269     recommended_probe_size = prober_->RecommendedMinProbeSize();
270   }
271   // We need to check paused_ here because the critical section protecting
272   // it is released during the call to SendPacket. This has been fixed in
273   // a similar way upstream, so these changes can be dropped the next time
274   // we update.
275   while (!paused_ && !packets_->Empty()) {
276     // Since we need to release the lock in order to send, we first pop the
277     // element from the priority queue but keep it in storage, so that we can
278     // reinsert it if send fails.
279     const PacketQueue::Packet& packet = packets_->BeginPop();
280 
281     if (SendPacket(packet, pacing_info)) {
282       // Send succeeded, remove it from the queue.
283       if (first_sent_packet_ms_ == -1)
284         first_sent_packet_ms_ = clock_->TimeInMilliseconds();
285       bytes_sent += packet.bytes;
286       packets_->FinalizePop(packet);
287       if (is_probing && bytes_sent > recommended_probe_size)
288         break;
289     } else {
290       // Send failed, put it back into the queue.
291       packets_->CancelPop(packet);
292       break;
293     }
294   }
295 
296   if (packets_->Empty()) {
297     // We can not send padding unless a normal packet has first been sent. If we
298     // do, timestamps get messed up.
299     if (packet_counter_ > 0) {
300       int padding_needed =
301           static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
302                                       : padding_budget_->bytes_remaining());
303       if (padding_needed > 0)
304         bytes_sent += SendPadding(padding_needed, pacing_info);
305     }
306   }
307   if (is_probing) {
308     probing_send_failure_ = bytes_sent == 0;
309     if (!probing_send_failure_)
310       prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
311   }
312   alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms);
313 }
314 
ProcessThreadAttached(ProcessThread * process_thread)315 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
316   RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread;
317   process_thread_ = process_thread;
318 }
319 
SendPacket(const PacketQueue::Packet & packet,const PacedPacketInfo & pacing_info)320 bool PacedSender::SendPacket(const PacketQueue::Packet& packet,
321                              const PacedPacketInfo& pacing_info) {
322   RTC_DCHECK(!paused_);
323   if (media_budget_->bytes_remaining() == 0 &&
324       pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
325     return false;
326   }
327 
328   critsect_.Leave();
329   const bool success = packet_sender_->TimeToSendPacket(
330       packet.ssrc, packet.sequence_number, packet.capture_time_ms,
331       packet.retransmission, pacing_info);
332   critsect_.Enter();
333 
334   if (success) {
335     if (packet.priority != kHighPriority || account_for_audio_) {
336       // Update media bytes sent.
337       // TODO(eladalon): TimeToSendPacket() can also return |true| in some
338       // situations where nothing actually ended up being sent to the network,
339       // and we probably don't want to update the budget in such cases.
340       // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052
341       UpdateBudgetWithBytesSent(packet.bytes);
342     }
343   }
344 
345   return success;
346 }
347 
SendPadding(size_t padding_needed,const PacedPacketInfo & pacing_info)348 size_t PacedSender::SendPadding(size_t padding_needed,
349                                 const PacedPacketInfo& pacing_info) {
350   RTC_DCHECK_GT(packet_counter_, 0);
351   critsect_.Leave();
352   size_t bytes_sent =
353       packet_sender_->TimeToSendPadding(padding_needed, pacing_info);
354   critsect_.Enter();
355 
356   if (bytes_sent > 0) {
357     UpdateBudgetWithBytesSent(bytes_sent);
358   }
359   return bytes_sent;
360 }
361 
UpdateBudgetWithElapsedTime(int64_t delta_time_ms)362 void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
363   media_budget_->IncreaseBudget(delta_time_ms);
364   padding_budget_->IncreaseBudget(delta_time_ms);
365 }
366 
UpdateBudgetWithBytesSent(size_t bytes_sent)367 void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
368   media_budget_->UseBudget(bytes_sent);
369   padding_budget_->UseBudget(bytes_sent);
370 }
371 
SetPacingFactor(float pacing_factor)372 void PacedSender::SetPacingFactor(float pacing_factor) {
373   rtc::CritScope cs(&critsect_);
374   pacing_factor_ = pacing_factor;
375   // Make sure new padding factor is applied immediately, otherwise we need to
376   // wait for the send bitrate estimate to be updated before this takes effect.
377   SetEstimatedBitrate(estimated_bitrate_bps_);
378 }
379 
GetPacingFactor() const380 float PacedSender::GetPacingFactor() const {
381   rtc::CritScope cs(&critsect_);
382   return pacing_factor_;
383 }
384 
SetQueueTimeLimit(int limit_ms)385 void PacedSender::SetQueueTimeLimit(int limit_ms) {
386   rtc::CritScope cs(&critsect_);
387   queue_time_limit = limit_ms;
388 }
389 
390 }  // namespace webrtc
391