1 /** @file
2 Licensed to the Apache Software Foundation (ASF) under one
3 or more contributor license agreements. See the NOTICE file
4 distributed with this work for additional information
5 regarding copyright ownership. The ASF licenses this file
6 to you under the Apache License, Version 2.0 (the
7 "License"); you may not use this file except in compliance
8 with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 */
18
19 #include "server.h"
20
21 #include "Config.h"
22 #include "ContentRange.h"
23 #include "response.h"
24 #include "transfer.h"
25 #include "util.h"
26
27 #include "ts/experimental.h"
28
29 #include <cinttypes>
30
31 namespace
32 {
33 ContentRange
contentRangeFrom(HttpHeader const & header)34 contentRangeFrom(HttpHeader const &header)
35 {
36 ContentRange bcr;
37
38 /* Pull content length off the response header
39 and manipulate it into a client response header
40 */
41 char rangestr[1024];
42 int rangelen = sizeof(rangestr);
43
44 // look for expected Content-Range field
45 bool const hasContentRange(header.valueForKey(TS_MIME_FIELD_CONTENT_RANGE, TS_MIME_LEN_CONTENT_RANGE, rangestr, &rangelen));
46
47 if (!hasContentRange) {
48 DEBUG_LOG("invalid response header, no Content-Range");
49 } else {
50 // ensure null termination
51 rangestr[rangelen] = '\0';
52 if (!bcr.fromStringClosed(rangestr)) {
53 DEBUG_LOG("invalid response header, malformed Content-Range, %s", rangestr);
54 }
55 }
56
57 return bcr;
58 }
59
60 int64_t
contentLengthFrom(HttpHeader const & header)61 contentLengthFrom(HttpHeader const &header)
62 {
63 int64_t bytes = 0;
64
65 char constr[1024];
66 int conlen = sizeof(constr);
67
68 // look for expected Content-Length field
69 bool const hasContentLength(header.valueForKey(TS_MIME_FIELD_CONTENT_LENGTH, TS_MIME_LEN_CONTENT_LENGTH, constr, &conlen));
70
71 if (!hasContentLength) {
72 DEBUG_LOG("invalid response header, no Content-Length");
73 bytes = INT64_MAX;
74 } else {
75 // ensure null termination
76 constr[conlen] = '\0';
77 char *endptr = nullptr;
78 bytes = std::max(static_cast<int64_t>(0), static_cast<int64_t>(strtoll(constr, &endptr, 10)));
79 }
80
81 return bytes;
82 }
83
84 // Also reference server header
85 enum HeaderState {
86 Good,
87 Fail,
88 Passthru,
89 };
90
91 HeaderState
handleFirstServerHeader(Data * const data,TSCont const contp)92 handleFirstServerHeader(Data *const data, TSCont const contp)
93 {
94 HttpHeader header(data->m_resp_hdrmgr.m_buffer, data->m_resp_hdrmgr.m_lochdr);
95
96 if (TSIsDebugTagSet(PLUGIN_NAME)) {
97 DEBUG_LOG("First header\n%s", header.toString().c_str());
98 }
99
100 data->m_dnstream.setupVioWrite(contp, INT64_MAX);
101
102 TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
103 TSIOBuffer const output_buf = data->m_dnstream.m_write.m_iobuf;
104
105 // only process a 206, everything else gets a (possibly incomplete)
106 // pass through
107 if (TS_HTTP_STATUS_PARTIAL_CONTENT != header.status()) {
108 DEBUG_LOG("Initial response other than 206: %d", header.status());
109
110 // Should run TSVIONSetBytes(output_io, hlen + bodybytes);
111 int64_t const hlen = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr);
112 int64_t const clen = contentLengthFrom(header);
113 DEBUG_LOG("Passthru bytes: header: %" PRId64 " body: %" PRId64, hlen, clen);
114 if (clen != INT64_MAX) {
115 TSVIONBytesSet(output_vio, hlen + clen);
116 } else {
117 TSVIONBytesSet(output_vio, clen);
118 }
119 TSHttpHdrPrint(header.m_buffer, header.m_lochdr, output_buf);
120 return HeaderState::Passthru;
121 }
122
123 ContentRange const blockcr = contentRangeFrom(header);
124
125 // 206 with bad content range -- should NEVER happen.
126 if (!blockcr.isValid()) {
127 std::string const msg502 = string502(header.version());
128 TSVIONBytesSet(output_vio, msg502.size());
129 TSIOBufferWrite(output_buf, msg502.data(), msg502.size());
130 TSVIOReenable(output_vio);
131 return HeaderState::Fail;
132 }
133
134 // set the resource content length from block response
135 data->m_contentlen = blockcr.m_length;
136
137 // special case last N bytes
138 if (data->m_req_range.isEndBytes()) {
139 data->m_req_range.m_end += data->m_contentlen;
140 data->m_req_range.m_beg += data->m_contentlen;
141 data->m_req_range.m_beg = std::max(static_cast<int64_t>(0), data->m_req_range.m_beg);
142 } else {
143 // fix up request range end now that we have the content length
144 data->m_req_range.m_end = std::min(data->m_contentlen, data->m_req_range.m_end);
145 }
146
147 int64_t const bodybytes = data->m_req_range.size();
148
149 // range begins past end of data but inside last block, send 416
150 bool const send416 = (bodybytes <= 0 || TS_HTTP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE == data->m_statustype);
151 if (send416) {
152 std::string const &bodystr = bodyString416();
153 form416HeaderAndBody(header, data->m_contentlen, bodystr);
154
155 int const hlen = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr);
156 int64_t const blen = bodystr.size();
157
158 TSVIONBytesSet(output_vio, int64_t(hlen) + blen);
159 TSHttpHdrPrint(header.m_buffer, header.m_lochdr, output_buf);
160 TSIOBufferWrite(output_buf, bodystr.data(), bodystr.size());
161 TSVIOReenable(output_vio);
162 data->m_upstream.m_read.close();
163 return HeaderState::Fail;
164 }
165
166 // save data header string
167 data->m_datelen = sizeof(data->m_date);
168 header.valueForKey(TS_MIME_FIELD_DATE, TS_MIME_LEN_DATE, data->m_date, &data->m_datelen);
169
170 // save weak cache header identifiers (rfc7232 section 2)
171 data->m_etaglen = sizeof(data->m_etag);
172 header.valueForKey(TS_MIME_FIELD_ETAG, TS_MIME_LEN_ETAG, data->m_etag, &data->m_etaglen);
173 data->m_lastmodifiedlen = sizeof(data->m_lastmodified);
174 header.valueForKey(TS_MIME_FIELD_LAST_MODIFIED, TS_MIME_LEN_LAST_MODIFIED, data->m_lastmodified, &data->m_lastmodifiedlen);
175
176 // size of the first block payload
177 data->m_blockexpected = blockcr.rangeSize();
178
179 // Now we can set up the expected client response
180 if (TS_HTTP_STATUS_PARTIAL_CONTENT == data->m_statustype) {
181 ContentRange respcr;
182 respcr.m_beg = data->m_req_range.m_beg;
183 respcr.m_end = data->m_req_range.m_end;
184 respcr.m_length = data->m_contentlen;
185
186 char rangestr[1024];
187 int rangelen = sizeof(rangestr);
188 bool const crstat = respcr.toStringClosed(rangestr, &rangelen);
189
190 // corner case, return 500 ??
191 if (!crstat) {
192 data->m_upstream.close();
193 data->m_dnstream.close();
194
195 ERROR_LOG("Bad/invalid response content range");
196 return HeaderState::Fail;
197 }
198
199 header.setKeyVal(TS_MIME_FIELD_CONTENT_RANGE, TS_MIME_LEN_CONTENT_RANGE, rangestr, rangelen);
200 } else if (TS_HTTP_STATUS_OK == data->m_statustype) {
201 header.setStatus(TS_HTTP_STATUS_OK);
202 static char const *const reason = TSHttpHdrReasonLookup(TS_HTTP_STATUS_OK);
203 header.setReason(reason, strlen(reason));
204 header.removeKey(TS_MIME_FIELD_CONTENT_RANGE, TS_MIME_LEN_CONTENT_RANGE);
205 }
206
207 char bufstr[1024];
208 int const buflen = snprintf(bufstr, sizeof(bufstr), "%" PRId64, bodybytes);
209 header.setKeyVal(TS_MIME_FIELD_CONTENT_LENGTH, TS_MIME_LEN_CONTENT_LENGTH, bufstr, buflen);
210
211 // add the response header length to the total bytes to send
212 int const hbytes = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr);
213
214 TSVIONBytesSet(output_vio, hbytes + bodybytes);
215 data->m_bytestosend = hbytes + bodybytes;
216 TSHttpHdrPrint(header.m_buffer, header.m_lochdr, output_buf);
217 data->m_bytessent = hbytes;
218 TSVIOReenable(output_vio);
219
220 return HeaderState::Good;
221 }
222
223 void
logSliceError(char const * const message,Data const * const data,HttpHeader const & header_resp)224 logSliceError(char const *const message, Data const *const data, HttpHeader const &header_resp)
225 {
226 Config *const config = data->m_config;
227
228 bool const logToError = config->canLogError();
229
230 // always write block stitch errors while in debug mode
231 if (!logToError && !TSIsDebugTagSet(PLUGIN_NAME)) {
232 return;
233 }
234
235 HttpHeader const header_req(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr);
236
237 TSHRTime const timenowus = TShrtime();
238 int64_t const msecs = timenowus / 1000000;
239 int64_t const secs = msecs / 1000;
240 int64_t const ms = msecs % 1000;
241
242 // Gather information on the request, must delete urlstr
243 int urllen = 0;
244 char *const urlstr = header_req.urlString(&urllen);
245
246 char urlpstr[16384];
247 size_t urlplen = sizeof(urlpstr);
248 TSStringPercentEncode(urlstr, urllen, urlpstr, urlplen, &urlplen, nullptr);
249
250 if (nullptr != urlstr) {
251 TSfree(urlstr);
252 }
253
254 // uas
255 char uasstr[8192];
256 int uaslen = sizeof(uasstr);
257 header_req.valueForKey(TS_MIME_FIELD_USER_AGENT, TS_MIME_LEN_USER_AGENT, uasstr, &uaslen);
258
259 // raw range request
260 char rangestr[1024];
261 int rangelen = sizeof(rangestr);
262 header_req.valueForKey(SLICER_MIME_FIELD_INFO, strlen(SLICER_MIME_FIELD_INFO), rangestr, &rangelen);
263
264 // Normalized range request
265 ContentRange const crange(data->m_req_range.m_beg, data->m_req_range.m_end, data->m_contentlen);
266 char normstr[1024];
267 int normlen = sizeof(normstr);
268 crange.toStringClosed(normstr, &normlen);
269
270 // block range request
271 int64_t const blockbeg = data->m_blocknum * data->m_config->m_blockbytes;
272 int64_t const blockend = std::min(blockbeg + data->m_config->m_blockbytes, data->m_contentlen);
273
274 // Block response data
275 TSHttpStatus const statusgot = header_resp.status();
276
277 // content range
278 char crstr[1024];
279 int crlen = sizeof(crstr);
280 header_resp.valueForKey(TS_MIME_FIELD_CONTENT_RANGE, TS_MIME_LEN_CONTENT_RANGE, crstr, &crlen);
281
282 // etag
283 char etagstr[1024];
284 int etaglen = sizeof(etagstr);
285 header_resp.valueForKey(TS_MIME_FIELD_ETAG, TS_MIME_LEN_ETAG, etagstr, &etaglen);
286
287 // last modified
288 time_t lmgot = 0;
289 header_resp.timeForKey(TS_MIME_FIELD_LAST_MODIFIED, TS_MIME_LEN_LAST_MODIFIED, &lmgot);
290
291 // cc
292 char ccstr[2048];
293 int cclen = sizeof(ccstr);
294 header_resp.valueForKey(TS_MIME_FIELD_CACHE_CONTROL, TS_MIME_LEN_CACHE_CONTROL, ccstr, &cclen);
295
296 // via tag
297 char viastr[8192];
298 int vialen = sizeof(viastr);
299 header_resp.valueForKey(TS_MIME_FIELD_VIA, TS_MIME_LEN_VIA, viastr, &vialen);
300
301 char etagexpstr[1024];
302 size_t etagexplen = sizeof(etagexpstr);
303 TSStringPercentEncode(data->m_etag, data->m_etaglen, etagexpstr, etagexplen, &etagexplen, nullptr);
304
305 char etaggotstr[1024];
306 size_t etaggotlen = sizeof(etaggotstr);
307 TSStringPercentEncode(etagstr, etaglen, etaggotstr, etaggotlen, &etaggotlen, nullptr);
308
309 DEBUG_LOG("Logging Block Stitch error");
310
311 ERROR_LOG("%" PRId64 ".%" PRId64 " reason=\"%s\""
312 " uri=\"%.*s\""
313 " uas=\"%.*s\""
314 " req_range=\"%.*s\""
315 " norm_range=\"%.*s\""
316
317 " etag_exp=\"%.*s\""
318 " lm_exp=\"%.*s\""
319
320 " blk_range=\"%" PRId64 "-%" PRId64 "\""
321
322 " status_got=\"%d\""
323 " cr_got=\"%.*s\""
324 " etag_got=\"%.*s\""
325 " lm_got=\"%jd\""
326 " cc=\"%.*s\""
327 " via=\"%.*s\" - attempting to recover",
328 secs, ms, message, (int)urlplen, urlpstr, uaslen, uasstr, rangelen, rangestr, normlen, normstr, (int)etagexplen,
329 etagexpstr, data->m_lastmodifiedlen, data->m_lastmodified, blockbeg, blockend - 1, statusgot, crlen, crstr,
330 (int)etaggotlen, etaggotstr, static_cast<intmax_t>(lmgot), cclen, ccstr, vialen, viastr);
331 }
332
333 bool
handleNextServerHeader(Data * const data,TSCont const contp)334 handleNextServerHeader(Data *const data, TSCont const contp)
335 {
336 // block response header
337 HttpHeader header(data->m_resp_hdrmgr.m_buffer, data->m_resp_hdrmgr.m_lochdr);
338 if (TSIsDebugTagSet(PLUGIN_NAME)) {
339 DEBUG_LOG("Next Header:\n%s", header.toString().c_str());
340 }
341
342 bool same = true;
343
344 switch (header.status()) {
345 case TS_HTTP_STATUS_NOT_FOUND:
346 // need to reissue reference slice
347 logSliceError("404 internal block response (asset gone)", data, header);
348 same = false;
349 break;
350 case TS_HTTP_STATUS_PARTIAL_CONTENT:
351 break;
352 default:
353 DEBUG_LOG("Non 206/404 internal block response encountered");
354 return false;
355 break;
356 }
357
358 // can't parse the content range header, abort -- might be too strict
359 ContentRange blockcr;
360
361 if (same) {
362 blockcr = contentRangeFrom(header);
363 if (!blockcr.isValid() || blockcr.m_length != data->m_contentlen) {
364 logSliceError("Mismatch/Bad block Content-Range", data, header);
365 same = false;
366 }
367 }
368
369 if (same) {
370 // prefer the etag but use Last-Modified if we must.
371 char etag[8192];
372 int etaglen = sizeof(etag);
373 header.valueForKey(TS_MIME_FIELD_ETAG, TS_MIME_LEN_ETAG, etag, &etaglen);
374
375 if (0 < data->m_etaglen || 0 < etaglen) {
376 same = data->m_etaglen == etaglen && 0 == strncmp(etag, data->m_etag, etaglen);
377 if (!same) {
378 logSliceError("Mismatch block Etag", data, header);
379 }
380 } else {
381 char lastmodified[33];
382 int lastmodifiedlen = sizeof(lastmodified);
383 header.valueForKey(TS_MIME_FIELD_LAST_MODIFIED, TS_MIME_LEN_LAST_MODIFIED, lastmodified, &lastmodifiedlen);
384 if (0 < data->m_lastmodifiedlen || 0 < lastmodifiedlen) {
385 same = data->m_lastmodifiedlen == lastmodifiedlen && 0 == strncmp(lastmodified, data->m_lastmodified, lastmodifiedlen);
386 if (!same) {
387 logSliceError("Mismatch block Last-Modified", data, header);
388 }
389 }
390 }
391 }
392
393 // Header mismatch
394 if (same) {
395 // If we were in reference block refetch mode and the headers
396 // still match there is a problem
397 if (BlockState::ActiveRef == data->m_blockstate) {
398 ERROR_LOG("Reference block refetched, got the same block back again");
399 return false;
400 }
401 } else {
402 switch (data->m_blockstate) {
403 case BlockState::Active: {
404 data->m_upstream.abort();
405
406 // Refetch the current interior slice
407 data->m_blockstate = BlockState::PendingInt;
408
409 time_t date = 0;
410 header.timeForKey(TS_MIME_FIELD_DATE, TS_MIME_LEN_DATE, &date);
411
412 // Ask for any slice newer than the cached one
413 time_t const dateims = date + 1;
414
415 DEBUG_LOG("Attempting to reissue interior slice block request with IMS header time: %jd", static_cast<intmax_t>(dateims));
416
417 // add special CRR IMS header to the request
418 HttpHeader headerreq(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr);
419 if (!headerreq.setKeyTime(X_CRR_IMS_HEADER.data(), X_CRR_IMS_HEADER.size(), dateims)) {
420 ERROR_LOG("Failed setting '%.*s'", (int)X_CRR_IMS_HEADER.size(), X_CRR_IMS_HEADER.data());
421 return false;
422 }
423
424 } break;
425 case BlockState::ActiveInt: {
426 data->m_upstream.abort();
427
428 // New interior slice still mismatches, refetch the reference slice
429 data->m_blockstate = BlockState::PendingRef;
430
431 // convert reference date header to time_t
432 time_t const date = TSMimeParseDate(data->m_date, data->m_datelen);
433
434 // Ask for any slice newer than the cached one
435 time_t const dateims = date + 1;
436
437 DEBUG_LOG("Attempting to reissue reference slice block request with IMS header time: %jd", static_cast<intmax_t>(dateims));
438
439 // add special CRR IMS header to the request
440 HttpHeader headerreq(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr);
441 if (!headerreq.setKeyTime(X_CRR_IMS_HEADER.data(), X_CRR_IMS_HEADER.size(), dateims)) {
442 ERROR_LOG("Failed setting '%.*s'", (int)X_CRR_IMS_HEADER.size(), X_CRR_IMS_HEADER.data());
443 return false;
444 }
445
446 // Reset for first block
447 if (Config::RefType::First == data->m_config->m_reftype) {
448 data->m_blocknum = 0;
449 } else {
450 data->m_blocknum = data->m_req_range.firstBlockFor(data->m_config->m_blockbytes);
451 }
452
453 return true;
454
455 } break;
456 // Refetch the reference slice
457 case BlockState::ActiveRef: {
458 // In this state the reference changed otherwise the asset is toast
459 // reset the content length (if content length drove the mismatch)
460 data->m_contentlen = blockcr.m_length;
461 return true;
462 } break;
463 default:
464 break;
465 }
466 }
467
468 data->m_blockexpected = blockcr.rangeSize();
469
470 return true;
471 }
472
473 } // namespace
474
475 // this is called every time the server has data for us
476 void
handle_server_resp(TSCont contp,TSEvent event,Data * const data)477 handle_server_resp(TSCont contp, TSEvent event, Data *const data)
478 {
479 switch (event) {
480 case TS_EVENT_VCONN_READ_READY: {
481 if (data->m_blockstate == BlockState::Passthru) {
482 transfer_all_bytes(data);
483 return;
484 }
485
486 // has block response header been parsed??
487 if (!data->m_server_block_header_parsed) {
488 int64_t consumed = 0;
489 TSIOBufferReader const reader = data->m_upstream.m_read.m_reader;
490 TSVIO const input_vio = data->m_upstream.m_read.m_vio;
491 TSParseResult const res = data->m_resp_hdrmgr.populateFrom(data->m_http_parser, reader, TSHttpHdrParseResp, &consumed);
492
493 TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed);
494
495 // the server response header didn't fit into the input buffer.
496 // wait for more data from upstream
497 if (TS_PARSE_CONT == res) {
498 return;
499 }
500
501 bool headerStat = false;
502
503 if (TS_PARSE_DONE == res) {
504 if (!data->m_server_first_header_parsed) {
505 HeaderState const state = handleFirstServerHeader(data, contp);
506
507 data->m_server_first_header_parsed = true;
508 switch (state) {
509 case HeaderState::Fail:
510 data->m_blockstate = BlockState::Fail;
511 headerStat = false;
512 break;
513 case HeaderState::Passthru: {
514 data->m_blockstate = BlockState::Passthru;
515 transfer_all_bytes(data);
516 DEBUG_LOG("Going into a passthru state");
517 return;
518 } break;
519 case HeaderState::Good:
520 default:
521 headerStat = true;
522 break;
523 }
524 } else {
525 headerStat = handleNextServerHeader(data, contp);
526 }
527
528 data->m_server_block_header_parsed = true;
529 }
530
531 // kill the upstream and allow dnstream to clean up
532 if (!headerStat) {
533 data->m_upstream.abort();
534 data->m_blockstate = BlockState::Fail;
535 if (data->m_dnstream.m_write.isOpen()) {
536 TSVIOReenable(data->m_dnstream.m_write.m_vio);
537 } else {
538 shutdown(contp, data);
539 }
540 return;
541 }
542
543 // header may have been successfully parsed but with caveats
544 switch (data->m_blockstate) {
545 // request new version of current internal slice
546 case BlockState::PendingInt:
547 case BlockState::PendingRef: {
548 if (!request_block(contp, data)) {
549 data->m_blockstate = BlockState::Fail;
550 if (data->m_dnstream.m_write.isOpen()) {
551 TSVIOReenable(data->m_dnstream.m_write.m_vio);
552 } else {
553 shutdown(contp, data);
554 }
555 }
556 return;
557 } break;
558 case BlockState::ActiveRef: {
559 // Mark the reference block for "skip".
560 int64_t const blockbytes = data->m_config->m_blockbytes;
561 int64_t const firstblock = data->m_req_range.firstBlockFor(blockbytes);
562 int64_t const blockpos = firstblock * blockbytes;
563 int64_t const firstblockbytes = std::min(blockbytes, data->m_contentlen - blockpos);
564 data->m_blockskip = firstblockbytes;
565
566 // Check if we should abort the client
567 if (data->m_dnstream.isOpen()) {
568 TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
569 int64_t const output_done = TSVIONDoneGet(output_vio);
570 int64_t const output_sent = data->m_bytessent;
571 if (output_done == output_sent) {
572 data->m_dnstream.abort();
573 }
574 }
575 } break;
576 default: {
577 // how much to normally fast forward into this data block
578 data->m_blockskip = data->m_req_range.skipBytesForBlock(data->m_config->m_blockbytes, data->m_blocknum);
579 } break;
580 }
581 }
582
583 transfer_content_bytes(data);
584 } break;
585 case TS_EVENT_VCONN_READ_COMPLETE: {
586 // fprintf(stderr, "%p: TS_EVENT_VCONN_READ_COMPLETE\n", data);
587 } break;
588 case TS_EVENT_VCONN_EOS: {
589 switch (data->m_blockstate) {
590 case BlockState::ActiveRef:
591 case BlockState::Passthru: {
592 transfer_all_bytes(data);
593 data->m_upstream.close();
594 TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
595 if (nullptr != output_vio) {
596 TSVIOReenable(output_vio);
597 } else {
598 shutdown(contp, data);
599 }
600 return;
601 } break;
602 default:
603 break;
604 }
605
606 // corner condition, good source header + 0 length aborted content
607 // results in no header being read, just an EOS.
608 // trying to delete the upstream will crash ATS (??)
609 if (0 == data->m_blockexpected) {
610 shutdown(contp, data); // this will crash if first block
611 return;
612 }
613
614 transfer_content_bytes(data);
615
616 data->m_upstream.close();
617 data->m_blockstate = BlockState::Pending;
618
619 // check for block truncation
620 if (data->m_blockconsumed < data->m_blockexpected) {
621 DEBUG_LOG("%p handle_server_resp truncation: %" PRId64 "\n", data, data->m_blockexpected - data->m_blockconsumed);
622 data->m_blockstate = BlockState::Fail;
623 // shutdown(contp, data);
624 return;
625 }
626
627 // prepare for the next request block
628 ++data->m_blocknum;
629
630 // when we get a "bytes=-<end>" last N bytes request the plugin
631 // issues a speculative request for the first block
632 // in that case fast forward to the real first in range block
633 // Btw this isn't implemented yet, to be handled
634 int64_t const firstblock = data->m_req_range.firstBlockFor(data->m_config->m_blockbytes);
635 if (data->m_blocknum < firstblock) {
636 data->m_blocknum = firstblock;
637 }
638
639 // continue processing blocks?
640 if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, data->m_blocknum)) {
641 // Don't immediately request the next slice if the client
642 // isn't keeping up
643
644 if (data->m_dnstream.m_write.isOpen()) {
645 bool start_next_block = true;
646
647 // check throttle condition
648 TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
649 int64_t const output_done = TSVIONDoneGet(output_vio);
650 int64_t const output_sent = data->m_bytessent;
651 int64_t const threshout = data->m_config->m_blockbytes;
652 int64_t const buffered = output_sent - output_done;
653
654 if (threshout < buffered) {
655 start_next_block = false;
656 DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, buffered);
657 }
658
659 if (start_next_block) {
660 if (!request_block(contp, data)) {
661 data->m_blockstate = BlockState::Fail;
662 abort(contp, data);
663 return;
664 }
665 }
666 }
667 } else {
668 data->m_upstream.close();
669 data->m_blockstate = BlockState::Done;
670 if (!data->m_dnstream.m_write.isOpen()) {
671 shutdown(contp, data);
672 }
673 }
674 } break;
675 default: {
676 DEBUG_LOG("%p handle_server_resp uhandled event: %s", data, TSHttpEventNameLookup(event));
677 } break;
678 }
679 }
680