1 /*
2  *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #ifndef MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
12 #define MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
13 
14 #include <stddef.h>
15 #include <stdint.h>
16 
17 #include <list>
18 #include <map>
19 #include <memory>
20 #include <queue>
21 #include <set>
22 
23 #include "absl/types/optional.h"
24 #include "api/transport/webrtc_key_value_config.h"
25 #include "api/units/data_size.h"
26 #include "api/units/time_delta.h"
27 #include "api/units/timestamp.h"
28 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
29 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
30 #include "system_wrappers/include/clock.h"
31 
32 namespace webrtc {
33 
34 class RoundRobinPacketQueue {
35  public:
36   RoundRobinPacketQueue(Timestamp start_time,
37                         const WebRtcKeyValueConfig* field_trials);
38   ~RoundRobinPacketQueue();
39 
40   void Push(int priority,
41             Timestamp enqueue_time,
42             uint64_t enqueue_order,
43             std::unique_ptr<RtpPacketToSend> packet);
44   std::unique_ptr<RtpPacketToSend> Pop();
45 
46   bool Empty() const;
47   size_t SizeInPackets() const;
48   DataSize Size() const;
49   // If the next packet, that would be returned by Pop() if called
50   // now, is an audio packet this method returns the enqueue time
51   // of that packet. If queue is empty or top packet is not audio,
52   // returns nullopt.
53   absl::optional<Timestamp> LeadingAudioPacketEnqueueTime() const;
54 
55   Timestamp OldestEnqueueTime() const;
56   TimeDelta AverageQueueTime() const;
57   void UpdateQueueTime(Timestamp now);
58   void SetPauseState(bool paused, Timestamp now);
59   void SetIncludeOverhead();
60   void SetTransportOverhead(DataSize overhead_per_packet);
61 
62  private:
63   struct QueuedPacket {
64    public:
65     QueuedPacket(int priority,
66                  Timestamp enqueue_time,
67                  uint64_t enqueue_order,
68                  std::multiset<Timestamp>::iterator enqueue_time_it,
69                  std::unique_ptr<RtpPacketToSend> packet);
70     QueuedPacket(const QueuedPacket& rhs);
71     ~QueuedPacket();
72 
73     bool operator<(const QueuedPacket& other) const;
74 
75     int Priority() const;
76     RtpPacketMediaType Type() const;
77     uint32_t Ssrc() const;
78     Timestamp EnqueueTime() const;
79     bool IsRetransmission() const;
80     uint64_t EnqueueOrder() const;
81     RtpPacketToSend* RtpPacket() const;
82 
83     std::multiset<Timestamp>::iterator EnqueueTimeIterator() const;
84     void UpdateEnqueueTimeIterator(std::multiset<Timestamp>::iterator it);
85     void SubtractPauseTime(TimeDelta pause_time_sum);
86 
87    private:
88     int priority_;
89     Timestamp enqueue_time_;  // Absolute time of pacer queue entry.
90     uint64_t enqueue_order_;
91     bool is_retransmission_;  // Cached for performance.
92     std::multiset<Timestamp>::iterator enqueue_time_it_;
93     // Raw pointer since priority_queue doesn't allow for moving
94     // out of the container.
95     RtpPacketToSend* owned_packet_;
96   };
97 
98   class PriorityPacketQueue : public std::priority_queue<QueuedPacket> {
99    public:
100     using const_iterator = container_type::const_iterator;
101     const_iterator begin() const;
102     const_iterator end() const;
103   };
104 
105   struct StreamPrioKey {
StreamPrioKeyStreamPrioKey106     StreamPrioKey(int priority, DataSize size)
107         : priority(priority), size(size) {}
108 
109     bool operator<(const StreamPrioKey& other) const {
110       if (priority != other.priority)
111         return priority < other.priority;
112       return size < other.size;
113     }
114 
115     const int priority;
116     const DataSize size;
117   };
118 
119   struct Stream {
120     Stream();
121     Stream(const Stream&);
122 
123     virtual ~Stream();
124 
125     DataSize size;
126     uint32_t ssrc;
127 
128     PriorityPacketQueue packet_queue;
129 
130     // Whenever a packet is inserted for this stream we check if |priority_it|
131     // points to an element in |stream_priorities_|, and if it does it means
132     // this stream has already been scheduled, and if the scheduled priority is
133     // lower than the priority of the incoming packet we reschedule this stream
134     // with the higher priority.
135     std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
136   };
137 
138   void Push(QueuedPacket packet);
139 
140   DataSize PacketSize(const QueuedPacket& packet) const;
141   void MaybePromoteSinglePacketToNormalQueue();
142 
143   Stream* GetHighestPriorityStream();
144 
145   // Just used to verify correctness.
146   bool IsSsrcScheduled(uint32_t ssrc) const;
147 
148   DataSize transport_overhead_per_packet_;
149 
150   Timestamp time_last_updated_;
151 
152   bool paused_;
153   size_t size_packets_;
154   DataSize size_;
155   DataSize max_size_;
156   TimeDelta queue_time_sum_;
157   TimeDelta pause_time_sum_;
158 
159   // A map of streams used to prioritize from which stream to send next. We use
160   // a multimap instead of a priority_queue since the priority of a stream can
161   // change as a new packet is inserted, and a multimap allows us to remove and
162   // then reinsert a StreamPrioKey if the priority has increased.
163   std::multimap<StreamPrioKey, uint32_t> stream_priorities_;
164 
165   // A map of SSRCs to Streams.
166   std::map<uint32_t, Stream> streams_;
167 
168   // The enqueue time of every packet currently in the queue. Used to figure out
169   // the age of the oldest packet in the queue.
170   std::multiset<Timestamp> enqueue_times_;
171 
172   absl::optional<QueuedPacket> single_packet_queue_;
173 
174   bool include_overhead_;
175 };
176 }  // namespace webrtc
177 
178 #endif  // MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
179