1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22
23 @section thoughts Transform thoughts
24
25 - Must be able to handle a chain of transformations.
26 - Any transformation in the chain may fail.
27 Failure options:
28 - abort the client (if transformed data already sent)
29 - serve the client the untransformed document
30 - remove the failing transformation from the chain and attempt the transformation again (difficult to do)
31 - never send untransformed document to client if client would not understand it (e.g. a set top box)
32 - Must be able to change response header fields up until the point that TRANSFORM_READ_READY is sent to the user.
33
34 @section usage Transform usage
35
36 -# transformProcessor.open (cont, hooks); - returns "tvc", a TransformVConnection if 'hooks != NULL'
37 -# tvc->do_io_write (cont, nbytes, buffer1);
38 -# cont->handleEvent (TRANSFORM_READ_READY, NULL);
39 -# tvc->do_io_read (cont, nbytes, buffer2);
40 -# tvc->do_io_close ();
41
42 @section visualization Transform visualization
43
44 @verbatim
45 +----+ +----+ +----+ +----+
46 -IB->| T1 |-B1->| T2 |-B2->| T3 |-B3->| T4 |-OB->
47 +----+ +----+ +----+ +----+
48 @endverbatim
49
50 Data flows into the first transform in the form of the buffer
51 passed to TransformVConnection::do_io_write (IB). Data flows
52 out of the last transform in the form of the buffer passed to
53 TransformVConnection::do_io_read (OB). Between each transformation is
54 another buffer (B1, B2 and B3).
55
56 A transformation is a Continuation. The continuation is called with the
57 event TRANSFORM_IO_WRITE to initialize the write and TRANSFORM_IO_READ
58 to initialize the read.
59
60 */
61
62 #include "ProxyConfig.h"
63 #include "P_Net.h"
64 #include "TransformInternal.h"
65 #include "HdrUtils.h"
66 #include "Log.h"
67
68 TransformProcessor transformProcessor;
69
70 /*-------------------------------------------------------------------------
71 -------------------------------------------------------------------------*/
72
73 void
start()74 TransformProcessor::start()
75 {
76 }
77
78 /*-------------------------------------------------------------------------
79 -------------------------------------------------------------------------*/
80
81 VConnection *
open(Continuation * cont,APIHook * hooks)82 TransformProcessor::open(Continuation *cont, APIHook *hooks)
83 {
84 if (hooks) {
85 return new TransformVConnection(cont, hooks);
86 } else {
87 return nullptr;
88 }
89 }
90
91 /*-------------------------------------------------------------------------
92 -------------------------------------------------------------------------*/
93
94 INKVConnInternal *
null_transform(ProxyMutex * mutex)95 TransformProcessor::null_transform(ProxyMutex *mutex)
96 {
97 return new NullTransform(mutex);
98 }
99
100 /*-------------------------------------------------------------------------
101 -------------------------------------------------------------------------*/
102
103 INKVConnInternal *
range_transform(ProxyMutex * mut,RangeRecord * ranges,int num_fields,HTTPHdr * transform_resp,const char * content_type,int content_type_len,int64_t content_length)104 TransformProcessor::range_transform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp,
105 const char *content_type, int content_type_len, int64_t content_length)
106 {
107 RangeTransform *range_transform =
108 new RangeTransform(mut, ranges, num_fields, transform_resp, content_type, content_type_len, content_length);
109 return range_transform;
110 }
111
112 /*-------------------------------------------------------------------------
113 -------------------------------------------------------------------------*/
114
TransformTerminus(TransformVConnection * tvc)115 TransformTerminus::TransformTerminus(TransformVConnection *tvc)
116 : VConnection(tvc->mutex),
117 m_tvc(tvc),
118 m_read_vio(),
119 m_write_vio(),
120 m_event_count(0),
121 m_deletable(0),
122 m_closed(0),
123 m_called_user(0)
124 {
125 SET_HANDLER(&TransformTerminus::handle_event);
126 }
127
128 #define RETRY() \
129 if (ink_atomic_increment((int *)&m_event_count, 1) < 0) { \
130 ink_assert(!"not reached"); \
131 } \
132 this_ethread()->schedule_in(this, HRTIME_MSECONDS(10)); \
133 return 0;
134
135 int
handle_event(int event,void *)136 TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
137 {
138 int val;
139
140 m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0));
141
142 val = ink_atomic_increment(&m_event_count, -1);
143
144 Debug("transform", "[TransformTerminus::handle_event] event_count %d", m_event_count);
145
146 if (val <= 0) {
147 ink_assert(!"not reached");
148 }
149
150 m_deletable = m_deletable && (val == 1);
151
152 if (m_closed != 0 && m_tvc->m_closed != 0) {
153 if (m_deletable) {
154 Debug("transform", "TransformVConnection destroy [0x%lx]", (long)m_tvc);
155 delete m_tvc;
156 return 0;
157 }
158 } else if (m_write_vio.op == VIO::WRITE) {
159 if (m_read_vio.op == VIO::NONE) {
160 if (!m_called_user) {
161 Debug("transform", "TransformVConnection calling user: %d %d [0x%lx] [0x%lx]", m_event_count, event, (long)m_tvc,
162 (long)m_tvc->m_cont);
163
164 m_called_user = 1;
165 // It is our belief this is safe to pass a reference, i.e. its scope
166 // and locking ought to be safe across the lifetime of the continuation.
167 m_tvc->m_cont->handleEvent(TRANSFORM_READ_READY, (void *)&m_write_vio.nbytes);
168 }
169 } else {
170 int64_t towrite;
171
172 MUTEX_TRY_LOCK(trylock1, m_write_vio.mutex, this_ethread());
173 if (!trylock1.is_locked()) {
174 RETRY();
175 }
176
177 MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
178 if (!trylock2.is_locked()) {
179 RETRY();
180 }
181
182 if (m_closed != 0) {
183 return 0;
184 }
185
186 if (m_write_vio.op == VIO::NONE) {
187 return 0;
188 }
189
190 towrite = m_write_vio.ntodo();
191 if (towrite > 0) {
192 if (towrite > m_write_vio.get_reader()->read_avail()) {
193 towrite = m_write_vio.get_reader()->read_avail();
194 }
195 if (towrite > m_read_vio.ntodo()) {
196 towrite = m_read_vio.ntodo();
197 }
198
199 if (towrite > 0) {
200 m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite);
201 m_read_vio.ndone += towrite;
202
203 m_write_vio.get_reader()->consume(towrite);
204 m_write_vio.ndone += towrite;
205 }
206 }
207
208 if (m_write_vio.ntodo() > 0) {
209 if (towrite > 0) {
210 m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
211 }
212 } else {
213 m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
214 }
215
216 // We could have closed on the write callback
217 if (m_closed != 0 && m_tvc->m_closed != 0) {
218 return 0;
219 }
220
221 if (m_read_vio.ntodo() > 0) {
222 if (m_write_vio.ntodo() <= 0) {
223 m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
224 } else if (towrite > 0) {
225 m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
226 }
227 } else {
228 m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
229 }
230 }
231 } else {
232 MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
233 if (!trylock2.is_locked()) {
234 RETRY();
235 }
236
237 if (m_closed != 0) {
238 // The terminus was closed, but the enclosing transform
239 // vconnection wasn't. If the terminus was aborted then we
240 // call the read_vio cont back with VC_EVENT_ERROR. If it
241 // was closed normally then we call it back with
242 // VC_EVENT_EOS. If a read operation hasn't been initiated
243 // yet and we haven't called the user back then we call
244 // the user back instead of the read_vio cont (which won't
245 // exist).
246 if (m_tvc->m_closed == 0) {
247 int ev = (m_closed == TS_VC_CLOSE_ABORT) ? VC_EVENT_ERROR : VC_EVENT_EOS;
248
249 if (!m_called_user) {
250 m_called_user = 1;
251 m_tvc->m_cont->handleEvent(ev, &m_read_vio);
252 } else {
253 ink_assert(m_read_vio.cont != nullptr);
254 m_read_vio.cont->handleEvent(ev, &m_read_vio);
255 }
256 }
257
258 return 0;
259 }
260 }
261
262 return 0;
263 }
264
265 /*-------------------------------------------------------------------------
266 -------------------------------------------------------------------------*/
267
268 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)269 TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
270 {
271 m_read_vio.buffer.writer_for(buf);
272 m_read_vio.op = VIO::READ;
273 m_read_vio.set_continuation(c);
274 m_read_vio.nbytes = nbytes;
275 m_read_vio.ndone = 0;
276 m_read_vio.vc_server = this;
277
278 if (ink_atomic_increment(&m_event_count, 1) < 0) {
279 ink_assert(!"not reached");
280 }
281 Debug("transform", "[TransformTerminus::do_io_read] event_count %d", m_event_count);
282
283 this_ethread()->schedule_imm_local(this);
284
285 return &m_read_vio;
286 }
287
288 /*-------------------------------------------------------------------------
289 -------------------------------------------------------------------------*/
290
291 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool owner)292 TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
293 {
294 // In the process of eliminating 'owner' mode so asserting against it
295 ink_assert(!owner);
296 m_write_vio.buffer.reader_for(buf);
297 m_write_vio.op = VIO::WRITE;
298 m_write_vio.set_continuation(c);
299 m_write_vio.nbytes = nbytes;
300 m_write_vio.ndone = 0;
301 m_write_vio.vc_server = this;
302
303 if (ink_atomic_increment(&m_event_count, 1) < 0) {
304 ink_assert(!"not reached");
305 }
306 Debug("transform", "[TransformTerminus::do_io_write] event_count %d", m_event_count);
307
308 this_ethread()->schedule_imm_local(this);
309
310 return &m_write_vio;
311 }
312
313 /*-------------------------------------------------------------------------
314 -------------------------------------------------------------------------*/
315
316 void
do_io_close(int error)317 TransformTerminus::do_io_close(int error)
318 {
319 if (ink_atomic_increment(&m_event_count, 1) < 0) {
320 ink_assert(!"not reached");
321 }
322
323 INK_WRITE_MEMORY_BARRIER;
324
325 if (error != -1) {
326 lerrno = error;
327 m_closed = TS_VC_CLOSE_ABORT;
328 } else {
329 m_closed = TS_VC_CLOSE_NORMAL;
330 }
331
332 m_read_vio.op = VIO::NONE;
333 m_read_vio.buffer.clear();
334
335 m_write_vio.op = VIO::NONE;
336 m_write_vio.buffer.clear();
337
338 this_ethread()->schedule_imm_local(this);
339 }
340
341 /*-------------------------------------------------------------------------
342 -------------------------------------------------------------------------*/
343
344 void
do_io_shutdown(ShutdownHowTo_t howto)345 TransformTerminus::do_io_shutdown(ShutdownHowTo_t howto)
346 {
347 if ((howto == IO_SHUTDOWN_READ) || (howto == IO_SHUTDOWN_READWRITE)) {
348 m_read_vio.op = VIO::NONE;
349 m_read_vio.buffer.clear();
350 }
351
352 if ((howto == IO_SHUTDOWN_WRITE) || (howto == IO_SHUTDOWN_READWRITE)) {
353 m_write_vio.op = VIO::NONE;
354 m_write_vio.buffer.clear();
355 }
356 }
357
358 /*-------------------------------------------------------------------------
359 -------------------------------------------------------------------------*/
360
361 void
reenable(VIO * vio)362 TransformTerminus::reenable(VIO *vio)
363 {
364 ink_assert((vio == &m_read_vio) || (vio == &m_write_vio));
365
366 if (m_event_count == 0) {
367 if (ink_atomic_increment(&m_event_count, 1) < 0) {
368 ink_assert(!"not reached");
369 }
370 Debug("transform", "[TransformTerminus::reenable] event_count %d", m_event_count);
371 this_ethread()->schedule_imm_local(this);
372 } else {
373 Debug("transform", "[TransformTerminus::reenable] skipping due to "
374 "pending events");
375 }
376 }
377
378 /*-------------------------------------------------------------------------
379 -------------------------------------------------------------------------*/
380
TransformVConnection(Continuation * cont,APIHook * hooks)381 TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
382 : TransformVCChain(cont->mutex.get()), m_cont(cont), m_terminus(this), m_closed(0)
383 {
384 INKVConnInternal *xform;
385
386 SET_HANDLER(&TransformVConnection::handle_event);
387
388 ink_assert(hooks != nullptr);
389
390 m_transform = hooks->m_cont;
391 while (hooks->m_link.next) {
392 xform = (INKVConnInternal *)hooks->m_cont;
393 hooks = hooks->m_link.next;
394 xform->do_io_transform(hooks->m_cont);
395 }
396 xform = (INKVConnInternal *)hooks->m_cont;
397 xform->do_io_transform(&m_terminus);
398
399 Debug("transform", "TransformVConnection create [0x%lx]", (long)this);
400 }
401
402 /*-------------------------------------------------------------------------
403 -------------------------------------------------------------------------*/
404
~TransformVConnection()405 TransformVConnection::~TransformVConnection()
406 {
407 // Clear the continuations in terminus VConnections so that
408 // mutex's get released (INKqa05596)
409 m_terminus.m_read_vio.set_continuation(nullptr);
410 m_terminus.m_write_vio.set_continuation(nullptr);
411 m_terminus.mutex = nullptr;
412 this->mutex = nullptr;
413 }
414
415 /*-------------------------------------------------------------------------
416 -------------------------------------------------------------------------*/
417
418 int
handle_event(int,void *)419 TransformVConnection::handle_event(int /* event ATS_UNUSED */, void * /* edata ATS_UNUSED */)
420 {
421 ink_assert(!"not reached");
422 return 0;
423 }
424
425 /*-------------------------------------------------------------------------
426 -------------------------------------------------------------------------*/
427
428 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)429 TransformVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
430 {
431 Debug("transform", "TransformVConnection do_io_read: 0x%lx [0x%lx]", (long)c, (long)this);
432
433 return m_terminus.do_io_read(c, nbytes, buf);
434 }
435
436 /*-------------------------------------------------------------------------
437 -------------------------------------------------------------------------*/
438
439 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool)440 TransformVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool /* owner ATS_UNUSED */)
441 {
442 Debug("transform", "TransformVConnection do_io_write: 0x%lx [0x%lx]", (long)c, (long)this);
443
444 return m_transform->do_io_write(c, nbytes, buf);
445 }
446
447 /*-------------------------------------------------------------------------
448 -------------------------------------------------------------------------*/
449
450 void
do_io_close(int error)451 TransformVConnection::do_io_close(int error)
452 {
453 Debug("transform", "TransformVConnection do_io_close: %d [0x%lx]", error, (long)this);
454
455 if (m_closed != 0) {
456 return;
457 }
458
459 if (error != -1) {
460 m_closed = TS_VC_CLOSE_ABORT;
461 } else {
462 m_closed = TS_VC_CLOSE_NORMAL;
463 }
464
465 m_transform->do_io_close(error);
466 m_transform = nullptr;
467 }
468
469 /*-------------------------------------------------------------------------
470 -------------------------------------------------------------------------*/
471
472 void
do_io_shutdown(ShutdownHowTo_t howto)473 TransformVConnection::do_io_shutdown(ShutdownHowTo_t howto)
474 {
475 ink_assert(howto == IO_SHUTDOWN_WRITE);
476
477 Debug("transform", "TransformVConnection do_io_shutdown: %d [0x%lx]", howto, (long)this);
478
479 m_transform->do_io_shutdown(howto);
480 }
481
482 /*-------------------------------------------------------------------------
483 -------------------------------------------------------------------------*/
484
485 void
reenable(VIO *)486 TransformVConnection::reenable(VIO * /* vio ATS_UNUSED */)
487 {
488 ink_assert(!"not reached");
489 }
490
491 /*-------------------------------------------------------------------------
492 -------------------------------------------------------------------------*/
493
494 uint64_t
backlog(uint64_t limit)495 TransformVConnection::backlog(uint64_t limit)
496 {
497 uint64_t b = 0; // backlog
498 VConnection *raw_vc = m_transform;
499 MIOBuffer *w;
500 while (raw_vc && raw_vc != &m_terminus) {
501 INKVConnInternal *vc = static_cast<INKVConnInternal *>(raw_vc);
502 if (nullptr != (w = vc->m_read_vio.buffer.writer())) {
503 b += w->max_read_avail();
504 }
505 if (b >= limit) {
506 return b;
507 }
508 raw_vc = vc->m_output_vc;
509 }
510 if (nullptr != (w = m_terminus.m_read_vio.buffer.writer())) {
511 b += w->max_read_avail();
512 }
513 if (b >= limit) {
514 return b;
515 }
516
517 IOBufferReader *r = m_terminus.m_write_vio.get_reader();
518 if (r) {
519 b += r->read_avail();
520 }
521 return b;
522 }
523
524 /*-------------------------------------------------------------------------
525 -------------------------------------------------------------------------*/
526
TransformControl()527 TransformControl::TransformControl() : Continuation(new_ProxyMutex()), m_hooks()
528 {
529 SET_HANDLER(&TransformControl::handle_event);
530
531 m_hooks.append(transformProcessor.null_transform(new_ProxyMutex()));
532 }
533
534 /*-------------------------------------------------------------------------
535 -------------------------------------------------------------------------*/
536
537 int
handle_event(int event,void *)538 TransformControl::handle_event(int event, void * /* edata ATS_UNUSED */)
539 {
540 switch (event) {
541 case EVENT_IMMEDIATE: {
542 char *s, *e;
543
544 ink_assert(m_tvc == nullptr);
545 if (http_global_hooks && http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)) {
546 m_tvc = transformProcessor.open(this, http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK));
547 } else {
548 m_tvc = transformProcessor.open(this, m_hooks.head());
549 }
550 ink_assert(m_tvc != nullptr);
551
552 m_write_buf = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
553 s = m_write_buf->end();
554 e = m_write_buf->buf_end();
555
556 memset(s, 'a', e - s);
557 m_write_buf->fill(e - s);
558
559 m_tvc->do_io_write(this, 4 * 1024, m_write_buf->alloc_reader());
560 break;
561 }
562
563 case TRANSFORM_READ_READY: {
564 MIOBuffer *buf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
565
566 m_read_buf = buf->alloc_reader();
567 m_tvc->do_io_read(this, INT64_MAX, buf);
568 break;
569 }
570
571 case VC_EVENT_READ_COMPLETE:
572 case VC_EVENT_EOS:
573 m_tvc->do_io_close();
574
575 free_MIOBuffer(m_read_buf->mbuf);
576 m_read_buf = nullptr;
577
578 free_MIOBuffer(m_write_buf);
579 m_write_buf = nullptr;
580 break;
581
582 case VC_EVENT_WRITE_COMPLETE:
583 break;
584
585 default:
586 ink_assert(!"not reached");
587 break;
588 }
589
590 return 0;
591 }
592
593 /*-------------------------------------------------------------------------
594 -------------------------------------------------------------------------*/
595
NullTransform(ProxyMutex * _mutex)596 NullTransform::NullTransform(ProxyMutex *_mutex)
597 : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(_mutex)),
598 m_output_buf(nullptr),
599 m_output_reader(nullptr),
600 m_output_vio(nullptr)
601 {
602 SET_HANDLER(&NullTransform::handle_event);
603
604 Debug("transform", "NullTransform create [0x%lx]", (long)this);
605 }
606
607 /*-------------------------------------------------------------------------
608 -------------------------------------------------------------------------*/
609
~NullTransform()610 NullTransform::~NullTransform()
611 {
612 if (m_output_buf) {
613 free_MIOBuffer(m_output_buf);
614 }
615 }
616
617 /*-------------------------------------------------------------------------
618 -------------------------------------------------------------------------*/
619
620 int
handle_event(int event,void * edata)621 NullTransform::handle_event(int event, void *edata)
622 {
623 handle_event_count(event);
624
625 Debug("transform", "[NullTransform::handle_event] event count %d", m_event_count);
626
627 if (m_closed) {
628 if (m_deletable) {
629 Debug("transform", "NullTransform destroy: %" PRId64 " [%p]", m_output_vio ? m_output_vio->ndone : 0, this);
630 delete this;
631 }
632 } else {
633 switch (event) {
634 case VC_EVENT_ERROR:
635 m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
636 break;
637 case VC_EVENT_WRITE_COMPLETE:
638 ink_assert(m_output_vio == (VIO *)edata);
639
640 // The write to the output vconnection completed. This
641 // could only be the case if the data being fed into us
642 // has also completed.
643 ink_assert(m_write_vio.ntodo() == 0);
644
645 m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
646 break;
647 case VC_EVENT_WRITE_READY:
648 default: {
649 int64_t towrite;
650 int64_t avail;
651
652 ink_assert(m_output_vc != nullptr);
653
654 if (!m_output_vio) {
655 m_output_buf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
656 m_output_reader = m_output_buf->alloc_reader();
657 m_output_vio = m_output_vc->do_io_write(this, m_write_vio.nbytes, m_output_reader);
658 }
659
660 MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
661 if (!trylock.is_locked()) {
662 retry(10);
663 return 0;
664 }
665
666 if (m_closed) {
667 return 0;
668 }
669
670 if (m_write_vio.op == VIO::NONE) {
671 m_output_vio->nbytes = m_write_vio.ndone;
672 m_output_vio->reenable();
673 return 0;
674 }
675
676 towrite = m_write_vio.ntodo();
677 if (towrite > 0) {
678 avail = m_write_vio.get_reader()->read_avail();
679 if (towrite > avail) {
680 towrite = avail;
681 }
682
683 if (towrite > 0) {
684 Debug("transform",
685 "[NullTransform::handle_event] "
686 "writing %" PRId64 " bytes to output",
687 towrite);
688 m_output_buf->write(m_write_vio.get_reader(), towrite);
689
690 m_write_vio.get_reader()->consume(towrite);
691 m_write_vio.ndone += towrite;
692 }
693 }
694
695 if (m_write_vio.ntodo() > 0) {
696 if (towrite > 0) {
697 m_output_vio->reenable();
698 m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
699 }
700 } else {
701 m_output_vio->nbytes = m_write_vio.ndone;
702 m_output_vio->reenable();
703 m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
704 }
705
706 break;
707 }
708 }
709 }
710
711 return 0;
712 }
713
714 /*-------------------------------------------------------------------------
715 Reasons the JG transform cannot currently be a plugin:
716 a) Uses the config system
717 - Easily avoided by using the plugin.config file to pass the config
718 values as parameters to the plugin initialization routine.
719 b) Uses the stat system
720 - FIXME: should probably solve this.
721 -------------------------------------------------------------------------*/
722
723 /* the JG transform is now a plugin. All the JG code,
724 config variables and stats are removed from Transform.cc */
725
726 /*-------------------------------------------------------------------------
727 -------------------------------------------------------------------------*/
728
729 #if TS_HAS_TESTS
730 void
run()731 TransformTest::run()
732 {
733 if (is_action_tag_set("transform_test")) {
734 this_ethread()->schedule_imm_local(new TransformControl());
735 }
736 }
737 #endif
738
739 /*-------------------------------------------------------------------------
740 -------------------------------------------------------------------------*/
741
RangeTransform(ProxyMutex * mut,RangeRecord * ranges,int num_fields,HTTPHdr * transform_resp,const char * content_type,int content_type_len,int64_t content_length)742 RangeTransform::RangeTransform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp,
743 const char *content_type, int content_type_len, int64_t content_length)
744 : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(mut)),
745 m_output_buf(nullptr),
746 m_output_reader(nullptr),
747 m_transform_resp(transform_resp),
748 m_output_vio(nullptr),
749 m_range_content_length(0),
750 m_num_range_fields(num_fields),
751 m_current_range(0),
752 m_content_type(content_type),
753 m_content_type_len(content_type_len),
754 m_ranges(ranges),
755 m_output_cl(content_length),
756 m_done(0)
757 {
758 SET_HANDLER(&RangeTransform::handle_event);
759
760 m_num_chars_for_cl = num_chars_for_int(m_range_content_length);
761 Debug("http_trans", "RangeTransform creation finishes");
762 }
763
764 /*-------------------------------------------------------------------------
765 -------------------------------------------------------------------------*/
766
~RangeTransform()767 RangeTransform::~RangeTransform()
768 {
769 if (m_output_buf) {
770 free_MIOBuffer(m_output_buf);
771 }
772 }
773
774 /*-------------------------------------------------------------------------
775 -------------------------------------------------------------------------*/
776
777 int
handle_event(int event,void * edata)778 RangeTransform::handle_event(int event, void *edata)
779 {
780 handle_event_count(event);
781
782 if (m_closed) {
783 if (m_deletable) {
784 if (m_output_vc != nullptr) {
785 Debug("http_trans", "RangeTransform destroy: %p ndone=%" PRId64, this, m_output_vio ? m_output_vio->ndone : 0);
786 } else {
787 Debug("http_trans", "RangeTransform destroy");
788 }
789 delete this;
790 }
791 } else {
792 switch (event) {
793 case VC_EVENT_ERROR:
794 m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
795 break;
796 case VC_EVENT_WRITE_COMPLETE:
797 ink_assert(m_output_vio == (VIO *)edata);
798 m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
799 break;
800 case VC_EVENT_WRITE_READY:
801 default:
802 ink_assert(m_output_vc != nullptr);
803
804 if (!m_output_vio) {
805 m_output_buf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
806 m_output_reader = m_output_buf->alloc_reader();
807 m_output_vio = m_output_vc->do_io_write(this, m_output_cl, m_output_reader);
808
809 change_response_header();
810
811 if (m_num_range_fields > 1) {
812 add_boundary(false);
813 add_sub_header(m_current_range);
814 }
815 }
816
817 MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
818 if (!trylock.is_locked()) {
819 retry(10);
820 return 0;
821 }
822
823 if (m_closed) {
824 return 0;
825 }
826
827 if (m_write_vio.op == VIO::NONE) {
828 m_output_vio->nbytes = m_done;
829 m_output_vio->reenable();
830 return 0;
831 }
832
833 transform_to_range();
834 break;
835 }
836 }
837
838 return 0;
839 }
840
841 /*-------------------------------------------------------------------------
842 -------------------------------------------------------------------------*/
843
844 void
transform_to_range()845 RangeTransform::transform_to_range()
846 {
847 IOBufferReader *reader = m_write_vio.get_reader();
848 int64_t toskip, tosend, avail;
849 const int64_t *end, *start;
850 int64_t prev_end = 0;
851 int64_t *done_byte;
852
853 if (m_current_range >= m_num_range_fields) {
854 return;
855 }
856
857 end = &m_ranges[m_current_range]._end;
858 done_byte = &m_ranges[m_current_range]._done_byte;
859 start = &m_ranges[m_current_range]._start;
860 avail = reader->read_avail();
861
862 while (true) {
863 if (*done_byte < (*start - 1)) {
864 toskip = *start - *done_byte - 1;
865
866 if (toskip > avail) {
867 toskip = avail;
868 }
869
870 if (toskip > 0) {
871 reader->consume(toskip);
872 *done_byte += toskip;
873 avail = reader->read_avail();
874 }
875 }
876
877 if (avail > 0) {
878 tosend = *end - *done_byte;
879
880 if (tosend > avail) {
881 tosend = avail;
882 }
883
884 m_output_buf->write(reader, tosend);
885 reader->consume(tosend);
886
887 m_done += tosend;
888 *done_byte += tosend;
889 }
890
891 if (*done_byte == *end) {
892 prev_end = *end;
893 }
894
895 // move to next Range if done one
896 // ignore bad Range: _done_byte -1, _end -1
897 while (*done_byte == *end) {
898 m_current_range++;
899
900 if (m_current_range == m_num_range_fields) {
901 if (m_num_range_fields > 1) {
902 m_done += m_output_buf->write("\r\n", 2);
903 add_boundary(true);
904 }
905
906 Debug("http_trans", "total bytes of Range response body is %" PRId64, m_done);
907 m_output_vio->nbytes = m_done;
908 m_output_vio->reenable();
909
910 // if we are detaching before processing all the
911 // input data, send VC_EVENT_EOS to let the upstream know
912 // that it can rely on us consuming any more data
913 int cb_event = (m_write_vio.ntodo() > 0) ? VC_EVENT_EOS : VC_EVENT_WRITE_COMPLETE;
914 m_write_vio.cont->handleEvent(cb_event, &m_write_vio);
915 return;
916 }
917
918 end = &m_ranges[m_current_range]._end;
919 done_byte = &m_ranges[m_current_range]._done_byte;
920 start = &m_ranges[m_current_range]._start;
921
922 // if this is a good Range
923 if (*end != -1) {
924 m_done += m_output_buf->write("\r\n", 2);
925 add_boundary(false);
926 add_sub_header(m_current_range);
927
928 // keep this part for future support of out-of-order Range
929 // if this is NOT a sequential Range compared to the previous one -
930 // start of current Range is larger than the end of last Range, do
931 // not need to go back to the start of the IOBuffereReader.
932 // Otherwise, reset the IOBufferReader.
933 // if ( *start > prev_end )
934 *done_byte = prev_end;
935 // else
936 // reader->reset();
937
938 break;
939 }
940 }
941
942 // When we need to read and there is nothing available
943 avail = reader->read_avail();
944 if (avail == 0) {
945 break;
946 }
947 }
948
949 m_output_vio->reenable();
950 m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
951 }
952
953 /*-------------------------------------------------------------------------
954 -------------------------------------------------------------------------*/
955
956 /*
957 * these two need be changed at the same time
958 */
959
960 static char bound[] = "RANGE_SEPARATOR";
961 static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR";
962 static char cont_type[] = "Content-type: ";
963 static char cont_range[] = "Content-range: bytes ";
964
965 void
add_boundary(bool end)966 RangeTransform::add_boundary(bool end)
967 {
968 m_done += m_output_buf->write("--", 2);
969 m_done += m_output_buf->write(bound, sizeof(bound) - 1);
970
971 if (end) {
972 m_done += m_output_buf->write("--", 2);
973 }
974
975 m_done += m_output_buf->write("\r\n", 2);
976 }
977
978 /*-------------------------------------------------------------------------
979 -------------------------------------------------------------------------*/
980
981 #define RANGE_NUMBERS_LENGTH 60
982
983 void
add_sub_header(int index)984 RangeTransform::add_sub_header(int index)
985 {
986 // this should be large enough to hold three integers!
987 char numbers[RANGE_NUMBERS_LENGTH];
988 int len;
989
990 m_done += m_output_buf->write(cont_type, sizeof(cont_type) - 1);
991 if (m_content_type) {
992 m_done += m_output_buf->write(m_content_type, m_content_type_len);
993 }
994 m_done += m_output_buf->write("\r\n", 2);
995 m_done += m_output_buf->write(cont_range, sizeof(cont_range) - 1);
996
997 snprintf(numbers, sizeof(numbers), "%" PRId64 "-%" PRId64 "/%" PRId64 "", m_ranges[index]._start, m_ranges[index]._end,
998 m_output_cl);
999 len = strlen(numbers);
1000 if (len < RANGE_NUMBERS_LENGTH) {
1001 m_done += m_output_buf->write(numbers, len);
1002 }
1003 m_done += m_output_buf->write("\r\n\r\n", 4);
1004 }
1005
1006 /*-------------------------------------------------------------------------
1007 -------------------------------------------------------------------------*/
1008
1009 /*
1010 * this function changes the response header to reflect this is
1011 * a Range response.
1012 */
1013
1014 void
change_response_header()1015 RangeTransform::change_response_header()
1016 {
1017 MIMEField *field;
1018 char *reason_phrase;
1019 HTTPStatus status_code;
1020
1021 ink_release_assert(m_transform_resp);
1022
1023 status_code = HTTP_STATUS_PARTIAL_CONTENT;
1024 m_transform_resp->status_set(status_code);
1025 reason_phrase = const_cast<char *>(http_hdr_reason_lookup(status_code));
1026 m_transform_resp->reason_set(reason_phrase, strlen(reason_phrase));
1027
1028 if (m_num_range_fields > 1) {
1029 // set the right Content-Type for multiple entry Range
1030 field = m_transform_resp->field_find(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1031
1032 if (field != nullptr) {
1033 m_transform_resp->field_delete(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1034 }
1035
1036 field = m_transform_resp->field_create(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1037 field->value_append(m_transform_resp->m_heap, m_transform_resp->m_mime, range_type, sizeof(range_type) - 1);
1038
1039 m_transform_resp->field_attach(field);
1040 } else {
1041 char numbers[RANGE_NUMBERS_LENGTH];
1042 m_transform_resp->field_delete(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
1043 field = m_transform_resp->field_create(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
1044 snprintf(numbers, sizeof(numbers), "bytes %" PRId64 "-%" PRId64 "/%" PRId64, m_ranges[0]._start, m_ranges[0]._end, m_output_cl);
1045 field->value_set(m_transform_resp->m_heap, m_transform_resp->m_mime, numbers, strlen(numbers));
1046 m_transform_resp->field_attach(field);
1047 }
1048 }
1049
1050 #undef RANGE_NUMBERS_LENGTH
1051