1 /**
2   Licensed to the Apache Software Foundation (ASF) under one
3   or more contributor license agreements.  See the NOTICE file
4   distributed with this work for additional information
5   regarding copyright ownership.  The ASF licenses this file
6   to you under the Apache License, Version 2.0 (the
7   "License"); you may not use this file except in compliance
8   with the License.  You may obtain a copy of the License at
9 
10       http://www.apache.org/licenses/LICENSE-2.0
11 
12   Unless required by applicable law or agreed to in writing, software
13   distributed under the License is distributed on an "AS IS" BASIS,
14   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   See the License for the specific language governing permissions and
16   limitations under the License.
17  */
18 
19 /**
20  * @file TransformationPlugin.cc
21  */
22 
23 #include "tscpp/api/TransformationPlugin.h"
24 
25 #include "ts/ts.h"
26 #include <cstddef>
27 #include <cinttypes>
28 #include "utils_internal.h"
29 #include "logging_internal.h"
30 #include "tscpp/api/noncopyable.h"
31 #include "tscpp/api/Continuation.h"
32 
33 #ifndef INT64_MAX
34 #define INT64_MAX (9223372036854775807LL)
35 #endif
36 
37 namespace atscppapi
38 {
39 namespace detail
40 {
41   class ResumeAfterPauseCont : public Continuation
42   {
43   public:
ResumeAfterPauseCont()44     ResumeAfterPauseCont() : Continuation() {}
45 
ResumeAfterPauseCont(Continuation::Mutex m)46     ResumeAfterPauseCont(Continuation::Mutex m) : Continuation(m) {}
47 
48   protected:
49     int _run(TSEvent event, void *edata) override;
50   };
51 
52 } // end namespace detail
53 
54 /**
55  * @private
56  */
57 struct TransformationPluginState : noncopyable, public detail::ResumeAfterPauseCont {
58   TSVConn vconn_;
59   Transaction &transaction_;
60   TransformationPlugin &transformation_plugin_;
61   TransformationPlugin::Type type_;
62   TSVIO output_vio_; // this gets initialized on an output().
63   TSHttpTxn txn_;
64   TSIOBuffer output_buffer_;
65   TSIOBufferReader output_buffer_reader_;
66   int64_t bytes_written_;
67   bool paused_;
68 
69   // We can only send a single WRITE_COMPLETE even though
70   // we may receive an immediate event after we've sent a
71   // write complete, so we'll keep track of whether or not we've
72   // sent the input end our write complete.
73   bool input_complete_dispatched_;
74 
75   std::string request_xform_output_; // in case of request xform, data produced is buffered here
76 
TransformationPluginStateatscppapi::TransformationPluginState77   TransformationPluginState(atscppapi::Transaction &transaction, TransformationPlugin &transformation_plugin,
78                             TransformationPlugin::Type type, TSHttpTxn txn)
79     : vconn_(nullptr),
80       transaction_(transaction),
81       transformation_plugin_(transformation_plugin),
82       type_(type),
83       output_vio_(nullptr),
84       txn_(txn),
85       output_buffer_(nullptr),
86       output_buffer_reader_(nullptr),
87       bytes_written_(0),
88       paused_(false),
89       input_complete_dispatched_(false)
90   {
91     output_buffer_        = TSIOBufferCreate();
92     output_buffer_reader_ = TSIOBufferReaderAlloc(output_buffer_);
93   };
94 
~TransformationPluginStateatscppapi::TransformationPluginState95   ~TransformationPluginState() override
96   {
97     if (output_buffer_reader_) {
98       TSIOBufferReaderFree(output_buffer_reader_);
99       output_buffer_reader_ = nullptr;
100     }
101 
102     if (output_buffer_) {
103       TSIOBufferDestroy(output_buffer_);
104       output_buffer_ = nullptr;
105     }
106   }
107 };
108 
109 } // end namespace atscppapi
110 
111 using namespace atscppapi;
112 
113 namespace
114 {
115 using ResumeAfterPauseCont = atscppapi::detail::ResumeAfterPauseCont;
116 
117 void
cleanupTransformation(TSCont contp)118 cleanupTransformation(TSCont contp)
119 {
120   LOG_DEBUG("Destroying transformation contp=%p", contp);
121   TSContDataSet(contp, reinterpret_cast<void *>(0xDEADDEAD));
122   TSContDestroy(contp);
123 }
124 
125 int
handleTransformationPluginRead(TSCont contp,TransformationPluginState * state)126 handleTransformationPluginRead(TSCont contp, TransformationPluginState *state)
127 {
128   // Traffic Server naming is quite confusing, in this context the write_vio
129   // is actually the vio we read from.
130   TSVIO write_vio = TSVConnWriteVIOGet(contp);
131   if (write_vio) {
132     if (state->paused_) {
133       LOG_DEBUG("Transformation contp=%p write_vio=%p, is paused", contp, write_vio);
134       return 0;
135     }
136 
137     int64_t to_read = TSVIONTodoGet(write_vio);
138     LOG_DEBUG("Transformation contp=%p write_vio=%p, to_read=%" PRId64, contp, write_vio, to_read);
139 
140     if (to_read > 0) {
141       /*
142        * The amount of data left to read needs to be truncated by
143        * the amount of data actually in the read buffer.
144        **/
145       int64_t avail = TSIOBufferReaderAvail(TSVIOReaderGet(write_vio));
146       LOG_DEBUG("Transformation contp=%p write_vio=%p, to_read=%" PRId64 ", buffer reader avail=%" PRId64, contp, write_vio,
147                 to_read, avail);
148 
149       if (to_read > avail) {
150         to_read = avail;
151         LOG_DEBUG("Transformation contp=%p write_vio=%p, to read > avail, fixing to_read to be equal to avail. to_read=%" PRId64
152                   ", buffer reader avail=%" PRId64,
153                   contp, write_vio, to_read, avail);
154       }
155 
156       if (to_read > 0) {
157         /* Create a buffer and a buffer reader */
158         TSIOBuffer input_buffer       = TSIOBufferCreate();
159         TSIOBufferReader input_reader = TSIOBufferReaderAlloc(input_buffer);
160 
161         /* Copy the data from the read buffer to the input buffer. */
162         TSIOBufferCopy(input_buffer, TSVIOReaderGet(write_vio), to_read, 0);
163 
164         /* Tell the read buffer that we have read the data and are no
165          longer interested in it. */
166         TSIOBufferReaderConsume(TSVIOReaderGet(write_vio), to_read);
167 
168         /* Modify the read VIO to reflect how much data we've completed. */
169         TSVIONDoneSet(write_vio, TSVIONDoneGet(write_vio) + to_read);
170 
171         std::string in_data = utils::internal::consumeFromTSIOBufferReader(input_reader);
172         LOG_DEBUG("Transformation contp=%p write_vio=%p consumed %ld bytes from bufferreader", contp, write_vio, in_data.length());
173 
174         /* Clean up the buffer and reader */
175         TSIOBufferReaderFree(input_reader);
176         TSIOBufferDestroy(input_buffer);
177 
178         /* Now call the client to tell them about data */
179         if (in_data.length() > 0) {
180           state->transformation_plugin_.consume(in_data);
181         }
182       }
183 
184       /* now that we've finished reading we will check if there is anything left to read. */
185       TSCont vio_cont = TSVIOContGet(write_vio); // for some reason this can occasionally be null
186 
187       if (TSVIONTodoGet(write_vio) > 0) {
188         LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p still has bytes left to process, todo > 0.", contp, write_vio,
189                   vio_cont);
190 
191         if (to_read > 0) {
192           TSVIOReenable(write_vio);
193 
194           /* Call back the read VIO continuation to let it know that we are ready for more data. */
195           if (vio_cont) {
196             TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_READY), write_vio);
197           }
198         }
199       } else {
200         LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p has no bytes left to process, will send WRITE_COMPLETE.",
201                   contp, write_vio, vio_cont);
202 
203         /* Call back the write VIO continuation to let it know that we have completed the write operation. */
204         if (!state->input_complete_dispatched_) {
205           state->transformation_plugin_.handleInputComplete();
206           state->input_complete_dispatched_ = true;
207           if (vio_cont && nullptr != TSVIOBufferGet(write_vio)) {
208             TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_COMPLETE), write_vio);
209           }
210         }
211       }
212     } else {
213       TSCont vio_cont = TSVIOContGet(write_vio); // for some reason this can occasionally be null?
214       LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p has no bytes left to process.", contp, write_vio, vio_cont);
215 
216       /* Call back the write VIO continuation to let it know that we have completed the write operation. */
217       if (!state->input_complete_dispatched_) {
218         state->transformation_plugin_.handleInputComplete();
219         state->input_complete_dispatched_ = true;
220         if (vio_cont && nullptr != TSVIOBufferGet(write_vio)) {
221           TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_COMPLETE), write_vio);
222         }
223       }
224     }
225   } else {
226     LOG_ERROR("Transformation contp=%p write_vio=%p was nullptr!", contp, write_vio);
227   }
228   return 0;
229 }
230 
231 int
handleTransformationPluginEvents(TSCont contp,TSEvent event,void * edata)232 handleTransformationPluginEvents(TSCont contp, TSEvent event, void *edata)
233 {
234   TransformationPluginState *state = static_cast<TransformationPluginState *>(TSContDataGet(contp));
235   LOG_DEBUG("Transformation contp=%p event=%d edata=%p tshttptxn=%p", contp, event, edata, state->txn_);
236 
237   // The first thing you always do is check if the VConn is closed.
238   int connection_closed = TSVConnClosedGet(state->vconn_);
239   if (connection_closed) {
240     LOG_DEBUG("Transformation contp=%p tshttptxn=%p is closed connection_closed=%d ", contp, state->txn_, connection_closed);
241     // we'll do the cleanupTransformation in the TransformationPlugin destructor.
242     return 0;
243   }
244 
245   if (event == TS_EVENT_VCONN_WRITE_COMPLETE) {
246     TSVConn output_vconn = TSTransformOutputVConnGet(state->vconn_);
247     LOG_DEBUG("Transformation contp=%p tshttptxn=%p received WRITE_COMPLETE, shutting down outputvconn=%p ", contp, state->txn_,
248               output_vconn);
249     TSVConnShutdown(output_vconn, 0, 1); // The other end is done reading our output
250     return 0;
251   } else if (event == TS_EVENT_ERROR) {
252     TSVIO write_vio;
253     /* Get the write VIO for the write operation that was
254      performed on ourself. This VIO contains the continuation of
255      our parent transformation. */
256     write_vio       = TSVConnWriteVIOGet(state->vconn_);
257     TSCont vio_cont = TSVIOContGet(write_vio);
258     LOG_ERROR("Transformation contp=%p tshttptxn=%p received EVENT_ERROR forwarding to write_vio=%p viocont=%p", contp, state->txn_,
259               write_vio, vio_cont);
260     if (vio_cont) {
261       TSContCall(vio_cont, TS_EVENT_ERROR, write_vio);
262     }
263     return 0;
264   }
265 
266   // All other events including WRITE_READY will just attempt to transform more data.
267   return handleTransformationPluginRead(state->vconn_, state);
268 }
269 
270 } /* anonymous namespace */
271 
TransformationPlugin(Transaction & transaction,TransformationPlugin::Type type)272 TransformationPlugin::TransformationPlugin(Transaction &transaction, TransformationPlugin::Type type)
273   : TransactionPlugin(transaction), state_(nullptr)
274 {
275   state_         = new TransformationPluginState(transaction, *this, type, static_cast<TSHttpTxn>(transaction.getAtsHandle()));
276   state_->vconn_ = TSTransformCreate(handleTransformationPluginEvents, state_->txn_);
277   TSContDataSet(state_->vconn_, static_cast<void *>(state_)); // edata in a TransformationHandler is NOT a TSHttpTxn.
278   LOG_DEBUG("Creating TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p transformation_type=%d", this, state_->vconn_,
279             state_->txn_, type);
280   TSHttpTxnHookAdd(state_->txn_, utils::internal::convertInternalTransformationTypeToTsHook(type), state_->vconn_);
281 }
282 
~TransformationPlugin()283 TransformationPlugin::~TransformationPlugin()
284 {
285   LOG_DEBUG("Destroying TransformationPlugin=%p", this);
286   cleanupTransformation(state_->vconn_);
287   delete state_;
288 }
289 
290 void
pause()291 TransformationPlugin::pause()
292 {
293   if (state_->paused_) {
294     LOG_ERROR("Can not pause transformation, already paused  TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p", this,
295               state_->vconn_, state_->txn_);
296   } else if (state_->input_complete_dispatched_) {
297     LOG_ERROR("Can not pause transformation (transformation completed) TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p", this,
298               state_->vconn_, state_->txn_);
299   } else {
300     state_->paused_ = true;
301     if (!static_cast<bool>(static_cast<ResumeAfterPauseCont *>(state_))) {
302       *static_cast<ResumeAfterPauseCont *>(state_) = ResumeAfterPauseCont(TSContMutexGet(reinterpret_cast<TSCont>(state_->txn_)));
303     }
304   }
305 }
306 
307 bool
isPaused() const308 TransformationPlugin::isPaused() const
309 {
310   return state_->paused_;
311 }
312 
313 Continuation &
resumeCont()314 TransformationPlugin::resumeCont()
315 {
316   TSReleaseAssert(state_->paused_);
317 
318   // The cast to a pointer to the intermediate base class ResumeAfterPauseCont is not strictly necessary.  It is
319   // possible that the transform plugin might want to defer work to other continuations in the future.  This would
320   // naturally result in TransactionPluginState having Continuation as an indirect base class multiple times, making
321   // disambiguation necessary when converting.
322   //
323   return *static_cast<ResumeAfterPauseCont *>(state_);
324 }
325 
326 int
_run(TSEvent event,void * edata)327 ResumeAfterPauseCont::_run(TSEvent event, void *edata)
328 {
329   auto state     = static_cast<TransformationPluginState *>(this);
330   state->paused_ = false;
331   handleTransformationPluginRead(state->vconn_, state);
332 
333   return TS_SUCCESS;
334 }
335 
336 size_t
doProduce(std::string_view data)337 TransformationPlugin::doProduce(std::string_view data)
338 {
339   LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p producing output with length=%ld", this, state_->txn_, data.length());
340   int64_t write_length = static_cast<int64_t>(data.length());
341   if (!write_length) {
342     return 0;
343   }
344 
345   if (!state_->output_vio_) {
346     TSVConn output_vconn = TSTransformOutputVConnGet(state_->vconn_);
347     LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p will issue a TSVConnWrite, output_vconn=%p.", this, state_->txn_, output_vconn);
348     if (output_vconn) {
349       // If you're confused about the following reference the traffic server transformation docs.
350       // You always write INT64_MAX, this basically says you're not sure how much data you're going to write
351       state_->output_vio_ = TSVConnWrite(output_vconn, state_->vconn_, state_->output_buffer_reader_, INT64_MAX);
352     } else {
353       LOG_ERROR("TransformationPlugin=%p tshttptxn=%p output_vconn=%p cannot issue TSVConnWrite due to null output vconn.", this,
354                 state_->txn_, output_vconn);
355       return 0;
356     }
357 
358     if (!state_->output_vio_) {
359       LOG_ERROR("TransformationPlugin=%p tshttptxn=%p state_->output_vio=%p, TSVConnWrite failed.", this, state_->txn_,
360                 state_->output_vio_);
361       return 0;
362     }
363   }
364 
365   // Finally we can copy this data into the output_buffer
366   int64_t bytes_written = TSIOBufferWrite(state_->output_buffer_, data.data(), write_length);
367   state_->bytes_written_ += bytes_written; // So we can set BytesDone on outputComplete().
368   LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p write to TSIOBuffer %" PRId64 " bytes total bytes written %" PRId64, this,
369             state_->txn_, bytes_written, state_->bytes_written_);
370 
371   // Sanity Checks
372   if (bytes_written != write_length) {
373     LOG_ERROR("TransformationPlugin=%p tshttptxn=%p bytes written < expected. bytes_written=%" PRId64 " write_length=%" PRId64,
374               this, state_->txn_, bytes_written, write_length);
375   }
376 
377   int connection_closed = TSVConnClosedGet(state_->vconn_);
378   LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p vconn=%p connection_closed=%d", this, state_->txn_, state_->vconn_,
379             connection_closed);
380 
381   if (!connection_closed) {
382     TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
383   } else {
384     LOG_ERROR(
385       "TransformationPlugin=%p tshttptxn=%p output_vio=%p connection_closed=%d : Couldn't reenable output vio (connection closed).",
386       this, state_->txn_, state_->output_vio_, connection_closed);
387   }
388 
389   return static_cast<size_t>(bytes_written);
390 }
391 
392 size_t
produce(std::string_view data)393 TransformationPlugin::produce(std::string_view data)
394 {
395   if (state_->type_ == REQUEST_TRANSFORMATION) {
396     state_->request_xform_output_.append(data.data(), data.length());
397     return data.size();
398   } else if (state_->type_ == SINK_TRANSFORMATION) {
399     LOG_DEBUG("produce TransformationPlugin=%p tshttptxn=%p : This is a sink transform. Not producing any output", this,
400               state_->txn_);
401     return 0;
402   } else {
403     return doProduce(data);
404   }
405 }
406 
407 size_t
setOutputComplete()408 TransformationPlugin::setOutputComplete()
409 {
410   if (state_->type_ == SINK_TRANSFORMATION) {
411     // There's no output stream for a sink transform, so we do nothing
412     //
413     // Warning: don't try to shutdown the VConn, since the default implementation (DummyVConnection)
414     // has a stubbed out shutdown/close implementation
415     return 0;
416   } else if (state_->type_ == REQUEST_TRANSFORMATION) {
417     doProduce(state_->request_xform_output_);
418   }
419 
420   int connection_closed = TSVConnClosedGet(state_->vconn_);
421   LOG_DEBUG("OutputComplete TransformationPlugin=%p tshttptxn=%p vconn=%p connection_closed=%d, total bytes written=%" PRId64, this,
422             state_->txn_, state_->vconn_, connection_closed, state_->bytes_written_);
423 
424   if (!connection_closed && !state_->output_vio_) {
425     LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p output complete without writing any data, initiating write of 0 bytes.", this,
426               state_->txn_);
427 
428     // We're done without ever outputting anything, to correctly
429     // clean up we'll initiate a write then immediately set it to 0 bytes done.
430     state_->output_vio_ = TSVConnWrite(TSTransformOutputVConnGet(state_->vconn_), state_->vconn_, state_->output_buffer_reader_, 0);
431 
432     if (state_->output_vio_) {
433       TSVIONDoneSet(state_->output_vio_, 0);
434       TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
435     } else {
436       LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p because VConnWrite failed.", this,
437                 state_->txn_, state_->output_vio_);
438     }
439 
440     return 0;
441   }
442 
443   if (!connection_closed) {
444     // So there is a possible race condition here, if we wake up a dead
445     // VIO it can cause a segfault, so we must check that the VCONN is not dead.
446     int connection_closed = TSVConnClosedGet(state_->vconn_);
447     if (!connection_closed) {
448       TSVIONBytesSet(state_->output_vio_, state_->bytes_written_);
449       TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
450     } else {
451       LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p connection was closed=%d.", this,
452                 state_->txn_, state_->output_vio_, connection_closed);
453     }
454   } else {
455     LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p connection was closed=%d.", this, state_->txn_,
456               state_->output_vio_, connection_closed);
457   }
458 
459   return state_->bytes_written_;
460 }
461