1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22 
23   @section thoughts Transform thoughts
24 
25     - Must be able to handle a chain of transformations.
26     - Any transformation in the chain may fail.
27       Failure options:
28         - abort the client (if transformed data already sent)
29         - serve the client the untransformed document
30         - remove the failing transformation from the chain and attempt the transformation again (difficult to do)
31         - never send untransformed document to client if client would not understand it (e.g. a set top box)
32     - Must be able to change response header fields up until the point that TRANSFORM_READ_READY is sent to the user.
33 
34   @section usage Transform usage
35 
36     -# transformProcessor.open (cont, hooks); - returns "tvc", a TransformVConnection if 'hooks != NULL'
37     -# tvc->do_io_write (cont, nbytes, buffer1);
38     -# cont->handleEvent (TRANSFORM_READ_READY, NULL);
39     -# tvc->do_io_read (cont, nbytes, buffer2);
40     -# tvc->do_io_close ();
41 
42   @section visualization Transform visualization
43 
44   @verbatim
45          +----+     +----+     +----+     +----+
46     -IB->| T1 |-B1->| T2 |-B2->| T3 |-B3->| T4 |-OB->
47          +----+     +----+     +----+     +----+
48   @endverbatim
49 
50   Data flows into the first transform in the form of the buffer
51   passed to TransformVConnection::do_io_write (IB). Data flows
52   out of the last transform in the form of the buffer passed to
53   TransformVConnection::do_io_read (OB). Between each transformation is
54   another buffer (B1, B2 and B3).
55 
56   A transformation is a Continuation. The continuation is called with the
57   event TRANSFORM_IO_WRITE to initialize the write and TRANSFORM_IO_READ
58   to initialize the read.
59 
60 */
61 
62 #include "ProxyConfig.h"
63 #include "P_Net.h"
64 #include "TransformInternal.h"
65 #include "HdrUtils.h"
66 #include "Log.h"
67 
68 TransformProcessor transformProcessor;
69 
70 /*-------------------------------------------------------------------------
71   -------------------------------------------------------------------------*/
72 
73 void
start()74 TransformProcessor::start()
75 {
76 }
77 
78 /*-------------------------------------------------------------------------
79   -------------------------------------------------------------------------*/
80 
81 VConnection *
open(Continuation * cont,APIHook * hooks)82 TransformProcessor::open(Continuation *cont, APIHook *hooks)
83 {
84   if (hooks) {
85     return new TransformVConnection(cont, hooks);
86   } else {
87     return nullptr;
88   }
89 }
90 
91 /*-------------------------------------------------------------------------
92   -------------------------------------------------------------------------*/
93 
94 INKVConnInternal *
null_transform(ProxyMutex * mutex)95 TransformProcessor::null_transform(ProxyMutex *mutex)
96 {
97   return new NullTransform(mutex);
98 }
99 
100 /*-------------------------------------------------------------------------
101   -------------------------------------------------------------------------*/
102 
103 INKVConnInternal *
range_transform(ProxyMutex * mut,RangeRecord * ranges,int num_fields,HTTPHdr * transform_resp,const char * content_type,int content_type_len,int64_t content_length)104 TransformProcessor::range_transform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp,
105                                     const char *content_type, int content_type_len, int64_t content_length)
106 {
107   RangeTransform *range_transform =
108     new RangeTransform(mut, ranges, num_fields, transform_resp, content_type, content_type_len, content_length);
109   return range_transform;
110 }
111 
112 /*-------------------------------------------------------------------------
113   -------------------------------------------------------------------------*/
114 
TransformTerminus(TransformVConnection * tvc)115 TransformTerminus::TransformTerminus(TransformVConnection *tvc)
116   : VConnection(tvc->mutex),
117     m_tvc(tvc),
118     m_read_vio(),
119     m_write_vio(),
120     m_event_count(0),
121     m_deletable(0),
122     m_closed(0),
123     m_called_user(0)
124 {
125   SET_HANDLER(&TransformTerminus::handle_event);
126 }
127 
128 #define RETRY()                                             \
129   if (ink_atomic_increment((int *)&m_event_count, 1) < 0) { \
130     ink_assert(!"not reached");                             \
131   }                                                         \
132   this_ethread()->schedule_in(this, HRTIME_MSECONDS(10));   \
133   return 0;
134 
135 int
handle_event(int event,void *)136 TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
137 {
138   int val;
139 
140   m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0));
141 
142   val = ink_atomic_increment(&m_event_count, -1);
143 
144   Debug("transform", "[TransformTerminus::handle_event] event_count %d", m_event_count);
145 
146   if (val <= 0) {
147     ink_assert(!"not reached");
148   }
149 
150   m_deletable = m_deletable && (val == 1);
151 
152   if (m_closed != 0 && m_tvc->m_closed != 0) {
153     if (m_deletable) {
154       Debug("transform", "TransformVConnection destroy [0x%lx]", (long)m_tvc);
155       delete m_tvc;
156       return 0;
157     }
158   } else if (m_write_vio.op == VIO::WRITE) {
159     if (m_read_vio.op == VIO::NONE) {
160       if (!m_called_user) {
161         Debug("transform", "TransformVConnection calling user: %d %d [0x%lx] [0x%lx]", m_event_count, event, (long)m_tvc,
162               (long)m_tvc->m_cont);
163 
164         m_called_user = 1;
165         // It is our belief this is safe to pass a reference, i.e. its scope
166         // and locking ought to be safe across the lifetime of the continuation.
167         m_tvc->m_cont->handleEvent(TRANSFORM_READ_READY, (void *)&m_write_vio.nbytes);
168       }
169     } else {
170       int64_t towrite;
171 
172       MUTEX_TRY_LOCK(trylock1, m_write_vio.mutex, this_ethread());
173       if (!trylock1.is_locked()) {
174         RETRY();
175       }
176 
177       MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
178       if (!trylock2.is_locked()) {
179         RETRY();
180       }
181 
182       if (m_closed != 0) {
183         return 0;
184       }
185 
186       if (m_write_vio.op == VIO::NONE) {
187         return 0;
188       }
189 
190       towrite = m_write_vio.ntodo();
191       if (towrite > 0) {
192         if (towrite > m_write_vio.get_reader()->read_avail()) {
193           towrite = m_write_vio.get_reader()->read_avail();
194         }
195         if (towrite > m_read_vio.ntodo()) {
196           towrite = m_read_vio.ntodo();
197         }
198 
199         if (towrite > 0) {
200           m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite);
201           m_read_vio.ndone += towrite;
202 
203           m_write_vio.get_reader()->consume(towrite);
204           m_write_vio.ndone += towrite;
205         }
206       }
207 
208       if (m_write_vio.ntodo() > 0) {
209         if (towrite > 0) {
210           m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
211         }
212       } else {
213         m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
214       }
215 
216       // We could have closed on the write callback
217       if (m_closed != 0 && m_tvc->m_closed != 0) {
218         return 0;
219       }
220 
221       if (m_read_vio.ntodo() > 0) {
222         if (m_write_vio.ntodo() <= 0) {
223           m_read_vio.cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
224         } else if (towrite > 0) {
225           m_read_vio.cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
226         }
227       } else {
228         m_read_vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
229       }
230     }
231   } else {
232     MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
233     if (!trylock2.is_locked()) {
234       RETRY();
235     }
236 
237     if (m_closed != 0) {
238       // The terminus was closed, but the enclosing transform
239       // vconnection wasn't. If the terminus was aborted then we
240       // call the read_vio cont back with VC_EVENT_ERROR. If it
241       // was closed normally then we call it back with
242       // VC_EVENT_EOS. If a read operation hasn't been initiated
243       // yet and we haven't called the user back then we call
244       // the user back instead of the read_vio cont (which won't
245       // exist).
246       if (m_tvc->m_closed == 0) {
247         int ev = (m_closed == TS_VC_CLOSE_ABORT) ? VC_EVENT_ERROR : VC_EVENT_EOS;
248 
249         if (!m_called_user) {
250           m_called_user = 1;
251           m_tvc->m_cont->handleEvent(ev, &m_read_vio);
252         } else {
253           ink_assert(m_read_vio.cont != nullptr);
254           m_read_vio.cont->handleEvent(ev, &m_read_vio);
255         }
256       }
257 
258       return 0;
259     }
260   }
261 
262   return 0;
263 }
264 
265 /*-------------------------------------------------------------------------
266   -------------------------------------------------------------------------*/
267 
268 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)269 TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
270 {
271   m_read_vio.buffer.writer_for(buf);
272   m_read_vio.op = VIO::READ;
273   m_read_vio.set_continuation(c);
274   m_read_vio.nbytes    = nbytes;
275   m_read_vio.ndone     = 0;
276   m_read_vio.vc_server = this;
277 
278   if (ink_atomic_increment(&m_event_count, 1) < 0) {
279     ink_assert(!"not reached");
280   }
281   Debug("transform", "[TransformTerminus::do_io_read] event_count %d", m_event_count);
282 
283   this_ethread()->schedule_imm_local(this);
284 
285   return &m_read_vio;
286 }
287 
288 /*-------------------------------------------------------------------------
289   -------------------------------------------------------------------------*/
290 
291 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool owner)292 TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
293 {
294   // In the process of eliminating 'owner' mode so asserting against it
295   ink_assert(!owner);
296   m_write_vio.buffer.reader_for(buf);
297   m_write_vio.op = VIO::WRITE;
298   m_write_vio.set_continuation(c);
299   m_write_vio.nbytes    = nbytes;
300   m_write_vio.ndone     = 0;
301   m_write_vio.vc_server = this;
302 
303   if (ink_atomic_increment(&m_event_count, 1) < 0) {
304     ink_assert(!"not reached");
305   }
306   Debug("transform", "[TransformTerminus::do_io_write] event_count %d", m_event_count);
307 
308   this_ethread()->schedule_imm_local(this);
309 
310   return &m_write_vio;
311 }
312 
313 /*-------------------------------------------------------------------------
314   -------------------------------------------------------------------------*/
315 
316 void
do_io_close(int error)317 TransformTerminus::do_io_close(int error)
318 {
319   if (ink_atomic_increment(&m_event_count, 1) < 0) {
320     ink_assert(!"not reached");
321   }
322 
323   INK_WRITE_MEMORY_BARRIER;
324 
325   if (error != -1) {
326     lerrno   = error;
327     m_closed = TS_VC_CLOSE_ABORT;
328   } else {
329     m_closed = TS_VC_CLOSE_NORMAL;
330   }
331 
332   m_read_vio.op = VIO::NONE;
333   m_read_vio.buffer.clear();
334 
335   m_write_vio.op = VIO::NONE;
336   m_write_vio.buffer.clear();
337 
338   this_ethread()->schedule_imm_local(this);
339 }
340 
341 /*-------------------------------------------------------------------------
342   -------------------------------------------------------------------------*/
343 
344 void
do_io_shutdown(ShutdownHowTo_t howto)345 TransformTerminus::do_io_shutdown(ShutdownHowTo_t howto)
346 {
347   if ((howto == IO_SHUTDOWN_READ) || (howto == IO_SHUTDOWN_READWRITE)) {
348     m_read_vio.op = VIO::NONE;
349     m_read_vio.buffer.clear();
350   }
351 
352   if ((howto == IO_SHUTDOWN_WRITE) || (howto == IO_SHUTDOWN_READWRITE)) {
353     m_write_vio.op = VIO::NONE;
354     m_write_vio.buffer.clear();
355   }
356 }
357 
358 /*-------------------------------------------------------------------------
359   -------------------------------------------------------------------------*/
360 
361 void
reenable(VIO * vio)362 TransformTerminus::reenable(VIO *vio)
363 {
364   ink_assert((vio == &m_read_vio) || (vio == &m_write_vio));
365 
366   if (m_event_count == 0) {
367     if (ink_atomic_increment(&m_event_count, 1) < 0) {
368       ink_assert(!"not reached");
369     }
370     Debug("transform", "[TransformTerminus::reenable] event_count %d", m_event_count);
371     this_ethread()->schedule_imm_local(this);
372   } else {
373     Debug("transform", "[TransformTerminus::reenable] skipping due to "
374                        "pending events");
375   }
376 }
377 
378 /*-------------------------------------------------------------------------
379   -------------------------------------------------------------------------*/
380 
TransformVConnection(Continuation * cont,APIHook * hooks)381 TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
382   : TransformVCChain(cont->mutex.get()), m_cont(cont), m_terminus(this), m_closed(0)
383 {
384   INKVConnInternal *xform;
385 
386   SET_HANDLER(&TransformVConnection::handle_event);
387 
388   ink_assert(hooks != nullptr);
389 
390   m_transform = hooks->m_cont;
391   while (hooks->m_link.next) {
392     xform = (INKVConnInternal *)hooks->m_cont;
393     hooks = hooks->m_link.next;
394     xform->do_io_transform(hooks->m_cont);
395   }
396   xform = (INKVConnInternal *)hooks->m_cont;
397   xform->do_io_transform(&m_terminus);
398 
399   Debug("transform", "TransformVConnection create [0x%lx]", (long)this);
400 }
401 
402 /*-------------------------------------------------------------------------
403   -------------------------------------------------------------------------*/
404 
~TransformVConnection()405 TransformVConnection::~TransformVConnection()
406 {
407   // Clear the continuations in terminus VConnections so that
408   //  mutex's get released (INKqa05596)
409   m_terminus.m_read_vio.set_continuation(nullptr);
410   m_terminus.m_write_vio.set_continuation(nullptr);
411   m_terminus.mutex = nullptr;
412   this->mutex      = nullptr;
413 }
414 
415 /*-------------------------------------------------------------------------
416   -------------------------------------------------------------------------*/
417 
418 int
handle_event(int,void *)419 TransformVConnection::handle_event(int /* event ATS_UNUSED */, void * /* edata ATS_UNUSED */)
420 {
421   ink_assert(!"not reached");
422   return 0;
423 }
424 
425 /*-------------------------------------------------------------------------
426   -------------------------------------------------------------------------*/
427 
428 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)429 TransformVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
430 {
431   Debug("transform", "TransformVConnection do_io_read: 0x%lx [0x%lx]", (long)c, (long)this);
432 
433   return m_terminus.do_io_read(c, nbytes, buf);
434 }
435 
436 /*-------------------------------------------------------------------------
437   -------------------------------------------------------------------------*/
438 
439 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * buf,bool)440 TransformVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool /* owner ATS_UNUSED */)
441 {
442   Debug("transform", "TransformVConnection do_io_write: 0x%lx [0x%lx]", (long)c, (long)this);
443 
444   return m_transform->do_io_write(c, nbytes, buf);
445 }
446 
447 /*-------------------------------------------------------------------------
448   -------------------------------------------------------------------------*/
449 
450 void
do_io_close(int error)451 TransformVConnection::do_io_close(int error)
452 {
453   Debug("transform", "TransformVConnection do_io_close: %d [0x%lx]", error, (long)this);
454 
455   if (m_closed != 0) {
456     return;
457   }
458 
459   if (error != -1) {
460     m_closed = TS_VC_CLOSE_ABORT;
461   } else {
462     m_closed = TS_VC_CLOSE_NORMAL;
463   }
464 
465   m_transform->do_io_close(error);
466   m_transform = nullptr;
467 }
468 
469 /*-------------------------------------------------------------------------
470   -------------------------------------------------------------------------*/
471 
472 void
do_io_shutdown(ShutdownHowTo_t howto)473 TransformVConnection::do_io_shutdown(ShutdownHowTo_t howto)
474 {
475   ink_assert(howto == IO_SHUTDOWN_WRITE);
476 
477   Debug("transform", "TransformVConnection do_io_shutdown: %d [0x%lx]", howto, (long)this);
478 
479   m_transform->do_io_shutdown(howto);
480 }
481 
482 /*-------------------------------------------------------------------------
483   -------------------------------------------------------------------------*/
484 
485 void
reenable(VIO *)486 TransformVConnection::reenable(VIO * /* vio ATS_UNUSED */)
487 {
488   ink_assert(!"not reached");
489 }
490 
491 /*-------------------------------------------------------------------------
492   -------------------------------------------------------------------------*/
493 
494 uint64_t
backlog(uint64_t limit)495 TransformVConnection::backlog(uint64_t limit)
496 {
497   uint64_t b          = 0; // backlog
498   VConnection *raw_vc = m_transform;
499   MIOBuffer *w;
500   while (raw_vc && raw_vc != &m_terminus) {
501     INKVConnInternal *vc = static_cast<INKVConnInternal *>(raw_vc);
502     if (nullptr != (w = vc->m_read_vio.buffer.writer())) {
503       b += w->max_read_avail();
504     }
505     if (b >= limit) {
506       return b;
507     }
508     raw_vc = vc->m_output_vc;
509   }
510   if (nullptr != (w = m_terminus.m_read_vio.buffer.writer())) {
511     b += w->max_read_avail();
512   }
513   if (b >= limit) {
514     return b;
515   }
516 
517   IOBufferReader *r = m_terminus.m_write_vio.get_reader();
518   if (r) {
519     b += r->read_avail();
520   }
521   return b;
522 }
523 
524 /*-------------------------------------------------------------------------
525   -------------------------------------------------------------------------*/
526 
TransformControl()527 TransformControl::TransformControl() : Continuation(new_ProxyMutex()), m_hooks()
528 {
529   SET_HANDLER(&TransformControl::handle_event);
530 
531   m_hooks.append(transformProcessor.null_transform(new_ProxyMutex()));
532 }
533 
534 /*-------------------------------------------------------------------------
535   -------------------------------------------------------------------------*/
536 
537 int
handle_event(int event,void *)538 TransformControl::handle_event(int event, void * /* edata ATS_UNUSED */)
539 {
540   switch (event) {
541   case EVENT_IMMEDIATE: {
542     char *s, *e;
543 
544     ink_assert(m_tvc == nullptr);
545     if (http_global_hooks && http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)) {
546       m_tvc = transformProcessor.open(this, http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK));
547     } else {
548       m_tvc = transformProcessor.open(this, m_hooks.head());
549     }
550     ink_assert(m_tvc != nullptr);
551 
552     m_write_buf = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
553     s           = m_write_buf->end();
554     e           = m_write_buf->buf_end();
555 
556     memset(s, 'a', e - s);
557     m_write_buf->fill(e - s);
558 
559     m_tvc->do_io_write(this, 4 * 1024, m_write_buf->alloc_reader());
560     break;
561   }
562 
563   case TRANSFORM_READ_READY: {
564     MIOBuffer *buf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
565 
566     m_read_buf = buf->alloc_reader();
567     m_tvc->do_io_read(this, INT64_MAX, buf);
568     break;
569   }
570 
571   case VC_EVENT_READ_COMPLETE:
572   case VC_EVENT_EOS:
573     m_tvc->do_io_close();
574 
575     free_MIOBuffer(m_read_buf->mbuf);
576     m_read_buf = nullptr;
577 
578     free_MIOBuffer(m_write_buf);
579     m_write_buf = nullptr;
580     break;
581 
582   case VC_EVENT_WRITE_COMPLETE:
583     break;
584 
585   default:
586     ink_assert(!"not reached");
587     break;
588   }
589 
590   return 0;
591 }
592 
593 /*-------------------------------------------------------------------------
594   -------------------------------------------------------------------------*/
595 
NullTransform(ProxyMutex * _mutex)596 NullTransform::NullTransform(ProxyMutex *_mutex)
597   : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(_mutex)),
598     m_output_buf(nullptr),
599     m_output_reader(nullptr),
600     m_output_vio(nullptr)
601 {
602   SET_HANDLER(&NullTransform::handle_event);
603 
604   Debug("transform", "NullTransform create [0x%lx]", (long)this);
605 }
606 
607 /*-------------------------------------------------------------------------
608   -------------------------------------------------------------------------*/
609 
~NullTransform()610 NullTransform::~NullTransform()
611 {
612   if (m_output_buf) {
613     free_MIOBuffer(m_output_buf);
614   }
615 }
616 
617 /*-------------------------------------------------------------------------
618   -------------------------------------------------------------------------*/
619 
620 int
handle_event(int event,void * edata)621 NullTransform::handle_event(int event, void *edata)
622 {
623   handle_event_count(event);
624 
625   Debug("transform", "[NullTransform::handle_event] event count %d", m_event_count);
626 
627   if (m_closed) {
628     if (m_deletable) {
629       Debug("transform", "NullTransform destroy: %" PRId64 " [%p]", m_output_vio ? m_output_vio->ndone : 0, this);
630       delete this;
631     }
632   } else {
633     switch (event) {
634     case VC_EVENT_ERROR:
635       m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
636       break;
637     case VC_EVENT_WRITE_COMPLETE:
638       ink_assert(m_output_vio == (VIO *)edata);
639 
640       // The write to the output vconnection completed. This
641       // could only be the case if the data being fed into us
642       // has also completed.
643       ink_assert(m_write_vio.ntodo() == 0);
644 
645       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
646       break;
647     case VC_EVENT_WRITE_READY:
648     default: {
649       int64_t towrite;
650       int64_t avail;
651 
652       ink_assert(m_output_vc != nullptr);
653 
654       if (!m_output_vio) {
655         m_output_buf    = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
656         m_output_reader = m_output_buf->alloc_reader();
657         m_output_vio    = m_output_vc->do_io_write(this, m_write_vio.nbytes, m_output_reader);
658       }
659 
660       MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
661       if (!trylock.is_locked()) {
662         retry(10);
663         return 0;
664       }
665 
666       if (m_closed) {
667         return 0;
668       }
669 
670       if (m_write_vio.op == VIO::NONE) {
671         m_output_vio->nbytes = m_write_vio.ndone;
672         m_output_vio->reenable();
673         return 0;
674       }
675 
676       towrite = m_write_vio.ntodo();
677       if (towrite > 0) {
678         avail = m_write_vio.get_reader()->read_avail();
679         if (towrite > avail) {
680           towrite = avail;
681         }
682 
683         if (towrite > 0) {
684           Debug("transform",
685                 "[NullTransform::handle_event] "
686                 "writing %" PRId64 " bytes to output",
687                 towrite);
688           m_output_buf->write(m_write_vio.get_reader(), towrite);
689 
690           m_write_vio.get_reader()->consume(towrite);
691           m_write_vio.ndone += towrite;
692         }
693       }
694 
695       if (m_write_vio.ntodo() > 0) {
696         if (towrite > 0) {
697           m_output_vio->reenable();
698           m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
699         }
700       } else {
701         m_output_vio->nbytes = m_write_vio.ndone;
702         m_output_vio->reenable();
703         m_write_vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
704       }
705 
706       break;
707     }
708     }
709   }
710 
711   return 0;
712 }
713 
714 /*-------------------------------------------------------------------------
715   Reasons the JG transform cannot currently be a plugin:
716     a) Uses the config system
717        - Easily avoided by using the plugin.config file to pass the config
718          values as parameters to the plugin initialization routine.
719     b) Uses the stat system
720        - FIXME: should probably solve this.
721   -------------------------------------------------------------------------*/
722 
723 /* the JG transform is now a plugin. All the JG code,
724    config variables and stats are removed from Transform.cc */
725 
726 /*-------------------------------------------------------------------------
727   -------------------------------------------------------------------------*/
728 
729 #if TS_HAS_TESTS
730 void
run()731 TransformTest::run()
732 {
733   if (is_action_tag_set("transform_test")) {
734     this_ethread()->schedule_imm_local(new TransformControl());
735   }
736 }
737 #endif
738 
739 /*-------------------------------------------------------------------------
740   -------------------------------------------------------------------------*/
741 
RangeTransform(ProxyMutex * mut,RangeRecord * ranges,int num_fields,HTTPHdr * transform_resp,const char * content_type,int content_type_len,int64_t content_length)742 RangeTransform::RangeTransform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp,
743                                const char *content_type, int content_type_len, int64_t content_length)
744   : INKVConnInternal(nullptr, reinterpret_cast<TSMutex>(mut)),
745     m_output_buf(nullptr),
746     m_output_reader(nullptr),
747     m_transform_resp(transform_resp),
748     m_output_vio(nullptr),
749     m_range_content_length(0),
750     m_num_range_fields(num_fields),
751     m_current_range(0),
752     m_content_type(content_type),
753     m_content_type_len(content_type_len),
754     m_ranges(ranges),
755     m_output_cl(content_length),
756     m_done(0)
757 {
758   SET_HANDLER(&RangeTransform::handle_event);
759 
760   m_num_chars_for_cl = num_chars_for_int(m_range_content_length);
761   Debug("http_trans", "RangeTransform creation finishes");
762 }
763 
764 /*-------------------------------------------------------------------------
765   -------------------------------------------------------------------------*/
766 
~RangeTransform()767 RangeTransform::~RangeTransform()
768 {
769   if (m_output_buf) {
770     free_MIOBuffer(m_output_buf);
771   }
772 }
773 
774 /*-------------------------------------------------------------------------
775   -------------------------------------------------------------------------*/
776 
777 int
handle_event(int event,void * edata)778 RangeTransform::handle_event(int event, void *edata)
779 {
780   handle_event_count(event);
781 
782   if (m_closed) {
783     if (m_deletable) {
784       if (m_output_vc != nullptr) {
785         Debug("http_trans", "RangeTransform destroy: %p ndone=%" PRId64, this, m_output_vio ? m_output_vio->ndone : 0);
786       } else {
787         Debug("http_trans", "RangeTransform destroy");
788       }
789       delete this;
790     }
791   } else {
792     switch (event) {
793     case VC_EVENT_ERROR:
794       m_write_vio.cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
795       break;
796     case VC_EVENT_WRITE_COMPLETE:
797       ink_assert(m_output_vio == (VIO *)edata);
798       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
799       break;
800     case VC_EVENT_WRITE_READY:
801     default:
802       ink_assert(m_output_vc != nullptr);
803 
804       if (!m_output_vio) {
805         m_output_buf    = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K);
806         m_output_reader = m_output_buf->alloc_reader();
807         m_output_vio    = m_output_vc->do_io_write(this, m_output_cl, m_output_reader);
808 
809         change_response_header();
810 
811         if (m_num_range_fields > 1) {
812           add_boundary(false);
813           add_sub_header(m_current_range);
814         }
815       }
816 
817       MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
818       if (!trylock.is_locked()) {
819         retry(10);
820         return 0;
821       }
822 
823       if (m_closed) {
824         return 0;
825       }
826 
827       if (m_write_vio.op == VIO::NONE) {
828         m_output_vio->nbytes = m_done;
829         m_output_vio->reenable();
830         return 0;
831       }
832 
833       transform_to_range();
834       break;
835     }
836   }
837 
838   return 0;
839 }
840 
841 /*-------------------------------------------------------------------------
842   -------------------------------------------------------------------------*/
843 
844 void
transform_to_range()845 RangeTransform::transform_to_range()
846 {
847   IOBufferReader *reader = m_write_vio.get_reader();
848   int64_t toskip, tosend, avail;
849   const int64_t *end, *start;
850   int64_t prev_end = 0;
851   int64_t *done_byte;
852 
853   if (m_current_range >= m_num_range_fields) {
854     return;
855   }
856 
857   end       = &m_ranges[m_current_range]._end;
858   done_byte = &m_ranges[m_current_range]._done_byte;
859   start     = &m_ranges[m_current_range]._start;
860   avail     = reader->read_avail();
861 
862   while (true) {
863     if (*done_byte < (*start - 1)) {
864       toskip = *start - *done_byte - 1;
865 
866       if (toskip > avail) {
867         toskip = avail;
868       }
869 
870       if (toskip > 0) {
871         reader->consume(toskip);
872         *done_byte += toskip;
873         avail = reader->read_avail();
874       }
875     }
876 
877     if (avail > 0) {
878       tosend = *end - *done_byte;
879 
880       if (tosend > avail) {
881         tosend = avail;
882       }
883 
884       m_output_buf->write(reader, tosend);
885       reader->consume(tosend);
886 
887       m_done += tosend;
888       *done_byte += tosend;
889     }
890 
891     if (*done_byte == *end) {
892       prev_end = *end;
893     }
894 
895     // move to next Range if done one
896     // ignore bad Range: _done_byte -1, _end -1
897     while (*done_byte == *end) {
898       m_current_range++;
899 
900       if (m_current_range == m_num_range_fields) {
901         if (m_num_range_fields > 1) {
902           m_done += m_output_buf->write("\r\n", 2);
903           add_boundary(true);
904         }
905 
906         Debug("http_trans", "total bytes of Range response body is %" PRId64, m_done);
907         m_output_vio->nbytes = m_done;
908         m_output_vio->reenable();
909 
910         // if we are detaching before processing all the
911         //   input data, send VC_EVENT_EOS to let the upstream know
912         //   that it can rely on us consuming any more data
913         int cb_event = (m_write_vio.ntodo() > 0) ? VC_EVENT_EOS : VC_EVENT_WRITE_COMPLETE;
914         m_write_vio.cont->handleEvent(cb_event, &m_write_vio);
915         return;
916       }
917 
918       end       = &m_ranges[m_current_range]._end;
919       done_byte = &m_ranges[m_current_range]._done_byte;
920       start     = &m_ranges[m_current_range]._start;
921 
922       // if this is a good Range
923       if (*end != -1) {
924         m_done += m_output_buf->write("\r\n", 2);
925         add_boundary(false);
926         add_sub_header(m_current_range);
927 
928         // keep this part for future support of out-of-order Range
929         // if this is NOT a sequential Range compared to the previous one -
930         // start of current Range is larger than the end of last Range, do
931         // not need to go back to the start of the IOBuffereReader.
932         // Otherwise, reset the IOBufferReader.
933         // if ( *start > prev_end )
934         *done_byte = prev_end;
935         // else
936         //  reader->reset();
937 
938         break;
939       }
940     }
941 
942     // When we need to read and there is nothing available
943     avail = reader->read_avail();
944     if (avail == 0) {
945       break;
946     }
947   }
948 
949   m_output_vio->reenable();
950   m_write_vio.cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
951 }
952 
953 /*-------------------------------------------------------------------------
954   -------------------------------------------------------------------------*/
955 
956 /*
957  * these two need be changed at the same time
958  */
959 
960 static char bound[]      = "RANGE_SEPARATOR";
961 static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR";
962 static char cont_type[]  = "Content-type: ";
963 static char cont_range[] = "Content-range: bytes ";
964 
965 void
add_boundary(bool end)966 RangeTransform::add_boundary(bool end)
967 {
968   m_done += m_output_buf->write("--", 2);
969   m_done += m_output_buf->write(bound, sizeof(bound) - 1);
970 
971   if (end) {
972     m_done += m_output_buf->write("--", 2);
973   }
974 
975   m_done += m_output_buf->write("\r\n", 2);
976 }
977 
978 /*-------------------------------------------------------------------------
979   -------------------------------------------------------------------------*/
980 
981 #define RANGE_NUMBERS_LENGTH 60
982 
983 void
add_sub_header(int index)984 RangeTransform::add_sub_header(int index)
985 {
986   // this should be large enough to hold three integers!
987   char numbers[RANGE_NUMBERS_LENGTH];
988   int len;
989 
990   m_done += m_output_buf->write(cont_type, sizeof(cont_type) - 1);
991   if (m_content_type) {
992     m_done += m_output_buf->write(m_content_type, m_content_type_len);
993   }
994   m_done += m_output_buf->write("\r\n", 2);
995   m_done += m_output_buf->write(cont_range, sizeof(cont_range) - 1);
996 
997   snprintf(numbers, sizeof(numbers), "%" PRId64 "-%" PRId64 "/%" PRId64 "", m_ranges[index]._start, m_ranges[index]._end,
998            m_output_cl);
999   len = strlen(numbers);
1000   if (len < RANGE_NUMBERS_LENGTH) {
1001     m_done += m_output_buf->write(numbers, len);
1002   }
1003   m_done += m_output_buf->write("\r\n\r\n", 4);
1004 }
1005 
1006 /*-------------------------------------------------------------------------
1007   -------------------------------------------------------------------------*/
1008 
1009 /*
1010  * this function changes the response header to reflect this is
1011  * a Range response.
1012  */
1013 
1014 void
change_response_header()1015 RangeTransform::change_response_header()
1016 {
1017   MIMEField *field;
1018   char *reason_phrase;
1019   HTTPStatus status_code;
1020 
1021   ink_release_assert(m_transform_resp);
1022 
1023   status_code = HTTP_STATUS_PARTIAL_CONTENT;
1024   m_transform_resp->status_set(status_code);
1025   reason_phrase = const_cast<char *>(http_hdr_reason_lookup(status_code));
1026   m_transform_resp->reason_set(reason_phrase, strlen(reason_phrase));
1027 
1028   if (m_num_range_fields > 1) {
1029     // set the right Content-Type for multiple entry Range
1030     field = m_transform_resp->field_find(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1031 
1032     if (field != nullptr) {
1033       m_transform_resp->field_delete(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1034     }
1035 
1036     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
1037     field->value_append(m_transform_resp->m_heap, m_transform_resp->m_mime, range_type, sizeof(range_type) - 1);
1038 
1039     m_transform_resp->field_attach(field);
1040   } else {
1041     char numbers[RANGE_NUMBERS_LENGTH];
1042     m_transform_resp->field_delete(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
1043     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
1044     snprintf(numbers, sizeof(numbers), "bytes %" PRId64 "-%" PRId64 "/%" PRId64, m_ranges[0]._start, m_ranges[0]._end, m_output_cl);
1045     field->value_set(m_transform_resp->m_heap, m_transform_resp->m_mime, numbers, strlen(numbers));
1046     m_transform_resp->field_attach(field);
1047   }
1048 }
1049 
1050 #undef RANGE_NUMBERS_LENGTH
1051