1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/spdy/spdy_write_queue.h"
6 
7 #include <cstddef>
8 #include <utility>
9 #include <vector>
10 
11 #include "base/check_op.h"
12 #include "base/containers/circular_deque.h"
13 #include "base/trace_event/memory_usage_estimator.h"
14 #include "net/spdy/spdy_buffer.h"
15 #include "net/spdy/spdy_buffer_producer.h"
16 #include "net/spdy/spdy_stream.h"
17 
18 namespace net {
19 
IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type)20 bool IsSpdyFrameTypeWriteCapped(spdy::SpdyFrameType frame_type) {
21   return frame_type == spdy::SpdyFrameType::RST_STREAM ||
22          frame_type == spdy::SpdyFrameType::SETTINGS ||
23          frame_type == spdy::SpdyFrameType::WINDOW_UPDATE ||
24          frame_type == spdy::SpdyFrameType::PING ||
25          frame_type == spdy::SpdyFrameType::GOAWAY;
26 }
27 
28 SpdyWriteQueue::PendingWrite::PendingWrite() = default;
29 
PendingWrite(spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const MutableNetworkTrafficAnnotationTag & traffic_annotation)30 SpdyWriteQueue::PendingWrite::PendingWrite(
31     spdy::SpdyFrameType frame_type,
32     std::unique_ptr<SpdyBufferProducer> frame_producer,
33     const base::WeakPtr<SpdyStream>& stream,
34     const MutableNetworkTrafficAnnotationTag& traffic_annotation)
35     : frame_type(frame_type),
36       frame_producer(std::move(frame_producer)),
37       stream(stream),
38       traffic_annotation(traffic_annotation),
39       has_stream(stream.get() != nullptr) {}
40 
41 SpdyWriteQueue::PendingWrite::~PendingWrite() = default;
42 
43 SpdyWriteQueue::PendingWrite::PendingWrite(PendingWrite&& other) = default;
44 SpdyWriteQueue::PendingWrite& SpdyWriteQueue::PendingWrite::operator=(
45     PendingWrite&& other) = default;
46 
EstimateMemoryUsage() const47 size_t SpdyWriteQueue::PendingWrite::EstimateMemoryUsage() const {
48   return base::trace_event::EstimateMemoryUsage(frame_producer);
49 }
50 
SpdyWriteQueue()51 SpdyWriteQueue::SpdyWriteQueue() : removing_writes_(false) {}
52 
~SpdyWriteQueue()53 SpdyWriteQueue::~SpdyWriteQueue() {
54   DCHECK_GE(num_queued_capped_frames_, 0);
55   Clear();
56 }
57 
IsEmpty() const58 bool SpdyWriteQueue::IsEmpty() const {
59   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; i++) {
60     if (!queue_[i].empty())
61       return false;
62   }
63   return true;
64 }
65 
Enqueue(RequestPriority priority,spdy::SpdyFrameType frame_type,std::unique_ptr<SpdyBufferProducer> frame_producer,const base::WeakPtr<SpdyStream> & stream,const NetworkTrafficAnnotationTag & traffic_annotation)66 void SpdyWriteQueue::Enqueue(
67     RequestPriority priority,
68     spdy::SpdyFrameType frame_type,
69     std::unique_ptr<SpdyBufferProducer> frame_producer,
70     const base::WeakPtr<SpdyStream>& stream,
71     const NetworkTrafficAnnotationTag& traffic_annotation) {
72   CHECK(!removing_writes_);
73   CHECK_GE(priority, MINIMUM_PRIORITY);
74   CHECK_LE(priority, MAXIMUM_PRIORITY);
75   if (stream.get())
76     DCHECK_EQ(stream->priority(), priority);
77   queue_[priority].push_back(
78       {frame_type, std::move(frame_producer), stream,
79        MutableNetworkTrafficAnnotationTag(traffic_annotation)});
80   if (IsSpdyFrameTypeWriteCapped(frame_type)) {
81     DCHECK_GE(num_queued_capped_frames_, 0);
82     num_queued_capped_frames_++;
83   }
84 }
85 
Dequeue(spdy::SpdyFrameType * frame_type,std::unique_ptr<SpdyBufferProducer> * frame_producer,base::WeakPtr<SpdyStream> * stream,MutableNetworkTrafficAnnotationTag * traffic_annotation)86 bool SpdyWriteQueue::Dequeue(
87     spdy::SpdyFrameType* frame_type,
88     std::unique_ptr<SpdyBufferProducer>* frame_producer,
89     base::WeakPtr<SpdyStream>* stream,
90     MutableNetworkTrafficAnnotationTag* traffic_annotation) {
91   CHECK(!removing_writes_);
92   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
93     if (!queue_[i].empty()) {
94       PendingWrite pending_write = std::move(queue_[i].front());
95       queue_[i].pop_front();
96       *frame_type = pending_write.frame_type;
97       *frame_producer = std::move(pending_write.frame_producer);
98       *stream = pending_write.stream;
99       *traffic_annotation = pending_write.traffic_annotation;
100       if (pending_write.has_stream)
101         DCHECK(stream->get());
102       if (IsSpdyFrameTypeWriteCapped(*frame_type)) {
103         num_queued_capped_frames_--;
104         DCHECK_GE(num_queued_capped_frames_, 0);
105       }
106       return true;
107     }
108   }
109   return false;
110 }
111 
RemovePendingWritesForStream(SpdyStream * stream)112 void SpdyWriteQueue::RemovePendingWritesForStream(SpdyStream* stream) {
113   CHECK(!removing_writes_);
114   removing_writes_ = true;
115   RequestPriority priority = stream->priority();
116   CHECK_GE(priority, MINIMUM_PRIORITY);
117   CHECK_LE(priority, MAXIMUM_PRIORITY);
118 
119 #if DCHECK_IS_ON()
120   // |stream| should not have pending writes in a queue not matching
121   // its priority.
122   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
123     if (priority == i)
124       continue;
125     for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
126       DCHECK_NE(it->stream.get(), stream);
127   }
128 #endif
129 
130   // Defer deletion until queue iteration is complete, as
131   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
132   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
133   base::circular_deque<PendingWrite>& queue = queue_[priority];
134   for (auto it = queue.begin(); it != queue.end();) {
135     if (it->stream.get() == stream) {
136       if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
137         num_queued_capped_frames_--;
138         DCHECK_GE(num_queued_capped_frames_, 0);
139       }
140       erased_buffer_producers.push_back(std::move(it->frame_producer));
141       it = queue.erase(it);
142     } else {
143       ++it;
144     }
145   }
146   removing_writes_ = false;
147 
148   // Iteration on |queue| is completed.  Now |erased_buffer_producers| goes out
149   // of scope, SpdyBufferProducers are destroyed.
150 }
151 
RemovePendingWritesForStreamsAfter(spdy::SpdyStreamId last_good_stream_id)152 void SpdyWriteQueue::RemovePendingWritesForStreamsAfter(
153     spdy::SpdyStreamId last_good_stream_id) {
154   CHECK(!removing_writes_);
155   removing_writes_ = true;
156 
157   // Defer deletion until queue iteration is complete, as
158   // SpdyBuffer::~SpdyBuffer() can result in callbacks into SpdyWriteQueue.
159   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
160   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
161     base::circular_deque<PendingWrite>& queue = queue_[i];
162     for (auto it = queue.begin(); it != queue.end();) {
163       if (it->stream.get() && (it->stream->stream_id() > last_good_stream_id ||
164                                it->stream->stream_id() == 0)) {
165         if (IsSpdyFrameTypeWriteCapped(it->frame_type)) {
166           num_queued_capped_frames_--;
167           DCHECK_GE(num_queued_capped_frames_, 0);
168         }
169         erased_buffer_producers.push_back(std::move(it->frame_producer));
170         it = queue.erase(it);
171       } else {
172         ++it;
173       }
174     }
175   }
176   removing_writes_ = false;
177 
178   // Iteration on each |queue| is completed.  Now |erased_buffer_producers| goes
179   // out of scope, SpdyBufferProducers are destroyed.
180 }
181 
ChangePriorityOfWritesForStream(SpdyStream * stream,RequestPriority old_priority,RequestPriority new_priority)182 void SpdyWriteQueue::ChangePriorityOfWritesForStream(
183     SpdyStream* stream,
184     RequestPriority old_priority,
185     RequestPriority new_priority) {
186   CHECK(!removing_writes_);
187   DCHECK(stream);
188 
189 #if DCHECK_IS_ON()
190   // |stream| should not have pending writes in a queue not matching
191   // |old_priority|.
192   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
193     if (i == old_priority)
194       continue;
195     for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it)
196       DCHECK_NE(it->stream.get(), stream);
197   }
198 #endif
199 
200   base::circular_deque<PendingWrite>& old_queue = queue_[old_priority];
201   base::circular_deque<PendingWrite>& new_queue = queue_[new_priority];
202   for (auto it = old_queue.begin(); it != old_queue.end();) {
203     if (it->stream.get() == stream) {
204       new_queue.push_back(std::move(*it));
205       it = old_queue.erase(it);
206     } else {
207       ++it;
208     }
209   }
210 }
211 
Clear()212 void SpdyWriteQueue::Clear() {
213   CHECK(!removing_writes_);
214   removing_writes_ = true;
215   std::vector<std::unique_ptr<SpdyBufferProducer>> erased_buffer_producers;
216 
217   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
218     for (auto it = queue_[i].begin(); it != queue_[i].end(); ++it) {
219       erased_buffer_producers.push_back(std::move(it->frame_producer));
220     }
221     queue_[i].clear();
222   }
223   removing_writes_ = false;
224   num_queued_capped_frames_ = 0;
225 }
226 
EstimateMemoryUsage() const227 size_t SpdyWriteQueue::EstimateMemoryUsage() const {
228   return base::trace_event::EstimateMemoryUsage(queue_);
229 }
230 
231 }  // namespace net
232