1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 /****************************************************************************
25
26 HttpTunnel.cc
27
28 Description:
29
30
31 ****************************************************************************/
32
33 #include "tscore/ink_config.h"
34 #include "HttpConfig.h"
35 #include "HttpTunnel.h"
36 #include "HttpSM.h"
37 #include "HttpDebugNames.h"
38 #include "tscore/ParseRules.h"
39 #include "tscore/ink_memory.h"
40
41 static const int min_block_transfer_bytes = 256;
42 static const char *const CHUNK_HEADER_FMT = "%" PRIx64 "\r\n";
43 // This should be as small as possible because it will only hold the
44 // header and trailer per chunk - the chunk body will be a reference to
45 // a block in the input stream.
46 static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
47
ChunkedHandler()48 ChunkedHandler::ChunkedHandler() : max_chunk_size(DEFAULT_MAX_CHUNK_SIZE) {}
49
50 void
init(IOBufferReader * buffer_in,HttpTunnelProducer * p)51 ChunkedHandler::init(IOBufferReader *buffer_in, HttpTunnelProducer *p)
52 {
53 if (p->do_chunking) {
54 init_by_action(buffer_in, ACTION_DOCHUNK);
55 } else if (p->do_dechunking) {
56 init_by_action(buffer_in, ACTION_DECHUNK);
57 } else {
58 init_by_action(buffer_in, ACTION_PASSTHRU);
59 }
60 return;
61 }
62
63 void
init_by_action(IOBufferReader * buffer_in,Action action)64 ChunkedHandler::init_by_action(IOBufferReader *buffer_in, Action action)
65 {
66 running_sum = 0;
67 num_digits = 0;
68 cur_chunk_size = 0;
69 bytes_left = 0;
70 truncation = false;
71 this->action = action;
72
73 switch (action) {
74 case ACTION_DOCHUNK:
75 dechunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
76 dechunked_reader->mbuf->water_mark = min_block_transfer_bytes;
77 chunked_buffer = new_MIOBuffer(CHUNK_IOBUFFER_SIZE_INDEX);
78 chunked_size = 0;
79 break;
80 case ACTION_DECHUNK:
81 chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
82 dechunked_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_256);
83 dechunked_size = 0;
84 break;
85 case ACTION_PASSTHRU:
86 chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
87 break;
88 default:
89 ink_release_assert(!"Unknown action");
90 }
91
92 return;
93 }
94
95 void
clear()96 ChunkedHandler::clear()
97 {
98 switch (action) {
99 case ACTION_DOCHUNK:
100 free_MIOBuffer(chunked_buffer);
101 break;
102 case ACTION_DECHUNK:
103 free_MIOBuffer(dechunked_buffer);
104 break;
105 case ACTION_PASSTHRU:
106 default:
107 break;
108 }
109
110 return;
111 }
112
113 void
set_max_chunk_size(int64_t size)114 ChunkedHandler::set_max_chunk_size(int64_t size)
115 {
116 max_chunk_size = size ? size : DEFAULT_MAX_CHUNK_SIZE;
117 max_chunk_header_len = snprintf(max_chunk_header, sizeof(max_chunk_header), CHUNK_HEADER_FMT, max_chunk_size);
118 }
119
120 void
read_size()121 ChunkedHandler::read_size()
122 {
123 int64_t bytes_used;
124 bool done = false;
125
126 while (chunked_reader->read_avail() > 0 && !done) {
127 const char *tmp = chunked_reader->start();
128 int64_t data_size = chunked_reader->block_read_avail();
129
130 ink_assert(data_size > 0);
131 bytes_used = 0;
132
133 while (data_size > 0) {
134 bytes_used++;
135 if (state == CHUNK_READ_SIZE) {
136 // The http spec says the chunked size is always in hex
137 if (ParseRules::is_hex(*tmp)) {
138 // Make sure we will not overflow running_sum with our shift.
139 if (!can_safely_shift_left(running_sum, 4)) {
140 // We have no more space in our variable for the shift.
141 state = CHUNK_READ_ERROR;
142 done = true;
143 break;
144 }
145 num_digits++;
146 // Shift over one hex value.
147 running_sum <<= 4;
148
149 if (ParseRules::is_digit(*tmp)) {
150 running_sum += *tmp - '0';
151 } else {
152 running_sum += ParseRules::ink_tolower(*tmp) - 'a' + 10;
153 }
154 } else {
155 // We are done parsing size
156 if (num_digits == 0 || running_sum < 0) {
157 // Bogus chunk size
158 state = CHUNK_READ_ERROR;
159 done = true;
160 break;
161 } else {
162 state = CHUNK_READ_SIZE_CRLF; // now look for CRLF
163 }
164 }
165 } else if (state == CHUNK_READ_SIZE_CRLF) { // Scan for a linefeed
166 if (ParseRules::is_lf(*tmp)) {
167 Debug("http_chunk", "read chunk size of %d bytes", running_sum);
168 bytes_left = (cur_chunk_size = running_sum);
169 state = (running_sum == 0) ? CHUNK_READ_TRAILER_BLANK : CHUNK_READ_CHUNK;
170 done = true;
171 break;
172 }
173 } else if (state == CHUNK_READ_SIZE_START) {
174 if (ParseRules::is_lf(*tmp)) {
175 running_sum = 0;
176 num_digits = 0;
177 state = CHUNK_READ_SIZE;
178 }
179 }
180 tmp++;
181 data_size--;
182 }
183 chunked_reader->consume(bytes_used);
184 }
185 }
186
187 // int ChunkedHandler::transfer_bytes()
188 //
189 // Transfer bytes from chunked_reader to dechunked buffer
190 // Use block reference method when there is a sufficient
191 // size to move. Otherwise, uses memcpy method
192 //
193 int64_t
transfer_bytes()194 ChunkedHandler::transfer_bytes()
195 {
196 int64_t block_read_avail, moved, to_move, total_moved = 0;
197
198 // Handle the case where we are doing chunked passthrough.
199 if (!dechunked_buffer) {
200 moved = std::min(bytes_left, chunked_reader->read_avail());
201 chunked_reader->consume(moved);
202 bytes_left = bytes_left - moved;
203 return moved;
204 }
205
206 while (bytes_left > 0) {
207 block_read_avail = chunked_reader->block_read_avail();
208
209 to_move = std::min(bytes_left, block_read_avail);
210 if (to_move <= 0) {
211 break;
212 }
213
214 if (to_move >= min_block_transfer_bytes) {
215 moved = dechunked_buffer->write(chunked_reader, bytes_left);
216 } else {
217 // Small amount of data available. We want to copy the
218 // data rather than block reference to prevent the buildup
219 // of too many small blocks which leads to stack overflow
220 // on deallocation
221 moved = dechunked_buffer->write(chunked_reader->start(), to_move);
222 }
223
224 if (moved > 0) {
225 chunked_reader->consume(moved);
226 bytes_left = bytes_left - moved;
227 dechunked_size += moved;
228 total_moved += moved;
229 } else {
230 break;
231 }
232 }
233 return total_moved;
234 }
235
236 void
read_chunk()237 ChunkedHandler::read_chunk()
238 {
239 int64_t b = transfer_bytes();
240
241 ink_assert(bytes_left >= 0);
242 if (bytes_left == 0) {
243 Debug("http_chunk", "completed read of chunk of %" PRId64 " bytes", cur_chunk_size);
244
245 state = CHUNK_READ_SIZE_START;
246 } else if (bytes_left > 0) {
247 Debug("http_chunk", "read %" PRId64 " bytes of an %" PRId64 " chunk", b, cur_chunk_size);
248 }
249 }
250
251 void
read_trailer()252 ChunkedHandler::read_trailer()
253 {
254 int64_t bytes_used;
255 bool done = false;
256
257 while (chunked_reader->is_read_avail_more_than(0) && !done) {
258 const char *tmp = chunked_reader->start();
259 int64_t data_size = chunked_reader->block_read_avail();
260
261 ink_assert(data_size > 0);
262 for (bytes_used = 0; data_size > 0; data_size--) {
263 bytes_used++;
264
265 if (ParseRules::is_cr(*tmp)) {
266 // For a CR to signal we are almost done, the preceding
267 // part of the line must be blank and next character
268 // must a LF
269 state = (state == CHUNK_READ_TRAILER_BLANK) ? CHUNK_READ_TRAILER_CR : CHUNK_READ_TRAILER_LINE;
270 } else if (ParseRules::is_lf(*tmp)) {
271 // For a LF to signal we are done reading the
272 // trailer, the line must have either been blank
273 // or must have have only had a CR on it
274 if (state == CHUNK_READ_TRAILER_CR || state == CHUNK_READ_TRAILER_BLANK) {
275 state = CHUNK_READ_DONE;
276 Debug("http_chunk", "completed read of trailers");
277 done = true;
278 break;
279 } else {
280 // A LF that does not terminate the trailer
281 // indicates a new line
282 state = CHUNK_READ_TRAILER_BLANK;
283 }
284 } else {
285 // A character that is not a CR or LF indicates
286 // the we are parsing a line of the trailer
287 state = CHUNK_READ_TRAILER_LINE;
288 }
289 tmp++;
290 }
291 chunked_reader->consume(bytes_used);
292 }
293 }
294
295 bool
process_chunked_content()296 ChunkedHandler::process_chunked_content()
297 {
298 while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
299 switch (state) {
300 case CHUNK_READ_SIZE:
301 case CHUNK_READ_SIZE_CRLF:
302 case CHUNK_READ_SIZE_START:
303 read_size();
304 break;
305 case CHUNK_READ_CHUNK:
306 read_chunk();
307 break;
308 case CHUNK_READ_TRAILER_BLANK:
309 case CHUNK_READ_TRAILER_CR:
310 case CHUNK_READ_TRAILER_LINE:
311 read_trailer();
312 break;
313 case CHUNK_FLOW_CONTROL:
314 return false;
315 default:
316 ink_release_assert(0);
317 break;
318 }
319 }
320 return (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
321 }
322
323 bool
generate_chunked_content()324 ChunkedHandler::generate_chunked_content()
325 {
326 char tmp[16];
327 bool server_done = false;
328 int64_t r_avail;
329
330 ink_assert(max_chunk_header_len);
331
332 switch (last_server_event) {
333 case VC_EVENT_EOS:
334 case VC_EVENT_READ_COMPLETE:
335 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
336 server_done = true;
337 break;
338 }
339
340 while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) {
341 int64_t write_val = std::min(max_chunk_size, r_avail);
342
343 state = CHUNK_WRITE_CHUNK;
344 Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val);
345
346 // Output the chunk size.
347 if (write_val != max_chunk_size) {
348 int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
349 chunked_buffer->write(tmp, len);
350 chunked_size += len;
351 } else {
352 chunked_buffer->write(max_chunk_header, max_chunk_header_len);
353 chunked_size += max_chunk_header_len;
354 }
355
356 // Output the chunk itself.
357 //
358 // BZ# 54395 Note - we really should only do a
359 // block transfer if there is sizable amount of
360 // data (like we do for the case where we are
361 // removing chunked encoding in ChunkedHandler::transfer_bytes()
362 // However, I want to do this fix with as small a risk
363 // as possible so I'm leaving this issue alone for
364 // now
365 //
366 chunked_buffer->write(dechunked_reader, write_val);
367 chunked_size += write_val;
368 dechunked_reader->consume(write_val);
369
370 // Output the trailing CRLF.
371 chunked_buffer->write("\r\n", 2);
372 chunked_size += 2;
373 }
374
375 if (server_done) {
376 state = CHUNK_WRITE_DONE;
377
378 // Add the chunked transfer coding trailer.
379 chunked_buffer->write("0\r\n\r\n", 5);
380 chunked_size += 5;
381 return true;
382 }
383 return false;
384 }
385
HttpTunnelProducer()386 HttpTunnelProducer::HttpTunnelProducer() : consumer_list() {}
387
388 uint64_t
backlog(uint64_t limit)389 HttpTunnelProducer::backlog(uint64_t limit)
390 {
391 uint64_t zret = 0;
392 // Calculate the total backlog, the # of bytes inside ATS for this producer.
393 // We go all the way through each chain to the ending sink and take the maximum
394 // over those paths. Do need to be careful about loops which can occur.
395 for (HttpTunnelConsumer *c = consumer_list.head; c; c = c->link.next) {
396 if (c->alive && c->write_vio) {
397 uint64_t n = 0;
398 if (HT_TRANSFORM == c->vc_type) {
399 n += static_cast<TransformVCChain *>(c->vc)->backlog(limit);
400 } else {
401 IOBufferReader *r = c->write_vio->get_reader();
402 if (r) {
403 n += static_cast<uint64_t>(r->read_avail());
404 }
405 }
406 if (n >= limit) {
407 return n;
408 }
409
410 if (!c->is_sink()) {
411 HttpTunnelProducer *dsp = c->self_producer;
412 if (dsp) {
413 n += dsp->backlog();
414 }
415 }
416 if (n >= limit) {
417 return n;
418 }
419 if (n > zret) {
420 zret = n;
421 }
422 }
423 }
424
425 if (chunked_handler.chunked_reader) {
426 zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
427 }
428
429 return zret;
430 }
431
432 /* We set the producers in a flow chain specifically rather than
433 using a tunnel level variable in order to handle bi-directional
434 tunnels correctly. In such a case the flow control on producers is
435 not related so a single value for the tunnel won't work.
436 */
437 void
set_throttle_src(HttpTunnelProducer * srcp)438 HttpTunnelProducer::set_throttle_src(HttpTunnelProducer *srcp)
439 {
440 HttpTunnelProducer *p = this;
441 p->flow_control_source = srcp;
442 for (HttpTunnelConsumer *c = consumer_list.head; c; c = c->link.next) {
443 if (!c->is_sink()) {
444 p = c->self_producer;
445 if (p) {
446 p->set_throttle_src(srcp);
447 }
448 }
449 }
450 }
451
HttpTunnelConsumer()452 HttpTunnelConsumer::HttpTunnelConsumer() : link() {}
453
HttpTunnel()454 HttpTunnel::HttpTunnel() : Continuation(nullptr) {}
455
456 void
init(HttpSM * sm_arg,Ptr<ProxyMutex> & amutex)457 HttpTunnel::init(HttpSM *sm_arg, Ptr<ProxyMutex> &amutex)
458 {
459 HttpConfigParams *params = sm_arg->t_state.http_config_param;
460 sm = sm_arg;
461 active = false;
462 mutex = amutex;
463 ink_release_assert(reentrancy_count == 0);
464 SET_HANDLER(&HttpTunnel::main_handler);
465 flow_state.enabled_p = params->oride.flow_control_enabled;
466 if (params->oride.flow_low_water_mark > 0) {
467 flow_state.low_water = params->oride.flow_low_water_mark;
468 }
469 if (params->oride.flow_high_water_mark > 0) {
470 flow_state.high_water = params->oride.flow_high_water_mark;
471 }
472 // This should always be true, we handled default cases back in HttpConfig::reconfigure()
473 ink_assert(flow_state.low_water <= flow_state.high_water);
474 }
475
476 void
reset()477 HttpTunnel::reset()
478 {
479 ink_assert(active == false);
480 #ifdef DEBUG
481 for (auto &producer : producers) {
482 ink_assert(producer.alive == false);
483 }
484 for (auto &consumer : consumers) {
485 ink_assert(consumer.alive == false);
486 }
487 #endif
488
489 call_sm = false;
490 num_producers = 0;
491 num_consumers = 0;
492 ink_zero(consumers);
493 ink_zero(producers);
494 }
495
496 void
kill_tunnel()497 HttpTunnel::kill_tunnel()
498 {
499 for (auto &producer : producers) {
500 if (producer.vc != nullptr) {
501 chain_abort_all(&producer);
502 }
503 ink_assert(producer.alive == false);
504 }
505 active = false;
506 this->deallocate_buffers();
507 this->reset();
508 }
509
510 HttpTunnelProducer *
alloc_producer()511 HttpTunnel::alloc_producer()
512 {
513 for (int i = 0; i < MAX_PRODUCERS; ++i) {
514 if (producers[i].vc == nullptr) {
515 num_producers++;
516 ink_assert(num_producers <= MAX_PRODUCERS);
517 return producers + i;
518 }
519 }
520 ink_release_assert(0);
521 return nullptr;
522 }
523
524 HttpTunnelConsumer *
alloc_consumer()525 HttpTunnel::alloc_consumer()
526 {
527 for (int i = 0; i < MAX_CONSUMERS; i++) {
528 if (consumers[i].vc == nullptr) {
529 num_consumers++;
530 ink_assert(num_consumers <= MAX_CONSUMERS);
531 return consumers + i;
532 }
533 }
534 ink_release_assert(0);
535 return nullptr;
536 }
537
538 int
deallocate_buffers()539 HttpTunnel::deallocate_buffers()
540 {
541 int num = 0;
542 ink_release_assert(active == false);
543 for (auto &producer : producers) {
544 if (producer.read_buffer != nullptr) {
545 ink_assert(producer.vc != nullptr);
546 free_MIOBuffer(producer.read_buffer);
547 producer.read_buffer = nullptr;
548 producer.buffer_start = nullptr;
549 num++;
550 }
551
552 if (producer.chunked_handler.dechunked_buffer != nullptr) {
553 ink_assert(producer.vc != nullptr);
554 free_MIOBuffer(producer.chunked_handler.dechunked_buffer);
555 producer.chunked_handler.dechunked_buffer = nullptr;
556 num++;
557 }
558
559 if (producer.chunked_handler.chunked_buffer != nullptr) {
560 ink_assert(producer.vc != nullptr);
561 free_MIOBuffer(producer.chunked_handler.chunked_buffer);
562 producer.chunked_handler.chunked_buffer = nullptr;
563 num++;
564 }
565 producer.chunked_handler.max_chunk_header_len = 0;
566 }
567 return num;
568 }
569
570 void
set_producer_chunking_action(HttpTunnelProducer * p,int64_t skip_bytes,TunnelChunkingAction_t action)571 HttpTunnel::set_producer_chunking_action(HttpTunnelProducer *p, int64_t skip_bytes, TunnelChunkingAction_t action)
572 {
573 p->chunked_handler.skip_bytes = skip_bytes;
574 p->chunking_action = action;
575
576 switch (action) {
577 case TCA_CHUNK_CONTENT:
578 p->chunked_handler.state = p->chunked_handler.CHUNK_WRITE_CHUNK;
579 break;
580 case TCA_DECHUNK_CONTENT:
581 case TCA_PASSTHRU_CHUNKED_CONTENT:
582 p->chunked_handler.state = p->chunked_handler.CHUNK_READ_SIZE;
583 break;
584 default:
585 break;
586 };
587 }
588
589 void
set_producer_chunking_size(HttpTunnelProducer * p,int64_t size)590 HttpTunnel::set_producer_chunking_size(HttpTunnelProducer *p, int64_t size)
591 {
592 p->chunked_handler.set_max_chunk_size(size);
593 }
594
595 // HttpTunnelProducer* HttpTunnel::add_producer
596 //
597 // Adds a new producer to the tunnel
598 //
599 HttpTunnelProducer *
add_producer(VConnection * vc,int64_t nbytes_arg,IOBufferReader * reader_start,HttpProducerHandler sm_handler,HttpTunnelType_t vc_type,const char * name_arg)600 HttpTunnel::add_producer(VConnection *vc, int64_t nbytes_arg, IOBufferReader *reader_start, HttpProducerHandler sm_handler,
601 HttpTunnelType_t vc_type, const char *name_arg)
602 {
603 HttpTunnelProducer *p;
604
605 Debug("http_tunnel", "[%" PRId64 "] adding producer '%s'", sm->sm_id, name_arg);
606
607 ink_assert(reader_start->mbuf);
608 if ((p = alloc_producer()) != nullptr) {
609 p->vc = vc;
610 p->nbytes = nbytes_arg;
611 p->buffer_start = reader_start;
612 p->read_buffer = reader_start->mbuf;
613 p->vc_handler = sm_handler;
614 p->vc_type = vc_type;
615 p->name = name_arg;
616 p->chunking_action = TCA_PASSTHRU_DECHUNKED_CONTENT;
617
618 p->do_chunking = false;
619 p->do_dechunking = false;
620 p->do_chunked_passthru = false;
621
622 p->init_bytes_done = reader_start->read_avail();
623 if (p->nbytes < 0) {
624 p->ntodo = p->nbytes;
625 } else { // The byte count given us includes bytes
626 // that already may be in the buffer.
627 // ntodo represents the number of bytes
628 // the tunneling mechanism needs to read
629 // for the producer
630 p->ntodo = p->nbytes - p->init_bytes_done;
631 ink_assert(p->ntodo >= 0);
632 }
633
634 // We are static, the producer is never "alive"
635 // It just has data in the buffer
636 if (vc == HTTP_TUNNEL_STATIC_PRODUCER) {
637 ink_assert(p->ntodo == 0);
638 p->alive = false;
639 p->read_success = true;
640 } else {
641 p->alive = true;
642 }
643 }
644 return p;
645 }
646
647 // void HttpTunnel::add_consumer
648 //
649 // Adds a new consumer to the tunnel. The producer must
650 // be specified and already added to the tunnel. Attaches
651 // the new consumer to the entry for the existing producer
652 //
653 // Returns true if the consumer successfully added. Returns
654 // false if the consumer was not added because the source failed
655 //
656 HttpTunnelConsumer *
add_consumer(VConnection * vc,VConnection * producer,HttpConsumerHandler sm_handler,HttpTunnelType_t vc_type,const char * name_arg,int64_t skip_bytes)657 HttpTunnel::add_consumer(VConnection *vc, VConnection *producer, HttpConsumerHandler sm_handler, HttpTunnelType_t vc_type,
658 const char *name_arg, int64_t skip_bytes)
659 {
660 Debug("http_tunnel", "[%" PRId64 "] adding consumer '%s'", sm->sm_id, name_arg);
661
662 // Find the producer entry
663 HttpTunnelProducer *p = get_producer(producer);
664 ink_release_assert(p);
665
666 // Check to see if the producer terminated
667 // without sending all of its data
668 if (p->alive == false && p->read_success == false) {
669 Debug("http_tunnel", "[%" PRId64 "] consumer '%s' not added due to producer failure", sm->sm_id, name_arg);
670 return nullptr;
671 }
672 // Initialize the consumer structure
673 HttpTunnelConsumer *c = alloc_consumer();
674 c->producer = p;
675 c->vc = vc;
676 c->alive = true;
677 c->skip_bytes = skip_bytes;
678 c->vc_handler = sm_handler;
679 c->vc_type = vc_type;
680 c->name = name_arg;
681
682 // Register the consumer with the producer
683 p->consumer_list.push(c);
684 p->num_consumers++;
685
686 return c;
687 }
688
689 void
chain(HttpTunnelConsumer * c,HttpTunnelProducer * p)690 HttpTunnel::chain(HttpTunnelConsumer *c, HttpTunnelProducer *p)
691 {
692 p->self_consumer = c;
693 c->self_producer = p;
694 // If the flow is already throttled update the chained producer.
695 if (c->producer->is_throttled()) {
696 p->set_throttle_src(c->producer->flow_control_source);
697 }
698 }
699
700 // void HttpTunnel::tunnel_run()
701 //
702 // Makes the tunnel go
703 //
704 void
tunnel_run(HttpTunnelProducer * p_arg)705 HttpTunnel::tunnel_run(HttpTunnelProducer *p_arg)
706 {
707 Debug("http_tunnel", "tunnel_run started, p_arg is %s", p_arg ? "provided" : "NULL");
708 if (p_arg) {
709 producer_run(p_arg);
710 } else {
711 HttpTunnelProducer *p;
712
713 ink_assert(active == false);
714
715 for (int i = 0; i < MAX_PRODUCERS; ++i) {
716 p = producers + i;
717 if (p->vc != nullptr && (p->alive || (p->vc_type == HT_STATIC && p->buffer_start != nullptr))) {
718 producer_run(p);
719 }
720 }
721 }
722
723 // It is possible that there was nothing to do
724 // due to a all transfers being zero length
725 // If that is the case, call the state machine
726 // back to say we are done
727 if (!is_tunnel_alive()) {
728 active = false;
729 sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
730 }
731 }
732
733 void
producer_run(HttpTunnelProducer * p)734 HttpTunnel::producer_run(HttpTunnelProducer *p)
735 {
736 // Determine whether the producer has a cache-write consumer,
737 // since all chunked content read by the producer gets dechunked
738 // prior to being written into the cache.
739 HttpTunnelConsumer *c, *cache_write_consumer = nullptr;
740 bool transform_consumer = false;
741
742 for (c = p->consumer_list.head; c; c = c->link.next) {
743 if (c->vc_type == HT_CACHE_WRITE) {
744 cache_write_consumer = c;
745 break;
746 }
747 }
748
749 // bz57413
750 for (c = p->consumer_list.head; c; c = c->link.next) {
751 if (c->vc_type == HT_TRANSFORM) {
752 transform_consumer = true;
753 break;
754 }
755 }
756
757 // Determine whether the producer is to perform chunking,
758 // dechunking, or chunked-passthough of the incoming response.
759 TunnelChunkingAction_t action = p->chunking_action;
760
761 // [bug 2579251] static producers won't have handler set
762 if (p->vc != HTTP_TUNNEL_STATIC_PRODUCER) {
763 if (action == TCA_CHUNK_CONTENT) {
764 p->do_chunking = true;
765 } else if (action == TCA_DECHUNK_CONTENT) {
766 p->do_dechunking = true;
767 } else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
768 p->do_chunked_passthru = true;
769
770 // Dechunk the chunked content into the cache.
771 if (cache_write_consumer != nullptr) {
772 p->do_dechunking = true;
773 }
774 }
775 }
776
777 int64_t consumer_n;
778 int64_t producer_n;
779
780 ink_assert(p->vc != nullptr);
781 active = true;
782
783 IOBufferReader *chunked_buffer_start = nullptr, *dechunked_buffer_start = nullptr;
784 if (p->do_chunking || p->do_dechunking || p->do_chunked_passthru) {
785 p->chunked_handler.init(p->buffer_start, p);
786
787 // Copy the header into the chunked/dechunked buffers.
788 if (p->do_chunking) {
789 // initialize a reader to chunked buffer start before writing to keep ref count
790 chunked_buffer_start = p->chunked_handler.chunked_buffer->alloc_reader();
791 p->chunked_handler.chunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
792 }
793 if (p->do_dechunking) {
794 // bz57413
795 Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64 "",
796 p->chunked_handler.chunked_reader->read_avail());
797
798 // initialize a reader to dechunked buffer start before writing to keep ref count
799 dechunked_buffer_start = p->chunked_handler.dechunked_buffer->alloc_reader();
800
801 // If there is no transformation then add the header to the buffer, else the
802 // client already has got the header from us, no need for it in the buffer.
803 if (!transform_consumer) {
804 p->chunked_handler.dechunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
805
806 Debug("http_tunnel", "[producer_run] do_dechunking::Copied header of size %" PRId64 "", p->chunked_handler.skip_bytes);
807 }
808 }
809 }
810
811 int64_t read_start_pos = 0;
812 if (p->vc_type == HT_CACHE_READ && sm->t_state.range_setup == HttpTransact::RANGE_NOT_TRANSFORM_REQUESTED) {
813 ink_assert(sm->t_state.num_range_fields == 1); // we current just support only one range entry
814 read_start_pos = sm->t_state.ranges[0]._start;
815 producer_n = (sm->t_state.ranges[0]._end - sm->t_state.ranges[0]._start) + 1;
816 consumer_n = (producer_n + sm->client_response_hdr_bytes);
817 } else if (p->nbytes >= 0) {
818 consumer_n = p->nbytes;
819 producer_n = p->ntodo;
820 } else {
821 consumer_n = (producer_n = INT64_MAX);
822 }
823
824 // At least set up the consumer readers first so the data
825 // doesn't disappear out from under the tunnel
826 for (c = p->consumer_list.head; c; c = c->link.next) {
827 // Create a reader for each consumer. The reader allows
828 // us to implement skip bytes
829 if (c->vc_type == HT_CACHE_WRITE) {
830 switch (action) {
831 case TCA_CHUNK_CONTENT:
832 case TCA_PASSTHRU_DECHUNKED_CONTENT:
833 c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
834 break;
835 case TCA_DECHUNK_CONTENT:
836 case TCA_PASSTHRU_CHUNKED_CONTENT:
837 c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
838 break;
839 default:
840 break;
841 }
842 }
843 // Non-cache consumers.
844 else if (action == TCA_CHUNK_CONTENT) {
845 c->buffer_reader = p->chunked_handler.chunked_buffer->clone_reader(chunked_buffer_start);
846 } else if (action == TCA_DECHUNK_CONTENT) {
847 c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
848 } else {
849 c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
850 }
851
852 // Consume bytes of the reader if we skipping bytes
853 if (c->skip_bytes > 0) {
854 ink_assert(c->skip_bytes <= c->buffer_reader->read_avail());
855 c->buffer_reader->consume(c->skip_bytes);
856 }
857 }
858
859 // YTS Team, yamsat Plugin
860 // Allocate and copy partial POST data to buffers. Check for the various parameters
861 // including the maximum configured post data size
862 if ((p->vc_type == HT_BUFFER_READ && sm->is_postbuf_valid()) ||
863 (p->alive && sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection && p->vc_type == HT_HTTP_CLIENT)) {
864 Debug("http_redirect", "[HttpTunnel::producer_run] client post: %" PRId64 " max size: %" PRId64 "",
865 p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
866
867 // (note that since we are not dechunking POST, this is the chunked size if chunked)
868 if (p->buffer_start->read_avail() > HttpConfig::m_master.post_copy_size) {
869 Warning("http_redirect, [HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64 " limit=%" PRId64 "",
870 p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
871 sm->disable_redirect();
872 if (p->vc_type == HT_BUFFER_READ) {
873 producer_handler(VC_EVENT_ERROR, p);
874 return;
875 }
876 } else {
877 sm->postbuf_copy_partial_data();
878 }
879 } // end of added logic for partial POST
880
881 if (p->do_chunking) {
882 // remove the chunked reader marker so that it doesn't act like a buffer guard
883 p->chunked_handler.chunked_buffer->dealloc_reader(chunked_buffer_start);
884 p->chunked_handler.dechunked_reader->consume(p->chunked_handler.skip_bytes);
885
886 // If there is data to process in the buffer, do it now
887 producer_handler(VC_EVENT_READ_READY, p);
888 } else if (p->do_dechunking || p->do_chunked_passthru) {
889 // remove the dechunked reader marker so that it doesn't act like a buffer guard
890 if (p->do_dechunking && dechunked_buffer_start) {
891 p->chunked_handler.dechunked_buffer->dealloc_reader(dechunked_buffer_start);
892 }
893
894 // bz57413
895 // If there is no transformation plugin, then we didn't add the header, hence no need to consume it
896 Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64 "",
897 p->chunked_handler.chunked_reader->read_avail());
898 if (!transform_consumer && (p->chunked_handler.chunked_reader->read_avail() >= p->chunked_handler.skip_bytes)) {
899 p->chunked_handler.chunked_reader->consume(p->chunked_handler.skip_bytes);
900 Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.skip_bytes = %" PRId64 "",
901 p->chunked_handler.skip_bytes);
902 }
903 // if(p->chunked_handler.chunked_reader->read_avail() > 0)
904 // p->chunked_handler.chunked_reader->consume(
905 // p->chunked_handler.skip_bytes);
906
907 producer_handler(VC_EVENT_READ_READY, p);
908 if (sm->get_postbuf_done() && p->vc_type == HT_HTTP_CLIENT) { // read_avail() == 0
909 // [bug 2579251]
910 // Ugh, this is horrible but in the redirect case they are running a the tunnel again with the
911 // now closed/empty producer to trigger PRECOMPLETE. If the POST was chunked, producer_n is set
912 // (incorrectly) to INT64_MAX. It needs to be set to 0 to prevent triggering another read.
913 producer_n = 0;
914 }
915 }
916 for (c = p->consumer_list.head; c; c = c->link.next) {
917 int64_t c_write = consumer_n;
918
919 // Don't bother to set up the consumer if it is dead
920 if (!c->alive) {
921 continue;
922 }
923
924 if (!p->alive) {
925 // Adjust the amount of chunked data to write if the only data was in the initial read
926 // The amount to write in some cases is dependent on the type of the consumer, so this
927 // value must be computed for each consumer
928 c_write = final_consumer_bytes_to_write(p, c);
929 } else {
930 // INKqa05109 - if we don't know the length leave it at
931 // INT64_MAX or else the cache may bounce the write
932 // because it thinks the document is too big. INT64_MAX
933 // is a special case for the max document size code
934 // in the cache
935 if (c_write != INT64_MAX) {
936 c_write -= c->skip_bytes;
937 }
938 // Fix for problems with not chunked content being chunked and
939 // not sending the entire data. The content length grows when
940 // it is being chunked.
941 if (p->do_chunking == true) {
942 c_write = INT64_MAX;
943 }
944 }
945
946 if (c_write == 0) {
947 // Nothing to do, call back the cleanup handlers
948 c->write_vio = nullptr;
949 consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
950 } else {
951 // In the client half close case, all the data that will be sent
952 // from the client is already in the buffer. Go ahead and set
953 // the amount to read since we know it. We will forward the FIN
954 // to the server on VC_EVENT_WRITE_COMPLETE.
955 if (p->vc_type == HT_HTTP_CLIENT) {
956 ProxyTransaction *ua_vc = static_cast<ProxyTransaction *>(p->vc);
957 if (ua_vc->get_half_close_flag()) {
958 int tmp = c->buffer_reader->read_avail();
959 if (tmp < c_write) {
960 c_write = tmp;
961 }
962 p->alive = false;
963 p->handler_state = HTTP_SM_POST_SUCCESS;
964 }
965 }
966 // Start the writes now that we know we will consume all the initial data
967 c->write_vio = c->vc->do_io_write(this, c_write, c->buffer_reader);
968 ink_assert(c_write > 0);
969 if (c->write_vio == nullptr) {
970 consumer_handler(VC_EVENT_ERROR, c);
971 }
972 }
973 }
974 if (p->alive) {
975 ink_assert(producer_n >= 0);
976
977 if (producer_n == 0) {
978 // Everything is already in the buffer so mark the producer as done. We need to notify
979 // state machine that everything is done. We use a special event to say the producers is
980 // done but we didn't do anything
981 p->alive = false;
982 p->read_success = true;
983 p->handler_state = HTTP_SM_POST_SUCCESS;
984 Debug("http_tunnel", "[%" PRId64 "] [tunnel_run] producer already done", sm->sm_id);
985 producer_handler(HTTP_TUNNEL_EVENT_PRECOMPLETE, p);
986 } else {
987 if (read_start_pos > 0) {
988 p->read_vio = ((CacheVC *)p->vc)->do_io_pread(this, producer_n, p->read_buffer, read_start_pos);
989 } else {
990 p->read_vio = p->vc->do_io_read(this, producer_n, p->read_buffer);
991 }
992 }
993 }
994
995 // Now that the tunnel has started, we must remove producer's reader so
996 // that it doesn't act like a buffer guard
997 if (p->read_buffer && p->buffer_start) {
998 p->read_buffer->dealloc_reader(p->buffer_start);
999 }
1000 p->buffer_start = nullptr;
1001 }
1002
1003 int
producer_handler_dechunked(int event,HttpTunnelProducer * p)1004 HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer *p)
1005 {
1006 ink_assert(p->do_chunking);
1007
1008 Debug("http_tunnel", "[%" PRId64 "] producer_handler_dechunked [%s %s]", sm->sm_id, p->name,
1009 HttpDebugNames::get_event_name(event));
1010
1011 // We only interested in translating certain events
1012 switch (event) {
1013 case VC_EVENT_READ_COMPLETE:
1014 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1015 case VC_EVENT_EOS:
1016 p->alive = false; // Update the producer state for final_consumer_bytes_to_write
1017 /* fallthrough */
1018 case VC_EVENT_READ_READY:
1019 p->last_event = p->chunked_handler.last_server_event = event;
1020 if (p->chunked_handler.generate_chunked_content()) { // We are done, make sure the consumer is activated
1021 HttpTunnelConsumer *c;
1022 for (c = p->consumer_list.head; c; c = c->link.next) {
1023 if (c->alive) {
1024 c->write_vio->nbytes = final_consumer_bytes_to_write(p, c);
1025 // consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
1026 }
1027 }
1028 }
1029 break;
1030 };
1031 // Since we will consume all the data if the server is actually finished
1032 // we don't have to translate events like we do in the
1033 // case producer_handler_chunked()
1034 return event;
1035 }
1036
1037 // int HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer* p)
1038 //
1039 // Handles events from chunked producers. It calls the chunking handlers
1040 // if appropriate and then translates the event we got into a suitable
1041 // event to represent the unchunked state, and does chunked bookkeeping
1042 //
1043 int
producer_handler_chunked(int event,HttpTunnelProducer * p)1044 HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer *p)
1045 {
1046 ink_assert(p->do_dechunking || p->do_chunked_passthru);
1047
1048 Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
1049
1050 // We only interested in translating certain events
1051 switch (event) {
1052 case VC_EVENT_READ_READY:
1053 case VC_EVENT_READ_COMPLETE:
1054 case VC_EVENT_INACTIVITY_TIMEOUT:
1055 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1056 case VC_EVENT_EOS:
1057 break;
1058 default:
1059 return event;
1060 }
1061
1062 p->last_event = p->chunked_handler.last_server_event = event;
1063 bool done = p->chunked_handler.process_chunked_content();
1064
1065 // If we couldn't understand the encoding, return
1066 // an error
1067 if (p->chunked_handler.state == ChunkedHandler::CHUNK_READ_ERROR) {
1068 Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s chunk decoding error]", sm->sm_id, p->name);
1069 p->chunked_handler.truncation = true;
1070 // FIX ME: we return EOS here since it will cause the
1071 // the client to be reenabled. ERROR makes more
1072 // sense but no reenables follow
1073 return VC_EVENT_EOS;
1074 }
1075
1076 switch (event) {
1077 case VC_EVENT_READ_READY:
1078 if (done) {
1079 return VC_EVENT_READ_COMPLETE;
1080 }
1081 break;
1082 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1083 case VC_EVENT_EOS:
1084 case VC_EVENT_READ_COMPLETE:
1085 case VC_EVENT_INACTIVITY_TIMEOUT:
1086 if (!done) {
1087 p->chunked_handler.truncation = true;
1088 }
1089 break;
1090 }
1091
1092 return event;
1093 }
1094
1095 //
1096 // bool HttpTunnel::producer_handler(int event, HttpTunnelProducer* p)
1097 //
1098 // Handles events from producers.
1099 //
1100 // If the event is interesting only to the tunnel, this
1101 // handler takes all necessary actions and returns false
1102 // If the event is interesting to the state_machine,
1103 // it calls back the state machine and returns true
1104 //
1105 //
1106 bool
producer_handler(int event,HttpTunnelProducer * p)1107 HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
1108 {
1109 HttpTunnelConsumer *c;
1110 HttpProducerHandler jump_point;
1111 bool sm_callback = false;
1112
1113 Debug("http_tunnel", "[%" PRId64 "] producer_handler [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
1114
1115 // Handle chunking/dechunking/chunked-passthrough if necessary.
1116 if (p->do_chunking) {
1117 event = producer_handler_dechunked(event, p);
1118
1119 // If we were in PRECOMPLETE when this function was called
1120 // and we are doing chunking, then we just wrote the last
1121 // chunk in the the function call above. We are done with the
1122 // tunnel.
1123 if (event == HTTP_TUNNEL_EVENT_PRECOMPLETE) {
1124 event = VC_EVENT_EOS;
1125 }
1126 } else if (p->do_dechunking || p->do_chunked_passthru) {
1127 event = producer_handler_chunked(event, p);
1128 } else {
1129 p->last_event = event;
1130 }
1131
1132 // YTS Team, yamsat Plugin
1133 // Copy partial POST data to buffers. Check for the various parameters including
1134 // the maximum configured post data size
1135 if ((p->vc_type == HT_BUFFER_READ && sm->is_postbuf_valid()) ||
1136 (sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection &&
1137 (event == VC_EVENT_READ_READY || event == VC_EVENT_READ_COMPLETE) && p->vc_type == HT_HTTP_CLIENT)) {
1138 Debug("http_redirect", "[HttpTunnel::producer_handler] [%s %s]", p->name, HttpDebugNames::get_event_name(event));
1139
1140 if ((sm->postbuf_buffer_avail() + sm->postbuf_reader_avail()) > HttpConfig::m_master.post_copy_size) {
1141 Warning("http_redirect, [HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64
1142 " reader_avail=%" PRId64 " limit=%" PRId64 "",
1143 sm->postbuf_buffer_avail(), sm->postbuf_reader_avail(), HttpConfig::m_master.post_copy_size);
1144 sm->disable_redirect();
1145 if (p->vc_type == HT_BUFFER_READ) {
1146 event = VC_EVENT_ERROR;
1147 }
1148 } else {
1149 sm->postbuf_copy_partial_data();
1150 if (event == VC_EVENT_READ_COMPLETE || event == HTTP_TUNNEL_EVENT_PRECOMPLETE || event == VC_EVENT_EOS) {
1151 sm->set_postbuf_done(true);
1152 }
1153 }
1154 } // end of added logic for partial copy of POST
1155
1156 Debug("http_redirect", "[HttpTunnel::producer_handler] enable_redirection: [%d %d %d] event: %d, state: %d", p->alive == true,
1157 sm->enable_redirection, (p->self_consumer && p->self_consumer->alive == true), event, p->chunked_handler.state);
1158
1159 switch (event) {
1160 case VC_EVENT_READ_READY:
1161 // Data read from producer, reenable consumers
1162 for (c = p->consumer_list.head; c; c = c->link.next) {
1163 if (c->alive && c->write_vio) {
1164 c->write_vio->reenable();
1165 }
1166 }
1167 break;
1168
1169 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1170 // If the write completes on the stack (as it can for http2), then
1171 // consumer could have called back by this point. Must treat this as
1172 // a regular read complete (falling through to the following cases).
1173
1174 case VC_EVENT_READ_COMPLETE:
1175 case VC_EVENT_EOS:
1176 // The producer completed
1177 p->alive = false;
1178 if (p->read_vio) {
1179 p->bytes_read = p->read_vio->ndone;
1180 } else {
1181 // If we are chunked, we can receive the whole document
1182 // along with the header without knowing it (due to
1183 // the message length being a property of the encoding)
1184 // In that case, we won't have done a do_io so there
1185 // will not be vio
1186 p->bytes_read = 0;
1187 }
1188
1189 // callback the SM to notify of completion
1190 // Note: we need to callback the SM before
1191 // reenabling the consumers as the reenable may
1192 // make the data visible to the consumer and
1193 // initiate async I/O operation. The SM needs to
1194 // set how much I/O to do before async I/O is
1195 // initiated
1196 jump_point = p->vc_handler;
1197 (sm->*jump_point)(event, p);
1198 sm_callback = true;
1199 p->update_state_if_not_set(HTTP_SM_POST_SUCCESS);
1200
1201 // Data read from producer, reenable consumers
1202 for (c = p->consumer_list.head; c; c = c->link.next) {
1203 if (c->alive && c->write_vio) {
1204 c->write_vio->reenable();
1205 }
1206 }
1207 break;
1208
1209 case VC_EVENT_ERROR:
1210 case VC_EVENT_ACTIVE_TIMEOUT:
1211 case VC_EVENT_INACTIVITY_TIMEOUT:
1212 case HTTP_TUNNEL_EVENT_CONSUMER_DETACH:
1213 if (p->alive) {
1214 p->alive = false;
1215 if (p->read_vio) {
1216 p->bytes_read = p->read_vio->ndone;
1217 } else {
1218 p->bytes_read = 0;
1219 }
1220 // Clear any outstanding reads so they don't
1221 // collide with future tunnel IO's
1222 p->vc->do_io_read(nullptr, 0, nullptr);
1223 // Interesting tunnel event, call SM
1224 jump_point = p->vc_handler;
1225 (sm->*jump_point)(event, p);
1226 sm_callback = true;
1227 // Failure case anyway
1228 p->update_state_if_not_set(HTTP_SM_POST_UA_FAIL);
1229 }
1230 break;
1231
1232 case VC_EVENT_WRITE_READY:
1233 case VC_EVENT_WRITE_COMPLETE:
1234 default:
1235 // Producers should not get these events
1236 ink_release_assert(0);
1237 break;
1238 }
1239
1240 return sm_callback;
1241 }
1242
1243 void
consumer_reenable(HttpTunnelConsumer * c)1244 HttpTunnel::consumer_reenable(HttpTunnelConsumer *c)
1245 {
1246 HttpTunnelProducer *p = c->producer;
1247
1248 if (p && p->alive) {
1249 // Only do flow control if enabled and the producer is an external
1250 // source. Otherwise disable by making the backlog zero. Because
1251 // the backlog short cuts quit when the value is equal (or
1252 // greater) to the target, we use strict comparison only for
1253 // checking low water, otherwise the flow control can stall out.
1254 uint64_t backlog = (flow_state.enabled_p && p->is_source()) ? p->backlog(flow_state.high_water) : 0;
1255 HttpTunnelProducer *srcp = p->flow_control_source;
1256
1257 if (backlog >= flow_state.high_water) {
1258 if (is_debug_tag_set("http_tunnel")) {
1259 Debug("http_tunnel", "Throttle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
1260 }
1261 p->throttle(); // p becomes srcp for future calls to this method
1262 } else {
1263 if (srcp && srcp->alive && c->is_sink()) {
1264 // Check if backlog is below low water - note we need to check
1265 // against the source producer, not necessarily the producer
1266 // for this consumer. We don't have to recompute the backlog
1267 // if they are the same because we know low water <= high
1268 // water so the value is sufficiently accurate.
1269 if (srcp != p) {
1270 backlog = srcp->backlog(flow_state.low_water);
1271 }
1272 if (backlog < flow_state.low_water) {
1273 if (is_debug_tag_set("http_tunnel")) {
1274 Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
1275 }
1276 srcp->unthrottle();
1277 if (srcp->read_vio) {
1278 srcp->read_vio->reenable();
1279 }
1280 // Kick source producer to get flow ... well, flowing.
1281 this->producer_handler(VC_EVENT_READ_READY, srcp);
1282 } else {
1283 // We can stall for small thresholds on network sinks because this event happens
1284 // before the actual socket write. So we trap for the buffer becoming empty to
1285 // make sure we get an event to unthrottle after the write.
1286 if (HT_HTTP_CLIENT == c->vc_type) {
1287 NetVConnection *netvc = dynamic_cast<NetVConnection *>(c->write_vio->vc_server);
1288 if (netvc) { // really, this should always be true.
1289 netvc->trapWriteBufferEmpty();
1290 }
1291 }
1292 }
1293 }
1294 if (p->read_vio) {
1295 p->read_vio->reenable();
1296 }
1297 }
1298 }
1299 }
1300
1301 //
1302 // bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer* p)
1303 //
1304 // Handles events from consumers.
1305 //
1306 // If the event is interesting only to the tunnel, this
1307 // handler takes all necessary actions and returns false
1308 // If the event is interesting to the state_machine,
1309 // it calls back the state machine and returns true
1310 //
1311 //
1312 bool
consumer_handler(int event,HttpTunnelConsumer * c)1313 HttpTunnel::consumer_handler(int event, HttpTunnelConsumer *c)
1314 {
1315 bool sm_callback = false;
1316 HttpConsumerHandler jump_point;
1317 HttpTunnelProducer *p = c->producer;
1318
1319 Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event));
1320
1321 ink_assert(c->alive == true);
1322
1323 switch (event) {
1324 case VC_EVENT_WRITE_READY:
1325 this->consumer_reenable(c);
1326 // Once we get a write ready from the origin, we can assume the connect to some degree succeeded
1327 if (c->vc_type == HT_HTTP_SERVER) {
1328 sm->t_state.current.server->clear_connect_fail();
1329 }
1330 break;
1331
1332 case VC_EVENT_WRITE_COMPLETE:
1333 case VC_EVENT_EOS:
1334 case VC_EVENT_ERROR:
1335 case VC_EVENT_ACTIVE_TIMEOUT:
1336 case VC_EVENT_INACTIVITY_TIMEOUT:
1337 ink_assert(c->alive);
1338 ink_assert(c->buffer_reader);
1339 c->alive = false;
1340
1341 c->bytes_written = c->write_vio ? c->write_vio->ndone : 0;
1342
1343 // Interesting tunnel event, call SM
1344 jump_point = c->vc_handler;
1345 (sm->*jump_point)(event, c);
1346 // Make sure the handler_state is set
1347 // Necessary for post tunnel end processing
1348 if (c->producer && c->producer->handler_state == 0) {
1349 if (event == VC_EVENT_WRITE_COMPLETE) {
1350 c->producer->handler_state = HTTP_SM_POST_SUCCESS;
1351 // If the consumer completed, presumably the producer successfully read
1352 c->producer->read_success = true;
1353 // Go ahead and clean up the producer side
1354 if (p->alive) {
1355 producer_handler(VC_EVENT_READ_COMPLETE, p);
1356 }
1357 } else if (c->vc_type == HT_HTTP_SERVER) {
1358 c->producer->handler_state = HTTP_SM_POST_UA_FAIL;
1359 } else if (c->vc_type == HT_HTTP_CLIENT) {
1360 c->producer->handler_state = HTTP_SM_POST_SERVER_FAIL;
1361 }
1362 }
1363 sm_callback = true;
1364
1365 // Deallocate the reader after calling back the sm
1366 // because buffer problems are easier to debug
1367 // in the sm when the reader is still valid
1368 if (c->buffer_reader) {
1369 c->buffer_reader->mbuf->dealloc_reader(c->buffer_reader);
1370 c->buffer_reader = nullptr;
1371 }
1372
1373 // Since we removed a consumer, it may now be
1374 // possible to put more stuff in the buffer
1375 // Note: we reenable only after calling back
1376 // the SM since the reenabling has the side effect
1377 // updating the buffer state for the VConnection
1378 // that is being reenabled
1379 if (p->alive && p->read_vio) {
1380 if (p->is_throttled()) {
1381 this->consumer_reenable(c);
1382 } else {
1383 p->read_vio->reenable();
1384 }
1385 }
1386 // [amc] I don't think this happens but we'll leave a debug trap
1387 // here just in case.
1388 if (p->is_throttled()) {
1389 Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p);
1390 }
1391 break;
1392
1393 case VC_EVENT_READ_READY:
1394 case VC_EVENT_READ_COMPLETE:
1395 default:
1396 // Consumers should not get these events
1397 ink_release_assert(0);
1398 break;
1399 }
1400
1401 return sm_callback;
1402 }
1403
1404 // void HttpTunnel::chain_abort_all(HttpTunnelProducer* p)
1405 //
1406 // Abort the producer and everyone still alive
1407 // downstream of the producer
1408 //
1409 void
chain_abort_all(HttpTunnelProducer * p)1410 HttpTunnel::chain_abort_all(HttpTunnelProducer *p)
1411 {
1412 HttpTunnelConsumer *c = p->consumer_list.head;
1413
1414 while (c) {
1415 if (c->alive) {
1416 c->alive = false;
1417 c->write_vio = nullptr;
1418 c->vc->do_io_close(EHTTP_ERROR);
1419 update_stats_after_abort(c->vc_type);
1420 }
1421
1422 if (c->self_producer) {
1423 // Must snip the link before recursively
1424 // freeing to avoid looks introduced by
1425 // blind tunneling
1426 HttpTunnelProducer *selfp = c->self_producer;
1427 c->self_producer = nullptr;
1428 chain_abort_all(selfp);
1429 }
1430
1431 c = c->link.next;
1432 }
1433
1434 if (p->alive) {
1435 p->alive = false;
1436 if (p->read_vio) {
1437 p->bytes_read = p->read_vio->ndone;
1438 }
1439 if (p->self_consumer) {
1440 p->self_consumer->alive = false;
1441 }
1442 p->read_vio = nullptr;
1443 p->vc->do_io_close(EHTTP_ERROR);
1444 HTTP_INCREMENT_DYN_STAT(http_origin_shutdown_tunnel_abort);
1445 update_stats_after_abort(p->vc_type);
1446 }
1447 }
1448
1449 //
1450 // Determine the number of bytes a consumer should read from a producer
1451 //
1452 int64_t
final_consumer_bytes_to_write(HttpTunnelProducer * p,HttpTunnelConsumer * c)1453 HttpTunnel::final_consumer_bytes_to_write(HttpTunnelProducer *p, HttpTunnelConsumer *c)
1454 {
1455 int64_t total_bytes = 0;
1456 int64_t consumer_n = 0;
1457 if (p->alive) {
1458 consumer_n = INT64_MAX;
1459 } else {
1460 TunnelChunkingAction_t action = p->chunking_action;
1461 if (c->alive) {
1462 if (c->vc_type == HT_CACHE_WRITE) {
1463 switch (action) {
1464 case TCA_CHUNK_CONTENT:
1465 case TCA_PASSTHRU_DECHUNKED_CONTENT:
1466 total_bytes = p->bytes_read + p->init_bytes_done;
1467 break;
1468 case TCA_DECHUNK_CONTENT:
1469 case TCA_PASSTHRU_CHUNKED_CONTENT:
1470 total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
1471 break;
1472 default:
1473 break;
1474 }
1475 } else if (action == TCA_CHUNK_CONTENT) {
1476 total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
1477 } else if (action == TCA_DECHUNK_CONTENT) {
1478 total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
1479 } else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
1480 total_bytes = p->bytes_read + p->init_bytes_done;
1481 } else {
1482 total_bytes = p->bytes_read + p->init_bytes_done;
1483 }
1484 consumer_n = total_bytes - c->skip_bytes;
1485 }
1486 }
1487 return consumer_n;
1488 }
1489
1490 //
1491 // void HttpTunnel::finish_all_internal(HttpTunnelProducer* p)
1492 //
1493 // Internal function for finishing all consumers. Takes
1494 // chain argument about where to finish just immediate
1495 // consumer or all those downstream
1496 //
1497 void
finish_all_internal(HttpTunnelProducer * p,bool chain)1498 HttpTunnel::finish_all_internal(HttpTunnelProducer *p, bool chain)
1499 {
1500 ink_assert(p->alive == false);
1501 HttpTunnelConsumer *c = p->consumer_list.head;
1502 int64_t total_bytes = 0;
1503 TunnelChunkingAction_t action = p->chunking_action;
1504
1505 if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
1506 // if the only chunked data was in the initial read, make sure we don't consume too much
1507 if (p->bytes_read == 0 && p->buffer_start != nullptr) {
1508 int num_read = p->buffer_start->read_avail() - p->chunked_handler.chunked_reader->read_avail();
1509 if (num_read < p->init_bytes_done) {
1510 p->init_bytes_done = num_read;
1511 }
1512 }
1513 }
1514
1515 while (c) {
1516 if (c->alive) {
1517 if (c->write_vio) {
1518 // Adjust the number of bytes to write in the case of
1519 // a completed unlimited producer
1520 c->write_vio->nbytes = final_consumer_bytes_to_write(p, c);
1521 ink_assert(c->write_vio->nbytes >= 0);
1522
1523 if (c->write_vio->nbytes < 0) {
1524 Error("Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n", total_bytes - c->skip_bytes);
1525 }
1526 }
1527
1528 if (chain == true && c->self_producer) {
1529 chain_finish_all(c->self_producer);
1530 }
1531 // The IO Core will not call us back if there
1532 // is nothing to do. Check to see if there is
1533 // nothing to do and take the appropriate
1534 // action
1535 if (c->write_vio && c->alive && c->write_vio->nbytes == c->write_vio->ndone) {
1536 consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
1537 }
1538 }
1539
1540 c = c->link.next;
1541 }
1542 }
1543
1544 // void HttpTunnel::chain_abort_cache_write(HttpProducer* p)
1545 //
1546 // Terminates all cache writes. Used to prevent truncated
1547 // documents from being stored in the cache
1548 //
1549 void
chain_abort_cache_write(HttpTunnelProducer * p)1550 HttpTunnel::chain_abort_cache_write(HttpTunnelProducer *p)
1551 {
1552 HttpTunnelConsumer *c = p->consumer_list.head;
1553
1554 while (c) {
1555 if (c->alive) {
1556 if (c->vc_type == HT_CACHE_WRITE) {
1557 ink_assert(c->self_producer == nullptr);
1558 c->write_vio = nullptr;
1559 c->vc->do_io_close(EHTTP_ERROR);
1560 c->alive = false;
1561 HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
1562 } else if (c->self_producer) {
1563 chain_abort_cache_write(c->self_producer);
1564 }
1565 }
1566 c = c->link.next;
1567 }
1568 }
1569
1570 // void HttpTunnel::close_vc(HttpTunnelProducer* p)
1571 //
1572 // Closes the vc associated with the producer and
1573 // updates the state of the self_consumer
1574 //
1575 void
close_vc(HttpTunnelProducer * p)1576 HttpTunnel::close_vc(HttpTunnelProducer *p)
1577 {
1578 ink_assert(p->alive == false);
1579 HttpTunnelConsumer *c = p->self_consumer;
1580
1581 if (c && c->alive) {
1582 c->alive = false;
1583 if (c->write_vio) {
1584 c->bytes_written = c->write_vio->ndone;
1585 }
1586 }
1587
1588 p->vc->do_io_close();
1589 }
1590
1591 // void HttpTunnel::close_vc(HttpTunnelConsumer* c)
1592 //
1593 // Closes the vc associated with the consumer and
1594 // updates the state of the self_producer
1595 //
1596 void
close_vc(HttpTunnelConsumer * c)1597 HttpTunnel::close_vc(HttpTunnelConsumer *c)
1598 {
1599 ink_assert(c->alive == false);
1600 HttpTunnelProducer *p = c->self_producer;
1601
1602 if (p && p->alive) {
1603 p->alive = false;
1604 if (p->read_vio) {
1605 p->bytes_read = p->read_vio->ndone;
1606 }
1607 }
1608
1609 c->vc->do_io_close();
1610 }
1611
1612 // int HttpTunnel::main_handler(int event, void* data)
1613 //
1614 // Main handler for the tunnel. Vectors events
1615 // based on whether they are from consumers or
1616 // producers
1617 //
1618 int
main_handler(int event,void * data)1619 HttpTunnel::main_handler(int event, void *data)
1620 {
1621 HttpTunnelProducer *p = nullptr;
1622 HttpTunnelConsumer *c = nullptr;
1623 bool sm_callback = false;
1624
1625 ++reentrancy_count;
1626
1627 ink_assert(sm->magic == HTTP_SM_MAGIC_ALIVE);
1628
1629 // Find the appropriate entry
1630 if ((p = get_producer(static_cast<VIO *>(data))) != nullptr) {
1631 sm_callback = producer_handler(event, p);
1632 } else {
1633 if ((c = get_consumer(static_cast<VIO *>(data))) != nullptr) {
1634 ink_assert(c->write_vio == (VIO *)data || c->vc == ((VIO *)data)->vc_server);
1635 sm_callback = consumer_handler(event, c);
1636 } else {
1637 // Presumably a delayed event we can ignore now
1638 internal_error(); // do nothing
1639 }
1640 }
1641
1642 // We called a vc handler, the tunnel might be
1643 // finished. Check to see if there are any remaining
1644 // VConnections alive. If not, notify the state machine
1645 //
1646 // Don't call out if we are nested
1647 if (call_sm || (sm_callback && !is_tunnel_alive())) {
1648 if (reentrancy_count == 1) {
1649 reentrancy_count = 0;
1650 active = false;
1651 sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
1652 return EVENT_DONE;
1653 } else {
1654 call_sm = true;
1655 }
1656 }
1657 --reentrancy_count;
1658 return EVENT_CONT;
1659 }
1660
1661 void
update_stats_after_abort(HttpTunnelType_t t)1662 HttpTunnel::update_stats_after_abort(HttpTunnelType_t t)
1663 {
1664 switch (t) {
1665 case HT_CACHE_READ:
1666 case HT_CACHE_WRITE:
1667 HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
1668 break;
1669 default:
1670 // Handled here:
1671 // HT_HTTP_SERVER, HT_HTTP_CLIENT,
1672 // HT_TRANSFORM, HT_STATIC
1673 break;
1674 };
1675 }
1676
1677 void
internal_error()1678 HttpTunnel::internal_error()
1679 {
1680 }
1681