1 /** @file
2
3 Http2Stream.cc
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "Http2Stream.h"
25
26 #include "HTTP2.h"
27 #include "Http2ClientSession.h"
28 #include "../http/HttpSM.h"
29
30 #include <numeric>
31
32 #define REMEMBER(e, r) \
33 { \
34 this->_history.push_back(MakeSourceLocation(), e, r); \
35 }
36
37 #define Http2StreamDebug(fmt, ...) \
38 SsnDebug(_proxy_ssn, "http2_stream", "[%" PRId64 "] [%u] " fmt, _proxy_ssn->connection_id(), this->get_id(), ##__VA_ARGS__);
39
40 ClassAllocator<Http2Stream, true> http2StreamAllocator("http2StreamAllocator");
41
Http2Stream(ProxySession * session,Http2StreamId sid,ssize_t initial_rwnd)42 Http2Stream::Http2Stream(ProxySession *session, Http2StreamId sid, ssize_t initial_rwnd)
43 : super(session), _id(sid), _client_rwnd(initial_rwnd)
44 {
45 SET_HANDLER(&Http2Stream::main_event_handler);
46
47 this->mark_milestone(Http2StreamMilestone::OPEN);
48
49 this->_sm = nullptr;
50 this->_id = sid;
51 this->_thread = this_ethread();
52 this->_client_rwnd = initial_rwnd;
53 this->_server_rwnd = Http2::initial_window_size;
54
55 this->_reader = this->_request_buffer.alloc_reader();
56
57 _req_header.create(HTTP_TYPE_REQUEST);
58 response_header.create(HTTP_TYPE_RESPONSE);
59 // TODO: init _req_header instead of response_header if this Http2Stream is outgoing
60 http2_init_pseudo_headers(response_header);
61
62 http_parser_init(&http_parser);
63 }
64
~Http2Stream()65 Http2Stream::~Http2Stream()
66 {
67 REMEMBER(NO_EVENT, this->reentrancy_count);
68 Http2StreamDebug("Destroy stream, sent %" PRIu64 " bytes", this->bytes_sent);
69 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
70 // Clean up after yourself if this was an EOS
71 ink_release_assert(this->closed);
72 ink_release_assert(reentrancy_count == 0);
73
74 uint64_t cid = 0;
75
76 // Safe to initiate SSN_CLOSE if this is the last stream
77 if (_proxy_ssn) {
78 cid = _proxy_ssn->connection_id();
79
80 Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(_proxy_ssn);
81 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
82 // Make sure the stream is removed from the stream list and priority tree
83 // In many cases, this has been called earlier, so this call is a no-op
84 h2_proxy_ssn->connection_state.delete_stream(this);
85
86 h2_proxy_ssn->connection_state.decrement_stream_count();
87
88 // Update session's stream counts, so it accurately goes into keep-alive state
89 h2_proxy_ssn->connection_state.release_stream();
90
91 // Do not access `_proxy_ssn` in below. It might be freed by `release_stream`.
92 }
93
94 // Clean up the write VIO in case of inactivity timeout
95 this->do_io_write(nullptr, 0, nullptr);
96
97 this->_milestones.mark(Http2StreamMilestone::CLOSE);
98
99 ink_hrtime total_time = this->_milestones.elapsed(Http2StreamMilestone::OPEN, Http2StreamMilestone::CLOSE);
100 HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, this->_thread, total_time);
101
102 // Slow Log
103 if (Http2::stream_slow_log_threshold != 0 && ink_hrtime_from_msec(Http2::stream_slow_log_threshold) < total_time) {
104 Error("[%" PRIu64 "] [%" PRIu32 "] [%" PRId64 "] Slow H2 Stream: "
105 "open: %" PRIu64 " "
106 "dec_hdrs: %.3f "
107 "txn: %.3f "
108 "enc_hdrs: %.3f "
109 "tx_hdrs: %.3f "
110 "tx_data: %.3f "
111 "close: %.3f",
112 cid, static_cast<uint32_t>(this->_id), this->_http_sm_id,
113 ink_hrtime_to_msec(this->_milestones[Http2StreamMilestone::OPEN]),
114 this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_DECODE_HEADERS),
115 this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TXN),
116 this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_ENCODE_HEADERS),
117 this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TX_HEADERS_FRAMES),
118 this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::START_TX_DATA_FRAMES),
119 this->_milestones.difference_sec(Http2StreamMilestone::OPEN, Http2StreamMilestone::CLOSE));
120 }
121
122 _req_header.destroy();
123 response_header.destroy();
124
125 // Drop references to all buffer data
126 this->_request_buffer.clear();
127
128 // Free the mutexes in the VIO
129 read_vio.mutex.clear();
130 write_vio.mutex.clear();
131
132 if (header_blocks) {
133 ats_free(header_blocks);
134 }
135 _clear_timers();
136 clear_io_events();
137 http_parser_clear(&http_parser);
138 }
139
140 int
main_event_handler(int event,void * edata)141 Http2Stream::main_event_handler(int event, void *edata)
142 {
143 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
144 REMEMBER(event, this->reentrancy_count);
145
146 if (!this->_switch_thread_if_not_on_right_thread(event, edata)) {
147 // Not on the right thread
148 return 0;
149 }
150 ink_release_assert(this->_thread == this_ethread());
151
152 Event *e = static_cast<Event *>(edata);
153 reentrancy_count++;
154 if (e == _read_vio_event) {
155 _read_vio_event = nullptr;
156 this->signal_read_event(e->callback_event);
157 reentrancy_count--;
158 return 0;
159 } else if (e == _write_vio_event) {
160 _write_vio_event = nullptr;
161 this->signal_write_event(e->callback_event);
162 reentrancy_count--;
163 return 0;
164 } else if (e == cross_thread_event) {
165 cross_thread_event = nullptr;
166 } else if (e == read_event) {
167 read_event = nullptr;
168 } else if (e == write_event) {
169 write_event = nullptr;
170 } else if (e == buffer_full_write_event) {
171 buffer_full_write_event = nullptr;
172 }
173
174 switch (event) {
175 case VC_EVENT_ACTIVE_TIMEOUT:
176 case VC_EVENT_INACTIVITY_TIMEOUT:
177 if (_sm && read_vio.ntodo() > 0) {
178 this->signal_read_event(event);
179 } else if (_sm && write_vio.ntodo() > 0) {
180 this->signal_write_event(event);
181 }
182 break;
183 case VC_EVENT_WRITE_READY:
184 case VC_EVENT_WRITE_COMPLETE:
185 _timeout.update_inactivity();
186 if (e->cookie == &write_vio) {
187 if (write_vio.mutex && write_vio.cont && this->_sm) {
188 this->signal_write_event(event);
189 }
190 } else {
191 update_write_request(true);
192 }
193 break;
194 case VC_EVENT_READ_COMPLETE:
195 case VC_EVENT_READ_READY:
196 _timeout.update_inactivity();
197 if (e->cookie == &read_vio) {
198 if (read_vio.mutex && read_vio.cont && this->_sm) {
199 signal_read_event(event);
200 }
201 } else {
202 this->update_read_request(true);
203 }
204 break;
205 case VC_EVENT_EOS:
206 if (e->cookie == &read_vio) {
207 SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
208 read_vio.cont->handleEvent(VC_EVENT_EOS, &read_vio);
209 } else if (e->cookie == &write_vio) {
210 SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
211 write_vio.cont->handleEvent(VC_EVENT_EOS, &write_vio);
212 }
213 break;
214 }
215 reentrancy_count--;
216 // Clean stream up if the terminate flag is set and we are at the bottom of the handler stack
217 terminate_if_possible();
218
219 return 0;
220 }
221
222 Http2ErrorCode
decode_header_blocks(HpackHandle & hpack_handle,uint32_t maximum_table_size)223 Http2Stream::decode_header_blocks(HpackHandle &hpack_handle, uint32_t maximum_table_size)
224 {
225 return http2_decode_header_blocks(&_req_header, (const uint8_t *)header_blocks, header_blocks_length, nullptr, hpack_handle,
226 trailing_header, maximum_table_size);
227 }
228
229 void
send_request(Http2ConnectionState & cstate)230 Http2Stream::send_request(Http2ConnectionState &cstate)
231 {
232 ink_release_assert(this->_sm != nullptr);
233 this->_http_sm_id = this->_sm->sm_id;
234
235 // Convert header to HTTP/1.1 format
236 http2_convert_header_from_2_to_1_1(&_req_header);
237
238 // Write header to a buffer. Borrowing logic from HttpSM::write_header_into_buffer.
239 // Seems like a function like this ought to be in HTTPHdr directly
240 int bufindex;
241 int dumpoffset = 0;
242 int done, tmp;
243 do {
244 bufindex = 0;
245 tmp = dumpoffset;
246 IOBufferBlock *block = this->_request_buffer.get_current_block();
247 if (!block) {
248 this->_request_buffer.add_block();
249 block = this->_request_buffer.get_current_block();
250 }
251 done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp);
252 dumpoffset += bufindex;
253 this->_request_buffer.fill(bufindex);
254 if (!done) {
255 this->_request_buffer.add_block();
256 }
257 } while (!done);
258
259 if (bufindex == 0) {
260 // No data to signal read event
261 return;
262 }
263
264 // Is the _sm ready to process the header?
265 if (this->read_vio.nbytes > 0) {
266 if (this->recv_end_stream) {
267 this->read_vio.nbytes = bufindex;
268 this->signal_read_event(VC_EVENT_READ_COMPLETE);
269 } else {
270 // End of header but not end of stream, must have some body frames coming
271 this->has_body = true;
272 this->signal_read_event(VC_EVENT_READ_READY);
273 }
274 }
275 }
276
277 bool
change_state(uint8_t type,uint8_t flags)278 Http2Stream::change_state(uint8_t type, uint8_t flags)
279 {
280 switch (_state) {
281 case Http2StreamState::HTTP2_STREAM_STATE_IDLE:
282 if (type == HTTP2_FRAME_TYPE_HEADERS) {
283 if (recv_end_stream) {
284 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
285 } else if (send_end_stream) {
286 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
287 } else {
288 _state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
289 }
290 } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
291 if (recv_end_stream) {
292 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
293 } else if (send_end_stream) {
294 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
295 } else {
296 _state = Http2StreamState::HTTP2_STREAM_STATE_OPEN;
297 }
298 } else if (type == HTTP2_FRAME_TYPE_PUSH_PROMISE) {
299 _state = Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL;
300 } else {
301 return false;
302 }
303 break;
304
305 case Http2StreamState::HTTP2_STREAM_STATE_OPEN:
306 if (type == HTTP2_FRAME_TYPE_RST_STREAM) {
307 _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
308 } else if (type == HTTP2_FRAME_TYPE_DATA) {
309 if (recv_end_stream) {
310 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
311 } else if (send_end_stream) {
312 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL;
313 } else {
314 // Do not change state
315 }
316 } else {
317 // A stream in the "open" state may be used by both peers to send frames of any type.
318 return true;
319 }
320 break;
321
322 case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_LOCAL:
323 if (type == HTTP2_FRAME_TYPE_HEADERS) {
324 if (flags & HTTP2_FLAGS_HEADERS_END_HEADERS) {
325 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
326 }
327 } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) {
328 if (flags & HTTP2_FLAGS_CONTINUATION_END_HEADERS) {
329 _state = Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE;
330 }
331 } else {
332 return false;
333 }
334 break;
335
336 case Http2StreamState::HTTP2_STREAM_STATE_RESERVED_REMOTE:
337 // Currently ATS supports only HTTP/2 server features
338 return false;
339
340 case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL:
341 if (type == HTTP2_FRAME_TYPE_RST_STREAM || recv_end_stream) {
342 _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
343 } else {
344 // Error, set state closed
345 _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
346 return false;
347 }
348 break;
349
350 case Http2StreamState::HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE:
351 if (type == HTTP2_FRAME_TYPE_RST_STREAM || send_end_stream) {
352 _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
353 } else if (type == HTTP2_FRAME_TYPE_HEADERS) { // w/o END_STREAM flag
354 // No state change here. Expect a following DATA frame with END_STREAM flag.
355 return true;
356 } else if (type == HTTP2_FRAME_TYPE_CONTINUATION) { // w/o END_STREAM flag
357 // No state change here. Expect a following DATA frame with END_STREAM flag.
358 return true;
359 } else {
360 // Error, set state closed
361 _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
362 return false;
363 }
364 break;
365
366 case Http2StreamState::HTTP2_STREAM_STATE_CLOSED:
367 // No state changing
368 return true;
369
370 default:
371 return false;
372 }
373
374 Http2StreamDebug("%s", Http2DebugNames::get_state_name(_state));
375
376 return true;
377 }
378
379 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)380 Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
381 {
382 if (buf) {
383 read_vio.buffer.writer_for(buf);
384 } else {
385 read_vio.buffer.clear();
386 }
387
388 read_vio.mutex = c ? c->mutex : this->mutex;
389 read_vio.cont = c;
390 read_vio.nbytes = nbytes;
391 read_vio.ndone = 0;
392 read_vio.vc_server = this;
393 read_vio.op = VIO::READ;
394
395 // TODO: re-enable read_vio
396
397 return &read_vio;
398 }
399
400 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuffer,bool owner)401 Http2Stream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
402 {
403 if (abuffer) {
404 write_vio.buffer.reader_for(abuffer);
405 } else {
406 write_vio.buffer.clear();
407 }
408 write_vio.mutex = c ? c->mutex : this->mutex;
409 write_vio.cont = c;
410 write_vio.nbytes = nbytes;
411 write_vio.ndone = 0;
412 write_vio.vc_server = this;
413 write_vio.op = VIO::WRITE;
414
415 if (c != nullptr && nbytes > 0 && this->is_client_state_writeable()) {
416 update_write_request(false);
417 } else if (!this->is_client_state_writeable()) {
418 // Cannot start a write on a closed stream
419 return nullptr;
420 }
421 return &write_vio;
422 }
423
424 // Initiated from SM
425 void
do_io_close(int)426 Http2Stream::do_io_close(int /* flags */)
427 {
428 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
429
430 if (!closed) {
431 REMEMBER(NO_EVENT, this->reentrancy_count);
432 Http2StreamDebug("do_io_close");
433
434 // When we get here, the SM has initiated the shutdown. Either it received a WRITE_COMPLETE, or it is shutting down. Any
435 // remaining IO operations back to client should be abandoned. The SM-side buffers backing these operations will be deleted
436 // by the time this is called from transaction_done.
437 closed = true;
438
439 if (_proxy_ssn && this->is_client_state_writeable()) {
440 // Make sure any trailing end of stream frames are sent
441 // We will be removed at send_data_frames or closing connection phase
442 Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
443 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
444 h2_proxy_ssn->connection_state.send_data_frames(this);
445 }
446
447 _clear_timers();
448 clear_io_events();
449
450 // Wait until transaction_done is called from HttpSM to signal that the TXN_CLOSE hook has been executed
451 }
452 }
453
454 /*
455 * HttpSM has called TXN_close hooks.
456 */
457 void
transaction_done()458 Http2Stream::transaction_done()
459 {
460 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
461 super::transaction_done();
462 if (cross_thread_event) {
463 cross_thread_event->cancel();
464 cross_thread_event = nullptr;
465 }
466
467 if (!closed) {
468 do_io_close(); // Make sure we've been closed. If we didn't close the _proxy_ssn session better still be open
469 }
470 ink_release_assert(closed || !static_cast<Http2ClientSession *>(_proxy_ssn)->connection_state.is_state_closed());
471 _sm = nullptr;
472
473 if (closed) {
474 // Safe to initiate SSN_CLOSE if this is the last stream
475 ink_assert(cross_thread_event == nullptr);
476 // Schedule the destroy to occur after we unwind here. IF we call directly, may delete with reference on the stack.
477 terminate_stream = true;
478 terminate_if_possible();
479 }
480 }
481
482 void
terminate_if_possible()483 Http2Stream::terminate_if_possible()
484 {
485 if (terminate_stream && reentrancy_count == 0) {
486 REMEMBER(NO_EVENT, this->reentrancy_count);
487
488 Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
489 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
490 THREAD_FREE(this, http2StreamAllocator, this_ethread());
491 }
492 }
493
494 // Initiated from the Http2 side
495 void
initiating_close()496 Http2Stream::initiating_close()
497 {
498 if (!closed) {
499 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
500 REMEMBER(NO_EVENT, this->reentrancy_count);
501 Http2StreamDebug("initiating_close");
502
503 // Set the state of the connection to closed
504 // TODO - these states should be combined
505 closed = true;
506 _state = Http2StreamState::HTTP2_STREAM_STATE_CLOSED;
507
508 // leaving the reference to the SM, so we can detach from the SM when we actually destroy
509 // _sm = NULL;
510 // Leaving reference to client session as well, so we can signal once the
511 // TXN_CLOSE has been sent
512 // _proxy_ssn = NULL;
513
514 _clear_timers();
515 clear_io_events();
516
517 // This should result in do_io_close or release being called. That will schedule the final
518 // kill yourself signal
519 // We are sending signals rather than calling the handlers directly to avoid the case where
520 // the HttpTunnel handler causes the HttpSM to be deleted on the stack.
521 bool sent_write_complete = false;
522 if (_sm) {
523 // Push out any last IO events
524 if (write_vio.cont) {
525 SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
526 // Are we done?
527 if (write_vio.nbytes > 0 && write_vio.nbytes == write_vio.ndone) {
528 Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_WRITE_COMPLETE);
529 write_event = send_tracked_event(write_event, VC_EVENT_WRITE_COMPLETE, &write_vio);
530 } else {
531 write_event = send_tracked_event(write_event, VC_EVENT_EOS, &write_vio);
532 Http2StreamDebug("handle write from destroy (event=%d)", VC_EVENT_EOS);
533 }
534 sent_write_complete = true;
535 }
536 }
537 // Send EOS to let SM know that we aren't sticking around
538 if (_sm && read_vio.cont) {
539 // Only bother with the EOS if we haven't sent the write complete
540 if (!sent_write_complete) {
541 SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
542 Http2StreamDebug("send EOS to read cont");
543 read_event = send_tracked_event(read_event, VC_EVENT_EOS, &read_vio);
544 }
545 } else if (!sent_write_complete) {
546 // Transaction is already gone or not started. Kill yourself
547 do_io_close();
548 terminate_stream = true;
549 terminate_if_possible();
550 }
551 }
552 }
553
554 /* Replace existing event only if the new event is different than the inprogress event */
555 Event *
send_tracked_event(Event * event,int send_event,VIO * vio)556 Http2Stream::send_tracked_event(Event *event, int send_event, VIO *vio)
557 {
558 if (event != nullptr) {
559 if (event->callback_event != send_event) {
560 event->cancel();
561 event = nullptr;
562 }
563 }
564
565 if (event == nullptr) {
566 REMEMBER(send_event, this->reentrancy_count);
567 event = this_ethread()->schedule_imm(this, send_event, vio);
568 }
569
570 return event;
571 }
572
573 void
update_read_request(bool call_update)574 Http2Stream::update_read_request(bool call_update)
575 {
576 if (closed || _proxy_ssn == nullptr || _sm == nullptr || read_vio.mutex == nullptr) {
577 return;
578 }
579
580 if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_READ_READY, nullptr)) {
581 // Not on the right thread
582 return;
583 }
584 ink_release_assert(this->_thread == this_ethread());
585
586 SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
587 if (read_vio.nbytes == 0) {
588 return;
589 }
590
591 // Try to be smart and only signal if there was additional data
592 int send_event = VC_EVENT_READ_READY;
593 if (read_vio.ntodo() == 0 || (this->recv_end_stream && this->read_vio.nbytes != INT64_MAX)) {
594 send_event = VC_EVENT_READ_COMPLETE;
595 }
596
597 int64_t read_avail = this->read_vio.buffer.writer()->max_read_avail();
598 if (read_avail > 0 || send_event == VC_EVENT_READ_COMPLETE) {
599 if (call_update) { // Safe to call vio handler directly
600 _timeout.update_inactivity();
601 if (read_vio.cont && this->_sm) {
602 read_vio.cont->handleEvent(send_event, &read_vio);
603 }
604 } else { // Called from do_io_read. Still setting things up. Send event
605 // to handle this after the dust settles
606 read_event = send_tracked_event(read_event, send_event, &read_vio);
607 }
608 }
609 }
610
611 void
restart_sending()612 Http2Stream::restart_sending()
613 {
614 if (!this->response_header_done) {
615 return;
616 }
617
618 IOBufferReader *reader = this->response_get_data_reader();
619 if (reader && !reader->is_read_avail_more_than(0)) {
620 return;
621 }
622
623 if (this->write_vio.mutex && this->write_vio.ntodo() == 0) {
624 return;
625 }
626
627 this->send_response_body(true);
628 }
629
630 void
update_write_request(bool call_update)631 Http2Stream::update_write_request(bool call_update)
632 {
633 if (!this->is_client_state_writeable() || closed || _proxy_ssn == nullptr || write_vio.mutex == nullptr ||
634 write_vio.get_reader() == nullptr) {
635 return;
636 }
637
638 if (!this->_switch_thread_if_not_on_right_thread(VC_EVENT_WRITE_READY, nullptr)) {
639 // Not on the right thread
640 return;
641 }
642 ink_release_assert(this->_thread == this_ethread());
643
644 Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
645
646 SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
647
648 IOBufferReader *vio_reader = write_vio.get_reader();
649 if (write_vio.ntodo() == 0 || !vio_reader->is_read_avail_more_than(0)) {
650 return;
651 }
652
653 // Process the new data
654 if (!this->response_header_done) {
655 // Still parsing the response_header
656 int bytes_used = 0;
657 int state = this->response_header.parse_resp(&http_parser, vio_reader, &bytes_used, false);
658 // HTTPHdr::parse_resp() consumed the vio_reader in above (consumed size is `bytes_used`)
659 write_vio.ndone += bytes_used;
660
661 switch (state) {
662 case PARSE_RESULT_DONE: {
663 this->response_header_done = true;
664
665 // Schedule session shutdown if response header has "Connection: close"
666 MIMEField *field = this->response_header.field_find(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION);
667 if (field) {
668 int len;
669 const char *value = field->value_get(&len);
670 if (memcmp(HTTP_VALUE_CLOSE, value, HTTP_LEN_CLOSE) == 0) {
671 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
672 if (h2_proxy_ssn->connection_state.get_shutdown_state() == HTTP2_SHUTDOWN_NONE) {
673 h2_proxy_ssn->connection_state.set_shutdown_state(HTTP2_SHUTDOWN_NOT_INITIATED, Http2ErrorCode::HTTP2_ERROR_NO_ERROR);
674 }
675 }
676 }
677
678 {
679 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
680 // Send the response header back
681 h2_proxy_ssn->connection_state.send_headers_frame(this);
682 }
683
684 // Roll back states of response header to read final response
685 if (this->response_header.expect_final_response()) {
686 this->response_header_done = false;
687 response_header.destroy();
688 response_header.create(HTTP_TYPE_RESPONSE);
689 http2_init_pseudo_headers(response_header);
690 http_parser_clear(&http_parser);
691 http_parser_init(&http_parser);
692 }
693
694 this->signal_write_event(call_update);
695
696 if (vio_reader->is_read_avail_more_than(0)) {
697 this->_milestones.mark(Http2StreamMilestone::START_TX_DATA_FRAMES);
698 this->send_response_body(call_update);
699 }
700 break;
701 }
702 case PARSE_RESULT_CONT:
703 // Let it ride for next time
704 break;
705 default:
706 break;
707 }
708 } else {
709 this->_milestones.mark(Http2StreamMilestone::START_TX_DATA_FRAMES);
710 this->send_response_body(call_update);
711 }
712
713 return;
714 }
715
716 void
signal_read_event(int event)717 Http2Stream::signal_read_event(int event)
718 {
719 if (this->read_vio.cont == nullptr || this->read_vio.cont->mutex == nullptr || this->read_vio.op == VIO::NONE) {
720 return;
721 }
722
723 MUTEX_TRY_LOCK(lock, read_vio.cont->mutex, this_ethread());
724 if (lock.is_locked()) {
725 _timeout.update_inactivity();
726 this->read_vio.cont->handleEvent(event, &this->read_vio);
727 } else {
728 if (this->_read_vio_event) {
729 this->_read_vio_event->cancel();
730 }
731 this->_read_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &read_vio);
732 }
733 }
734
735 void
signal_write_event(int event)736 Http2Stream::signal_write_event(int event)
737 {
738 // Don't signal a write event if in fact nothing was written
739 if (this->write_vio.cont == nullptr || this->write_vio.cont->mutex == nullptr || this->write_vio.op == VIO::NONE ||
740 this->write_vio.nbytes == 0) {
741 return;
742 }
743
744 MUTEX_TRY_LOCK(lock, write_vio.cont->mutex, this_ethread());
745 if (lock.is_locked()) {
746 _timeout.update_inactivity();
747 this->write_vio.cont->handleEvent(event, &this->write_vio);
748 } else {
749 if (this->_write_vio_event) {
750 this->_write_vio_event->cancel();
751 }
752 this->_write_vio_event = this_ethread()->schedule_in(this, retry_delay, event, &write_vio);
753 }
754 }
755
756 void
signal_write_event(bool call_update)757 Http2Stream::signal_write_event(bool call_update)
758 {
759 if (this->write_vio.cont == nullptr || this->write_vio.op == VIO::NONE) {
760 return;
761 }
762
763 if (this->write_vio.get_writer()->write_avail() == 0) {
764 return;
765 }
766
767 int send_event = this->write_vio.ntodo() == 0 ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY;
768
769 if (call_update) {
770 // Coming from reenable. Safe to call the handler directly
771 if (write_vio.cont && this->_sm) {
772 write_vio.cont->handleEvent(send_event, &write_vio);
773 }
774 } else {
775 // Called from do_io_write. Might still be setting up state. Send an event to let the dust settle
776 write_event = send_tracked_event(write_event, send_event, &write_vio);
777 }
778 }
779
780 bool
push_promise(URL & url,const MIMEField * accept_encoding)781 Http2Stream::push_promise(URL &url, const MIMEField *accept_encoding)
782 {
783 Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
784 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
785 return h2_proxy_ssn->connection_state.send_push_promise_frame(this, url, accept_encoding);
786 }
787
788 void
send_response_body(bool call_update)789 Http2Stream::send_response_body(bool call_update)
790 {
791 Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
792 _timeout.update_inactivity();
793
794 if (Http2::stream_priority_enabled) {
795 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
796 h2_proxy_ssn->connection_state.schedule_stream(this);
797 // signal_write_event() will be called from `Http2ConnectionState::send_data_frames_depends_on_priority()`
798 // when write_vio is consumed
799 } else {
800 SCOPED_MUTEX_LOCK(lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
801 h2_proxy_ssn->connection_state.send_data_frames(this);
802 this->signal_write_event(call_update);
803 // XXX The call to signal_write_event can destroy/free the Http2Stream.
804 // Don't modify the Http2Stream after calling this method.
805 }
806 }
807
808 void
reenable(VIO * vio)809 Http2Stream::reenable(VIO *vio)
810 {
811 if (this->_proxy_ssn) {
812 if (vio->op == VIO::WRITE) {
813 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
814 update_write_request(true);
815 } else if (vio->op == VIO::READ) {
816 Http2ClientSession *h2_proxy_ssn = static_cast<Http2ClientSession *>(this->_proxy_ssn);
817 {
818 SCOPED_MUTEX_LOCK(ssn_lock, h2_proxy_ssn->connection_state.mutex, this_ethread());
819 h2_proxy_ssn->connection_state.restart_receiving(this);
820 }
821
822 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
823 update_read_request(true);
824 }
825 }
826 }
827
828 IOBufferReader *
response_get_data_reader() const829 Http2Stream::response_get_data_reader() const
830 {
831 return write_vio.get_reader();
832 }
833
834 void
set_active_timeout(ink_hrtime timeout_in)835 Http2Stream::set_active_timeout(ink_hrtime timeout_in)
836 {
837 _timeout.set_active_timeout(timeout_in);
838 }
839
840 void
set_inactivity_timeout(ink_hrtime timeout_in)841 Http2Stream::set_inactivity_timeout(ink_hrtime timeout_in)
842 {
843 _timeout.set_inactive_timeout(timeout_in);
844 }
845
846 void
cancel_active_timeout()847 Http2Stream::cancel_active_timeout()
848 {
849 _timeout.cancel_active_timeout();
850 }
851
852 void
cancel_inactivity_timeout()853 Http2Stream::cancel_inactivity_timeout()
854 {
855 _timeout.cancel_inactive_timeout();
856 }
857
858 bool
is_active_timeout_expired(ink_hrtime now)859 Http2Stream::is_active_timeout_expired(ink_hrtime now)
860 {
861 return _timeout.is_active_timeout_expired(now);
862 }
863
864 bool
is_inactive_timeout_expired(ink_hrtime now)865 Http2Stream::is_inactive_timeout_expired(ink_hrtime now)
866 {
867 return _timeout.is_inactive_timeout_expired(now);
868 }
869
870 void
clear_io_events()871 Http2Stream::clear_io_events()
872 {
873 if (read_event) {
874 read_event->cancel();
875 read_event = nullptr;
876 }
877
878 if (write_event) {
879 write_event->cancel();
880 write_event = nullptr;
881 }
882
883 if (buffer_full_write_event) {
884 buffer_full_write_event->cancel();
885 buffer_full_write_event = nullptr;
886 }
887
888 if (this->_read_vio_event) {
889 this->_read_vio_event->cancel();
890 this->_read_vio_event = nullptr;
891 }
892
893 if (this->_write_vio_event) {
894 this->_write_vio_event->cancel();
895 this->_write_vio_event = nullptr;
896 }
897 }
898
899 // release and do_io_close are the same for the HTTP/2 protocol
900 void
release(IOBufferReader * r)901 Http2Stream::release(IOBufferReader *r)
902 {
903 this->do_io_close();
904 }
905
906 void
increment_client_transactions_stat()907 Http2Stream::increment_client_transactions_stat()
908 {
909 HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
910 HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_CLIENT_STREAM_COUNT, _thread);
911 }
912
913 void
decrement_client_transactions_stat()914 Http2Stream::decrement_client_transactions_stat()
915 {
916 HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
917 }
918
919 ssize_t
client_rwnd() const920 Http2Stream::client_rwnd() const
921 {
922 return this->_client_rwnd;
923 }
924
925 Http2ErrorCode
increment_client_rwnd(size_t amount)926 Http2Stream::increment_client_rwnd(size_t amount)
927 {
928 this->_client_rwnd += amount;
929
930 this->_recent_rwnd_increment[this->_recent_rwnd_increment_index] = amount;
931 ++this->_recent_rwnd_increment_index;
932 this->_recent_rwnd_increment_index %= this->_recent_rwnd_increment.size();
933 double sum = std::accumulate(this->_recent_rwnd_increment.begin(), this->_recent_rwnd_increment.end(), 0.0);
934 double avg = sum / this->_recent_rwnd_increment.size();
935 if (avg < Http2::min_avg_window_update) {
936 return Http2ErrorCode::HTTP2_ERROR_ENHANCE_YOUR_CALM;
937 }
938 return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
939 }
940
941 Http2ErrorCode
decrement_client_rwnd(size_t amount)942 Http2Stream::decrement_client_rwnd(size_t amount)
943 {
944 this->_client_rwnd -= amount;
945 if (this->_client_rwnd < 0) {
946 return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
947 } else {
948 return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
949 }
950 }
951
952 ssize_t
server_rwnd() const953 Http2Stream::server_rwnd() const
954 {
955 return this->_server_rwnd;
956 }
957
958 Http2ErrorCode
increment_server_rwnd(size_t amount)959 Http2Stream::increment_server_rwnd(size_t amount)
960 {
961 this->_server_rwnd += amount;
962 return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
963 }
964
965 Http2ErrorCode
decrement_server_rwnd(size_t amount)966 Http2Stream::decrement_server_rwnd(size_t amount)
967 {
968 this->_server_rwnd -= amount;
969 if (this->_server_rwnd < 0) {
970 return Http2ErrorCode::HTTP2_ERROR_PROTOCOL_ERROR;
971 } else {
972 return Http2ErrorCode::HTTP2_ERROR_NO_ERROR;
973 }
974 }
975
976 bool
_switch_thread_if_not_on_right_thread(int event,void * edata)977 Http2Stream::_switch_thread_if_not_on_right_thread(int event, void *edata)
978 {
979 if (this->_thread != this_ethread()) {
980 SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
981 if (cross_thread_event == nullptr) {
982 // Send to the right thread
983 cross_thread_event = this->_thread->schedule_imm(this, event, edata);
984 }
985 return false;
986 }
987 return true;
988 }
989
990 int
get_transaction_priority_weight() const991 Http2Stream::get_transaction_priority_weight() const
992 {
993 return priority_node ? priority_node->weight : 0;
994 }
995
996 int
get_transaction_priority_dependence() const997 Http2Stream::get_transaction_priority_dependence() const
998 {
999 if (!priority_node) {
1000 return -1;
1001 } else {
1002 return priority_node->parent ? priority_node->parent->id : 0;
1003 }
1004 }
1005
1006 int64_t
read_vio_read_avail()1007 Http2Stream::read_vio_read_avail()
1008 {
1009 MIOBuffer *writer = this->read_vio.get_writer();
1010 if (writer) {
1011 return writer->max_read_avail();
1012 }
1013
1014 return 0;
1015 }
1016
1017 bool
has_request_body(int64_t content_length,bool is_chunked_set) const1018 Http2Stream::has_request_body(int64_t content_length, bool is_chunked_set) const
1019 {
1020 return has_body;
1021 }
1022