1 /** @file
2 
3   Http2Stream.cc
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "Http2Stream.h"
25 
26 #include "HTTP2.h"
27 #include "Http2ClientSession.h"
28 #include "../http/HttpSM.h"
29 
30 #include <numeric>
31 
32 #define REMEMBER(e, r)                                    \
33   {                                                       \
34     this->_history.push_back(MakeSourceLocation(), e, r); \
35   }
36 
37 #define Http2StreamDebug(fmt, ...) \
38   SsnDebug(_proxy_ssn, "http2_stream", "[%" PRId64 "] [%u] " fmt, _proxy_ssn->connection_id(), this->get_id(), ##__VA_ARGS__);
39 
40 ClassAllocator<Http2Stream, true> http2StreamAllocator("http2StreamAllocator");
41 
Http2Stream(ProxySession * session,Http2StreamId sid,ssize_t initial_rwnd)42 Http2Stream::Http2Stream(ProxySession *session, Http2StreamId sid, ssize_t initial_rwnd)
43   : super(session), _id(sid), _client_rwnd(initial_rwnd)
44 {
45   SET_HANDLER(&Http2Stream::main_event_handler);
46 
47   this->mark_milestone(Http2StreamMilestone::OPEN);
48 
49   this->_sm          = nullptr;
50   this->_id          = sid;
51   this->_thread      = this_ethread();
52   this->_client_rwnd = initial_rwnd;
53   this->_server_rwnd = Http2::initial_window_size;
54 
55   this->_reader = this->_request_buffer.alloc_reader();
56 
57   _req_header.create(HTTP_TYPE_REQUEST);
58   response_header.create(HTTP_TYPE_RESPONSE);
59   // TODO: init _req_header instead of response_header if this Http2Stream is outgoing
60   http2_init_pseudo_headers(response_header);
61 
62   http_parser_init(&http_parser);
63 }
64 
~Http2Stream()65 Http2Stream::~Http2Stream()
66 {
67   REMEMBER(NO_EVENT, this->reentrancy_count);
68   Http2StreamDebug("Destroy stream, sent %" PRIu64 " bytes", this->bytes_sent);
69   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
70   // Clean up after yourself if this was an EOS
71   ink_release_assert(this->closed);
72   ink_release_assert(reentrancy_count == 0);
73 
74   uint64_t cid = 0;
75 
76   // Safe to initiate SSN_CLOSE if this is the last stream
77   if (_proxy_ssn) {
78     cid = _proxy_ssn->connection_id();
79 
80     Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(_proxy_ssn);
81     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
82     // Make sure the stream is removed from the stream list and priority tree
83     // In many cases, this has been called earlier, so this call is a no-op
84     h2_proxy_ssn->connection_state.delete_stream(this);
85 
86     h2_proxy_ssn->connection_state.decrement_stream_count();
87 
88     // Update session's stream counts, so it accurately goes into keep-alive state
89     h2_proxy_ssn->connection_state.release_stream();
90 
91     // Do not access `_proxy_ssn` in below. It might be freed by `release_stream`.
92   }
93 
94   // Clean up the write VIO in case of inactivity timeout
95   this->do_io_write(nullptr, 0, nullptr);
96 
97   this->_milestones.mark(Http2StreamMilestone::CLOSE);
98 
99   ink_hrtime total_time = this->_milestones.elapsed(Http2StreamMilestone::OPEN, Http2StreamMilestone::CLOSE);
100   HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, this->_thread, total_time);
101 
102   // Slow Log
103   if (Http2::stream_slow_log_threshold != 0 && ink_hrtime_from_msec(Http2::stream_slow_log_threshold) < total_time) {
104     Error("[%" PRIu64 "] [%" PRIu32 "] [%" PRId64 "] Slow H2 Stream: "
105           "open: %" PRIu64 " "
106           "dec_hdrs: %.3f "
107           "txn: %.3f "
108           "enc_hdrs: %.3f "
109           "tx_hdrs: %.3f "
110           "tx_data: %.3f "
111           "close: %.3f",
112           cid, static_cast<uint32_t>(this->_id), this->_http_sm_id,
113           ink_hrtime_to_msec(this->_milestones[Http2StreamMilestone::OPEN]),
114           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_DECODE_HEADERS),
115           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TXN),
116           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_ENCODE_HEADERS),
117           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TX_HEADERS_FRAMES),
118           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TX_DATA_FRAMES),
119           this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::CLOSE));
120   }
121 
122   _req_header.destroy();
123   response_header.destroy();
124 
125   // Drop references to all buffer data
126   this->_request_buffer.clear();
127 
128   // Free the mutexes in the VIO
129   read_vio.mutex.clear();
130   write_vio.mutex.clear();
131 
132   if (header_blocks) {
133     ats_free(header_blocks);
134   }
135   _clear_timers();
136   clear_io_events();
137   http_parser_clear(&http_parser);
138 }
139 
140 int
main_event_handler(int event,void * edata)141 Http2Stream::main_event_handler(int event, void *edata)
142 {
143   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
144   REMEMBER(event, this->reentrancy_count);
145 
146   if (!this->_switch_thread_if_not_on_right_thread(event, edata)) {
147     // Not on the right thread
148     return 0;
149   }
150   ink_release_assert(this->_thread == this_ethread());
151 
152   Event *e = static_cast<Event *>(edata);
153   reentrancy_count++;
154   if (e == _read_vio_event) {
155     _read_vio_event = nullptr;
156     this->signal_read_event(e->callback_event);
157     reentrancy_count--;
158     return 0;
159   } else if (e == _write_vio_event) {
160     _write_vio_event = nullptr;
161     this->signal_write_event(e->callback_event);
162     reentrancy_count--;
163     return 0;
164   } else if (e == cross_thread_event) {
165     cross_thread_event = nullptr;
166   } else if (e == read_event) {
167     read_event = nullptr;
168   } else if (e == write_event) {
169     write_event = nullptr;
170   } else if (e == buffer_full_write_event) {
171     buffer_full_write_event = nullptr;
172   }
173 
174   switch (event) {
175   case VC_EVENT_ACTIVE_TIMEOUT:
176   case VC_EVENT_INACTIVITY_TIMEOUT:
177     if (_sm && read_vio.ntodo() > 0) {
178       this->signal_read_event(event);
179     } else if (_sm && write_vio.ntodo() > 0) {
180       this->signal_write_event(event);
181     }
182     break;
183   case VC_EVENT_WRITE_READY:
184   case VC_EVENT_WRITE_COMPLETE:
185     _timeout.update_inactivity();
186     if (e->cookie == &write_vio) {
187       if (write_vio.mutex && write_vio.cont && this->_sm) {
188         this->signal_write_event(event);
189       }
190     } else {
191       update_write_request(true);
192     }
193     break;
194   case VC_EVENT_READ_COMPLETE:
195   case VC_EVENT_READ_READY:
196     _timeout.update_inactivity();
197     if (e->cookie == &read_vio) {
198       if (read_vio.mutex && read_vio.cont && this->_sm) {
199         signal_read_event(event);
200       }
201     } else {
202       this->update_read_request(true);
203     }
204     break;
205   case VC_EVENT_EOS:
206     if (e->cookie == &read_vio) {
207       SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
208       read_vio.cont->handleEvent(VC_EVENT_EOS, &read_vio);
209     } else if (e->cookie == &write_vio) {
210       SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
211       write_vio.cont->handleEvent(VC_EVENT_EOS, &write_vio);
212     }
213     break;
214   }
215   reentrancy_count--;
216   // Clean stream up if the terminate flag is set and we are at the bottom of the handler stack
217   terminate_if_possible();
218 
219   return 0;
220 }
221 
222 Http2ErrorCode
decode_header_blocks(HpackHandle & hpack_handle,uint32_t maximum_table_size)223 Http2Stream::decode_header_blocks(HpackHandle &hpack_handle, uint32_t maximum_table_size)
224 {
225   return http2_decode_header_blocks(&_req_header, (const uint8_t *)header_blocks, header_blocks_length, nullptr, hpack_handle,
226                                     trailing_header, maximum_table_size);
227 }
228 
229 void
send_request(Http2ConnectionState & cstate)230 Http2Stream::send_request(Http2ConnectionState &cstate)
231 {
232   ink_release_assert(this->_sm != nullptr);
233   this->_http_sm_id = this->_sm->sm_id;
234 
235   // Convert header to HTTP/1.1 format
236   http2_convert_header_from_2_to_1_1(&_req_header);
237 
238   // Write header to a buffer.  Borrowing logic from HttpSM::write_header_into_buffer.
239   // Seems like a function like this ought to be in HTTPHdr directly
240   int bufindex;
241   int dumpoffset = 0;
242   int done, tmp;
243   do {
244     bufindex             = 0;
245     tmp                  = dumpoffset;
246     IOBufferBlock *block = this->_request_buffer.get_current_block();
247     if (!block) {
248       this->_request_buffer.add_block();
249       block = this->_request_buffer.get_current_block();
250     }
251     done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp);
252     dumpoffset += bufindex;
253     this->_request_buffer.fill(bufindex);
254     if (!done) {
255       this->_request_buffer.add_block();
256     }
257   } while (!done);
258 
259   if (bufindex == 0) {
260     // No data to signal read event
261     return;
262   }
263 
264   // Is the _sm ready to process the header?
265   if (this->read_vio.nbytes > 0) {
266     if (this->recv_end_stream) {
267       this->read_vio.nbytes = bufindex;
268       this->signal_read_event(VC_EVENT_READ_COMPLETE);
269     } else {
270       // End of header but not end of stream, must have some body frames coming
271       this->has_body = true;
272       this->signal_read_event(VC_EVENT_READ_READY);
273     }
274   }
275 }
276 
277 bool
change_state(uint8_t type,uint8_t flags)278 Http2Stream::change_state(uint8_t type, uint8_t flags)
279 {
280   switch (_state) {
281   case Http2StreamState::HTTP2_STREAM_STATE_IDLE:
282     if (type == HTTP2_FRAME_TYPE_HEADERS) {
283       if (recv_end_stream) {
284         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
285       } else if (send_end_stream) {
286         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
287       } else {
288         _state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
289       }
290     } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
291       if (recv_end_stream) {
292         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
293       } else if (send_end_stream) {
294         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
295       } else {
296         _state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
297       }
298     } else if (type == HTTP2_FRAME_TYPE_PUSH_PROMISE) {
299       _state = Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL;
300     } else {
301       return false;
302     }
303     break;
304 
305   case Http2StreamState::HTTP2_STREAM_STATE_OPEN:
306     if (type == HTTP2_FRAME_TYPE_RST_STREAM) {
307       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
308     } else if (type == HTTP2_FRAME_TYPE_DATA) {
309       if (recv_end_stream) {
310         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
311       } else if (send_end_stream) {
312         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
313       } else {
314         // Do not change state
315       }
316     } else {
317       // A stream in the "open" state may be used by both peers to send frames of any type.
318       return true;
319     }
320     break;
321 
322   case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL:
323     if (type == HTTP2_FRAME_TYPE_HEADERS) {
324       if (flags & HTTP2_FLAGS_HEADERS_END_HEADERS) {
325         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
326       }
327     } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
328       if (flags & HTTP2_FLAGS_CONTINUATION_END_HEADERS) {
329         _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
330       }
331     } else {
332       return false;
333     }
334     break;
335 
336   case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_REMOTE:
337     // Currently ATS supports only HTTP/2 server features
338     return false;
339 
340   case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL:
341     if (type == HTTP2_FRAME_TYPE_RST_STREAM || recv_end_stream) {
342       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
343     } else {
344       // Error, set state closed
345       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
346       return false;
347     }
348     break;
349 
350   case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE:
351     if (type == HTTP2_FRAME_TYPE_RST_STREAM || send_end_stream) {
352       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
353     } else if (type == HTTP2_FRAME_TYPE_HEADERS) { // w/o END_STREAM flag
354       // No state change here. Expect a following DATA frame with END_STREAM flag.
355       return true;
356     } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) { // w/o END_STREAM flag
357       // No state change here. Expect a following DATA frame with END_STREAM flag.
358       return true;
359     } else {
360       // Error, set state closed
361       _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
362       return false;
363     }
364     break;
365 
366   case Http2StreamState::HTTP2_STREAM_STATE_CLOSED:
367     // No state changing
368     return true;
369 
370   default:
371     return false;
372   }
373 
374   Http2StreamDebug("%s", Http2DebugNames::get_state_name(_state));
375 
376   return true;
377 }
378 
379 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)380 Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
381 {
382   if (buf) {
383     read_vio.buffer.writer_for(buf);
384   } else {
385     read_vio.buffer.clear();
386   }
387 
388   read_vio.mutex     = c ? c->mutex : this->mutex;
389   read_vio.cont      = c;
390   read_vio.nbytes    = nbytes;
391   read_vio.ndone     = 0;
392   read_vio.vc_server = this;
393   read_vio.op        = VIO::READ;
394 
395   // TODO: re-enable read_vio
396 
397   return &read_vio;
398 }
399 
400 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuffer,bool owner)401 Http2Stream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
402 {
403   if (abuffer) {
404     write_vio.buffer.reader_for(abuffer);
405   } else {
406     write_vio.buffer.clear();
407   }
408   write_vio.mutex     = c ? c->mutex : this->mutex;
409   write_vio.cont      = c;
410   write_vio.nbytes    = nbytes;
411   write_vio.ndone     = 0;
412   write_vio.vc_server = this;
413   write_vio.op        = VIO::WRITE;
414 
415   if (c != nullptr && nbytes > 0 && this->is_client_state_writeable()) {
416     update_write_request(false);
417   } else if (!this->is_client_state_writeable()) {
418     // Cannot start a write on a closed stream
419     return nullptr;
420   }
421   return &write_vio;
422 }
423 
424 // Initiated from SM
425 void
do_io_close(int)426 Http2Stream::do_io_close(int /* flags */)
427 {
428   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
429 
430   if (!closed) {
431     REMEMBER(NO_EVENT, this->reentrancy_count);
432     Http2StreamDebug("do_io_close");
433 
434     // When we get here, the SM has initiated the shutdown.  Either it received a WRITE_COMPLETE, or it is shutting down.  Any
435     // remaining IO operations back to client should be abandoned.  The SM-side buffers backing these operations will be deleted
436     // by the time this is called from transaction_done.
437     closed = true;
438 
439     if (_proxy_ssn && this->is_client_state_writeable()) {
440       // Make sure any trailing end of stream frames are sent
441       // We will be removed at send_data_frames or closing connection phase
442       Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
443       SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
444       h2_proxy_ssn->connection_state.send_data_frames(this);
445     }
446 
447     _clear_timers();
448     clear_io_events();
449 
450     // Wait until transaction_done is called from HttpSM to signal that the TXN_CLOSE hook has been executed
451   }
452 }
453 
454 /*
455  *  HttpSM has called TXN_close hooks.
456  */
457 void
transaction_done()458 Http2Stream::transaction_done()
459 {
460   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
461   super::transaction_done();
462   if (cross_thread_event) {
463     cross_thread_event->cancel();
464     cross_thread_event = nullptr;
465   }
466 
467   if (!closed) {
468     do_io_close(); // Make sure we've been closed.  If we didn't close the _proxy_ssn session better still be open
469   }
470   ink_release_assert(closed || !static_cast<Http2ClientSession *>(_proxy_ssn)->connection_state.is_state_closed());
471   _sm = nullptr;
472 
473   if (closed) {
474     // Safe to initiate SSN_CLOSE if this is the last stream
475     ink_assert(cross_thread_event == nullptr);
476     // Schedule the destroy to occur after we unwind here.  IF we call directly, may delete with reference on the stack.
477     terminate_stream = true;
478     terminate_if_possible();
479   }
480 }
481 
482 void
terminate_if_possible()483 Http2Stream::terminate_if_possible()
484 {
485   if (terminate_stream && reentrancy_count == 0) {
486     REMEMBER(NO_EVENT, this->reentrancy_count);
487 
488     Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
489     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
490     THREAD_FREE(this, http2StreamAllocator, this_ethread());
491   }
492 }
493 
494 // Initiated from the Http2 side
495 void
initiating_close()496 Http2Stream::initiating_close()
497 {
498   if (!closed) {
499     SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
500     REMEMBER(NO_EVENT, this->reentrancy_count);
501     Http2StreamDebug("initiating_close");
502 
503     // Set the state of the connection to closed
504     // TODO - these states should be combined
505     closed = true;
506     _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
507 
508     // leaving the reference to the SM, so we can detach from the SM when we actually destroy
509     // _sm = NULL;
510     // Leaving reference to client session as well, so we can signal once the
511     // TXN_CLOSE has been sent
512     // _proxy_ssn = NULL;
513 
514     _clear_timers();
515     clear_io_events();
516 
517     // This should result in do_io_close or release being called.  That will schedule the final
518     // kill yourself signal
519     // We are sending signals rather than calling the handlers directly to avoid the case where
520     // the HttpTunnel handler causes the HttpSM to be deleted on the stack.
521     bool sent_write_complete = false;
522     if (_sm) {
523       // Push out any last IO events
524       if (write_vio.cont) {
525         SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
526         // Are we done?
527         if (write_vio.nbytes > 0 && write_vio.nbytes == write_vio.ndone) {
528           Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_WRITE_COMPLETE);
529           write_event = send_tracked_event(write_event, VC_EVENT_WRITE_COMPLETE, &write_vio);
530         } else {
531           write_event = send_tracked_event(write_event, VC_EVENT_EOS, &write_vio);
532           Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_EOS);
533         }
534         sent_write_complete = true;
535       }
536     }
537     // Send EOS to let SM know that we aren't sticking around
538     if (_sm && read_vio.cont) {
539       // Only bother with the EOS if we haven't sent the write complete
540       if (!sent_write_complete) {
541         SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
542         Http2StreamDebug("send EOS to read cont");
543         read_event = send_tracked_event(read_event, VC_EVENT_EOS, &read_vio);
544       }
545     } else if (!sent_write_complete) {
546       // Transaction is already gone or not started. Kill yourself
547       do_io_close();
548       terminate_stream = true;
549       terminate_if_possible();
550     }
551   }
552 }
553 
554 /* Replace existing event only if the new event is different than the inprogress event */
555 Event *
send_tracked_event(Event * event,int send_event,VIO * vio)556 Http2Stream::send_tracked_event(Event *event, int send_event, VIO *vio)
557 {
558   if (event != nullptr) {
559     if (event->callback_event != send_event) {
560       event->cancel();
561       event = nullptr;
562     }
563   }
564 
565   if (event == nullptr) {
566     REMEMBER(send_event, this->reentrancy_count);
567     event = this_ethread()->schedule_imm(this, send_event, vio);
568   }
569 
570   return event;
571 }
572 
573 void
update_read_request(bool call_update)574 Http2Stream::update_read_request(bool call_update)
575 {
576   if (closed || _proxy_ssn == nullptr || _sm == nullptr || read_vio.mutex == nullptr) {
577     return;
578   }
579 
580   if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_READ_READY, nullptr)) {
581     // Not on the right thread
582     return;
583   }
584   ink_release_assert(this->_thread == this_ethread());
585 
586   SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
587   if (read_vio.nbytes == 0) {
588     return;
589   }
590 
591   // Try to be smart and only signal if there was additional data
592   int send_event = VC_EVENT_READ_READY;
593   if (read_vio.ntodo() == 0 || (this->recv_end_stream && this->read_vio.nbytes != INT64_MAX)) {
594     send_event = VC_EVENT_READ_COMPLETE;
595   }
596 
597   int64_t read_avail = this->read_vio.buffer.writer()->max_read_avail();
598   if (read_avail > 0 || send_event == VC_EVENT_READ_COMPLETE) {
599     if (call_update) { // Safe to call vio handler directly
600       _timeout.update_inactivity();
601       if (read_vio.cont && this->_sm) {
602         read_vio.cont->handleEvent(send_event, &read_vio);
603       }
604     } else { // Called from do_io_read.  Still setting things up.  Send event
605       // to handle this after the dust settles
606       read_event = send_tracked_event(read_event, send_event, &read_vio);
607     }
608   }
609 }
610 
611 void
restart_sending()612 Http2Stream::restart_sending()
613 {
614   if (!this->response_header_done) {
615     return;
616   }
617 
618   IOBufferReader *reader = this->response_get_data_reader();
619   if (reader && !reader->is_read_avail_more_than(0)) {
620     return;
621   }
622 
623   if (this->write_vio.mutex && this->write_vio.ntodo() == 0) {
624     return;
625   }
626 
627   this->send_response_body(true);
628 }
629 
630 void
update_write_request(bool call_update)631 Http2Stream::update_write_request(bool call_update)
632 {
633   if (!this->is_client_state_writeable() || closed || _proxy_ssn == nullptr || write_vio.mutex == nullptr ||
634       write_vio.get_reader() == nullptr) {
635     return;
636   }
637 
638   if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_WRITE_READY, nullptr)) {
639     // Not on the right thread
640     return;
641   }
642   ink_release_assert(this->_thread == this_ethread());
643 
644   Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
645 
646   SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
647 
648   IOBufferReader *vio_reader = write_vio.get_reader();
649   if (write_vio.ntodo() == 0 || !vio_reader->is_read_avail_more_than(0)) {
650     return;
651   }
652 
653   // Process the new data
654   if (!this->response_header_done) {
655     // Still parsing the response_header
656     int bytes_used = 0;
657     int state      = this->response_header.parse_resp(&http_parser, vio_reader, &bytes_used, false);
658     // HTTPHdr::parse_resp() consumed the vio_reader in above (consumed size is `bytes_used`)
659     write_vio.ndone += bytes_used;
660 
661     switch (state) {
662     case PARSE_RESULT_DONE: {
663       this->response_header_done = true;
664 
665       // Schedule session shutdown if response header has "Connection: close"
666       MIMEField *field = this->response_header.field_find(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION);
667       if (field) {
668         int len;
669         const char *value = field->value_get(&len);
670         if (memcmp(HTTP_VALUE_CLOSE, value, HTTP_LEN_CLOSE) == 0) {
671           SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
672           if (h2_proxy_ssn->connection_state.get_shutdown_state() == HTTP2_SHUTDOWN_NONE) {
673             h2_proxy_ssn->connection_state.set_shutdown_state(HTTP2_SHUTDOWN_NOT_INITIATED, Http2ErrorCode::HTTP2_ERROR_NO_ERROR);
674           }
675         }
676       }
677 
678       {
679         SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
680         // Send the response header back
681         h2_proxy_ssn->connection_state.send_headers_frame(this);
682       }
683 
684       // Roll back states of response header to read final response
685       if (this->response_header.expect_final_response()) {
686         this->response_header_done = false;
687         response_header.destroy();
688         response_header.create(HTTP_TYPE_RESPONSE);
689         http2_init_pseudo_headers(response_header);
690         http_parser_clear(&http_parser);
691         http_parser_init(&http_parser);
692       }
693 
694       this->signal_write_event(call_update);
695 
696       if (vio_reader->is_read_avail_more_than(0)) {
697         this->_milestones.mark(Http2StreamMilestone::START_TX_DATA_FRAMES);
698         this->send_response_body(call_update);
699       }
700       break;
701     }
702     case PARSE_RESULT_CONT:
703       // Let it ride for next time
704       break;
705     default:
706       break;
707     }
708   } else {
709     this->_milestones.mark(Http2StreamMilestone::START_TX_DATA_FRAMES);
710     this->send_response_body(call_update);
711   }
712 
713   return;
714 }
715 
716 void
signal_read_event(int event)717 Http2Stream::signal_read_event(int event)
718 {
719   if (this->read_vio.cont == nullptr || this->read_vio.cont->mutex == nullptr || this->read_vio.op == VIO::NONE) {
720     return;
721   }
722 
723   MUTEX_TRY_LOCK(lock, read_vio.cont->mutex, this_ethread());
724   if (lock.is_locked()) {
725     _timeout.update_inactivity();
726     this->read_vio.cont->handleEvent(event, &this->read_vio);
727   } else {
728     if (this->_read_vio_event) {
729       this->_read_vio_event->cancel();
730     }
731     this->_read_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &read_vio);
732   }
733 }
734 
735 void
signal_write_event(int event)736 Http2Stream::signal_write_event(int event)
737 {
738   // Don't signal a write event if in fact nothing was written
739   if (this->write_vio.cont == nullptr || this->write_vio.cont->mutex == nullptr || this->write_vio.op == VIO::NONE ||
740       this->write_vio.nbytes == 0) {
741     return;
742   }
743 
744   MUTEX_TRY_LOCK(lock, write_vio.cont->mutex, this_ethread());
745   if (lock.is_locked()) {
746     _timeout.update_inactivity();
747     this->write_vio.cont->handleEvent(event, &this->write_vio);
748   } else {
749     if (this->_write_vio_event) {
750       this->_write_vio_event->cancel();
751     }
752     this->_write_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &write_vio);
753   }
754 }
755 
756 void
signal_write_event(bool call_update)757 Http2Stream::signal_write_event(bool call_update)
758 {
759   if (this->write_vio.cont == nullptr || this->write_vio.op == VIO::NONE) {
760     return;
761   }
762 
763   if (this->write_vio.get_writer()->write_avail() == 0) {
764     return;
765   }
766 
767   int send_event = this->write_vio.ntodo() == 0 ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY;
768 
769   if (call_update) {
770     // Coming from reenable.  Safe to call the handler directly
771     if (write_vio.cont && this->_sm) {
772       write_vio.cont->handleEvent(send_event, &write_vio);
773     }
774   } else {
775     // Called from do_io_write. Might still be setting up state. Send an event to let the dust settle
776     write_event = send_tracked_event(write_event, send_event, &write_vio);
777   }
778 }
779 
780 bool
push_promise(URL & url,const MIMEField * accept_encoding)781 Http2Stream::push_promise(URL &url, const MIMEField *accept_encoding)
782 {
783   Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
784   SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
785   return h2_proxy_ssn->connection_state.send_push_promise_frame(this, url, accept_encoding);
786 }
787 
788 void
send_response_body(bool call_update)789 Http2Stream::send_response_body(bool call_update)
790 {
791   Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
792   _timeout.update_inactivity();
793 
794   if (Http2::stream_priority_enabled) {
795     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
796     h2_proxy_ssn->connection_state.schedule_stream(this);
797     // signal_write_event() will be called from `Http2ConnectionState::send_data_frames_depends_on_priority()`
798     // when write_vio is consumed
799   } else {
800     SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
801     h2_proxy_ssn->connection_state.send_data_frames(this);
802     this->signal_write_event(call_update);
803     // XXX The call to signal_write_event can destroy/free the Http2Stream.
804     // Don't modify the Http2Stream after calling this method.
805   }
806 }
807 
808 void
reenable(VIO * vio)809 Http2Stream::reenable(VIO *vio)
810 {
811   if (this->_proxy_ssn) {
812     if (vio->op == VIO::WRITE) {
813       SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
814       update_write_request(true);
815     } else if (vio->op == VIO::READ) {
816       Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
817       {
818         SCOPED_MUTEX_LOCK(ssn_lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
819         h2_proxy_ssn->connection_state.restart_receiving(this);
820       }
821 
822       SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
823       update_read_request(true);
824     }
825   }
826 }
827 
828 IOBufferReader *
response_get_data_reader() const829 Http2Stream::response_get_data_reader() const
830 {
831   return write_vio.get_reader();
832 }
833 
834 void
set_active_timeout(ink_hrtime timeout_in)835 Http2Stream::set_active_timeout(ink_hrtime timeout_in)
836 {
837   _timeout.set_active_timeout(timeout_in);
838 }
839 
840 void
set_inactivity_timeout(ink_hrtime timeout_in)841 Http2Stream::set_inactivity_timeout(ink_hrtime timeout_in)
842 {
843   _timeout.set_inactive_timeout(timeout_in);
844 }
845 
846 void
cancel_active_timeout()847 Http2Stream::cancel_active_timeout()
848 {
849   _timeout.cancel_active_timeout();
850 }
851 
852 void
cancel_inactivity_timeout()853 Http2Stream::cancel_inactivity_timeout()
854 {
855   _timeout.cancel_inactive_timeout();
856 }
857 
858 bool
is_active_timeout_expired(ink_hrtime now)859 Http2Stream::is_active_timeout_expired(ink_hrtime now)
860 {
861   return _timeout.is_active_timeout_expired(now);
862 }
863 
864 bool
is_inactive_timeout_expired(ink_hrtime now)865 Http2Stream::is_inactive_timeout_expired(ink_hrtime now)
866 {
867   return _timeout.is_inactive_timeout_expired(now);
868 }
869 
870 void
clear_io_events()871 Http2Stream::clear_io_events()
872 {
873   if (read_event) {
874     read_event->cancel();
875     read_event = nullptr;
876   }
877 
878   if (write_event) {
879     write_event->cancel();
880     write_event = nullptr;
881   }
882 
883   if (buffer_full_write_event) {
884     buffer_full_write_event->cancel();
885     buffer_full_write_event = nullptr;
886   }
887 
888   if (this->_read_vio_event) {
889     this->_read_vio_event->cancel();
890     this->_read_vio_event = nullptr;
891   }
892 
893   if (this->_write_vio_event) {
894     this->_write_vio_event->cancel();
895     this->_write_vio_event = nullptr;
896   }
897 }
898 
899 //  release and do_io_close are the same for the HTTP/2 protocol
900 void
release(IOBufferReader * r)901 Http2Stream::release(IOBufferReader *r)
902 {
903   this->do_io_close();
904 }
905 
906 void
increment_client_transactions_stat()907 Http2Stream::increment_client_transactions_stat()
908 {
909   HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
910   HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_CLIENT_STREAM_COUNT, _thread);
911 }
912 
913 void
decrement_client_transactions_stat()914 Http2Stream::decrement_client_transactions_stat()
915 {
916   HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
917 }
918 
919 ssize_t
client_rwnd() const920 Http2Stream::client_rwnd() const
921 {
922   return this->_client_rwnd;
923 }
924 
925 Http2ErrorCode
increment_client_rwnd(size_t amount)926 Http2Stream::increment_client_rwnd(size_t amount)
927 {
928   this->_client_rwnd += amount;
929 
930   this->_recent_rwnd_increment[this->_recent_rwnd_increment_index] = amount;
931   ++this->_recent_rwnd_increment_index;
932   this->_recent_rwnd_increment_index %= this->_recent_rwnd_increment.size();
933   double sum = std::accumulate(this->_recent_rwnd_increment.begin(), this->_recent_rwnd_increment.end(), 0.0);
934   double avg = sum / this->_recent_rwnd_increment.size();
935   if (avg < Http2::min_avg_window_update) {
936     return Http2ErrorCode::HTTP2_ERROR_ENHANCE_YOUR_CALM;
937   }
938   return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
939 }
940 
941 Http2ErrorCode
decrement_client_rwnd(size_t amount)942 Http2Stream::decrement_client_rwnd(size_t amount)
943 {
944   this->_client_rwnd -= amount;
945   if (this->_client_rwnd < 0) {
946     return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
947   } else {
948     return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
949   }
950 }
951 
952 ssize_t
server_rwnd() const953 Http2Stream::server_rwnd() const
954 {
955   return this->_server_rwnd;
956 }
957 
958 Http2ErrorCode
increment_server_rwnd(size_t amount)959 Http2Stream::increment_server_rwnd(size_t amount)
960 {
961   this->_server_rwnd += amount;
962   return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
963 }
964 
965 Http2ErrorCode
decrement_server_rwnd(size_t amount)966 Http2Stream::decrement_server_rwnd(size_t amount)
967 {
968   this->_server_rwnd -= amount;
969   if (this->_server_rwnd < 0) {
970     return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
971   } else {
972     return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
973   }
974 }
975 
976 bool
_switch_thread_if_not_on_right_thread(int event,void * edata)977 Http2Stream::_switch_thread_if_not_on_right_thread(int event, void *edata)
978 {
979   if (this->_thread != this_ethread()) {
980     SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
981     if (cross_thread_event == nullptr) {
982       // Send to the right thread
983       cross_thread_event = this->_thread->schedule_imm(this, event, edata);
984     }
985     return false;
986   }
987   return true;
988 }
989 
990 int
get_transaction_priority_weight() const991 Http2Stream::get_transaction_priority_weight() const
992 {
993   return priority_node ? priority_node->weight : 0;
994 }
995 
996 int
get_transaction_priority_dependence() const997 Http2Stream::get_transaction_priority_dependence() const
998 {
999   if (!priority_node) {
1000     return -1;
1001   } else {
1002     return priority_node->parent ? priority_node->parent->id : 0;
1003   }
1004 }
1005 
1006 int64_t
read_vio_read_avail()1007 Http2Stream::read_vio_read_avail()
1008 {
1009   MIOBuffer *writer = this->read_vio.get_writer();
1010   if (writer) {
1011     return writer->max_read_avail();
1012   }
1013 
1014   return 0;
1015 }
1016 
1017 bool
has_request_body(int64_t content_length,bool is_chunked_set) const1018 Http2Stream::has_request_body(int64_t content_length, bool is_chunked_set) const
1019 {
1020   return has_body;
1021 }
1022