1 /** @file
2 
3   Implements callin functions for plugins
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 #include "tscore/ink_config.h"
25 #include "FetchSM.h"
26 #include <cstdio>
27 #include "HTTP.h"
28 #include "PluginVC.h"
29 #include "ts/ts.h" // Ugly, but we need a bunch of the public APIs here ... :-/
30 
31 #define DEBUG_TAG "FetchSM"
32 #define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
33 
34 ClassAllocator<FetchSM> FetchSMAllocator("FetchSMAllocator");
35 void
cleanUp()36 FetchSM::cleanUp()
37 {
38   Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__);
39 
40   if (!ink_atomic_cas(&destroyed, false, true)) {
41     Debug(DEBUG_TAG, "Error: Double delete on FetchSM, this:%p", this);
42     return;
43   }
44 
45   if (resp_is_chunked > 0 && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
46     chunked_handler.clear();
47   }
48 
49   free_MIOBuffer(req_buffer);
50   free_MIOBuffer(resp_buffer);
51   mutex.clear();
52   http_parser_clear(&http_parser);
53   client_response_hdr.destroy();
54   ats_free(client_response);
55   cont_mutex.clear();
56   http_vc->do_io_close();
57   FetchSMAllocator.free(this);
58 }
59 
60 void
httpConnect()61 FetchSM::httpConnect()
62 {
63   PluginIdentity *pi = dynamic_cast<PluginIdentity *>(contp);
64   const char *tag    = pi ? pi->getPluginTag() : "fetchSM";
65   int64_t id         = pi ? pi->getPluginId() : 0;
66 
67   Debug(DEBUG_TAG, "[%s] calling httpconnect write pi=%p tag=%s id=%" PRId64, __FUNCTION__, pi, tag, id);
68   http_vc = reinterpret_cast<PluginVC *>(TSHttpConnectWithPluginId(&_addr.sa, tag, id));
69 
70   /*
71    * TS-2906: We need a way to unset internal request when using FetchSM, the use case for this
72    * is H2 when it creates outgoing requests it uses FetchSM and the outgoing requests
73    * are spawned via H2 SYN packets which are definitely not internal requests.
74    */
75   if (!is_internal_request) {
76     PluginVC *other_side = reinterpret_cast<PluginVC *>(http_vc)->get_other_side();
77     if (other_side != nullptr) {
78       other_side->set_is_internal_request(false);
79     }
80   }
81 
82   read_vio  = http_vc->do_io_read(this, INT64_MAX, resp_buffer);
83   write_vio = http_vc->do_io_write(this, getReqLen() + req_content_length, req_reader);
84 }
85 
86 char *
resp_get(int * length)87 FetchSM::resp_get(int *length)
88 {
89   *length = client_bytes;
90   return client_response;
91 }
92 
93 int
InvokePlugin(int event,void * data)94 FetchSM::InvokePlugin(int event, void *data)
95 {
96   EThread *mythread = this_ethread();
97 
98   MUTEX_TAKE_LOCK(contp->mutex, mythread);
99 
100   int ret = contp->handleEvent(event, data);
101 
102   MUTEX_UNTAKE_LOCK(contp->mutex, mythread);
103 
104   return ret;
105 }
106 
107 bool
has_body()108 FetchSM::has_body()
109 {
110   int status_code;
111   HTTPHdr *hdr;
112 
113   if (!header_done) {
114     return false;
115   }
116 
117   if (is_method_head) {
118     return false;
119   }
120   //
121   // The following code comply with HTTP/1.1:
122   // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4
123   //
124 
125   hdr = &client_response_hdr;
126 
127   status_code = hdr->status_get();
128   if (status_code < 200 || status_code == 204 || status_code == 304) {
129     return false;
130   }
131 
132   if (check_chunked()) {
133     return true;
134   }
135 
136   resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
137   if (!resp_content_length) {
138     if (check_connection_close()) {
139       return true;
140     } else {
141       return false;
142     }
143   }
144 
145   return true;
146 }
147 
148 bool
check_body_done()149 FetchSM::check_body_done()
150 {
151   if (!check_chunked()) {
152     if (resp_content_length == resp_received_body_len + resp_reader->read_avail()) {
153       return true;
154     }
155 
156     return false;
157   }
158 
159   //
160   // TODO: check whether the chunked body is done
161   //
162   return true;
163 }
164 
165 bool
check_for_field_value(const char * name,size_t name_len,char const * value,size_t value_len)166 FetchSM::check_for_field_value(const char *name, size_t name_len, char const *value, size_t value_len)
167 {
168   bool zret = false; // not found.
169   StrList slist;
170   HTTPHdr *hdr = &client_response_hdr;
171   int ret      = hdr->value_get_comma_list(name, name_len, &slist);
172 
173   ink_release_assert(header_done);
174 
175   if (ret) {
176     for (Str *f = slist.head; f != nullptr; f = f->next) {
177       if (f->len == value_len && 0 == strncasecmp(f->str, value, value_len)) {
178         Debug(DEBUG_TAG, "[%s] field '%.*s', value '%.*s'", __FUNCTION__, static_cast<int>(name_len), name,
179               static_cast<int>(value_len), value);
180         zret = true;
181         break;
182       }
183     }
184   }
185   return zret;
186 }
187 
188 bool
check_chunked()189 FetchSM::check_chunked()
190 {
191   static const char CHUNKED_TEXT[] = "chunked";
192   static size_t const CHUNKED_LEN  = sizeof(CHUNKED_TEXT) - 1;
193 
194   if (resp_is_chunked < 0) {
195     resp_is_chunked = static_cast<int>(
196       this->check_for_field_value(MIME_FIELD_TRANSFER_ENCODING, MIME_LEN_TRANSFER_ENCODING, CHUNKED_TEXT, CHUNKED_LEN));
197 
198     if (resp_is_chunked && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
199       ChunkedHandler *ch = &chunked_handler;
200       ch->init_by_action(resp_reader, ChunkedHandler::ACTION_DECHUNK);
201       ch->dechunked_reader = ch->dechunked_buffer->alloc_reader();
202       ch->state            = ChunkedHandler::CHUNK_READ_SIZE;
203       resp_reader->dealloc();
204     }
205   }
206   return resp_is_chunked > 0;
207 }
208 
209 bool
check_connection_close()210 FetchSM::check_connection_close()
211 {
212   static const char CLOSE_TEXT[] = "close";
213   static size_t const CLOSE_LEN  = sizeof(CLOSE_TEXT) - 1;
214 
215   if (resp_received_close < 0) {
216     resp_received_close =
217       static_cast<int>(this->check_for_field_value(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION, CLOSE_TEXT, CLOSE_LEN));
218   }
219   return resp_received_close > 0;
220 }
221 
222 int
dechunk_body()223 FetchSM::dechunk_body()
224 {
225   ink_assert(resp_is_chunked > 0);
226   //
227   // Return Value:
228   //  - 0: need to read more data.
229   //  - TS_FETCH_EVENT_EXT_BODY_READY.
230   //  - TS_FETCH_EVENT_EXT_BODY_DONE.
231   //
232   if (chunked_handler.process_chunked_content()) {
233     return TS_FETCH_EVENT_EXT_BODY_DONE;
234   }
235 
236   if (chunked_handler.dechunked_reader->read_avail()) {
237     return TS_FETCH_EVENT_EXT_BODY_READY;
238   }
239 
240   return 0;
241 }
242 
243 void
InvokePluginExt(int fetch_event)244 FetchSM::InvokePluginExt(int fetch_event)
245 {
246   int event;
247   EThread *mythread        = this_ethread();
248   bool read_complete_event = (fetch_event == TS_EVENT_VCONN_READ_COMPLETE) || (fetch_event == TS_EVENT_VCONN_EOS);
249 
250   //
251   // Increasing *recursion* to prevent
252   // FetchSM being deleted by callback.
253   //
254   recursion++;
255 
256   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
257     MUTEX_TAKE_LOCK(cont_mutex, mythread);
258   }
259 
260   if (!contp) {
261     goto out;
262   }
263 
264   if (fetch_event && !read_complete_event) {
265     contp->handleEvent(fetch_event, this);
266     goto out;
267   }
268 
269   if (!has_sent_header) {
270     if (fetch_event != TS_EVENT_VCONN_EOS) {
271       contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this);
272       has_sent_header = true;
273     } else {
274       contp->handleEvent(fetch_event, this);
275       goto out;
276     }
277   }
278 
279   // TS-3112: always check 'contp' after handleEvent()
280   // since handleEvent effectively calls the plugin (or H2 layer)
281   // which may call TSFetchDestroy in error conditions.
282   // TSFetchDestroy sets contp to NULL, but, doesn't destroy FetchSM yet,
283   // since, it¹s in a tight loop protected by 'recursion' counter.
284   // When handleEvent returns, 'recursion' is decremented and contp is
285   // already null, so, FetchSM gets destroyed.
286   if (!contp) {
287     goto out;
288   }
289 
290   if (!has_body()) {
291     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
292     goto out;
293   }
294 
295   Debug(DEBUG_TAG, "[%s] chunked:%d, content_len: %" PRId64 ", received_len: %" PRId64 ", avail: %" PRId64 "", __FUNCTION__,
296         resp_is_chunked, resp_content_length, resp_received_body_len,
297         resp_is_chunked > 0 ? chunked_handler.chunked_reader->read_avail() : resp_reader->read_avail());
298 
299   if (resp_is_chunked > 0) {
300     if (!chunked_handler.chunked_reader->read_avail()) {
301       if (read_complete_event) {
302         contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
303       }
304       goto out;
305     }
306   } else if (!resp_reader->read_avail()) {
307     if (read_complete_event) {
308       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
309     }
310     goto out;
311   }
312 
313   if (!check_chunked()) {
314     if (!check_body_done() && !read_complete_event) {
315       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
316     } else {
317       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
318     }
319   } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK) {
320     do {
321       if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
322         chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
323       }
324 
325       event = dechunk_body();
326       if (!event) {
327         read_vio->reenable();
328         goto out;
329       }
330 
331       contp->handleEvent(event, this);
332 
333       // contp may be null after handleEvent
334       if (!contp) {
335         goto out;
336       }
337 
338     } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
339   } else if (check_body_done()) {
340     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
341   } else {
342     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
343   }
344 
345 out:
346   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
347     MUTEX_UNTAKE_LOCK(cont_mutex, mythread);
348   }
349   recursion--;
350 
351   if (!contp && !recursion) {
352     cleanUp();
353   }
354 
355   return;
356 }
357 
358 void
get_info_from_buffer(IOBufferReader * reader)359 FetchSM::get_info_from_buffer(IOBufferReader *reader)
360 {
361   char *buf, *info;
362   IOBufferBlock *blk;
363   int64_t read_avail, read_done;
364 
365   if (!reader) {
366     client_bytes = 0;
367     return;
368   }
369 
370   /* Read the data out of the reader */
371   if (reader->block != NULL)
372     reader->skip_empty_blocks();
373 
374   read_avail = reader->read_avail();
375   Debug(DEBUG_TAG, "[%s] total avail %" PRId64, __FUNCTION__, read_avail);
376   if (!read_avail) {
377     client_bytes = 0;
378     return;
379   }
380 
381   info            = (char *)ats_malloc(sizeof(char) * (read_avail + 1));
382   client_response = info;
383 
384   blk = reader->block.get();
385 
386   // This is the equivalent of TSIOBufferBlockReadStart()
387   buf       = blk->start() + reader->start_offset;
388   read_done = blk->read_avail() - reader->start_offset;
389 
390   if (header_done == 0 && read_done > 0) {
391     int bytes_used = 0;
392     header_done    = true;
393     if (client_response_hdr.parse_resp(&http_parser, reader, &bytes_used, 0) == PARSE_RESULT_DONE) {
394       if ((bytes_used > 0) && (bytes_used <= read_avail)) {
395         memcpy(info, buf, bytes_used);
396         info += bytes_used;
397         client_bytes += bytes_used;
398       }
399     } else {
400       Error("Failed to parse headers in FetchSM buffer");
401     }
402     // adjust the read_avail
403     read_avail -= bytes_used;
404   }
405 
406   // Send the body without dechunk when neither streaming nor dechunk flag is set
407   // Or when the body is not chunked
408   if (!((fetch_flags & TS_FETCH_FLAGS_STREAM) || (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) || !check_chunked()) {
409     /* Read the data out of the reader */
410     while (read_avail > 0) {
411       if (reader->block) {
412         reader->skip_empty_blocks();
413       }
414 
415       blk = reader->block.get();
416 
417       // This is the equivalent of TSIOBufferBlockReadStart()
418       buf       = blk->start() + reader->start_offset;
419       read_done = blk->read_avail() - reader->start_offset;
420 
421       if ((read_done > 0) && ((read_done <= read_avail))) {
422         memcpy(info, buf, read_done);
423         reader->consume(read_done);
424         read_avail -= read_done;
425         info += read_done;
426         client_bytes += read_done;
427       }
428     }
429     client_response[client_bytes] = '\0';
430     return;
431   }
432 
433   reader = chunked_handler.dechunked_reader;
434   do {
435     if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
436       chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
437     }
438 
439     if (!dechunk_body()) {
440       break;
441     }
442 
443     /* Read the data out of the reader */
444     read_avail = reader->read_avail();
445     while (read_avail > 0) {
446       if (reader->block) {
447         reader->skip_empty_blocks();
448       }
449 
450       IOBufferBlock *blk = reader->block.get();
451 
452       // This is the equivalent of TSIOBufferBlockReadStart()
453       buf       = blk->start() + reader->start_offset;
454       read_done = blk->read_avail() - reader->start_offset;
455 
456       if ((read_done > 0) && (read_done <= read_avail)) {
457         memcpy(info, buf, read_done);
458         reader->consume(read_done);
459         read_avail -= read_done;
460         info += read_done;
461         client_bytes += read_done;
462       }
463     }
464   } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
465 
466   client_response[client_bytes] = '\0';
467   return;
468 }
469 
470 void
process_fetch_read(int event)471 FetchSM::process_fetch_read(int event)
472 {
473   Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__);
474   int64_t bytes;
475   int bytes_used;
476   int64_t total_bytes_copied = 0;
477 
478   switch (event) {
479   case TS_EVENT_VCONN_READ_READY:
480     // duplicate the bytes for backward compatibility with TSFetchUrl()
481     if (!(fetch_flags & TS_FETCH_FLAGS_STREAM)) {
482       bytes = resp_reader->read_avail();
483       Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64, __FUNCTION__, bytes);
484 
485       while (total_bytes_copied < bytes) {
486         int64_t actual_bytes_copied;
487         actual_bytes_copied = resp_buffer->write(resp_reader, bytes, 0);
488         Debug(DEBUG_TAG, "[%s] copied %" PRId64 " bytes", __FUNCTION__, actual_bytes_copied);
489         if (actual_bytes_copied <= 0) {
490           break;
491         }
492         total_bytes_copied += actual_bytes_copied;
493       }
494       Debug(DEBUG_TAG, "[%s] total copied %" PRId64 " bytes", __FUNCTION__, total_bytes_copied);
495       resp_reader->consume(total_bytes_copied);
496     }
497 
498     if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) {
499       if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, false) == PARSE_RESULT_DONE) {
500         header_done = true;
501         if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
502           return InvokePluginExt();
503         } else {
504           InvokePlugin(callback_events.success_event_id, (void *)&client_response_hdr);
505         }
506       }
507     } else {
508       if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
509         return InvokePluginExt();
510       }
511     }
512     read_vio->reenable();
513     break;
514   case TS_EVENT_VCONN_READ_COMPLETE:
515   case TS_EVENT_VCONN_EOS:
516     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
517       return InvokePluginExt(event);
518     }
519     if (callback_options == AFTER_HEADER || callback_options == AFTER_BODY) {
520       get_info_from_buffer(resp_reader);
521       InvokePlugin(callback_events.success_event_id, (void *)this);
522     }
523     Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__);
524     cleanUp();
525     break;
526   case TS_EVENT_ERROR:
527   default:
528     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
529       return InvokePluginExt(event);
530     }
531     InvokePlugin(callback_events.failure_event_id, nullptr);
532     cleanUp();
533     break;
534   }
535 }
536 
537 void
process_fetch_write(int event)538 FetchSM::process_fetch_write(int event)
539 {
540   Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__);
541   switch (event) {
542   case TS_EVENT_VCONN_WRITE_COMPLETE:
543     req_finished = true;
544     break;
545   case TS_EVENT_VCONN_WRITE_READY:
546     // data is processed in chunks of 32k; if there is more than 32k
547     // of input data, we have to continue reenabling until all data is
548     // read (we have already written all the data to the buffer)
549     if (req_reader->read_avail() > 0) {
550       ((PluginVC *)http_vc)->reenable(write_vio);
551     }
552     break;
553   case TS_EVENT_ERROR:
554     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
555       return InvokePluginExt(event);
556     }
557     InvokePlugin(callback_events.failure_event_id, nullptr);
558     cleanUp();
559     break;
560   default:
561     break;
562   }
563 }
564 
565 int
fetch_handler(int event,void * edata)566 FetchSM::fetch_handler(int event, void *edata)
567 {
568   Debug(DEBUG_TAG, "[%s] calling fetch_plugin", __FUNCTION__);
569 
570   if (edata == read_vio) {
571     process_fetch_read(event);
572   } else if (edata == write_vio) {
573     process_fetch_write(event);
574   } else {
575     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
576       InvokePluginExt(event);
577       return 1;
578     }
579     InvokePlugin(callback_events.failure_event_id, nullptr);
580     cleanUp();
581   }
582   return 1;
583 }
584 
585 void
ext_init(Continuation * cont,const char * method,const char * url,const char * version,const sockaddr * client_addr,int flags)586 FetchSM::ext_init(Continuation *cont, const char *method, const char *url, const char *version, const sockaddr *client_addr,
587                   int flags)
588 {
589   init_comm();
590 
591   if (flags & TS_FETCH_FLAGS_NEWLOCK) {
592     mutex      = new_ProxyMutex();
593     cont_mutex = cont->mutex;
594   } else {
595     mutex = cont->mutex;
596   }
597 
598   contp = cont;
599   _addr.assign(client_addr);
600 
601   //
602   // Enable stream IO automatically.
603   //
604   fetch_flags = (TS_FETCH_FLAGS_STREAM | flags);
605   if (fetch_flags & TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST) {
606     set_internal_request(false);
607   }
608 
609   //
610   // These options are not used when enable
611   // stream IO.
612   //
613   memset(&callback_options, 0, sizeof(callback_options));
614   memset(&callback_events, 0, sizeof(callback_events));
615 
616   int method_len = strlen(method);
617   req_buffer->write(method, method_len);
618   req_buffer->write(" ", 1);
619   req_buffer->write(url, strlen(url));
620   req_buffer->write(" ", 1);
621   req_buffer->write(version, strlen(version));
622   req_buffer->write("\r\n", 2);
623 
624   if ((method_len == HTTP_LEN_HEAD) && !memcmp(method, HTTP_METHOD_HEAD, HTTP_LEN_HEAD)) {
625     is_method_head = true;
626   }
627 }
628 
629 void
ext_add_header(const char * name,int name_len,const char * value,int value_len)630 FetchSM::ext_add_header(const char *name, int name_len, const char *value, int value_len)
631 {
632   if (TS_MIME_LEN_CONTENT_LENGTH == name_len && !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) {
633     req_content_length = atoll(value);
634   }
635 
636   req_buffer->write(name, name_len);
637   req_buffer->write(": ", 2);
638   req_buffer->write(value, value_len);
639   req_buffer->write("\r\n", 2);
640 }
641 
642 void
ext_launch()643 FetchSM::ext_launch()
644 {
645   req_buffer->write("\r\n", 2);
646   httpConnect();
647 }
648 
649 void
ext_write_data(const void * data,size_t len)650 FetchSM::ext_write_data(const void *data, size_t len)
651 {
652   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
653     MUTEX_TAKE_LOCK(mutex, this_ethread());
654   }
655   req_buffer->write(data, len);
656 
657   Debug(DEBUG_TAG, "[%s] re-enabling write_vio, header_done %u", __FUNCTION__, header_done);
658   write_vio->reenable();
659 
660   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
661     MUTEX_UNTAKE_LOCK(mutex, this_ethread());
662   }
663 }
664 
665 ssize_t
ext_read_data(char * buf,size_t len)666 FetchSM::ext_read_data(char *buf, size_t len)
667 {
668   const char *start;
669   TSIOBufferReader reader;
670   TSIOBufferBlock blk, next_blk;
671   int64_t already, blk_len, need, wavail;
672 
673   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
674     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
675     if (!lock.is_locked()) {
676       return 0;
677     }
678   }
679 
680   if (!header_done) {
681     return 0;
682   }
683 
684   if (check_chunked() && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
685     reader = (tsapi_bufferreader *)chunked_handler.dechunked_reader;
686   } else {
687     reader = (TSIOBufferReader)resp_reader;
688   }
689 
690   already = 0;
691   blk     = TSIOBufferReaderStart(reader);
692 
693   while (blk) {
694     wavail = len - already;
695 
696     next_blk = TSIOBufferBlockNext(blk);
697     start    = TSIOBufferBlockReadStart(blk, reader, &blk_len);
698 
699     need = blk_len > wavail ? wavail : blk_len;
700 
701     memcpy(&buf[already], start, need);
702     already += need;
703 
704     if (already >= static_cast<int64_t>(len)) {
705       break;
706     }
707 
708     blk = next_blk;
709   }
710 
711   resp_received_body_len += already;
712   TSIOBufferReaderConsume(reader, already);
713 
714   read_vio->reenable();
715   return already;
716 }
717 
718 void
ext_destroy()719 FetchSM::ext_destroy()
720 {
721   contp = nullptr;
722 
723   if (recursion) {
724     return;
725   }
726 
727   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
728     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
729     if (!lock.is_locked()) {
730       eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME);
731       return;
732     }
733   }
734 
735   cleanUp();
736 }
737 
738 void
ext_set_user_data(void * data)739 FetchSM::ext_set_user_data(void *data)
740 {
741   user_data = data;
742 }
743 
744 void *
ext_get_user_data()745 FetchSM::ext_get_user_data()
746 {
747   return user_data;
748 }
749 
750 TSMBuffer
resp_hdr_bufp()751 FetchSM::resp_hdr_bufp()
752 {
753   HdrHeapSDKHandle *heap;
754   heap = (HdrHeapSDKHandle *)&client_response_hdr;
755 
756   return (TSMBuffer)heap;
757 }
758 
759 TSMLoc
resp_hdr_mloc()760 FetchSM::resp_hdr_mloc()
761 {
762   return (TSMLoc)client_response_hdr.m_http;
763 }
764