1 /**
2 Licensed to the Apache Software Foundation (ASF) under one
3 or more contributor license agreements. See the NOTICE file
4 distributed with this work for additional information
5 regarding copyright ownership. The ASF licenses this file
6 to you under the Apache License, Version 2.0 (the
7 "License"); you may not use this file except in compliance
8 with the License. You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 */
18
19 /**
20 * @file TransformationPlugin.cc
21 */
22
23 #include "tscpp/api/TransformationPlugin.h"
24
25 #include "ts/ts.h"
26 #include <cstddef>
27 #include <cinttypes>
28 #include "utils_internal.h"
29 #include "logging_internal.h"
30 #include "tscpp/api/noncopyable.h"
31 #include "tscpp/api/Continuation.h"
32
33 #ifndef INT64_MAX
34 #define INT64_MAX (9223372036854775807LL)
35 #endif
36
37 namespace atscppapi
38 {
39 namespace detail
40 {
41 class ResumeAfterPauseCont : public Continuation
42 {
43 public:
ResumeAfterPauseCont()44 ResumeAfterPauseCont() : Continuation() {}
45
ResumeAfterPauseCont(Continuation::Mutex m)46 ResumeAfterPauseCont(Continuation::Mutex m) : Continuation(m) {}
47
48 protected:
49 int _run(TSEvent event, void *edata) override;
50 };
51
52 } // end namespace detail
53
54 /**
55 * @private
56 */
57 struct TransformationPluginState : noncopyable, public detail::ResumeAfterPauseCont {
58 TSVConn vconn_;
59 Transaction &transaction_;
60 TransformationPlugin &transformation_plugin_;
61 TransformationPlugin::Type type_;
62 TSVIO output_vio_; // this gets initialized on an output().
63 TSHttpTxn txn_;
64 TSIOBuffer output_buffer_;
65 TSIOBufferReader output_buffer_reader_;
66 int64_t bytes_written_;
67 bool paused_;
68
69 // We can only send a single WRITE_COMPLETE even though
70 // we may receive an immediate event after we've sent a
71 // write complete, so we'll keep track of whether or not we've
72 // sent the input end our write complete.
73 bool input_complete_dispatched_;
74
75 std::string request_xform_output_; // in case of request xform, data produced is buffered here
76
TransformationPluginStateatscppapi::TransformationPluginState77 TransformationPluginState(atscppapi::Transaction &transaction, TransformationPlugin &transformation_plugin,
78 TransformationPlugin::Type type, TSHttpTxn txn)
79 : vconn_(nullptr),
80 transaction_(transaction),
81 transformation_plugin_(transformation_plugin),
82 type_(type),
83 output_vio_(nullptr),
84 txn_(txn),
85 output_buffer_(nullptr),
86 output_buffer_reader_(nullptr),
87 bytes_written_(0),
88 paused_(false),
89 input_complete_dispatched_(false)
90 {
91 output_buffer_ = TSIOBufferCreate();
92 output_buffer_reader_ = TSIOBufferReaderAlloc(output_buffer_);
93 };
94
~TransformationPluginStateatscppapi::TransformationPluginState95 ~TransformationPluginState() override
96 {
97 if (output_buffer_reader_) {
98 TSIOBufferReaderFree(output_buffer_reader_);
99 output_buffer_reader_ = nullptr;
100 }
101
102 if (output_buffer_) {
103 TSIOBufferDestroy(output_buffer_);
104 output_buffer_ = nullptr;
105 }
106 }
107 };
108
109 } // end namespace atscppapi
110
111 using namespace atscppapi;
112
113 namespace
114 {
115 using ResumeAfterPauseCont = atscppapi::detail::ResumeAfterPauseCont;
116
117 void
cleanupTransformation(TSCont contp)118 cleanupTransformation(TSCont contp)
119 {
120 LOG_DEBUG("Destroying transformation contp=%p", contp);
121 TSContDataSet(contp, reinterpret_cast<void *>(0xDEADDEAD));
122 TSContDestroy(contp);
123 }
124
125 int
handleTransformationPluginRead(TSCont contp,TransformationPluginState * state)126 handleTransformationPluginRead(TSCont contp, TransformationPluginState *state)
127 {
128 // Traffic Server naming is quite confusing, in this context the write_vio
129 // is actually the vio we read from.
130 TSVIO write_vio = TSVConnWriteVIOGet(contp);
131 if (write_vio) {
132 if (state->paused_) {
133 LOG_DEBUG("Transformation contp=%p write_vio=%p, is paused", contp, write_vio);
134 return 0;
135 }
136
137 int64_t to_read = TSVIONTodoGet(write_vio);
138 LOG_DEBUG("Transformation contp=%p write_vio=%p, to_read=%" PRId64, contp, write_vio, to_read);
139
140 if (to_read > 0) {
141 /*
142 * The amount of data left to read needs to be truncated by
143 * the amount of data actually in the read buffer.
144 **/
145 int64_t avail = TSIOBufferReaderAvail(TSVIOReaderGet(write_vio));
146 LOG_DEBUG("Transformation contp=%p write_vio=%p, to_read=%" PRId64 ", buffer reader avail=%" PRId64, contp, write_vio,
147 to_read, avail);
148
149 if (to_read > avail) {
150 to_read = avail;
151 LOG_DEBUG("Transformation contp=%p write_vio=%p, to read > avail, fixing to_read to be equal to avail. to_read=%" PRId64
152 ", buffer reader avail=%" PRId64,
153 contp, write_vio, to_read, avail);
154 }
155
156 if (to_read > 0) {
157 /* Create a buffer and a buffer reader */
158 TSIOBuffer input_buffer = TSIOBufferCreate();
159 TSIOBufferReader input_reader = TSIOBufferReaderAlloc(input_buffer);
160
161 /* Copy the data from the read buffer to the input buffer. */
162 TSIOBufferCopy(input_buffer, TSVIOReaderGet(write_vio), to_read, 0);
163
164 /* Tell the read buffer that we have read the data and are no
165 longer interested in it. */
166 TSIOBufferReaderConsume(TSVIOReaderGet(write_vio), to_read);
167
168 /* Modify the read VIO to reflect how much data we've completed. */
169 TSVIONDoneSet(write_vio, TSVIONDoneGet(write_vio) + to_read);
170
171 std::string in_data = utils::internal::consumeFromTSIOBufferReader(input_reader);
172 LOG_DEBUG("Transformation contp=%p write_vio=%p consumed %ld bytes from bufferreader", contp, write_vio, in_data.length());
173
174 /* Clean up the buffer and reader */
175 TSIOBufferReaderFree(input_reader);
176 TSIOBufferDestroy(input_buffer);
177
178 /* Now call the client to tell them about data */
179 if (in_data.length() > 0) {
180 state->transformation_plugin_.consume(in_data);
181 }
182 }
183
184 /* now that we've finished reading we will check if there is anything left to read. */
185 TSCont vio_cont = TSVIOContGet(write_vio); // for some reason this can occasionally be null
186
187 if (TSVIONTodoGet(write_vio) > 0) {
188 LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p still has bytes left to process, todo > 0.", contp, write_vio,
189 vio_cont);
190
191 if (to_read > 0) {
192 TSVIOReenable(write_vio);
193
194 /* Call back the read VIO continuation to let it know that we are ready for more data. */
195 if (vio_cont) {
196 TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_READY), write_vio);
197 }
198 }
199 } else {
200 LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p has no bytes left to process, will send WRITE_COMPLETE.",
201 contp, write_vio, vio_cont);
202
203 /* Call back the write VIO continuation to let it know that we have completed the write operation. */
204 if (!state->input_complete_dispatched_) {
205 state->transformation_plugin_.handleInputComplete();
206 state->input_complete_dispatched_ = true;
207 if (vio_cont && nullptr != TSVIOBufferGet(write_vio)) {
208 TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_COMPLETE), write_vio);
209 }
210 }
211 }
212 } else {
213 TSCont vio_cont = TSVIOContGet(write_vio); // for some reason this can occasionally be null?
214 LOG_DEBUG("Transformation contp=%p write_vio=%p, vio_cont=%p has no bytes left to process.", contp, write_vio, vio_cont);
215
216 /* Call back the write VIO continuation to let it know that we have completed the write operation. */
217 if (!state->input_complete_dispatched_) {
218 state->transformation_plugin_.handleInputComplete();
219 state->input_complete_dispatched_ = true;
220 if (vio_cont && nullptr != TSVIOBufferGet(write_vio)) {
221 TSContCall(vio_cont, static_cast<TSEvent>(TS_EVENT_VCONN_WRITE_COMPLETE), write_vio);
222 }
223 }
224 }
225 } else {
226 LOG_ERROR("Transformation contp=%p write_vio=%p was nullptr!", contp, write_vio);
227 }
228 return 0;
229 }
230
231 int
handleTransformationPluginEvents(TSCont contp,TSEvent event,void * edata)232 handleTransformationPluginEvents(TSCont contp, TSEvent event, void *edata)
233 {
234 TransformationPluginState *state = static_cast<TransformationPluginState *>(TSContDataGet(contp));
235 LOG_DEBUG("Transformation contp=%p event=%d edata=%p tshttptxn=%p", contp, event, edata, state->txn_);
236
237 // The first thing you always do is check if the VConn is closed.
238 int connection_closed = TSVConnClosedGet(state->vconn_);
239 if (connection_closed) {
240 LOG_DEBUG("Transformation contp=%p tshttptxn=%p is closed connection_closed=%d ", contp, state->txn_, connection_closed);
241 // we'll do the cleanupTransformation in the TransformationPlugin destructor.
242 return 0;
243 }
244
245 if (event == TS_EVENT_VCONN_WRITE_COMPLETE) {
246 TSVConn output_vconn = TSTransformOutputVConnGet(state->vconn_);
247 LOG_DEBUG("Transformation contp=%p tshttptxn=%p received WRITE_COMPLETE, shutting down outputvconn=%p ", contp, state->txn_,
248 output_vconn);
249 TSVConnShutdown(output_vconn, 0, 1); // The other end is done reading our output
250 return 0;
251 } else if (event == TS_EVENT_ERROR) {
252 TSVIO write_vio;
253 /* Get the write VIO for the write operation that was
254 performed on ourself. This VIO contains the continuation of
255 our parent transformation. */
256 write_vio = TSVConnWriteVIOGet(state->vconn_);
257 TSCont vio_cont = TSVIOContGet(write_vio);
258 LOG_ERROR("Transformation contp=%p tshttptxn=%p received EVENT_ERROR forwarding to write_vio=%p viocont=%p", contp, state->txn_,
259 write_vio, vio_cont);
260 if (vio_cont) {
261 TSContCall(vio_cont, TS_EVENT_ERROR, write_vio);
262 }
263 return 0;
264 }
265
266 // All other events including WRITE_READY will just attempt to transform more data.
267 return handleTransformationPluginRead(state->vconn_, state);
268 }
269
270 } /* anonymous namespace */
271
TransformationPlugin(Transaction & transaction,TransformationPlugin::Type type)272 TransformationPlugin::TransformationPlugin(Transaction &transaction, TransformationPlugin::Type type)
273 : TransactionPlugin(transaction), state_(nullptr)
274 {
275 state_ = new TransformationPluginState(transaction, *this, type, static_cast<TSHttpTxn>(transaction.getAtsHandle()));
276 state_->vconn_ = TSTransformCreate(handleTransformationPluginEvents, state_->txn_);
277 TSContDataSet(state_->vconn_, static_cast<void *>(state_)); // edata in a TransformationHandler is NOT a TSHttpTxn.
278 LOG_DEBUG("Creating TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p transformation_type=%d", this, state_->vconn_,
279 state_->txn_, type);
280 TSHttpTxnHookAdd(state_->txn_, utils::internal::convertInternalTransformationTypeToTsHook(type), state_->vconn_);
281 }
282
~TransformationPlugin()283 TransformationPlugin::~TransformationPlugin()
284 {
285 LOG_DEBUG("Destroying TransformationPlugin=%p", this);
286 cleanupTransformation(state_->vconn_);
287 delete state_;
288 }
289
290 void
pause()291 TransformationPlugin::pause()
292 {
293 if (state_->paused_) {
294 LOG_ERROR("Can not pause transformation, already paused TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p", this,
295 state_->vconn_, state_->txn_);
296 } else if (state_->input_complete_dispatched_) {
297 LOG_ERROR("Can not pause transformation (transformation completed) TransformationPlugin=%p (vconn)contp=%p tshttptxn=%p", this,
298 state_->vconn_, state_->txn_);
299 } else {
300 state_->paused_ = true;
301 if (!static_cast<bool>(static_cast<ResumeAfterPauseCont *>(state_))) {
302 *static_cast<ResumeAfterPauseCont *>(state_) = ResumeAfterPauseCont(TSContMutexGet(reinterpret_cast<TSCont>(state_->txn_)));
303 }
304 }
305 }
306
307 bool
isPaused() const308 TransformationPlugin::isPaused() const
309 {
310 return state_->paused_;
311 }
312
313 Continuation &
resumeCont()314 TransformationPlugin::resumeCont()
315 {
316 TSReleaseAssert(state_->paused_);
317
318 // The cast to a pointer to the intermediate base class ResumeAfterPauseCont is not strictly necessary. It is
319 // possible that the transform plugin might want to defer work to other continuations in the future. This would
320 // naturally result in TransactionPluginState having Continuation as an indirect base class multiple times, making
321 // disambiguation necessary when converting.
322 //
323 return *static_cast<ResumeAfterPauseCont *>(state_);
324 }
325
326 int
_run(TSEvent event,void * edata)327 ResumeAfterPauseCont::_run(TSEvent event, void *edata)
328 {
329 auto state = static_cast<TransformationPluginState *>(this);
330 state->paused_ = false;
331 handleTransformationPluginRead(state->vconn_, state);
332
333 return TS_SUCCESS;
334 }
335
336 size_t
doProduce(std::string_view data)337 TransformationPlugin::doProduce(std::string_view data)
338 {
339 LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p producing output with length=%ld", this, state_->txn_, data.length());
340 int64_t write_length = static_cast<int64_t>(data.length());
341 if (!write_length) {
342 return 0;
343 }
344
345 if (!state_->output_vio_) {
346 TSVConn output_vconn = TSTransformOutputVConnGet(state_->vconn_);
347 LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p will issue a TSVConnWrite, output_vconn=%p.", this, state_->txn_, output_vconn);
348 if (output_vconn) {
349 // If you're confused about the following reference the traffic server transformation docs.
350 // You always write INT64_MAX, this basically says you're not sure how much data you're going to write
351 state_->output_vio_ = TSVConnWrite(output_vconn, state_->vconn_, state_->output_buffer_reader_, INT64_MAX);
352 } else {
353 LOG_ERROR("TransformationPlugin=%p tshttptxn=%p output_vconn=%p cannot issue TSVConnWrite due to null output vconn.", this,
354 state_->txn_, output_vconn);
355 return 0;
356 }
357
358 if (!state_->output_vio_) {
359 LOG_ERROR("TransformationPlugin=%p tshttptxn=%p state_->output_vio=%p, TSVConnWrite failed.", this, state_->txn_,
360 state_->output_vio_);
361 return 0;
362 }
363 }
364
365 // Finally we can copy this data into the output_buffer
366 int64_t bytes_written = TSIOBufferWrite(state_->output_buffer_, data.data(), write_length);
367 state_->bytes_written_ += bytes_written; // So we can set BytesDone on outputComplete().
368 LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p write to TSIOBuffer %" PRId64 " bytes total bytes written %" PRId64, this,
369 state_->txn_, bytes_written, state_->bytes_written_);
370
371 // Sanity Checks
372 if (bytes_written != write_length) {
373 LOG_ERROR("TransformationPlugin=%p tshttptxn=%p bytes written < expected. bytes_written=%" PRId64 " write_length=%" PRId64,
374 this, state_->txn_, bytes_written, write_length);
375 }
376
377 int connection_closed = TSVConnClosedGet(state_->vconn_);
378 LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p vconn=%p connection_closed=%d", this, state_->txn_, state_->vconn_,
379 connection_closed);
380
381 if (!connection_closed) {
382 TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
383 } else {
384 LOG_ERROR(
385 "TransformationPlugin=%p tshttptxn=%p output_vio=%p connection_closed=%d : Couldn't reenable output vio (connection closed).",
386 this, state_->txn_, state_->output_vio_, connection_closed);
387 }
388
389 return static_cast<size_t>(bytes_written);
390 }
391
392 size_t
produce(std::string_view data)393 TransformationPlugin::produce(std::string_view data)
394 {
395 if (state_->type_ == REQUEST_TRANSFORMATION) {
396 state_->request_xform_output_.append(data.data(), data.length());
397 return data.size();
398 } else if (state_->type_ == SINK_TRANSFORMATION) {
399 LOG_DEBUG("produce TransformationPlugin=%p tshttptxn=%p : This is a sink transform. Not producing any output", this,
400 state_->txn_);
401 return 0;
402 } else {
403 return doProduce(data);
404 }
405 }
406
407 size_t
setOutputComplete()408 TransformationPlugin::setOutputComplete()
409 {
410 if (state_->type_ == SINK_TRANSFORMATION) {
411 // There's no output stream for a sink transform, so we do nothing
412 //
413 // Warning: don't try to shutdown the VConn, since the default implementation (DummyVConnection)
414 // has a stubbed out shutdown/close implementation
415 return 0;
416 } else if (state_->type_ == REQUEST_TRANSFORMATION) {
417 doProduce(state_->request_xform_output_);
418 }
419
420 int connection_closed = TSVConnClosedGet(state_->vconn_);
421 LOG_DEBUG("OutputComplete TransformationPlugin=%p tshttptxn=%p vconn=%p connection_closed=%d, total bytes written=%" PRId64, this,
422 state_->txn_, state_->vconn_, connection_closed, state_->bytes_written_);
423
424 if (!connection_closed && !state_->output_vio_) {
425 LOG_DEBUG("TransformationPlugin=%p tshttptxn=%p output complete without writing any data, initiating write of 0 bytes.", this,
426 state_->txn_);
427
428 // We're done without ever outputting anything, to correctly
429 // clean up we'll initiate a write then immediately set it to 0 bytes done.
430 state_->output_vio_ = TSVConnWrite(TSTransformOutputVConnGet(state_->vconn_), state_->vconn_, state_->output_buffer_reader_, 0);
431
432 if (state_->output_vio_) {
433 TSVIONDoneSet(state_->output_vio_, 0);
434 TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
435 } else {
436 LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p because VConnWrite failed.", this,
437 state_->txn_, state_->output_vio_);
438 }
439
440 return 0;
441 }
442
443 if (!connection_closed) {
444 // So there is a possible race condition here, if we wake up a dead
445 // VIO it can cause a segfault, so we must check that the VCONN is not dead.
446 int connection_closed = TSVConnClosedGet(state_->vconn_);
447 if (!connection_closed) {
448 TSVIONBytesSet(state_->output_vio_, state_->bytes_written_);
449 TSVIOReenable(state_->output_vio_); // Wake up the downstream vio
450 } else {
451 LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p connection was closed=%d.", this,
452 state_->txn_, state_->output_vio_, connection_closed);
453 }
454 } else {
455 LOG_ERROR("TransformationPlugin=%p tshttptxn=%p unable to reenable output_vio=%p connection was closed=%d.", this, state_->txn_,
456 state_->output_vio_, connection_closed);
457 }
458
459 return state_->bytes_written_;
460 }
461