1 /** @file
2
3 Implements callin functions for plugins
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 #include "tscore/ink_config.h"
25 #include "FetchSM.h"
26 #include <cstdio>
27 #include "HTTP.h"
28 #include "PluginVC.h"
29 #include "ts/ts.h" // Ugly, but we need a bunch of the public APIs here ... :-/
30
31 #define DEBUG_TAG "FetchSM"
32 #define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
33
34 ClassAllocator<FetchSM> FetchSMAllocator("FetchSMAllocator");
35 void
cleanUp()36 FetchSM::cleanUp()
37 {
38 Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__);
39
40 if (!ink_atomic_cas(&destroyed, false, true)) {
41 Debug(DEBUG_TAG, "Error: Double delete on FetchSM, this:%p", this);
42 return;
43 }
44
45 if (resp_is_chunked > 0 && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
46 chunked_handler.clear();
47 }
48
49 free_MIOBuffer(req_buffer);
50 free_MIOBuffer(resp_buffer);
51 mutex.clear();
52 http_parser_clear(&http_parser);
53 client_response_hdr.destroy();
54 ats_free(client_response);
55 cont_mutex.clear();
56 http_vc->do_io_close();
57 FetchSMAllocator.free(this);
58 }
59
60 void
httpConnect()61 FetchSM::httpConnect()
62 {
63 PluginIdentity *pi = dynamic_cast<PluginIdentity *>(contp);
64 const char *tag = pi ? pi->getPluginTag() : "fetchSM";
65 int64_t id = pi ? pi->getPluginId() : 0;
66
67 Debug(DEBUG_TAG, "[%s] calling httpconnect write pi=%p tag=%s id=%" PRId64, __FUNCTION__, pi, tag, id);
68 http_vc = reinterpret_cast<PluginVC *>(TSHttpConnectWithPluginId(&_addr.sa, tag, id));
69
70 /*
71 * TS-2906: We need a way to unset internal request when using FetchSM, the use case for this
72 * is H2 when it creates outgoing requests it uses FetchSM and the outgoing requests
73 * are spawned via H2 SYN packets which are definitely not internal requests.
74 */
75 if (!is_internal_request) {
76 PluginVC *other_side = reinterpret_cast<PluginVC *>(http_vc)->get_other_side();
77 if (other_side != nullptr) {
78 other_side->set_is_internal_request(false);
79 }
80 }
81
82 read_vio = http_vc->do_io_read(this, INT64_MAX, resp_buffer);
83 write_vio = http_vc->do_io_write(this, getReqLen() + req_content_length, req_reader);
84 }
85
86 char *
resp_get(int * length)87 FetchSM::resp_get(int *length)
88 {
89 *length = client_bytes;
90 return client_response;
91 }
92
93 int
InvokePlugin(int event,void * data)94 FetchSM::InvokePlugin(int event, void *data)
95 {
96 EThread *mythread = this_ethread();
97
98 MUTEX_TAKE_LOCK(contp->mutex, mythread);
99
100 int ret = contp->handleEvent(event, data);
101
102 MUTEX_UNTAKE_LOCK(contp->mutex, mythread);
103
104 return ret;
105 }
106
107 bool
has_body()108 FetchSM::has_body()
109 {
110 int status_code;
111 HTTPHdr *hdr;
112
113 if (!header_done) {
114 return false;
115 }
116
117 if (is_method_head) {
118 return false;
119 }
120 //
121 // The following code comply with HTTP/1.1:
122 // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4
123 //
124
125 hdr = &client_response_hdr;
126
127 status_code = hdr->status_get();
128 if (status_code < 200 || status_code == 204 || status_code == 304) {
129 return false;
130 }
131
132 if (check_chunked()) {
133 return true;
134 }
135
136 resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
137 if (!resp_content_length) {
138 if (check_connection_close()) {
139 return true;
140 } else {
141 return false;
142 }
143 }
144
145 return true;
146 }
147
148 bool
check_body_done()149 FetchSM::check_body_done()
150 {
151 if (!check_chunked()) {
152 if (resp_content_length == resp_received_body_len + resp_reader->read_avail()) {
153 return true;
154 }
155
156 return false;
157 }
158
159 //
160 // TODO: check whether the chunked body is done
161 //
162 return true;
163 }
164
165 bool
check_for_field_value(const char * name,size_t name_len,char const * value,size_t value_len)166 FetchSM::check_for_field_value(const char *name, size_t name_len, char const *value, size_t value_len)
167 {
168 bool zret = false; // not found.
169 StrList slist;
170 HTTPHdr *hdr = &client_response_hdr;
171 int ret = hdr->value_get_comma_list(name, name_len, &slist);
172
173 ink_release_assert(header_done);
174
175 if (ret) {
176 for (Str *f = slist.head; f != nullptr; f = f->next) {
177 if (f->len == value_len && 0 == strncasecmp(f->str, value, value_len)) {
178 Debug(DEBUG_TAG, "[%s] field '%.*s', value '%.*s'", __FUNCTION__, static_cast<int>(name_len), name,
179 static_cast<int>(value_len), value);
180 zret = true;
181 break;
182 }
183 }
184 }
185 return zret;
186 }
187
188 bool
check_chunked()189 FetchSM::check_chunked()
190 {
191 static const char CHUNKED_TEXT[] = "chunked";
192 static size_t const CHUNKED_LEN = sizeof(CHUNKED_TEXT) - 1;
193
194 if (resp_is_chunked < 0) {
195 resp_is_chunked = static_cast<int>(
196 this->check_for_field_value(MIME_FIELD_TRANSFER_ENCODING, MIME_LEN_TRANSFER_ENCODING, CHUNKED_TEXT, CHUNKED_LEN));
197
198 if (resp_is_chunked && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
199 ChunkedHandler *ch = &chunked_handler;
200 ch->init_by_action(resp_reader, ChunkedHandler::ACTION_DECHUNK);
201 ch->dechunked_reader = ch->dechunked_buffer->alloc_reader();
202 ch->state = ChunkedHandler::CHUNK_READ_SIZE;
203 resp_reader->dealloc();
204 }
205 }
206 return resp_is_chunked > 0;
207 }
208
209 bool
check_connection_close()210 FetchSM::check_connection_close()
211 {
212 static const char CLOSE_TEXT[] = "close";
213 static size_t const CLOSE_LEN = sizeof(CLOSE_TEXT) - 1;
214
215 if (resp_received_close < 0) {
216 resp_received_close =
217 static_cast<int>(this->check_for_field_value(MIME_FIELD_CONNECTION, MIME_LEN_CONNECTION, CLOSE_TEXT, CLOSE_LEN));
218 }
219 return resp_received_close > 0;
220 }
221
222 int
dechunk_body()223 FetchSM::dechunk_body()
224 {
225 ink_assert(resp_is_chunked > 0);
226 //
227 // Return Value:
228 // - 0: need to read more data.
229 // - TS_FETCH_EVENT_EXT_BODY_READY.
230 // - TS_FETCH_EVENT_EXT_BODY_DONE.
231 //
232 if (chunked_handler.process_chunked_content()) {
233 return TS_FETCH_EVENT_EXT_BODY_DONE;
234 }
235
236 if (chunked_handler.dechunked_reader->read_avail()) {
237 return TS_FETCH_EVENT_EXT_BODY_READY;
238 }
239
240 return 0;
241 }
242
243 void
InvokePluginExt(int fetch_event)244 FetchSM::InvokePluginExt(int fetch_event)
245 {
246 int event;
247 EThread *mythread = this_ethread();
248 bool read_complete_event = (fetch_event == TS_EVENT_VCONN_READ_COMPLETE) || (fetch_event == TS_EVENT_VCONN_EOS);
249
250 //
251 // Increasing *recursion* to prevent
252 // FetchSM being deleted by callback.
253 //
254 recursion++;
255
256 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
257 MUTEX_TAKE_LOCK(cont_mutex, mythread);
258 }
259
260 if (!contp) {
261 goto out;
262 }
263
264 if (fetch_event && !read_complete_event) {
265 contp->handleEvent(fetch_event, this);
266 goto out;
267 }
268
269 if (!has_sent_header) {
270 if (fetch_event != TS_EVENT_VCONN_EOS) {
271 contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this);
272 has_sent_header = true;
273 } else {
274 contp->handleEvent(fetch_event, this);
275 goto out;
276 }
277 }
278
279 // TS-3112: always check 'contp' after handleEvent()
280 // since handleEvent effectively calls the plugin (or H2 layer)
281 // which may call TSFetchDestroy in error conditions.
282 // TSFetchDestroy sets contp to NULL, but, doesn't destroy FetchSM yet,
283 // since, it¹s in a tight loop protected by 'recursion' counter.
284 // When handleEvent returns, 'recursion' is decremented and contp is
285 // already null, so, FetchSM gets destroyed.
286 if (!contp) {
287 goto out;
288 }
289
290 if (!has_body()) {
291 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
292 goto out;
293 }
294
295 Debug(DEBUG_TAG, "[%s] chunked:%d, content_len: %" PRId64 ", received_len: %" PRId64 ", avail: %" PRId64 "", __FUNCTION__,
296 resp_is_chunked, resp_content_length, resp_received_body_len,
297 resp_is_chunked > 0 ? chunked_handler.chunked_reader->read_avail() : resp_reader->read_avail());
298
299 if (resp_is_chunked > 0) {
300 if (!chunked_handler.chunked_reader->read_avail()) {
301 if (read_complete_event) {
302 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
303 }
304 goto out;
305 }
306 } else if (!resp_reader->read_avail()) {
307 if (read_complete_event) {
308 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
309 }
310 goto out;
311 }
312
313 if (!check_chunked()) {
314 if (!check_body_done() && !read_complete_event) {
315 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
316 } else {
317 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
318 }
319 } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK) {
320 do {
321 if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
322 chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
323 }
324
325 event = dechunk_body();
326 if (!event) {
327 read_vio->reenable();
328 goto out;
329 }
330
331 contp->handleEvent(event, this);
332
333 // contp may be null after handleEvent
334 if (!contp) {
335 goto out;
336 }
337
338 } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
339 } else if (check_body_done()) {
340 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
341 } else {
342 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
343 }
344
345 out:
346 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
347 MUTEX_UNTAKE_LOCK(cont_mutex, mythread);
348 }
349 recursion--;
350
351 if (!contp && !recursion) {
352 cleanUp();
353 }
354
355 return;
356 }
357
358 void
get_info_from_buffer(IOBufferReader * reader)359 FetchSM::get_info_from_buffer(IOBufferReader *reader)
360 {
361 char *buf, *info;
362 IOBufferBlock *blk;
363 int64_t read_avail, read_done;
364
365 if (!reader) {
366 client_bytes = 0;
367 return;
368 }
369
370 /* Read the data out of the reader */
371 if (reader->block != NULL)
372 reader->skip_empty_blocks();
373
374 read_avail = reader->read_avail();
375 Debug(DEBUG_TAG, "[%s] total avail %" PRId64, __FUNCTION__, read_avail);
376 if (!read_avail) {
377 client_bytes = 0;
378 return;
379 }
380
381 info = (char *)ats_malloc(sizeof(char) * (read_avail + 1));
382 client_response = info;
383
384 blk = reader->block.get();
385
386 // This is the equivalent of TSIOBufferBlockReadStart()
387 buf = blk->start() + reader->start_offset;
388 read_done = blk->read_avail() - reader->start_offset;
389
390 if (header_done == 0 && read_done > 0) {
391 int bytes_used = 0;
392 header_done = true;
393 if (client_response_hdr.parse_resp(&http_parser, reader, &bytes_used, 0) == PARSE_RESULT_DONE) {
394 if ((bytes_used > 0) && (bytes_used <= read_avail)) {
395 memcpy(info, buf, bytes_used);
396 info += bytes_used;
397 client_bytes += bytes_used;
398 }
399 } else {
400 Error("Failed to parse headers in FetchSM buffer");
401 }
402 // adjust the read_avail
403 read_avail -= bytes_used;
404 }
405
406 // Send the body without dechunk when neither streaming nor dechunk flag is set
407 // Or when the body is not chunked
408 if (!((fetch_flags & TS_FETCH_FLAGS_STREAM) || (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) || !check_chunked()) {
409 /* Read the data out of the reader */
410 while (read_avail > 0) {
411 if (reader->block) {
412 reader->skip_empty_blocks();
413 }
414
415 blk = reader->block.get();
416
417 // This is the equivalent of TSIOBufferBlockReadStart()
418 buf = blk->start() + reader->start_offset;
419 read_done = blk->read_avail() - reader->start_offset;
420
421 if ((read_done > 0) && ((read_done <= read_avail))) {
422 memcpy(info, buf, read_done);
423 reader->consume(read_done);
424 read_avail -= read_done;
425 info += read_done;
426 client_bytes += read_done;
427 }
428 }
429 client_response[client_bytes] = '\0';
430 return;
431 }
432
433 reader = chunked_handler.dechunked_reader;
434 do {
435 if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
436 chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
437 }
438
439 if (!dechunk_body()) {
440 break;
441 }
442
443 /* Read the data out of the reader */
444 read_avail = reader->read_avail();
445 while (read_avail > 0) {
446 if (reader->block) {
447 reader->skip_empty_blocks();
448 }
449
450 IOBufferBlock *blk = reader->block.get();
451
452 // This is the equivalent of TSIOBufferBlockReadStart()
453 buf = blk->start() + reader->start_offset;
454 read_done = blk->read_avail() - reader->start_offset;
455
456 if ((read_done > 0) && (read_done <= read_avail)) {
457 memcpy(info, buf, read_done);
458 reader->consume(read_done);
459 read_avail -= read_done;
460 info += read_done;
461 client_bytes += read_done;
462 }
463 }
464 } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
465
466 client_response[client_bytes] = '\0';
467 return;
468 }
469
470 void
process_fetch_read(int event)471 FetchSM::process_fetch_read(int event)
472 {
473 Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__);
474 int64_t bytes;
475 int bytes_used;
476 int64_t total_bytes_copied = 0;
477
478 switch (event) {
479 case TS_EVENT_VCONN_READ_READY:
480 // duplicate the bytes for backward compatibility with TSFetchUrl()
481 if (!(fetch_flags & TS_FETCH_FLAGS_STREAM)) {
482 bytes = resp_reader->read_avail();
483 Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64, __FUNCTION__, bytes);
484
485 while (total_bytes_copied < bytes) {
486 int64_t actual_bytes_copied;
487 actual_bytes_copied = resp_buffer->write(resp_reader, bytes, 0);
488 Debug(DEBUG_TAG, "[%s] copied %" PRId64 " bytes", __FUNCTION__, actual_bytes_copied);
489 if (actual_bytes_copied <= 0) {
490 break;
491 }
492 total_bytes_copied += actual_bytes_copied;
493 }
494 Debug(DEBUG_TAG, "[%s] total copied %" PRId64 " bytes", __FUNCTION__, total_bytes_copied);
495 resp_reader->consume(total_bytes_copied);
496 }
497
498 if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) {
499 if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, false) == PARSE_RESULT_DONE) {
500 header_done = true;
501 if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
502 return InvokePluginExt();
503 } else {
504 InvokePlugin(callback_events.success_event_id, (void *)&client_response_hdr);
505 }
506 }
507 } else {
508 if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
509 return InvokePluginExt();
510 }
511 }
512 read_vio->reenable();
513 break;
514 case TS_EVENT_VCONN_READ_COMPLETE:
515 case TS_EVENT_VCONN_EOS:
516 if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
517 return InvokePluginExt(event);
518 }
519 if (callback_options == AFTER_HEADER || callback_options == AFTER_BODY) {
520 get_info_from_buffer(resp_reader);
521 InvokePlugin(callback_events.success_event_id, (void *)this);
522 }
523 Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__);
524 cleanUp();
525 break;
526 case TS_EVENT_ERROR:
527 default:
528 if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
529 return InvokePluginExt(event);
530 }
531 InvokePlugin(callback_events.failure_event_id, nullptr);
532 cleanUp();
533 break;
534 }
535 }
536
537 void
process_fetch_write(int event)538 FetchSM::process_fetch_write(int event)
539 {
540 Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__);
541 switch (event) {
542 case TS_EVENT_VCONN_WRITE_COMPLETE:
543 req_finished = true;
544 break;
545 case TS_EVENT_VCONN_WRITE_READY:
546 // data is processed in chunks of 32k; if there is more than 32k
547 // of input data, we have to continue reenabling until all data is
548 // read (we have already written all the data to the buffer)
549 if (req_reader->read_avail() > 0) {
550 ((PluginVC *)http_vc)->reenable(write_vio);
551 }
552 break;
553 case TS_EVENT_ERROR:
554 if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
555 return InvokePluginExt(event);
556 }
557 InvokePlugin(callback_events.failure_event_id, nullptr);
558 cleanUp();
559 break;
560 default:
561 break;
562 }
563 }
564
565 int
fetch_handler(int event,void * edata)566 FetchSM::fetch_handler(int event, void *edata)
567 {
568 Debug(DEBUG_TAG, "[%s] calling fetch_plugin", __FUNCTION__);
569
570 if (edata == read_vio) {
571 process_fetch_read(event);
572 } else if (edata == write_vio) {
573 process_fetch_write(event);
574 } else {
575 if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
576 InvokePluginExt(event);
577 return 1;
578 }
579 InvokePlugin(callback_events.failure_event_id, nullptr);
580 cleanUp();
581 }
582 return 1;
583 }
584
585 void
ext_init(Continuation * cont,const char * method,const char * url,const char * version,const sockaddr * client_addr,int flags)586 FetchSM::ext_init(Continuation *cont, const char *method, const char *url, const char *version, const sockaddr *client_addr,
587 int flags)
588 {
589 init_comm();
590
591 if (flags & TS_FETCH_FLAGS_NEWLOCK) {
592 mutex = new_ProxyMutex();
593 cont_mutex = cont->mutex;
594 } else {
595 mutex = cont->mutex;
596 }
597
598 contp = cont;
599 _addr.assign(client_addr);
600
601 //
602 // Enable stream IO automatically.
603 //
604 fetch_flags = (TS_FETCH_FLAGS_STREAM | flags);
605 if (fetch_flags & TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST) {
606 set_internal_request(false);
607 }
608
609 //
610 // These options are not used when enable
611 // stream IO.
612 //
613 memset(&callback_options, 0, sizeof(callback_options));
614 memset(&callback_events, 0, sizeof(callback_events));
615
616 int method_len = strlen(method);
617 req_buffer->write(method, method_len);
618 req_buffer->write(" ", 1);
619 req_buffer->write(url, strlen(url));
620 req_buffer->write(" ", 1);
621 req_buffer->write(version, strlen(version));
622 req_buffer->write("\r\n", 2);
623
624 if ((method_len == HTTP_LEN_HEAD) && !memcmp(method, HTTP_METHOD_HEAD, HTTP_LEN_HEAD)) {
625 is_method_head = true;
626 }
627 }
628
629 void
ext_add_header(const char * name,int name_len,const char * value,int value_len)630 FetchSM::ext_add_header(const char *name, int name_len, const char *value, int value_len)
631 {
632 if (TS_MIME_LEN_CONTENT_LENGTH == name_len && !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) {
633 req_content_length = atoll(value);
634 }
635
636 req_buffer->write(name, name_len);
637 req_buffer->write(": ", 2);
638 req_buffer->write(value, value_len);
639 req_buffer->write("\r\n", 2);
640 }
641
642 void
ext_launch()643 FetchSM::ext_launch()
644 {
645 req_buffer->write("\r\n", 2);
646 httpConnect();
647 }
648
649 void
ext_write_data(const void * data,size_t len)650 FetchSM::ext_write_data(const void *data, size_t len)
651 {
652 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
653 MUTEX_TAKE_LOCK(mutex, this_ethread());
654 }
655 req_buffer->write(data, len);
656
657 Debug(DEBUG_TAG, "[%s] re-enabling write_vio, header_done %u", __FUNCTION__, header_done);
658 write_vio->reenable();
659
660 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
661 MUTEX_UNTAKE_LOCK(mutex, this_ethread());
662 }
663 }
664
665 ssize_t
ext_read_data(char * buf,size_t len)666 FetchSM::ext_read_data(char *buf, size_t len)
667 {
668 const char *start;
669 TSIOBufferReader reader;
670 TSIOBufferBlock blk, next_blk;
671 int64_t already, blk_len, need, wavail;
672
673 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
674 MUTEX_TRY_LOCK(lock, mutex, this_ethread());
675 if (!lock.is_locked()) {
676 return 0;
677 }
678 }
679
680 if (!header_done) {
681 return 0;
682 }
683
684 if (check_chunked() && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
685 reader = (tsapi_bufferreader *)chunked_handler.dechunked_reader;
686 } else {
687 reader = (TSIOBufferReader)resp_reader;
688 }
689
690 already = 0;
691 blk = TSIOBufferReaderStart(reader);
692
693 while (blk) {
694 wavail = len - already;
695
696 next_blk = TSIOBufferBlockNext(blk);
697 start = TSIOBufferBlockReadStart(blk, reader, &blk_len);
698
699 need = blk_len > wavail ? wavail : blk_len;
700
701 memcpy(&buf[already], start, need);
702 already += need;
703
704 if (already >= static_cast<int64_t>(len)) {
705 break;
706 }
707
708 blk = next_blk;
709 }
710
711 resp_received_body_len += already;
712 TSIOBufferReaderConsume(reader, already);
713
714 read_vio->reenable();
715 return already;
716 }
717
718 void
ext_destroy()719 FetchSM::ext_destroy()
720 {
721 contp = nullptr;
722
723 if (recursion) {
724 return;
725 }
726
727 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
728 MUTEX_TRY_LOCK(lock, mutex, this_ethread());
729 if (!lock.is_locked()) {
730 eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME);
731 return;
732 }
733 }
734
735 cleanUp();
736 }
737
738 void
ext_set_user_data(void * data)739 FetchSM::ext_set_user_data(void *data)
740 {
741 user_data = data;
742 }
743
744 void *
ext_get_user_data()745 FetchSM::ext_get_user_data()
746 {
747 return user_data;
748 }
749
750 TSMBuffer
resp_hdr_bufp()751 FetchSM::resp_hdr_bufp()
752 {
753 HdrHeapSDKHandle *heap;
754 heap = (HdrHeapSDKHandle *)&client_response_hdr;
755
756 return (TSMBuffer)heap;
757 }
758
759 TSMLoc
resp_hdr_mloc()760 FetchSM::resp_hdr_mloc()
761 {
762 return (TSMLoc)client_response_hdr.m_http;
763 }
764