1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26    HttpTunnel.cc
27 
28    Description:
29 
30 
31 ****************************************************************************/
32 
33 #include "tscore/ink_config.h"
34 #include "HttpConfig.h"
35 #include "HttpTunnel.h"
36 #include "HttpSM.h"
37 #include "HttpDebugNames.h"
38 #include "tscore/ParseRules.h"
39 #include "tscore/ink_memory.h"
40 
41 static const int min_block_transfer_bytes = 256;
42 static const char *const CHUNK_HEADER_FMT = "%" PRIx64 "\r\n";
43 // This should be as small as possible because it will only hold the
44 // header and trailer per chunk - the chunk body will be a reference to
45 // a block in the input stream.
46 static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
47 
ChunkedHandler()48 ChunkedHandler::ChunkedHandler() : max_chunk_size(DEFAULT_MAX_CHUNK_SIZE) {}
49 
50 void
init(IOBufferReader * buffer_in,HttpTunnelProducer * p)51 ChunkedHandler::init(IOBufferReader *buffer_in, HttpTunnelProducer *p)
52 {
53   if (p->do_chunking) {
54     init_by_action(buffer_in, ACTION_DOCHUNK);
55   } else if (p->do_dechunking) {
56     init_by_action(buffer_in, ACTION_DECHUNK);
57   } else {
58     init_by_action(buffer_in, ACTION_PASSTHRU);
59   }
60   return;
61 }
62 
63 void
init_by_action(IOBufferReader * buffer_in,Action action)64 ChunkedHandler::init_by_action(IOBufferReader *buffer_in, Action action)
65 {
66   running_sum    = 0;
67   num_digits     = 0;
68   cur_chunk_size = 0;
69   bytes_left     = 0;
70   truncation     = false;
71   this->action   = action;
72 
73   switch (action) {
74   case ACTION_DOCHUNK:
75     dechunked_reader                   = buffer_in->mbuf->clone_reader(buffer_in);
76     dechunked_reader->mbuf->water_mark = min_block_transfer_bytes;
77     chunked_buffer                     = new_MIOBuffer(CHUNK_IOBUFFER_SIZE_INDEX);
78     chunked_size                       = 0;
79     break;
80   case ACTION_DECHUNK:
81     chunked_reader   = buffer_in->mbuf->clone_reader(buffer_in);
82     dechunked_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_256);
83     dechunked_size   = 0;
84     break;
85   case ACTION_PASSTHRU:
86     chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
87     break;
88   default:
89     ink_release_assert(!"Unknown action");
90   }
91 
92   return;
93 }
94 
95 void
clear()96 ChunkedHandler::clear()
97 {
98   switch (action) {
99   case ACTION_DOCHUNK:
100     free_MIOBuffer(chunked_buffer);
101     break;
102   case ACTION_DECHUNK:
103     free_MIOBuffer(dechunked_buffer);
104     break;
105   case ACTION_PASSTHRU:
106   default:
107     break;
108   }
109 
110   return;
111 }
112 
113 void
set_max_chunk_size(int64_t size)114 ChunkedHandler::set_max_chunk_size(int64_t size)
115 {
116   max_chunk_size       = size ? size : DEFAULT_MAX_CHUNK_SIZE;
117   max_chunk_header_len = snprintf(max_chunk_header, sizeof(max_chunk_header), CHUNK_HEADER_FMT, max_chunk_size);
118 }
119 
120 void
read_size()121 ChunkedHandler::read_size()
122 {
123   int64_t bytes_used;
124   bool done = false;
125 
126   while (chunked_reader->read_avail() > 0 && !done) {
127     const char *tmp   = chunked_reader->start();
128     int64_t data_size = chunked_reader->block_read_avail();
129 
130     ink_assert(data_size > 0);
131     bytes_used = 0;
132 
133     while (data_size > 0) {
134       bytes_used++;
135       if (state == CHUNK_READ_SIZE) {
136         // The http spec says the chunked size is always in hex
137         if (ParseRules::is_hex(*tmp)) {
138           // Make sure we will not overflow running_sum with our shift.
139           if (!can_safely_shift_left(running_sum, 4)) {
140             // We have no more space in our variable for the shift.
141             state = CHUNK_READ_ERROR;
142             done  = true;
143             break;
144           }
145           num_digits++;
146           // Shift over one hex value.
147           running_sum <<= 4;
148 
149           if (ParseRules::is_digit(*tmp)) {
150             running_sum += *tmp - '0';
151           } else {
152             running_sum += ParseRules::ink_tolower(*tmp) - 'a' + 10;
153           }
154         } else {
155           // We are done parsing size
156           if (num_digits == 0 || running_sum < 0) {
157             // Bogus chunk size
158             state = CHUNK_READ_ERROR;
159             done  = true;
160             break;
161           } else {
162             state = CHUNK_READ_SIZE_CRLF; // now look for CRLF
163           }
164         }
165       } else if (state == CHUNK_READ_SIZE_CRLF) { // Scan for a linefeed
166         if (ParseRules::is_lf(*tmp)) {
167           Debug("http_chunk", "read chunk size of %d bytes", running_sum);
168           bytes_left = (cur_chunk_size = running_sum);
169           state      = (running_sum == 0) ? CHUNK_READ_TRAILER_BLANK : CHUNK_READ_CHUNK;
170           done       = true;
171           break;
172         }
173       } else if (state == CHUNK_READ_SIZE_START) {
174         if (ParseRules::is_lf(*tmp)) {
175           running_sum = 0;
176           num_digits  = 0;
177           state       = CHUNK_READ_SIZE;
178         }
179       }
180       tmp++;
181       data_size--;
182     }
183     chunked_reader->consume(bytes_used);
184   }
185 }
186 
187 // int ChunkedHandler::transfer_bytes()
188 //
189 //   Transfer bytes from chunked_reader to dechunked buffer
190 //   Use block reference method when there is a sufficient
191 //   size to move.  Otherwise, uses memcpy method
192 //
193 int64_t
transfer_bytes()194 ChunkedHandler::transfer_bytes()
195 {
196   int64_t block_read_avail, moved, to_move, total_moved = 0;
197 
198   // Handle the case where we are doing chunked passthrough.
199   if (!dechunked_buffer) {
200     moved = std::min(bytes_left, chunked_reader->read_avail());
201     chunked_reader->consume(moved);
202     bytes_left = bytes_left - moved;
203     return moved;
204   }
205 
206   while (bytes_left > 0) {
207     block_read_avail = chunked_reader->block_read_avail();
208 
209     to_move = std::min(bytes_left, block_read_avail);
210     if (to_move <= 0) {
211       break;
212     }
213 
214     if (to_move >= min_block_transfer_bytes) {
215       moved = dechunked_buffer->write(chunked_reader, bytes_left);
216     } else {
217       // Small amount of data available.  We want to copy the
218       // data rather than block reference to prevent the buildup
219       // of too many small blocks which leads to stack overflow
220       // on deallocation
221       moved = dechunked_buffer->write(chunked_reader->start(), to_move);
222     }
223 
224     if (moved > 0) {
225       chunked_reader->consume(moved);
226       bytes_left = bytes_left - moved;
227       dechunked_size += moved;
228       total_moved += moved;
229     } else {
230       break;
231     }
232   }
233   return total_moved;
234 }
235 
236 void
read_chunk()237 ChunkedHandler::read_chunk()
238 {
239   int64_t b = transfer_bytes();
240 
241   ink_assert(bytes_left >= 0);
242   if (bytes_left == 0) {
243     Debug("http_chunk", "completed read of chunk of %" PRId64 " bytes", cur_chunk_size);
244 
245     state = CHUNK_READ_SIZE_START;
246   } else if (bytes_left > 0) {
247     Debug("http_chunk", "read %" PRId64 " bytes of an %" PRId64 " chunk", b, cur_chunk_size);
248   }
249 }
250 
251 void
read_trailer()252 ChunkedHandler::read_trailer()
253 {
254   int64_t bytes_used;
255   bool done = false;
256 
257   while (chunked_reader->is_read_avail_more_than(0) && !done) {
258     const char *tmp   = chunked_reader->start();
259     int64_t data_size = chunked_reader->block_read_avail();
260 
261     ink_assert(data_size > 0);
262     for (bytes_used = 0; data_size > 0; data_size--) {
263       bytes_used++;
264 
265       if (ParseRules::is_cr(*tmp)) {
266         // For a CR to signal we are almost done, the preceding
267         //  part of the line must be blank and next character
268         //  must a LF
269         state = (state == CHUNK_READ_TRAILER_BLANK) ? CHUNK_READ_TRAILER_CR : CHUNK_READ_TRAILER_LINE;
270       } else if (ParseRules::is_lf(*tmp)) {
271         // For a LF to signal we are done reading the
272         //   trailer, the line must have either been blank
273         //   or must have have only had a CR on it
274         if (state == CHUNK_READ_TRAILER_CR || state == CHUNK_READ_TRAILER_BLANK) {
275           state = CHUNK_READ_DONE;
276           Debug("http_chunk", "completed read of trailers");
277           done = true;
278           break;
279         } else {
280           // A LF that does not terminate the trailer
281           //  indicates a new line
282           state = CHUNK_READ_TRAILER_BLANK;
283         }
284       } else {
285         // A character that is not a CR or LF indicates
286         //  the we are parsing a line of the trailer
287         state = CHUNK_READ_TRAILER_LINE;
288       }
289       tmp++;
290     }
291     chunked_reader->consume(bytes_used);
292   }
293 }
294 
295 bool
process_chunked_content()296 ChunkedHandler::process_chunked_content()
297 {
298   while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
299     switch (state) {
300     case CHUNK_READ_SIZE:
301     case CHUNK_READ_SIZE_CRLF:
302     case CHUNK_READ_SIZE_START:
303       read_size();
304       break;
305     case CHUNK_READ_CHUNK:
306       read_chunk();
307       break;
308     case CHUNK_READ_TRAILER_BLANK:
309     case CHUNK_READ_TRAILER_CR:
310     case CHUNK_READ_TRAILER_LINE:
311       read_trailer();
312       break;
313     case CHUNK_FLOW_CONTROL:
314       return false;
315     default:
316       ink_release_assert(0);
317       break;
318     }
319   }
320   return (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
321 }
322 
323 bool
generate_chunked_content()324 ChunkedHandler::generate_chunked_content()
325 {
326   char tmp[16];
327   bool server_done = false;
328   int64_t r_avail;
329 
330   ink_assert(max_chunk_header_len);
331 
332   switch (last_server_event) {
333   case VC_EVENT_EOS:
334   case VC_EVENT_READ_COMPLETE:
335   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
336     server_done = true;
337     break;
338   }
339 
340   while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) {
341     int64_t write_val = std::min(max_chunk_size, r_avail);
342 
343     state = CHUNK_WRITE_CHUNK;
344     Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val);
345 
346     // Output the chunk size.
347     if (write_val != max_chunk_size) {
348       int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
349       chunked_buffer->write(tmp, len);
350       chunked_size += len;
351     } else {
352       chunked_buffer->write(max_chunk_header, max_chunk_header_len);
353       chunked_size += max_chunk_header_len;
354     }
355 
356     // Output the chunk itself.
357     //
358     // BZ# 54395 Note - we really should only do a
359     //   block transfer if there is sizable amount of
360     //   data (like we do for the case where we are
361     //   removing chunked encoding in ChunkedHandler::transfer_bytes()
362     //   However, I want to do this fix with as small a risk
363     //   as possible so I'm leaving this issue alone for
364     //   now
365     //
366     chunked_buffer->write(dechunked_reader, write_val);
367     chunked_size += write_val;
368     dechunked_reader->consume(write_val);
369 
370     // Output the trailing CRLF.
371     chunked_buffer->write("\r\n", 2);
372     chunked_size += 2;
373   }
374 
375   if (server_done) {
376     state = CHUNK_WRITE_DONE;
377 
378     // Add the chunked transfer coding trailer.
379     chunked_buffer->write("0\r\n\r\n", 5);
380     chunked_size += 5;
381     return true;
382   }
383   return false;
384 }
385 
HttpTunnelProducer()386 HttpTunnelProducer::HttpTunnelProducer() : consumer_list() {}
387 
388 uint64_t
backlog(uint64_t limit)389 HttpTunnelProducer::backlog(uint64_t limit)
390 {
391   uint64_t zret = 0;
392   // Calculate the total backlog, the # of bytes inside ATS for this producer.
393   // We go all the way through each chain to the ending sink and take the maximum
394   // over those paths. Do need to be careful about loops which can occur.
395   for (HttpTunnelConsumer *c = consumer_list.head; c; c = c->link.next) {
396     if (c->alive && c->write_vio) {
397       uint64_t n = 0;
398       if (HT_TRANSFORM == c->vc_type) {
399         n += static_cast<TransformVCChain *>(c->vc)->backlog(limit);
400       } else {
401         IOBufferReader *r = c->write_vio->get_reader();
402         if (r) {
403           n += static_cast<uint64_t>(r->read_avail());
404         }
405       }
406       if (n >= limit) {
407         return n;
408       }
409 
410       if (!c->is_sink()) {
411         HttpTunnelProducer *dsp = c->self_producer;
412         if (dsp) {
413           n += dsp->backlog();
414         }
415       }
416       if (n >= limit) {
417         return n;
418       }
419       if (n > zret) {
420         zret = n;
421       }
422     }
423   }
424 
425   if (chunked_handler.chunked_reader) {
426     zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
427   }
428 
429   return zret;
430 }
431 
432 /*  We set the producers in a flow chain specifically rather than
433     using a tunnel level variable in order to handle bi-directional
434     tunnels correctly. In such a case the flow control on producers is
435     not related so a single value for the tunnel won't work.
436 */
437 void
set_throttle_src(HttpTunnelProducer * srcp)438 HttpTunnelProducer::set_throttle_src(HttpTunnelProducer *srcp)
439 {
440   HttpTunnelProducer *p  = this;
441   p->flow_control_source = srcp;
442   for (HttpTunnelConsumer *c = consumer_list.head; c; c = c->link.next) {
443     if (!c->is_sink()) {
444       p = c->self_producer;
445       if (p) {
446         p->set_throttle_src(srcp);
447       }
448     }
449   }
450 }
451 
HttpTunnelConsumer()452 HttpTunnelConsumer::HttpTunnelConsumer() : link() {}
453 
HttpTunnel()454 HttpTunnel::HttpTunnel() : Continuation(nullptr) {}
455 
456 void
init(HttpSM * sm_arg,Ptr<ProxyMutex> & amutex)457 HttpTunnel::init(HttpSM *sm_arg, Ptr<ProxyMutex> &amutex)
458 {
459   HttpConfigParams *params = sm_arg->t_state.http_config_param;
460   sm                       = sm_arg;
461   active                   = false;
462   mutex                    = amutex;
463   ink_release_assert(reentrancy_count == 0);
464   SET_HANDLER(&HttpTunnel::main_handler);
465   flow_state.enabled_p = params->oride.flow_control_enabled;
466   if (params->oride.flow_low_water_mark > 0) {
467     flow_state.low_water = params->oride.flow_low_water_mark;
468   }
469   if (params->oride.flow_high_water_mark > 0) {
470     flow_state.high_water = params->oride.flow_high_water_mark;
471   }
472   // This should always be true, we handled default cases back in HttpConfig::reconfigure()
473   ink_assert(flow_state.low_water <= flow_state.high_water);
474 }
475 
476 void
reset()477 HttpTunnel::reset()
478 {
479   ink_assert(active == false);
480 #ifdef DEBUG
481   for (auto &producer : producers) {
482     ink_assert(producer.alive == false);
483   }
484   for (auto &consumer : consumers) {
485     ink_assert(consumer.alive == false);
486   }
487 #endif
488 
489   call_sm       = false;
490   num_producers = 0;
491   num_consumers = 0;
492   ink_zero(consumers);
493   ink_zero(producers);
494 }
495 
496 void
kill_tunnel()497 HttpTunnel::kill_tunnel()
498 {
499   for (auto &producer : producers) {
500     if (producer.vc != nullptr) {
501       chain_abort_all(&producer);
502     }
503     ink_assert(producer.alive == false);
504   }
505   active = false;
506   this->deallocate_buffers();
507   this->reset();
508 }
509 
510 HttpTunnelProducer *
alloc_producer()511 HttpTunnel::alloc_producer()
512 {
513   for (int i = 0; i < MAX_PRODUCERS; ++i) {
514     if (producers[i].vc == nullptr) {
515       num_producers++;
516       ink_assert(num_producers <= MAX_PRODUCERS);
517       return producers + i;
518     }
519   }
520   ink_release_assert(0);
521   return nullptr;
522 }
523 
524 HttpTunnelConsumer *
alloc_consumer()525 HttpTunnel::alloc_consumer()
526 {
527   for (int i = 0; i < MAX_CONSUMERS; i++) {
528     if (consumers[i].vc == nullptr) {
529       num_consumers++;
530       ink_assert(num_consumers <= MAX_CONSUMERS);
531       return consumers + i;
532     }
533   }
534   ink_release_assert(0);
535   return nullptr;
536 }
537 
538 int
deallocate_buffers()539 HttpTunnel::deallocate_buffers()
540 {
541   int num = 0;
542   ink_release_assert(active == false);
543   for (auto &producer : producers) {
544     if (producer.read_buffer != nullptr) {
545       ink_assert(producer.vc != nullptr);
546       free_MIOBuffer(producer.read_buffer);
547       producer.read_buffer  = nullptr;
548       producer.buffer_start = nullptr;
549       num++;
550     }
551 
552     if (producer.chunked_handler.dechunked_buffer != nullptr) {
553       ink_assert(producer.vc != nullptr);
554       free_MIOBuffer(producer.chunked_handler.dechunked_buffer);
555       producer.chunked_handler.dechunked_buffer = nullptr;
556       num++;
557     }
558 
559     if (producer.chunked_handler.chunked_buffer != nullptr) {
560       ink_assert(producer.vc != nullptr);
561       free_MIOBuffer(producer.chunked_handler.chunked_buffer);
562       producer.chunked_handler.chunked_buffer = nullptr;
563       num++;
564     }
565     producer.chunked_handler.max_chunk_header_len = 0;
566   }
567   return num;
568 }
569 
570 void
set_producer_chunking_action(HttpTunnelProducer * p,int64_t skip_bytes,TunnelChunkingAction_t action)571 HttpTunnel::set_producer_chunking_action(HttpTunnelProducer *p, int64_t skip_bytes, TunnelChunkingAction_t action)
572 {
573   p->chunked_handler.skip_bytes = skip_bytes;
574   p->chunking_action            = action;
575 
576   switch (action) {
577   case TCA_CHUNK_CONTENT:
578     p->chunked_handler.state = p->chunked_handler.CHUNK_WRITE_CHUNK;
579     break;
580   case TCA_DECHUNK_CONTENT:
581   case TCA_PASSTHRU_CHUNKED_CONTENT:
582     p->chunked_handler.state = p->chunked_handler.CHUNK_READ_SIZE;
583     break;
584   default:
585     break;
586   };
587 }
588 
589 void
set_producer_chunking_size(HttpTunnelProducer * p,int64_t size)590 HttpTunnel::set_producer_chunking_size(HttpTunnelProducer *p, int64_t size)
591 {
592   p->chunked_handler.set_max_chunk_size(size);
593 }
594 
595 // HttpTunnelProducer* HttpTunnel::add_producer
596 //
597 //   Adds a new producer to the tunnel
598 //
599 HttpTunnelProducer *
add_producer(VConnection * vc,int64_t nbytes_arg,IOBufferReader * reader_start,HttpProducerHandler sm_handler,HttpTunnelType_t vc_type,const char * name_arg)600 HttpTunnel::add_producer(VConnection *vc, int64_t nbytes_arg, IOBufferReader *reader_start, HttpProducerHandler sm_handler,
601                          HttpTunnelType_t vc_type, const char *name_arg)
602 {
603   HttpTunnelProducer *p;
604 
605   Debug("http_tunnel", "[%" PRId64 "] adding producer '%s'", sm->sm_id, name_arg);
606 
607   ink_assert(reader_start->mbuf);
608   if ((p = alloc_producer()) != nullptr) {
609     p->vc              = vc;
610     p->nbytes          = nbytes_arg;
611     p->buffer_start    = reader_start;
612     p->read_buffer     = reader_start->mbuf;
613     p->vc_handler      = sm_handler;
614     p->vc_type         = vc_type;
615     p->name            = name_arg;
616     p->chunking_action = TCA_PASSTHRU_DECHUNKED_CONTENT;
617 
618     p->do_chunking         = false;
619     p->do_dechunking       = false;
620     p->do_chunked_passthru = false;
621 
622     p->init_bytes_done = reader_start->read_avail();
623     if (p->nbytes < 0) {
624       p->ntodo = p->nbytes;
625     } else { // The byte count given us includes bytes
626       //  that already may be in the buffer.
627       //  ntodo represents the number of bytes
628       //  the tunneling mechanism needs to read
629       //  for the producer
630       p->ntodo = p->nbytes - p->init_bytes_done;
631       ink_assert(p->ntodo >= 0);
632     }
633 
634     // We are static, the producer is never "alive"
635     //   It just has data in the buffer
636     if (vc == HTTP_TUNNEL_STATIC_PRODUCER) {
637       ink_assert(p->ntodo == 0);
638       p->alive        = false;
639       p->read_success = true;
640     } else {
641       p->alive = true;
642     }
643   }
644   return p;
645 }
646 
647 // void HttpTunnel::add_consumer
648 //
649 //    Adds a new consumer to the tunnel.  The producer must
650 //    be specified and already added to the tunnel.  Attaches
651 //    the new consumer to the entry for the existing producer
652 //
653 //    Returns true if the consumer successfully added.  Returns
654 //    false if the consumer was not added because the source failed
655 //
656 HttpTunnelConsumer *
add_consumer(VConnection * vc,VConnection * producer,HttpConsumerHandler sm_handler,HttpTunnelType_t vc_type,const char * name_arg,int64_t skip_bytes)657 HttpTunnel::add_consumer(VConnection *vc, VConnection *producer, HttpConsumerHandler sm_handler, HttpTunnelType_t vc_type,
658                          const char *name_arg, int64_t skip_bytes)
659 {
660   Debug("http_tunnel", "[%" PRId64 "] adding consumer '%s'", sm->sm_id, name_arg);
661 
662   // Find the producer entry
663   HttpTunnelProducer *p = get_producer(producer);
664   ink_release_assert(p);
665 
666   // Check to see if the producer terminated
667   //  without sending all of its data
668   if (p->alive == false && p->read_success == false) {
669     Debug("http_tunnel", "[%" PRId64 "] consumer '%s' not added due to producer failure", sm->sm_id, name_arg);
670     return nullptr;
671   }
672   // Initialize the consumer structure
673   HttpTunnelConsumer *c = alloc_consumer();
674   c->producer           = p;
675   c->vc                 = vc;
676   c->alive              = true;
677   c->skip_bytes         = skip_bytes;
678   c->vc_handler         = sm_handler;
679   c->vc_type            = vc_type;
680   c->name               = name_arg;
681 
682   // Register the consumer with the producer
683   p->consumer_list.push(c);
684   p->num_consumers++;
685 
686   return c;
687 }
688 
689 void
chain(HttpTunnelConsumer * c,HttpTunnelProducer * p)690 HttpTunnel::chain(HttpTunnelConsumer *c, HttpTunnelProducer *p)
691 {
692   p->self_consumer = c;
693   c->self_producer = p;
694   // If the flow is already throttled update the chained producer.
695   if (c->producer->is_throttled()) {
696     p->set_throttle_src(c->producer->flow_control_source);
697   }
698 }
699 
700 // void HttpTunnel::tunnel_run()
701 //
702 //    Makes the tunnel go
703 //
704 void
tunnel_run(HttpTunnelProducer * p_arg)705 HttpTunnel::tunnel_run(HttpTunnelProducer *p_arg)
706 {
707   Debug("http_tunnel", "tunnel_run started, p_arg is %s", p_arg ? "provided" : "NULL");
708   if (p_arg) {
709     producer_run(p_arg);
710   } else {
711     HttpTunnelProducer *p;
712 
713     ink_assert(active == false);
714 
715     for (int i = 0; i < MAX_PRODUCERS; ++i) {
716       p = producers + i;
717       if (p->vc != nullptr && (p->alive || (p->vc_type == HT_STATIC && p->buffer_start != nullptr))) {
718         producer_run(p);
719       }
720     }
721   }
722 
723   // It is possible that there was nothing to do
724   //   due to a all transfers being zero length
725   //   If that is the case, call the state machine
726   //   back to say we are done
727   if (!is_tunnel_alive()) {
728     active = false;
729     sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
730   }
731 }
732 
733 void
producer_run(HttpTunnelProducer * p)734 HttpTunnel::producer_run(HttpTunnelProducer *p)
735 {
736   // Determine whether the producer has a cache-write consumer,
737   // since all chunked content read by the producer gets dechunked
738   // prior to being written into the cache.
739   HttpTunnelConsumer *c, *cache_write_consumer = nullptr;
740   bool transform_consumer = false;
741 
742   for (c = p->consumer_list.head; c; c = c->link.next) {
743     if (c->vc_type == HT_CACHE_WRITE) {
744       cache_write_consumer = c;
745       break;
746     }
747   }
748 
749   // bz57413
750   for (c = p->consumer_list.head; c; c = c->link.next) {
751     if (c->vc_type == HT_TRANSFORM) {
752       transform_consumer = true;
753       break;
754     }
755   }
756 
757   // Determine whether the producer is to perform chunking,
758   // dechunking, or chunked-passthough of the incoming response.
759   TunnelChunkingAction_t action = p->chunking_action;
760 
761   // [bug 2579251] static producers won't have handler set
762   if (p->vc != HTTP_TUNNEL_STATIC_PRODUCER) {
763     if (action == TCA_CHUNK_CONTENT) {
764       p->do_chunking = true;
765     } else if (action == TCA_DECHUNK_CONTENT) {
766       p->do_dechunking = true;
767     } else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
768       p->do_chunked_passthru = true;
769 
770       // Dechunk the chunked content into the cache.
771       if (cache_write_consumer != nullptr) {
772         p->do_dechunking = true;
773       }
774     }
775   }
776 
777   int64_t consumer_n;
778   int64_t producer_n;
779 
780   ink_assert(p->vc != nullptr);
781   active = true;
782 
783   IOBufferReader *chunked_buffer_start = nullptr, *dechunked_buffer_start = nullptr;
784   if (p->do_chunking || p->do_dechunking || p->do_chunked_passthru) {
785     p->chunked_handler.init(p->buffer_start, p);
786 
787     // Copy the header into the chunked/dechunked buffers.
788     if (p->do_chunking) {
789       // initialize a reader to chunked buffer start before writing to keep ref count
790       chunked_buffer_start = p->chunked_handler.chunked_buffer->alloc_reader();
791       p->chunked_handler.chunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
792     }
793     if (p->do_dechunking) {
794       // bz57413
795       Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64 "",
796             p->chunked_handler.chunked_reader->read_avail());
797 
798       // initialize a reader to dechunked buffer start before writing to keep ref count
799       dechunked_buffer_start = p->chunked_handler.dechunked_buffer->alloc_reader();
800 
801       // If there is no transformation then add the header to the buffer, else the
802       // client already has got the header from us, no need for it in the buffer.
803       if (!transform_consumer) {
804         p->chunked_handler.dechunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
805 
806         Debug("http_tunnel", "[producer_run] do_dechunking::Copied header of size %" PRId64 "", p->chunked_handler.skip_bytes);
807       }
808     }
809   }
810 
811   int64_t read_start_pos = 0;
812   if (p->vc_type == HT_CACHE_READ && sm->t_state.range_setup == HttpTransact::RANGE_NOT_TRANSFORM_REQUESTED) {
813     ink_assert(sm->t_state.num_range_fields == 1); // we current just support only one range entry
814     read_start_pos = sm->t_state.ranges[0]._start;
815     producer_n     = (sm->t_state.ranges[0]._end - sm->t_state.ranges[0]._start) + 1;
816     consumer_n     = (producer_n + sm->client_response_hdr_bytes);
817   } else if (p->nbytes >= 0) {
818     consumer_n = p->nbytes;
819     producer_n = p->ntodo;
820   } else {
821     consumer_n = (producer_n = INT64_MAX);
822   }
823 
824   // At least set up the consumer readers first so the data
825   // doesn't disappear out from under the tunnel
826   for (c = p->consumer_list.head; c; c = c->link.next) {
827     // Create a reader for each consumer.  The reader allows
828     // us to implement skip bytes
829     if (c->vc_type == HT_CACHE_WRITE) {
830       switch (action) {
831       case TCA_CHUNK_CONTENT:
832       case TCA_PASSTHRU_DECHUNKED_CONTENT:
833         c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
834         break;
835       case TCA_DECHUNK_CONTENT:
836       case TCA_PASSTHRU_CHUNKED_CONTENT:
837         c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
838         break;
839       default:
840         break;
841       }
842     }
843     // Non-cache consumers.
844     else if (action == TCA_CHUNK_CONTENT) {
845       c->buffer_reader = p->chunked_handler.chunked_buffer->clone_reader(chunked_buffer_start);
846     } else if (action == TCA_DECHUNK_CONTENT) {
847       c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
848     } else {
849       c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
850     }
851 
852     // Consume bytes of the reader if we skipping bytes
853     if (c->skip_bytes > 0) {
854       ink_assert(c->skip_bytes <= c->buffer_reader->read_avail());
855       c->buffer_reader->consume(c->skip_bytes);
856     }
857   }
858 
859   // YTS Team, yamsat Plugin
860   // Allocate and copy partial POST data to buffers. Check for the various parameters
861   // including the maximum configured post data size
862   if ((p->vc_type == HT_BUFFER_READ && sm->is_postbuf_valid()) ||
863       (p->alive && sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection && p->vc_type == HT_HTTP_CLIENT)) {
864     Debug("http_redirect", "[HttpTunnel::producer_run] client post: %" PRId64 " max size: %" PRId64 "",
865           p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
866 
867     // (note that since we are not dechunking POST, this is the chunked size if chunked)
868     if (p->buffer_start->read_avail() > HttpConfig::m_master.post_copy_size) {
869       Warning("http_redirect, [HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64 " limit=%" PRId64 "",
870               p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
871       sm->disable_redirect();
872       if (p->vc_type == HT_BUFFER_READ) {
873         producer_handler(VC_EVENT_ERROR, p);
874         return;
875       }
876     } else {
877       sm->postbuf_copy_partial_data();
878     }
879   } // end of added logic for partial POST
880 
881   if (p->do_chunking) {
882     // remove the chunked reader marker so that it doesn't act like a buffer guard
883     p->chunked_handler.chunked_buffer->dealloc_reader(chunked_buffer_start);
884     p->chunked_handler.dechunked_reader->consume(p->chunked_handler.skip_bytes);
885 
886     // If there is data to process in the buffer, do it now
887     producer_handler(VC_EVENT_READ_READY, p);
888   } else if (p->do_dechunking || p->do_chunked_passthru) {
889     // remove the dechunked reader marker so that it doesn't act like a buffer guard
890     if (p->do_dechunking && dechunked_buffer_start) {
891       p->chunked_handler.dechunked_buffer->dealloc_reader(dechunked_buffer_start);
892     }
893 
894     // bz57413
895     // If there is no transformation plugin, then we didn't add the header, hence no need to consume it
896     Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64 "",
897           p->chunked_handler.chunked_reader->read_avail());
898     if (!transform_consumer && (p->chunked_handler.chunked_reader->read_avail() >= p->chunked_handler.skip_bytes)) {
899       p->chunked_handler.chunked_reader->consume(p->chunked_handler.skip_bytes);
900       Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.skip_bytes = %" PRId64 "",
901             p->chunked_handler.skip_bytes);
902     }
903     // if(p->chunked_handler.chunked_reader->read_avail() > 0)
904     // p->chunked_handler.chunked_reader->consume(
905     // p->chunked_handler.skip_bytes);
906 
907     producer_handler(VC_EVENT_READ_READY, p);
908     if (sm->get_postbuf_done() && p->vc_type == HT_HTTP_CLIENT) { // read_avail() == 0
909       // [bug 2579251]
910       // Ugh, this is horrible but in the redirect case they are running a the tunnel again with the
911       // now closed/empty producer to trigger PRECOMPLETE.  If the POST was chunked, producer_n is set
912       // (incorrectly) to INT64_MAX.  It needs to be set to 0 to prevent triggering another read.
913       producer_n = 0;
914     }
915   }
916   for (c = p->consumer_list.head; c; c = c->link.next) {
917     int64_t c_write = consumer_n;
918 
919     // Don't bother to set up the consumer if it is dead
920     if (!c->alive) {
921       continue;
922     }
923 
924     if (!p->alive) {
925       // Adjust the amount of chunked data to write if the only data was in the initial read
926       // The amount to write in some cases is dependent on the type of the consumer, so this
927       // value must be computed for each consumer
928       c_write = final_consumer_bytes_to_write(p, c);
929     } else {
930       // INKqa05109 - if we don't know the length leave it at
931       //  INT64_MAX or else the cache may bounce the write
932       //  because it thinks the document is too big.  INT64_MAX
933       //  is a special case for the max document size code
934       //  in the cache
935       if (c_write != INT64_MAX) {
936         c_write -= c->skip_bytes;
937       }
938       // Fix for problems with not chunked content being chunked and
939       // not sending the entire data.  The content length grows when
940       // it is being chunked.
941       if (p->do_chunking == true) {
942         c_write = INT64_MAX;
943       }
944     }
945 
946     if (c_write == 0) {
947       // Nothing to do, call back the cleanup handlers
948       c->write_vio = nullptr;
949       consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
950     } else {
951       // In the client half close case, all the data that will be sent
952       // from the client is already in the buffer.  Go ahead and set
953       // the amount to read since we know it.  We will forward the FIN
954       // to the server on VC_EVENT_WRITE_COMPLETE.
955       if (p->vc_type == HT_HTTP_CLIENT) {
956         ProxyTransaction *ua_vc = static_cast<ProxyTransaction *>(p->vc);
957         if (ua_vc->get_half_close_flag()) {
958           int tmp = c->buffer_reader->read_avail();
959           if (tmp < c_write) {
960             c_write = tmp;
961           }
962           p->alive         = false;
963           p->handler_state = HTTP_SM_POST_SUCCESS;
964         }
965       }
966       // Start the writes now that we know we will consume all the initial data
967       c->write_vio = c->vc->do_io_write(this, c_write, c->buffer_reader);
968       ink_assert(c_write > 0);
969       if (c->write_vio == nullptr) {
970         consumer_handler(VC_EVENT_ERROR, c);
971       }
972     }
973   }
974   if (p->alive) {
975     ink_assert(producer_n >= 0);
976 
977     if (producer_n == 0) {
978       // Everything is already in the buffer so mark the producer as done.  We need to notify
979       // state machine that everything is done.  We use a special event to say the producers is
980       // done but we didn't do anything
981       p->alive         = false;
982       p->read_success  = true;
983       p->handler_state = HTTP_SM_POST_SUCCESS;
984       Debug("http_tunnel", "[%" PRId64 "] [tunnel_run] producer already done", sm->sm_id);
985       producer_handler(HTTP_TUNNEL_EVENT_PRECOMPLETE, p);
986     } else {
987       if (read_start_pos > 0) {
988         p->read_vio = ((CacheVC *)p->vc)->do_io_pread(this, producer_n, p->read_buffer, read_start_pos);
989       } else {
990         p->read_vio = p->vc->do_io_read(this, producer_n, p->read_buffer);
991       }
992     }
993   }
994 
995   // Now that the tunnel has started, we must remove producer's reader so
996   // that it doesn't act like a buffer guard
997   if (p->read_buffer && p->buffer_start) {
998     p->read_buffer->dealloc_reader(p->buffer_start);
999   }
1000   p->buffer_start = nullptr;
1001 }
1002 
1003 int
producer_handler_dechunked(int event,HttpTunnelProducer * p)1004 HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer *p)
1005 {
1006   ink_assert(p->do_chunking);
1007 
1008   Debug("http_tunnel", "[%" PRId64 "] producer_handler_dechunked [%s %s]", sm->sm_id, p->name,
1009         HttpDebugNames::get_event_name(event));
1010 
1011   // We only interested in translating certain events
1012   switch (event) {
1013   case VC_EVENT_READ_COMPLETE:
1014   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1015   case VC_EVENT_EOS:
1016     p->alive = false; // Update the producer state for final_consumer_bytes_to_write
1017     /* fallthrough */
1018   case VC_EVENT_READ_READY:
1019     p->last_event = p->chunked_handler.last_server_event = event;
1020     if (p->chunked_handler.generate_chunked_content()) { // We are done, make sure the consumer is activated
1021       HttpTunnelConsumer *c;
1022       for (c = p->consumer_list.head; c; c = c->link.next) {
1023         if (c->alive) {
1024           c->write_vio->nbytes = final_consumer_bytes_to_write(p, c);
1025           // consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
1026         }
1027       }
1028     }
1029     break;
1030   };
1031   // Since we will consume all the data if the server is actually finished
1032   //   we don't have to translate events like we do in the
1033   //   case producer_handler_chunked()
1034   return event;
1035 }
1036 
1037 // int HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer* p)
1038 //
1039 //   Handles events from chunked producers.  It calls the chunking handlers
1040 //    if appropriate and then translates the event we got into a suitable
1041 //    event to represent the unchunked state, and does chunked bookkeeping
1042 //
1043 int
producer_handler_chunked(int event,HttpTunnelProducer * p)1044 HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer *p)
1045 {
1046   ink_assert(p->do_dechunking || p->do_chunked_passthru);
1047 
1048   Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
1049 
1050   // We only interested in translating certain events
1051   switch (event) {
1052   case VC_EVENT_READ_READY:
1053   case VC_EVENT_READ_COMPLETE:
1054   case VC_EVENT_INACTIVITY_TIMEOUT:
1055   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1056   case VC_EVENT_EOS:
1057     break;
1058   default:
1059     return event;
1060   }
1061 
1062   p->last_event = p->chunked_handler.last_server_event = event;
1063   bool done                                            = p->chunked_handler.process_chunked_content();
1064 
1065   // If we couldn't understand the encoding, return
1066   //   an error
1067   if (p->chunked_handler.state == ChunkedHandler::CHUNK_READ_ERROR) {
1068     Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s chunk decoding error]", sm->sm_id, p->name);
1069     p->chunked_handler.truncation = true;
1070     // FIX ME: we return EOS here since it will cause the
1071     //  the client to be reenabled.  ERROR makes more
1072     //  sense but no reenables follow
1073     return VC_EVENT_EOS;
1074   }
1075 
1076   switch (event) {
1077   case VC_EVENT_READ_READY:
1078     if (done) {
1079       return VC_EVENT_READ_COMPLETE;
1080     }
1081     break;
1082   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1083   case VC_EVENT_EOS:
1084   case VC_EVENT_READ_COMPLETE:
1085   case VC_EVENT_INACTIVITY_TIMEOUT:
1086     if (!done) {
1087       p->chunked_handler.truncation = true;
1088     }
1089     break;
1090   }
1091 
1092   return event;
1093 }
1094 
1095 //
1096 // bool HttpTunnel::producer_handler(int event, HttpTunnelProducer* p)
1097 //
1098 //   Handles events from producers.
1099 //
1100 //   If the event is interesting only to the tunnel, this
1101 //    handler takes all necessary actions and returns false
1102 //    If the event is interesting to the state_machine,
1103 //    it calls back the state machine and returns true
1104 //
1105 //
1106 bool
producer_handler(int event,HttpTunnelProducer * p)1107 HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
1108 {
1109   HttpTunnelConsumer *c;
1110   HttpProducerHandler jump_point;
1111   bool sm_callback = false;
1112 
1113   Debug("http_tunnel", "[%" PRId64 "] producer_handler [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
1114 
1115   // Handle chunking/dechunking/chunked-passthrough if necessary.
1116   if (p->do_chunking) {
1117     event = producer_handler_dechunked(event, p);
1118 
1119     // If we were in PRECOMPLETE when this function was called
1120     // and we are doing chunking, then we just wrote the last
1121     // chunk in the the function call above.  We are done with the
1122     // tunnel.
1123     if (event == HTTP_TUNNEL_EVENT_PRECOMPLETE) {
1124       event = VC_EVENT_EOS;
1125     }
1126   } else if (p->do_dechunking || p->do_chunked_passthru) {
1127     event = producer_handler_chunked(event, p);
1128   } else {
1129     p->last_event = event;
1130   }
1131 
1132   // YTS Team, yamsat Plugin
1133   // Copy partial POST data to buffers. Check for the various parameters including
1134   // the maximum configured post data size
1135   if ((p->vc_type == HT_BUFFER_READ && sm->is_postbuf_valid()) ||
1136       (sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection &&
1137        (event == VC_EVENT_READ_READY || event == VC_EVENT_READ_COMPLETE) && p->vc_type == HT_HTTP_CLIENT)) {
1138     Debug("http_redirect", "[HttpTunnel::producer_handler] [%s %s]", p->name, HttpDebugNames::get_event_name(event));
1139 
1140     if ((sm->postbuf_buffer_avail() + sm->postbuf_reader_avail()) > HttpConfig::m_master.post_copy_size) {
1141       Warning("http_redirect, [HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64
1142               " reader_avail=%" PRId64 " limit=%" PRId64 "",
1143               sm->postbuf_buffer_avail(), sm->postbuf_reader_avail(), HttpConfig::m_master.post_copy_size);
1144       sm->disable_redirect();
1145       if (p->vc_type == HT_BUFFER_READ) {
1146         event = VC_EVENT_ERROR;
1147       }
1148     } else {
1149       sm->postbuf_copy_partial_data();
1150       if (event == VC_EVENT_READ_COMPLETE || event == HTTP_TUNNEL_EVENT_PRECOMPLETE || event == VC_EVENT_EOS) {
1151         sm->set_postbuf_done(true);
1152       }
1153     }
1154   } // end of added logic for partial copy of POST
1155 
1156   Debug("http_redirect", "[HttpTunnel::producer_handler] enable_redirection: [%d %d %d] event: %d, state: %d", p->alive == true,
1157         sm->enable_redirection, (p->self_consumer && p->self_consumer->alive == true), event, p->chunked_handler.state);
1158 
1159   switch (event) {
1160   case VC_EVENT_READ_READY:
1161     // Data read from producer, reenable consumers
1162     for (c = p->consumer_list.head; c; c = c->link.next) {
1163       if (c->alive && c->write_vio) {
1164         c->write_vio->reenable();
1165       }
1166     }
1167     break;
1168 
1169   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
1170     // If the write completes on the stack (as it can for http2), then
1171     // consumer could have called back by this point.  Must treat this as
1172     // a regular read complete (falling through to the following cases).
1173 
1174   case VC_EVENT_READ_COMPLETE:
1175   case VC_EVENT_EOS:
1176     // The producer completed
1177     p->alive = false;
1178     if (p->read_vio) {
1179       p->bytes_read = p->read_vio->ndone;
1180     } else {
1181       // If we are chunked, we can receive the whole document
1182       //   along with the header without knowing it (due to
1183       //   the message length being a property of the encoding)
1184       //   In that case, we won't have done a do_io so there
1185       //   will not be vio
1186       p->bytes_read = 0;
1187     }
1188 
1189     // callback the SM to notify of completion
1190     //  Note: we need to callback the SM before
1191     //  reenabling the consumers as the reenable may
1192     //  make the data visible to the consumer and
1193     //  initiate async I/O operation.  The SM needs to
1194     //  set how much I/O to do before async I/O is
1195     //  initiated
1196     jump_point = p->vc_handler;
1197     (sm->*jump_point)(event, p);
1198     sm_callback = true;
1199     p->update_state_if_not_set(HTTP_SM_POST_SUCCESS);
1200 
1201     // Data read from producer, reenable consumers
1202     for (c = p->consumer_list.head; c; c = c->link.next) {
1203       if (c->alive && c->write_vio) {
1204         c->write_vio->reenable();
1205       }
1206     }
1207     break;
1208 
1209   case VC_EVENT_ERROR:
1210   case VC_EVENT_ACTIVE_TIMEOUT:
1211   case VC_EVENT_INACTIVITY_TIMEOUT:
1212   case HTTP_TUNNEL_EVENT_CONSUMER_DETACH:
1213     if (p->alive) {
1214       p->alive = false;
1215       if (p->read_vio) {
1216         p->bytes_read = p->read_vio->ndone;
1217       } else {
1218         p->bytes_read = 0;
1219       }
1220       // Clear any outstanding reads so they don't
1221       // collide with future tunnel IO's
1222       p->vc->do_io_read(nullptr, 0, nullptr);
1223       // Interesting tunnel event, call SM
1224       jump_point = p->vc_handler;
1225       (sm->*jump_point)(event, p);
1226       sm_callback = true;
1227       // Failure case anyway
1228       p->update_state_if_not_set(HTTP_SM_POST_UA_FAIL);
1229     }
1230     break;
1231 
1232   case VC_EVENT_WRITE_READY:
1233   case VC_EVENT_WRITE_COMPLETE:
1234   default:
1235     // Producers should not get these events
1236     ink_release_assert(0);
1237     break;
1238   }
1239 
1240   return sm_callback;
1241 }
1242 
1243 void
consumer_reenable(HttpTunnelConsumer * c)1244 HttpTunnel::consumer_reenable(HttpTunnelConsumer *c)
1245 {
1246   HttpTunnelProducer *p = c->producer;
1247 
1248   if (p && p->alive) {
1249     // Only do flow control if enabled and the producer is an external
1250     // source.  Otherwise disable by making the backlog zero. Because
1251     // the backlog short cuts quit when the value is equal (or
1252     // greater) to the target, we use strict comparison only for
1253     // checking low water, otherwise the flow control can stall out.
1254     uint64_t backlog         = (flow_state.enabled_p && p->is_source()) ? p->backlog(flow_state.high_water) : 0;
1255     HttpTunnelProducer *srcp = p->flow_control_source;
1256 
1257     if (backlog >= flow_state.high_water) {
1258       if (is_debug_tag_set("http_tunnel")) {
1259         Debug("http_tunnel", "Throttle   %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
1260       }
1261       p->throttle(); // p becomes srcp for future calls to this method
1262     } else {
1263       if (srcp && srcp->alive && c->is_sink()) {
1264         // Check if backlog is below low water - note we need to check
1265         // against the source producer, not necessarily the producer
1266         // for this consumer. We don't have to recompute the backlog
1267         // if they are the same because we know low water <= high
1268         // water so the value is sufficiently accurate.
1269         if (srcp != p) {
1270           backlog = srcp->backlog(flow_state.low_water);
1271         }
1272         if (backlog < flow_state.low_water) {
1273           if (is_debug_tag_set("http_tunnel")) {
1274             Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
1275           }
1276           srcp->unthrottle();
1277           if (srcp->read_vio) {
1278             srcp->read_vio->reenable();
1279           }
1280           // Kick source producer to get flow ... well, flowing.
1281           this->producer_handler(VC_EVENT_READ_READY, srcp);
1282         } else {
1283           // We can stall for small thresholds on network sinks because this event happens
1284           // before the actual socket write. So we trap for the buffer becoming empty to
1285           // make sure we get an event to unthrottle after the write.
1286           if (HT_HTTP_CLIENT == c->vc_type) {
1287             NetVConnection *netvc = dynamic_cast<NetVConnection *>(c->write_vio->vc_server);
1288             if (netvc) { // really, this should always be true.
1289               netvc->trapWriteBufferEmpty();
1290             }
1291           }
1292         }
1293       }
1294       if (p->read_vio) {
1295         p->read_vio->reenable();
1296       }
1297     }
1298   }
1299 }
1300 
1301 //
1302 // bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer* p)
1303 //
1304 //   Handles events from consumers.
1305 //
1306 //   If the event is interesting only to the tunnel, this
1307 //    handler takes all necessary actions and returns false
1308 //    If the event is interesting to the state_machine,
1309 //    it calls back the state machine and returns true
1310 //
1311 //
1312 bool
consumer_handler(int event,HttpTunnelConsumer * c)1313 HttpTunnel::consumer_handler(int event, HttpTunnelConsumer *c)
1314 {
1315   bool sm_callback = false;
1316   HttpConsumerHandler jump_point;
1317   HttpTunnelProducer *p = c->producer;
1318 
1319   Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event));
1320 
1321   ink_assert(c->alive == true);
1322 
1323   switch (event) {
1324   case VC_EVENT_WRITE_READY:
1325     this->consumer_reenable(c);
1326     // Once we get a write ready from the origin, we can assume the connect to some degree succeeded
1327     if (c->vc_type == HT_HTTP_SERVER) {
1328       sm->t_state.current.server->clear_connect_fail();
1329     }
1330     break;
1331 
1332   case VC_EVENT_WRITE_COMPLETE:
1333   case VC_EVENT_EOS:
1334   case VC_EVENT_ERROR:
1335   case VC_EVENT_ACTIVE_TIMEOUT:
1336   case VC_EVENT_INACTIVITY_TIMEOUT:
1337     ink_assert(c->alive);
1338     ink_assert(c->buffer_reader);
1339     c->alive = false;
1340 
1341     c->bytes_written = c->write_vio ? c->write_vio->ndone : 0;
1342 
1343     // Interesting tunnel event, call SM
1344     jump_point = c->vc_handler;
1345     (sm->*jump_point)(event, c);
1346     // Make sure the handler_state is set
1347     // Necessary for post tunnel end processing
1348     if (c->producer && c->producer->handler_state == 0) {
1349       if (event == VC_EVENT_WRITE_COMPLETE) {
1350         c->producer->handler_state = HTTP_SM_POST_SUCCESS;
1351         // If the consumer completed, presumably the producer successfully read
1352         c->producer->read_success = true;
1353         // Go ahead and clean up the producer side
1354         if (p->alive) {
1355           producer_handler(VC_EVENT_READ_COMPLETE, p);
1356         }
1357       } else if (c->vc_type == HT_HTTP_SERVER) {
1358         c->producer->handler_state = HTTP_SM_POST_UA_FAIL;
1359       } else if (c->vc_type == HT_HTTP_CLIENT) {
1360         c->producer->handler_state = HTTP_SM_POST_SERVER_FAIL;
1361       }
1362     }
1363     sm_callback = true;
1364 
1365     // Deallocate the reader after calling back the sm
1366     //  because buffer problems are easier to debug
1367     //  in the sm when the reader is still valid
1368     if (c->buffer_reader) {
1369       c->buffer_reader->mbuf->dealloc_reader(c->buffer_reader);
1370       c->buffer_reader = nullptr;
1371     }
1372 
1373     // Since we removed a consumer, it may now be
1374     //   possible to put more stuff in the buffer
1375     // Note: we reenable only after calling back
1376     //    the SM since the reenabling has the side effect
1377     //    updating the buffer state for the VConnection
1378     //    that is being reenabled
1379     if (p->alive && p->read_vio) {
1380       if (p->is_throttled()) {
1381         this->consumer_reenable(c);
1382       } else {
1383         p->read_vio->reenable();
1384       }
1385     }
1386     // [amc] I don't think this happens but we'll leave a debug trap
1387     // here just in case.
1388     if (p->is_throttled()) {
1389       Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p);
1390     }
1391     break;
1392 
1393   case VC_EVENT_READ_READY:
1394   case VC_EVENT_READ_COMPLETE:
1395   default:
1396     // Consumers should not get these events
1397     ink_release_assert(0);
1398     break;
1399   }
1400 
1401   return sm_callback;
1402 }
1403 
1404 // void HttpTunnel::chain_abort_all(HttpTunnelProducer* p)
1405 //
1406 //    Abort the producer and everyone still alive
1407 //     downstream of the producer
1408 //
1409 void
chain_abort_all(HttpTunnelProducer * p)1410 HttpTunnel::chain_abort_all(HttpTunnelProducer *p)
1411 {
1412   HttpTunnelConsumer *c = p->consumer_list.head;
1413 
1414   while (c) {
1415     if (c->alive) {
1416       c->alive     = false;
1417       c->write_vio = nullptr;
1418       c->vc->do_io_close(EHTTP_ERROR);
1419       update_stats_after_abort(c->vc_type);
1420     }
1421 
1422     if (c->self_producer) {
1423       // Must snip the link before recursively
1424       // freeing to avoid looks introduced by
1425       // blind tunneling
1426       HttpTunnelProducer *selfp = c->self_producer;
1427       c->self_producer          = nullptr;
1428       chain_abort_all(selfp);
1429     }
1430 
1431     c = c->link.next;
1432   }
1433 
1434   if (p->alive) {
1435     p->alive = false;
1436     if (p->read_vio) {
1437       p->bytes_read = p->read_vio->ndone;
1438     }
1439     if (p->self_consumer) {
1440       p->self_consumer->alive = false;
1441     }
1442     p->read_vio = nullptr;
1443     p->vc->do_io_close(EHTTP_ERROR);
1444     HTTP_INCREMENT_DYN_STAT(http_origin_shutdown_tunnel_abort);
1445     update_stats_after_abort(p->vc_type);
1446   }
1447 }
1448 
1449 //
1450 // Determine the number of bytes a consumer should read from a producer
1451 //
1452 int64_t
final_consumer_bytes_to_write(HttpTunnelProducer * p,HttpTunnelConsumer * c)1453 HttpTunnel::final_consumer_bytes_to_write(HttpTunnelProducer *p, HttpTunnelConsumer *c)
1454 {
1455   int64_t total_bytes = 0;
1456   int64_t consumer_n  = 0;
1457   if (p->alive) {
1458     consumer_n = INT64_MAX;
1459   } else {
1460     TunnelChunkingAction_t action = p->chunking_action;
1461     if (c->alive) {
1462       if (c->vc_type == HT_CACHE_WRITE) {
1463         switch (action) {
1464         case TCA_CHUNK_CONTENT:
1465         case TCA_PASSTHRU_DECHUNKED_CONTENT:
1466           total_bytes = p->bytes_read + p->init_bytes_done;
1467           break;
1468         case TCA_DECHUNK_CONTENT:
1469         case TCA_PASSTHRU_CHUNKED_CONTENT:
1470           total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
1471           break;
1472         default:
1473           break;
1474         }
1475       } else if (action == TCA_CHUNK_CONTENT) {
1476         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
1477       } else if (action == TCA_DECHUNK_CONTENT) {
1478         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
1479       } else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
1480         total_bytes = p->bytes_read + p->init_bytes_done;
1481       } else {
1482         total_bytes = p->bytes_read + p->init_bytes_done;
1483       }
1484       consumer_n = total_bytes - c->skip_bytes;
1485     }
1486   }
1487   return consumer_n;
1488 }
1489 
1490 //
1491 // void HttpTunnel::finish_all_internal(HttpTunnelProducer* p)
1492 //
1493 //    Internal function for finishing all consumers.  Takes
1494 //       chain argument about where to finish just immediate
1495 //       consumer or all those downstream
1496 //
1497 void
finish_all_internal(HttpTunnelProducer * p,bool chain)1498 HttpTunnel::finish_all_internal(HttpTunnelProducer *p, bool chain)
1499 {
1500   ink_assert(p->alive == false);
1501   HttpTunnelConsumer *c         = p->consumer_list.head;
1502   int64_t total_bytes           = 0;
1503   TunnelChunkingAction_t action = p->chunking_action;
1504 
1505   if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
1506     // if the only chunked data was in the initial read, make sure we don't consume too much
1507     if (p->bytes_read == 0 && p->buffer_start != nullptr) {
1508       int num_read = p->buffer_start->read_avail() - p->chunked_handler.chunked_reader->read_avail();
1509       if (num_read < p->init_bytes_done) {
1510         p->init_bytes_done = num_read;
1511       }
1512     }
1513   }
1514 
1515   while (c) {
1516     if (c->alive) {
1517       if (c->write_vio) {
1518         // Adjust the number of bytes to write in the case of
1519         // a completed unlimited producer
1520         c->write_vio->nbytes = final_consumer_bytes_to_write(p, c);
1521         ink_assert(c->write_vio->nbytes >= 0);
1522 
1523         if (c->write_vio->nbytes < 0) {
1524           Error("Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n", total_bytes - c->skip_bytes);
1525         }
1526       }
1527 
1528       if (chain == true && c->self_producer) {
1529         chain_finish_all(c->self_producer);
1530       }
1531       // The IO Core will not call us back if there
1532       //   is nothing to do.  Check to see if there is
1533       //   nothing to do and take the appropriate
1534       //   action
1535       if (c->write_vio && c->alive && c->write_vio->nbytes == c->write_vio->ndone) {
1536         consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
1537       }
1538     }
1539 
1540     c = c->link.next;
1541   }
1542 }
1543 
1544 // void HttpTunnel::chain_abort_cache_write(HttpProducer* p)
1545 //
1546 //    Terminates all cache writes.  Used to prevent truncated
1547 //     documents from being stored in the cache
1548 //
1549 void
chain_abort_cache_write(HttpTunnelProducer * p)1550 HttpTunnel::chain_abort_cache_write(HttpTunnelProducer *p)
1551 {
1552   HttpTunnelConsumer *c = p->consumer_list.head;
1553 
1554   while (c) {
1555     if (c->alive) {
1556       if (c->vc_type == HT_CACHE_WRITE) {
1557         ink_assert(c->self_producer == nullptr);
1558         c->write_vio = nullptr;
1559         c->vc->do_io_close(EHTTP_ERROR);
1560         c->alive = false;
1561         HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
1562       } else if (c->self_producer) {
1563         chain_abort_cache_write(c->self_producer);
1564       }
1565     }
1566     c = c->link.next;
1567   }
1568 }
1569 
1570 // void HttpTunnel::close_vc(HttpTunnelProducer* p)
1571 //
1572 //    Closes the vc associated with the producer and
1573 //      updates the state of the self_consumer
1574 //
1575 void
close_vc(HttpTunnelProducer * p)1576 HttpTunnel::close_vc(HttpTunnelProducer *p)
1577 {
1578   ink_assert(p->alive == false);
1579   HttpTunnelConsumer *c = p->self_consumer;
1580 
1581   if (c && c->alive) {
1582     c->alive = false;
1583     if (c->write_vio) {
1584       c->bytes_written = c->write_vio->ndone;
1585     }
1586   }
1587 
1588   p->vc->do_io_close();
1589 }
1590 
1591 // void HttpTunnel::close_vc(HttpTunnelConsumer* c)
1592 //
1593 //    Closes the vc associated with the consumer and
1594 //      updates the state of the self_producer
1595 //
1596 void
close_vc(HttpTunnelConsumer * c)1597 HttpTunnel::close_vc(HttpTunnelConsumer *c)
1598 {
1599   ink_assert(c->alive == false);
1600   HttpTunnelProducer *p = c->self_producer;
1601 
1602   if (p && p->alive) {
1603     p->alive = false;
1604     if (p->read_vio) {
1605       p->bytes_read = p->read_vio->ndone;
1606     }
1607   }
1608 
1609   c->vc->do_io_close();
1610 }
1611 
1612 // int HttpTunnel::main_handler(int event, void* data)
1613 //
1614 //   Main handler for the tunnel.  Vectors events
1615 //   based on whether they are from consumers or
1616 //   producers
1617 //
1618 int
main_handler(int event,void * data)1619 HttpTunnel::main_handler(int event, void *data)
1620 {
1621   HttpTunnelProducer *p = nullptr;
1622   HttpTunnelConsumer *c = nullptr;
1623   bool sm_callback      = false;
1624 
1625   ++reentrancy_count;
1626 
1627   ink_assert(sm->magic == HTTP_SM_MAGIC_ALIVE);
1628 
1629   // Find the appropriate entry
1630   if ((p = get_producer(static_cast<VIO *>(data))) != nullptr) {
1631     sm_callback = producer_handler(event, p);
1632   } else {
1633     if ((c = get_consumer(static_cast<VIO *>(data))) != nullptr) {
1634       ink_assert(c->write_vio == (VIO *)data || c->vc == ((VIO *)data)->vc_server);
1635       sm_callback = consumer_handler(event, c);
1636     } else {
1637       // Presumably a delayed event we can ignore now
1638       internal_error(); // do nothing
1639     }
1640   }
1641 
1642   // We called a vc handler, the tunnel might be
1643   //  finished.  Check to see if there are any remaining
1644   //  VConnections alive.  If not, notify the state machine
1645   //
1646   // Don't call out if we are nested
1647   if (call_sm || (sm_callback && !is_tunnel_alive())) {
1648     if (reentrancy_count == 1) {
1649       reentrancy_count = 0;
1650       active           = false;
1651       sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
1652       return EVENT_DONE;
1653     } else {
1654       call_sm = true;
1655     }
1656   }
1657   --reentrancy_count;
1658   return EVENT_CONT;
1659 }
1660 
1661 void
update_stats_after_abort(HttpTunnelType_t t)1662 HttpTunnel::update_stats_after_abort(HttpTunnelType_t t)
1663 {
1664   switch (t) {
1665   case HT_CACHE_READ:
1666   case HT_CACHE_WRITE:
1667     HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
1668     break;
1669   default:
1670     // Handled here:
1671     // HT_HTTP_SERVER, HT_HTTP_CLIENT,
1672     // HT_TRANSFORM, HT_STATIC
1673     break;
1674   };
1675 }
1676 
1677 void
internal_error()1678 HttpTunnel::internal_error()
1679 {
1680 }
1681