1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/pacing/paced_sender.h"
12
13 #include <algorithm>
14 #include <map>
15 #include <queue>
16 #include <set>
17 #include <vector>
18 #include <utility>
19
20 #include "modules/include/module_common_types.h"
21 #include "modules/pacing/alr_detector.h"
22 #include "modules/pacing/bitrate_prober.h"
23 #include "modules/pacing/interval_budget.h"
24 #include "modules/utility/include/process_thread.h"
25 #include "rtc_base/checks.h"
26 #include "rtc_base/logging.h"
27 #include "rtc_base/ptr_util.h"
28 #include "system_wrappers/include/clock.h"
29 #include "system_wrappers/include/field_trial.h"
30
31 namespace {
32 // Time limit in milliseconds between packet bursts.
33 const int64_t kMinPacketLimitMs = 5;
34 const int64_t kPausedPacketIntervalMs = 500;
35
36 // Upper cap on process interval, in case process has not been called in a long
37 // time.
38 const int64_t kMaxIntervalTimeMs = 30;
39
40 } // namespace
41
42 namespace webrtc {
43
44 const int64_t PacedSender::kMaxQueueLengthMs = 2000;
45 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
46
PacedSender(const Clock * clock,PacketSender * packet_sender,RtcEventLog * event_log)47 PacedSender::PacedSender(const Clock* clock,
48 PacketSender* packet_sender,
49 RtcEventLog* event_log) :
50 PacedSender(clock, packet_sender, event_log,
51 webrtc::field_trial::IsEnabled("WebRTC-RoundRobinPacing")
52 ? rtc::MakeUnique<PacketQueue2>(clock)
53 : rtc::MakeUnique<PacketQueue>(clock)) {}
54
PacedSender(const Clock * clock,PacketSender * packet_sender,RtcEventLog * event_log,std::unique_ptr<PacketQueue> packets)55 PacedSender::PacedSender(const Clock* clock,
56 PacketSender* packet_sender,
57 RtcEventLog* event_log,
58 std::unique_ptr<PacketQueue> packets)
59 : clock_(clock),
60 packet_sender_(packet_sender),
61 alr_detector_(rtc::MakeUnique<AlrDetector>()),
62 paused_(false),
63 media_budget_(rtc::MakeUnique<IntervalBudget>(0)),
64 padding_budget_(rtc::MakeUnique<IntervalBudget>(0)),
65 prober_(rtc::MakeUnique<BitrateProber>(event_log)),
66 probing_send_failure_(false),
67 estimated_bitrate_bps_(0),
68 min_send_bitrate_kbps_(0u),
69 max_padding_bitrate_kbps_(0u),
70 pacing_bitrate_kbps_(0),
71 time_last_update_us_(clock->TimeInMicroseconds()),
72 first_sent_packet_ms_(-1),
73 packets_(std::move(packets)),
74 packet_counter_(0),
75 pacing_factor_(kDefaultPaceMultiplier),
76 queue_time_limit(kMaxQueueLengthMs),
77 account_for_audio_(false) {
78 UpdateBudgetWithElapsedTime(kMinPacketLimitMs);
79 }
80
~PacedSender()81 PacedSender::~PacedSender() {}
82
CreateProbeCluster(int bitrate_bps)83 void PacedSender::CreateProbeCluster(int bitrate_bps) {
84 rtc::CritScope cs(&critsect_);
85 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds());
86 }
87
Pause()88 void PacedSender::Pause() {
89 {
90 rtc::CritScope cs(&critsect_);
91 if (!paused_)
92 RTC_LOG(LS_INFO) << "PacedSender paused.";
93 paused_ = true;
94 packets_->SetPauseState(true, clock_->TimeInMilliseconds());
95 }
96 // Tell the process thread to call our TimeUntilNextProcess() method to get
97 // a new (longer) estimate for when to call Process().
98 if (process_thread_)
99 process_thread_->WakeUp(this);
100 }
101
Resume()102 void PacedSender::Resume() {
103 {
104 rtc::CritScope cs(&critsect_);
105 if (paused_)
106 RTC_LOG(LS_INFO) << "PacedSender resumed.";
107 paused_ = false;
108 packets_->SetPauseState(false, clock_->TimeInMilliseconds());
109 }
110 // Tell the process thread to call our TimeUntilNextProcess() method to
111 // refresh the estimate for when to call Process().
112 if (process_thread_)
113 process_thread_->WakeUp(this);
114 }
115
SetProbingEnabled(bool enabled)116 void PacedSender::SetProbingEnabled(bool enabled) {
117 rtc::CritScope cs(&critsect_);
118 RTC_CHECK_EQ(0, packet_counter_);
119 prober_->SetEnabled(enabled);
120 }
121
SetEstimatedBitrate(uint32_t bitrate_bps)122 void PacedSender::SetEstimatedBitrate(uint32_t bitrate_bps) {
123 if (bitrate_bps == 0)
124 RTC_LOG(LS_ERROR) << "PacedSender is not designed to handle 0 bitrate.";
125 rtc::CritScope cs(&critsect_);
126 estimated_bitrate_bps_ = bitrate_bps;
127 padding_budget_->set_target_rate_kbps(
128 std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_));
129 pacing_bitrate_kbps_ =
130 std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
131 pacing_factor_;
132 alr_detector_->SetEstimatedBitrate(bitrate_bps);
133 }
134
SetSendBitrateLimits(int min_send_bitrate_bps,int padding_bitrate)135 void PacedSender::SetSendBitrateLimits(int min_send_bitrate_bps,
136 int padding_bitrate) {
137 rtc::CritScope cs(&critsect_);
138 min_send_bitrate_kbps_ = min_send_bitrate_bps / 1000;
139 pacing_bitrate_kbps_ =
140 std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) *
141 pacing_factor_;
142 max_padding_bitrate_kbps_ = padding_bitrate / 1000;
143 padding_budget_->set_target_rate_kbps(
144 std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_));
145 }
146
InsertPacket(RtpPacketSender::Priority priority,uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,size_t bytes,bool retransmission)147 void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
148 uint32_t ssrc,
149 uint16_t sequence_number,
150 int64_t capture_time_ms,
151 size_t bytes,
152 bool retransmission) {
153 rtc::CritScope cs(&critsect_);
154 RTC_DCHECK(estimated_bitrate_bps_ > 0)
155 << "SetEstimatedBitrate must be called before InsertPacket.";
156
157 int64_t now_ms = clock_->TimeInMilliseconds();
158 prober_->OnIncomingPacket(bytes);
159
160 if (capture_time_ms < 0)
161 capture_time_ms = now_ms;
162
163 packets_->Push(PacketQueue::Packet(priority, ssrc, sequence_number,
164 capture_time_ms, now_ms, bytes,
165 retransmission, packet_counter_++));
166 }
167
SetAccountForAudioPackets(bool account_for_audio)168 void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
169 rtc::CritScope cs(&critsect_);
170 account_for_audio_ = account_for_audio;
171 }
172
ExpectedQueueTimeMs() const173 int64_t PacedSender::ExpectedQueueTimeMs() const {
174 rtc::CritScope cs(&critsect_);
175 RTC_DCHECK_GT(pacing_bitrate_kbps_, 0);
176 return static_cast<int64_t>(packets_->SizeInBytes() * 8 /
177 pacing_bitrate_kbps_);
178 }
179
GetApplicationLimitedRegionStartTime() const180 rtc::Optional<int64_t> PacedSender::GetApplicationLimitedRegionStartTime()
181 const {
182 rtc::CritScope cs(&critsect_);
183 return alr_detector_->GetApplicationLimitedRegionStartTime();
184 }
185
QueueSizePackets() const186 size_t PacedSender::QueueSizePackets() const {
187 rtc::CritScope cs(&critsect_);
188 return packets_->SizeInPackets();
189 }
190
FirstSentPacketTimeMs() const191 int64_t PacedSender::FirstSentPacketTimeMs() const {
192 rtc::CritScope cs(&critsect_);
193 return first_sent_packet_ms_;
194 }
195
QueueInMs() const196 int64_t PacedSender::QueueInMs() const {
197 rtc::CritScope cs(&critsect_);
198
199 int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
200 if (oldest_packet == 0)
201 return 0;
202
203 return clock_->TimeInMilliseconds() - oldest_packet;
204 }
205
TimeUntilNextProcess()206 int64_t PacedSender::TimeUntilNextProcess() {
207 rtc::CritScope cs(&critsect_);
208 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
209 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
210 // When paused we wake up every 500 ms to send a padding packet to ensure
211 // we won't get stuck in the paused state due to no feedback being received.
212 if (paused_)
213 return std::max<int64_t>(kPausedPacketIntervalMs - elapsed_time_ms, 0);
214
215 if (prober_->IsProbing()) {
216 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
217 if (ret > 0 || (ret == 0 && !probing_send_failure_))
218 return ret;
219 }
220 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
221 }
222
Process()223 void PacedSender::Process() {
224 int64_t now_us = clock_->TimeInMicroseconds();
225 rtc::CritScope cs(&critsect_);
226 int64_t elapsed_time_ms = std::min(
227 kMaxIntervalTimeMs, (now_us - time_last_update_us_ + 500) / 1000);
228 int target_bitrate_kbps = pacing_bitrate_kbps_;
229
230 if (paused_) {
231 PacedPacketInfo pacing_info;
232 time_last_update_us_ = now_us;
233 // We can not send padding unless a normal packet has first been sent. If we
234 // do, timestamps get messed up.
235 if (packet_counter_ == 0)
236 return;
237 size_t bytes_sent = SendPadding(1, pacing_info);
238 alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms);
239 return;
240 }
241
242 if (elapsed_time_ms > 0) {
243 size_t queue_size_bytes = packets_->SizeInBytes();
244 if (queue_size_bytes > 0) {
245 // Assuming equal size packets and input/output rate, the average packet
246 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
247 // time constraint shall be met. Determine bitrate needed for that.
248 packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
249 int64_t avg_time_left_ms = std::max<int64_t>(
250 1, queue_time_limit - packets_->AverageQueueTimeMs());
251 int min_bitrate_needed_kbps =
252 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
253 if (min_bitrate_needed_kbps > target_bitrate_kbps)
254 target_bitrate_kbps = min_bitrate_needed_kbps;
255 }
256
257 media_budget_->set_target_rate_kbps(target_bitrate_kbps);
258 UpdateBudgetWithElapsedTime(elapsed_time_ms);
259 }
260
261 time_last_update_us_ = now_us;
262
263 bool is_probing = prober_->IsProbing();
264 PacedPacketInfo pacing_info;
265 size_t bytes_sent = 0;
266 size_t recommended_probe_size = 0;
267 if (is_probing) {
268 pacing_info = prober_->CurrentCluster();
269 recommended_probe_size = prober_->RecommendedMinProbeSize();
270 }
271 // We need to check paused_ here because the critical section protecting
272 // it is released during the call to SendPacket. This has been fixed in
273 // a similar way upstream, so these changes can be dropped the next time
274 // we update.
275 while (!paused_ && !packets_->Empty()) {
276 // Since we need to release the lock in order to send, we first pop the
277 // element from the priority queue but keep it in storage, so that we can
278 // reinsert it if send fails.
279 const PacketQueue::Packet& packet = packets_->BeginPop();
280
281 if (SendPacket(packet, pacing_info)) {
282 // Send succeeded, remove it from the queue.
283 if (first_sent_packet_ms_ == -1)
284 first_sent_packet_ms_ = clock_->TimeInMilliseconds();
285 bytes_sent += packet.bytes;
286 packets_->FinalizePop(packet);
287 if (is_probing && bytes_sent > recommended_probe_size)
288 break;
289 } else {
290 // Send failed, put it back into the queue.
291 packets_->CancelPop(packet);
292 break;
293 }
294 }
295
296 if (packets_->Empty()) {
297 // We can not send padding unless a normal packet has first been sent. If we
298 // do, timestamps get messed up.
299 if (packet_counter_ > 0) {
300 int padding_needed =
301 static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
302 : padding_budget_->bytes_remaining());
303 if (padding_needed > 0)
304 bytes_sent += SendPadding(padding_needed, pacing_info);
305 }
306 }
307 if (is_probing) {
308 probing_send_failure_ = bytes_sent == 0;
309 if (!probing_send_failure_)
310 prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
311 }
312 alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms);
313 }
314
ProcessThreadAttached(ProcessThread * process_thread)315 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
316 RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread;
317 process_thread_ = process_thread;
318 }
319
SendPacket(const PacketQueue::Packet & packet,const PacedPacketInfo & pacing_info)320 bool PacedSender::SendPacket(const PacketQueue::Packet& packet,
321 const PacedPacketInfo& pacing_info) {
322 RTC_DCHECK(!paused_);
323 if (media_budget_->bytes_remaining() == 0 &&
324 pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
325 return false;
326 }
327
328 critsect_.Leave();
329 const bool success = packet_sender_->TimeToSendPacket(
330 packet.ssrc, packet.sequence_number, packet.capture_time_ms,
331 packet.retransmission, pacing_info);
332 critsect_.Enter();
333
334 if (success) {
335 if (packet.priority != kHighPriority || account_for_audio_) {
336 // Update media bytes sent.
337 // TODO(eladalon): TimeToSendPacket() can also return |true| in some
338 // situations where nothing actually ended up being sent to the network,
339 // and we probably don't want to update the budget in such cases.
340 // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052
341 UpdateBudgetWithBytesSent(packet.bytes);
342 }
343 }
344
345 return success;
346 }
347
SendPadding(size_t padding_needed,const PacedPacketInfo & pacing_info)348 size_t PacedSender::SendPadding(size_t padding_needed,
349 const PacedPacketInfo& pacing_info) {
350 RTC_DCHECK_GT(packet_counter_, 0);
351 critsect_.Leave();
352 size_t bytes_sent =
353 packet_sender_->TimeToSendPadding(padding_needed, pacing_info);
354 critsect_.Enter();
355
356 if (bytes_sent > 0) {
357 UpdateBudgetWithBytesSent(bytes_sent);
358 }
359 return bytes_sent;
360 }
361
UpdateBudgetWithElapsedTime(int64_t delta_time_ms)362 void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {
363 media_budget_->IncreaseBudget(delta_time_ms);
364 padding_budget_->IncreaseBudget(delta_time_ms);
365 }
366
UpdateBudgetWithBytesSent(size_t bytes_sent)367 void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) {
368 media_budget_->UseBudget(bytes_sent);
369 padding_budget_->UseBudget(bytes_sent);
370 }
371
SetPacingFactor(float pacing_factor)372 void PacedSender::SetPacingFactor(float pacing_factor) {
373 rtc::CritScope cs(&critsect_);
374 pacing_factor_ = pacing_factor;
375 // Make sure new padding factor is applied immediately, otherwise we need to
376 // wait for the send bitrate estimate to be updated before this takes effect.
377 SetEstimatedBitrate(estimated_bitrate_bps_);
378 }
379
GetPacingFactor() const380 float PacedSender::GetPacingFactor() const {
381 rtc::CritScope cs(&critsect_);
382 return pacing_factor_;
383 }
384
SetQueueTimeLimit(int limit_ms)385 void PacedSender::SetQueueTimeLimit(int limit_ms) {
386 rtc::CritScope cs(&critsect_);
387 queue_time_limit = limit_ms;
388 }
389
390 } // namespace webrtc
391