1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /****************************************************************************
25 
26    PluginVC.cc
27 
28    Description: Allows bi-directional transfer for data from one
29       continuation to another via a mechanism that impersonates a
30       NetVC.  Should implement all external attributes of NetVConnections.
31 
32    Since data is transferred within Traffic Server, this is a two
33    headed beast.  One NetVC on initiating side (active side) and
34    one NetVC on the receiving side (passive side).
35 
36    The two NetVC subclasses, PluginVC, are part PluginVCCore object.  All
37    three objects share the same mutex.  That mutex is required
38    for doing operations that affect the shared buffers,
39    read state from the PluginVC on the other side or deal with deallocation.
40 
41    To simplify the code, all data passing through the system goes initially
42    into a shared buffer.  There are two shared buffers, one for each
43    direction of the connection.  While it's more efficient to transfer
44    the data from one buffer to another directly, this creates a lot
45    of tricky conditions since you must be holding the lock for both
46    sides, in additional this VC's lock.  Additionally, issues like
47    watermarks are very hard to deal with.  Since we try to
48    to move data by IOBufferData references the efficiency penalty shouldn't
49    be too bad and if it is a big penalty, a brave soul can reimplement
50    to move the data directly without the intermediate buffer.
51 
52    Locking is difficult issue for this multi-headed beast.  In each
53    PluginVC, there a two locks. The one we got from our PluginVCCore and
54    the lock from the state machine using the PluginVC.  The read side
55    lock & the write side lock must be the same.  The regular net processor has
56    this constraint as well.  In order to handle scheduling of retry events cleanly,
57    we have two event pointers, one for each lock.  sm_lock_retry_event can only
58    be changed while holding the using state machine's lock and
59    core_lock_retry_event can only be manipulated while holding the PluginVC's
60    lock.  On entry to PluginVC::main_handler, we obtain all the locks
61    before looking at the events.  If we can't get all the locks
62    we reschedule the event for further retries.  Since all the locks are
63    obtained in the beginning of the handler, we know we are running
64    exclusively in the later parts of the handler and we will
65    be free from do_io or reenable calls on the PluginVC.
66 
67    The assumption is made (consistent with IO Core spec) that any close,
68    shutdown, reenable, or do_io_{read,write) operation is done by the callee
69    while holding the lock for that side of the operation.
70 
71 
72  ****************************************************************************/
73 
74 #include "PluginVC.h"
75 #include "P_EventSystem.h"
76 #include "P_Net.h"
77 #include "tscore/Regression.h"
78 
79 #define PVC_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
80 #define PVC_DEFAULT_MAX_BYTES 32768
81 #define MIN_BLOCK_TRANSFER_BYTES 128
82 
83 #define PVC_TYPE ((vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")
84 
PluginVC(PluginVCCore * core_obj)85 PluginVC::PluginVC(PluginVCCore *core_obj)
86   : NetVConnection(),
87     magic(PLUGIN_VC_MAGIC_ALIVE),
88     vc_type(PLUGIN_VC_UNKNOWN),
89     core_obj(core_obj),
90     other_side(nullptr),
91     read_state(),
92     write_state(),
93     need_read_process(false),
94     need_write_process(false),
95     closed(false),
96     sm_lock_retry_event(nullptr),
97     core_lock_retry_event(nullptr),
98     deletable(false),
99     reentrancy_count(0),
100     active_timeout(0),
101     active_event(nullptr),
102     inactive_timeout(0),
103     inactive_timeout_at(0),
104     inactive_event(nullptr),
105     plugin_tag(nullptr),
106     plugin_id(0)
107 {
108   ink_assert(core_obj != nullptr);
109   SET_HANDLER(&PluginVC::main_handler);
110 }
111 
~PluginVC()112 PluginVC::~PluginVC()
113 {
114   mutex = nullptr;
115 }
116 
117 int
main_handler(int event,void * data)118 PluginVC::main_handler(int event, void *data)
119 {
120   Debug("pvc_event", "[%u] %s: Received event %d", core_obj->id, PVC_TYPE, event);
121 
122   ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE);
123   ink_release_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
124   ink_assert(!deletable);
125   ink_assert(data != nullptr);
126 
127   Event *call_event   = static_cast<Event *>(data);
128   EThread *my_ethread = mutex->thread_holding;
129   ink_release_assert(my_ethread != nullptr);
130 
131   bool read_mutex_held             = false;
132   bool write_mutex_held            = false;
133   Ptr<ProxyMutex> read_side_mutex  = read_state.vio.mutex;
134   Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex;
135 
136   if (read_side_mutex) {
137     read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread);
138 
139     if (!read_mutex_held) {
140       if (call_event != inactive_event) {
141         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
142       }
143       return 0;
144     }
145 
146     if (read_side_mutex != read_state.vio.mutex) {
147       // It's possible some swapped the mutex on us before
148       //  we were able to grab it
149       Mutex_unlock(read_side_mutex, my_ethread);
150       if (call_event != inactive_event) {
151         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
152       }
153       return 0;
154     }
155   }
156 
157   if (write_side_mutex) {
158     write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread);
159 
160     if (!write_mutex_held) {
161       if (read_mutex_held) {
162         Mutex_unlock(read_side_mutex, my_ethread);
163       }
164       if (call_event != inactive_event) {
165         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
166       }
167       return 0;
168     }
169 
170     if (write_side_mutex != write_state.vio.mutex) {
171       // It's possible some swapped the mutex on us before
172       //  we were able to grab it
173       Mutex_unlock(write_side_mutex, my_ethread);
174       if (read_mutex_held) {
175         Mutex_unlock(read_side_mutex, my_ethread);
176       }
177       if (call_event != inactive_event) {
178         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
179       }
180       return 0;
181     }
182   }
183   // We've got all the locks so there should not be any
184   //   other calls active
185   ink_release_assert(reentrancy_count == 0);
186 
187   if (closed) {
188     process_close();
189 
190     if (read_mutex_held) {
191       Mutex_unlock(read_side_mutex, my_ethread);
192     }
193 
194     if (write_mutex_held) {
195       Mutex_unlock(write_side_mutex, my_ethread);
196     }
197 
198     return 0;
199   }
200   // We can get closed while we're calling back the
201   //  continuation.  Set the reentrancy count so we know
202   //  we could be calling the continuation and that we
203   //  need to defer close processing
204   reentrancy_count++;
205 
206   if (call_event == active_event) {
207     process_timeout(&active_event, VC_EVENT_ACTIVE_TIMEOUT);
208   } else if (call_event == inactive_event) {
209     if (inactive_timeout_at && inactive_timeout_at < Thread::get_hrtime()) {
210       process_timeout(&inactive_event, VC_EVENT_INACTIVITY_TIMEOUT);
211     }
212   } else {
213     if (call_event == sm_lock_retry_event) {
214       sm_lock_retry_event = nullptr;
215     } else {
216       ink_release_assert(call_event == core_lock_retry_event);
217       core_lock_retry_event = nullptr;
218     }
219 
220     if (need_read_process) {
221       process_read_side(false);
222     }
223 
224     if (need_write_process && !closed) {
225       process_write_side(false);
226     }
227   }
228 
229   reentrancy_count--;
230   if (closed) {
231     process_close();
232   }
233 
234   if (read_mutex_held) {
235     Mutex_unlock(read_side_mutex, my_ethread);
236   }
237 
238   if (write_mutex_held) {
239     Mutex_unlock(write_side_mutex, my_ethread);
240   }
241 
242   return 0;
243 }
244 
245 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)246 PluginVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
247 {
248   ink_assert(!closed);
249   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
250 
251   if (buf) {
252     read_state.vio.buffer.writer_for(buf);
253   } else {
254     read_state.vio.buffer.clear();
255   }
256 
257   // Note: we set vio.op last because process_read_side looks at it to
258   //  tell if the VConnection is active.
259   read_state.vio.mutex     = c ? c->mutex : this->mutex;
260   read_state.vio.cont      = c;
261   read_state.vio.nbytes    = nbytes;
262   read_state.vio.ndone     = 0;
263   read_state.vio.vc_server = (VConnection *)this;
264   read_state.vio.op        = VIO::READ;
265 
266   Debug("pvc", "[%u] %s: do_io_read for %" PRId64 " bytes", core_obj->id, PVC_TYPE, nbytes);
267 
268   // Since reentrant callbacks are not allowed on from do_io
269   //   functions schedule ourselves get on a different stack
270   need_read_process = true;
271   setup_event_cb(0, &sm_lock_retry_event);
272 
273   return &read_state.vio;
274 }
275 
276 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuffer,bool owner)277 PluginVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
278 {
279   ink_assert(!closed);
280   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
281 
282   if (abuffer) {
283     ink_assert(!owner);
284     write_state.vio.buffer.reader_for(abuffer);
285   } else {
286     write_state.vio.buffer.clear();
287   }
288 
289   // Note: we set vio.op last because process_write_side looks at it to
290   //  tell if the VConnection is active.
291   write_state.vio.mutex     = c ? c->mutex : this->mutex;
292   write_state.vio.cont      = c;
293   write_state.vio.nbytes    = nbytes;
294   write_state.vio.ndone     = 0;
295   write_state.vio.vc_server = (VConnection *)this;
296   write_state.vio.op        = VIO::WRITE;
297 
298   Debug("pvc", "[%u] %s: do_io_write for %" PRId64 " bytes", core_obj->id, PVC_TYPE, nbytes);
299 
300   // Since reentrant callbacks are not allowed on from do_io
301   //   functions schedule ourselves get on a different stack
302   need_write_process = true;
303   setup_event_cb(0, &sm_lock_retry_event);
304 
305   return &write_state.vio;
306 }
307 
308 void
reenable(VIO * vio)309 PluginVC::reenable(VIO *vio)
310 {
311   ink_assert(!closed);
312   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
313   ink_assert(vio->mutex->thread_holding == this_ethread());
314 
315   Ptr<ProxyMutex> sm_mutex = vio->mutex;
316   SCOPED_MUTEX_LOCK(lock, sm_mutex, this_ethread());
317 
318   Debug("pvc", "[%u] %s: reenable %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
319 
320   if (vio->op == VIO::WRITE) {
321     ink_assert(vio == &write_state.vio);
322     need_write_process = true;
323   } else if (vio->op == VIO::READ) {
324     need_read_process = true;
325   } else {
326     ink_release_assert(0);
327   }
328   setup_event_cb(0, &sm_lock_retry_event);
329 }
330 
331 void
reenable_re(VIO * vio)332 PluginVC::reenable_re(VIO *vio)
333 {
334   ink_assert(!closed);
335   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
336   ink_assert(vio->mutex->thread_holding == this_ethread());
337 
338   Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
339 
340   SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
341 
342   ++reentrancy_count;
343 
344   if (vio->op == VIO::WRITE) {
345     ink_assert(vio == &write_state.vio);
346     need_write_process = true;
347     process_write_side(false);
348   } else if (vio->op == VIO::READ) {
349     ink_assert(vio == &read_state.vio);
350     need_read_process = true;
351     process_read_side(false);
352   } else {
353     ink_release_assert(0);
354   }
355 
356   --reentrancy_count;
357 
358   // To process the close, we need the lock
359   //   for the PluginVC.  Schedule an event
360   //   to make sure we get it
361   if (closed) {
362     setup_event_cb(0, &sm_lock_retry_event);
363   }
364 }
365 
366 void
do_io_close(int)367 PluginVC::do_io_close(int /* flag ATS_UNUSED */)
368 {
369   ink_assert(!closed);
370   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
371 
372   Debug("pvc", "[%u] %s: do_io_close", core_obj->id, PVC_TYPE);
373 
374   SCOPED_MUTEX_LOCK(lock, mutex, this_ethread());
375   if (!closed) { // if already closed, need to do nothing.
376     closed = true;
377 
378     // If re-entered then that earlier handler will clean up, otherwise set up a ping
379     // to drive that process (too dangerous to do it here).
380     if (reentrancy_count <= 0) {
381       setup_event_cb(0, &sm_lock_retry_event);
382     }
383   }
384 }
385 
386 void
do_io_shutdown(ShutdownHowTo_t howto)387 PluginVC::do_io_shutdown(ShutdownHowTo_t howto)
388 {
389   ink_assert(!closed);
390   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
391 
392   switch (howto) {
393   case IO_SHUTDOWN_READ:
394     read_state.shutdown = true;
395     break;
396   case IO_SHUTDOWN_WRITE:
397     write_state.shutdown = true;
398     break;
399   case IO_SHUTDOWN_READWRITE:
400     read_state.shutdown  = true;
401     write_state.shutdown = true;
402     break;
403   }
404 }
405 
406 // int PluginVC::transfer_bytes(MIOBuffer* transfer_to,
407 //                              IOBufferReader* transfer_from, int act_on)
408 //
409 //   Takes care of transferring bytes from a reader to another buffer
410 //      In the case of large transfers, we move blocks.  In the case
411 //      of small transfers we copy data so as to not build too many
412 //      buffer blocks
413 //
414 // Args:
415 //   transfer_to:  buffer to copy to
416 //   transfer_from:  buffer_copy_from
417 //   act_on: is the max number of bytes we are to copy.  There must
418 //          be at least act_on bytes available from transfer_from
419 //
420 // Returns number of bytes transferred
421 //
422 int64_t
transfer_bytes(MIOBuffer * transfer_to,IOBufferReader * transfer_from,int64_t act_on)423 PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from, int64_t act_on)
424 {
425   int64_t total_added = 0;
426 
427   ink_assert(act_on <= transfer_from->read_avail());
428 
429   while (act_on > 0) {
430     int64_t block_read_avail = transfer_from->block_read_avail();
431     int64_t to_move          = std::min(act_on, block_read_avail);
432     int64_t moved            = 0;
433 
434     if (to_move <= 0) {
435       break;
436     }
437 
438     if (to_move >= MIN_BLOCK_TRANSFER_BYTES) {
439       moved = transfer_to->write(transfer_from, to_move, 0);
440     } else {
441       // We have a really small amount of data.  To make
442       //  sure we don't get a huge build up of blocks which
443       //  can lead to stack overflows if the buffer is destroyed
444       //  before we read from it, we need copy over to the new
445       //  buffer instead of doing a block transfer
446       moved = transfer_to->write(transfer_from->start(), to_move);
447 
448       if (moved == 0) {
449         // We are out of buffer space
450         break;
451       }
452     }
453 
454     act_on -= moved;
455     transfer_from->consume(moved);
456     total_added += moved;
457   }
458 
459   return total_added;
460 }
461 
462 // void PluginVC::process_write_side(bool cb_ok)
463 //
464 //   This function may only be called while holding
465 //      this->mutex & while it is ok to callback the
466 //      write side continuation
467 //
468 //   Does write side processing
469 //
470 void
process_write_side(bool other_side_call)471 PluginVC::process_write_side(bool other_side_call)
472 {
473   ink_assert(!deletable);
474   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
475 
476   MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
477 
478   Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
479   need_write_process = false;
480 
481   // Check write_state
482   if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
483     return;
484   }
485 
486   // Check the state of our write buffer as well as ntodo
487   int64_t ntodo = write_state.vio.ntodo();
488   if (ntodo == 0) {
489     return;
490   }
491 
492   IOBufferReader *reader = write_state.vio.get_reader();
493   int64_t bytes_avail    = reader->read_avail();
494   int64_t act_on         = std::min(bytes_avail, ntodo);
495 
496   Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
497 
498   if (other_side->closed || other_side->read_state.shutdown) {
499     write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
500     return;
501   }
502 
503   if (act_on <= 0) {
504     if (ntodo > 0) {
505       // Notify the continuation that we are "disabling"
506       //  ourselves due to to nothing to write
507       write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
508     }
509     return;
510   }
511   // Bytes available, try to transfer to the PluginVCCore
512   //   intermediate buffer
513   //
514   int64_t buf_space = PVC_DEFAULT_MAX_BYTES - core_buffer->max_read_avail();
515   if (buf_space <= 0) {
516     Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
517     return;
518   }
519   act_on = std::min(act_on, buf_space);
520 
521   int64_t added = transfer_bytes(core_buffer, reader, act_on);
522   if (added < 0) {
523     // Couldn't actually get the buffer space.  This only
524     //   happens on small transfers with the above
525     //   PVC_DEFAULT_MAX_BYTES factor doesn't apply
526     Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
527     return;
528   }
529 
530   write_state.vio.ndone += added;
531 
532   Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
533 
534   if (write_state.vio.ntodo() == 0) {
535     write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
536   } else {
537     write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
538   }
539 
540   update_inactive_time();
541 
542   // Wake up the read side on the other side to process these bytes
543   if (!other_side->closed) {
544     if (!other_side_call) {
545       /* To clear the `need_read_process`, the mutexes must be obtained:
546        *
547        *   - PluginVC::mutex
548        *   - PluginVC::read_state.vio.mutex
549        *
550        */
551       if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
552         // Just return, no touch on `other_side->need_read_process`.
553         return;
554       }
555       // Acquire the lock of the read side continuation
556       EThread *my_ethread = mutex->thread_holding;
557       ink_assert(my_ethread != nullptr);
558       MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
559       if (!lock.is_locked()) {
560         Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
561               ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
562 
563         // set need_read_process to enforce the read processing
564         other_side->need_read_process = true;
565         other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
566         return;
567       }
568 
569       other_side->process_read_side(true);
570     } else {
571       other_side->read_state.vio.reenable();
572     }
573   }
574 }
575 
576 // void PluginVC::process_read_side()
577 //
578 //   This function may only be called while holding
579 //      this->mutex & while it is ok to callback the
580 //      read side continuation
581 //
582 //   Does read side processing
583 //
584 void
process_read_side(bool other_side_call)585 PluginVC::process_read_side(bool other_side_call)
586 {
587   ink_assert(!deletable);
588   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
589 
590   // TODO: Never used??
591   // MIOBuffer *core_buffer;
592 
593   IOBufferReader *core_reader;
594 
595   if (vc_type == PLUGIN_VC_ACTIVE) {
596     // core_buffer = core_obj->p_to_a_buffer;
597     core_reader = core_obj->p_to_a_reader;
598   } else {
599     ink_assert(vc_type == PLUGIN_VC_PASSIVE);
600     // core_buffer = core_obj->a_to_p_buffer;
601     core_reader = core_obj->a_to_p_reader;
602   }
603 
604   Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
605   need_read_process = false;
606 
607   // Check read_state
608   if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
609     return;
610   }
611 
612   // Check the state of our read buffer as well as ntodo
613   int64_t ntodo = read_state.vio.ntodo();
614   if (ntodo == 0) {
615     return;
616   }
617 
618   int64_t bytes_avail = core_reader->read_avail();
619   int64_t act_on      = std::min(bytes_avail, ntodo);
620 
621   Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
622 
623   if (act_on <= 0) {
624     if (other_side->closed || other_side->write_state.shutdown) {
625       read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
626     }
627     return;
628   }
629   // Bytes available, try to transfer from the PluginVCCore
630   //   intermediate buffer
631   //
632   MIOBuffer *output_buffer = read_state.vio.get_writer();
633 
634   int64_t water_mark = output_buffer->water_mark;
635   water_mark         = std::max(water_mark, static_cast<int64_t>(PVC_DEFAULT_MAX_BYTES));
636   int64_t buf_space  = water_mark - output_buffer->max_read_avail();
637   if (buf_space <= 0) {
638     Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
639     return;
640   }
641   act_on = std::min(act_on, buf_space);
642 
643   int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
644   if (added <= 0) {
645     // Couldn't actually get the buffer space.  This only
646     //   happens on small transfers with the above
647     //   PVC_DEFAULT_MAX_BYTES factor doesn't apply
648     Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
649     return;
650   }
651 
652   read_state.vio.ndone += added;
653 
654   Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
655 
656   if (read_state.vio.ntodo() == 0) {
657     read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
658   } else {
659     read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
660   }
661 
662   update_inactive_time();
663 
664   // Wake up the other side so it knows there is space available in
665   //  intermediate buffer
666   if (!other_side->closed) {
667     if (!other_side_call) {
668       /* To clear the `need_write_process`, the mutexes must be obtained:
669        *
670        *   - PluginVC::mutex
671        *   - PluginVC::write_state.vio.mutex
672        *
673        */
674       if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) {
675         // Just return, no touch on `other_side->need_write_process`.
676         return;
677       }
678       // Acquire the lock of the write side continuation
679       EThread *my_ethread = mutex->thread_holding;
680       ink_assert(my_ethread != nullptr);
681       MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
682       if (!lock.is_locked()) {
683         Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
684               ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
685 
686         // set need_write_process to enforce the write processing
687         other_side->need_write_process = true;
688         other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
689         return;
690       }
691 
692       other_side->process_write_side(true);
693     } else {
694       other_side->write_state.vio.reenable();
695     }
696   }
697 }
698 
699 // void PluginVC::process_read_close()
700 //
701 //   This function may only be called while holding
702 //      this->mutex
703 //
704 //   Tries to close the and dealloc the the vc
705 //
706 void
process_close()707 PluginVC::process_close()
708 {
709   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
710 
711   Debug("pvc", "[%u] %s: process_close", core_obj->id, PVC_TYPE);
712 
713   if (!deletable) {
714     deletable = true;
715   }
716 
717   if (sm_lock_retry_event) {
718     sm_lock_retry_event->cancel();
719     sm_lock_retry_event = nullptr;
720   }
721 
722   if (core_lock_retry_event) {
723     core_lock_retry_event->cancel();
724     core_lock_retry_event = nullptr;
725   }
726 
727   if (active_event) {
728     active_event->cancel();
729     active_event = nullptr;
730   }
731 
732   if (inactive_event) {
733     inactive_event->cancel();
734     inactive_event      = nullptr;
735     inactive_timeout_at = 0;
736   }
737   // If the other side of the PluginVC is not closed
738   //  we need to force it process both living sides
739   //  of the connection in order that it recognizes
740   //  the close
741   if (!other_side->closed && core_obj->connected) {
742     other_side->need_write_process = true;
743     other_side->need_read_process  = true;
744     other_side->setup_event_cb(0, &other_side->core_lock_retry_event);
745   }
746 
747   core_obj->attempt_delete();
748 }
749 
750 // void PluginVC::process_timeout(Event** e, int event_to_send, Event** our_eptr)
751 //
752 //   Handles sending timeout event to the VConnection.  e is the event we got
753 //     which indicates the timeout.  event_to_send is the event to the
754 //     vc user.  e is a pointer to either inactive_event,
755 //     or active_event.  If we successfully send the timeout to vc user,
756 //     we clear the pointer, otherwise we reschedule it.
757 //
758 //   Because the possibility of reentrant close from vc user, we don't want to
759 //      touch any state after making the call back
760 //
761 void
process_timeout(Event ** e,int event_to_send)762 PluginVC::process_timeout(Event **e, int event_to_send)
763 {
764   ink_assert(*e == inactive_event || *e == active_event);
765 
766   if (closed) {
767     // already closed, ignore the timeout event
768     // to avoid handle_event asserting use-after-free
769     clear_event(e);
770     return;
771   }
772 
773   if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) {
774     MUTEX_TRY_LOCK(lock, read_state.vio.mutex, (*e)->ethread);
775     if (!lock.is_locked()) {
776       if (*e == active_event) {
777         // Only reschedule active_event due to inactive_event is perorid event.
778         (*e)->schedule_in(PVC_LOCK_RETRY_TIME);
779       }
780       return;
781     }
782     clear_event(e);
783     read_state.vio.cont->handleEvent(event_to_send, &read_state.vio);
784   } else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) {
785     MUTEX_TRY_LOCK(lock, write_state.vio.mutex, (*e)->ethread);
786     if (!lock.is_locked()) {
787       if (*e == active_event) {
788         // Only reschedule active_event due to inactive_event is perorid event.
789         (*e)->schedule_in(PVC_LOCK_RETRY_TIME);
790       }
791       return;
792     }
793     clear_event(e);
794     write_state.vio.cont->handleEvent(event_to_send, &write_state.vio);
795   } else {
796     clear_event(e);
797   }
798 }
799 
800 void
clear_event(Event ** e)801 PluginVC::clear_event(Event **e)
802 {
803   if (e == nullptr || *e == nullptr) {
804     return;
805   }
806   if (*e == inactive_event) {
807     inactive_event->cancel();
808     inactive_timeout_at = 0;
809   }
810   *e = nullptr;
811 }
812 
813 void
update_inactive_time()814 PluginVC::update_inactive_time()
815 {
816   if (inactive_event && inactive_timeout) {
817     // inactive_event->cancel();
818     // inactive_event = eventProcessor.schedule_in(this, inactive_timeout);
819     inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
820   }
821 }
822 
823 // void PluginVC::setup_event_cb(ink_hrtime in)
824 //
825 //    Setup up the event processor to call us back.
826 //      We've got two different event pointers to handle
827 //      locking issues
828 //
829 void
setup_event_cb(ink_hrtime in,Event ** e_ptr)830 PluginVC::setup_event_cb(ink_hrtime in, Event **e_ptr)
831 {
832   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
833 
834   if (*e_ptr == nullptr) {
835     // We locked the pointer so we can now allocate an event
836     //   to call us back
837     if (in == 0) {
838       if (this_ethread()->tt == REGULAR) {
839         *e_ptr = this_ethread()->schedule_imm_local(this);
840       } else {
841         *e_ptr = eventProcessor.schedule_imm(this);
842       }
843     } else {
844       if (this_ethread()->tt == REGULAR) {
845         *e_ptr = this_ethread()->schedule_in_local(this, in);
846       } else {
847         *e_ptr = eventProcessor.schedule_in(this, in);
848       }
849     }
850   }
851 }
852 
853 void
set_active_timeout(ink_hrtime timeout_in)854 PluginVC::set_active_timeout(ink_hrtime timeout_in)
855 {
856   active_timeout = timeout_in;
857 
858   // FIX - Do we need to handle the case where the timeout is set
859   //   but no io has been done?
860   if (active_event) {
861     ink_assert(!active_event->cancelled);
862     active_event->cancel();
863     active_event = nullptr;
864   }
865 
866   if (active_timeout > 0) {
867     active_event = eventProcessor.schedule_in(this, active_timeout);
868   }
869 }
870 
871 void
set_inactivity_timeout(ink_hrtime timeout_in)872 PluginVC::set_inactivity_timeout(ink_hrtime timeout_in)
873 {
874   inactive_timeout = timeout_in;
875   if (inactive_timeout != 0) {
876     inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
877     if (inactive_event == nullptr) {
878       inactive_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(1));
879     }
880   } else {
881     inactive_timeout_at = 0;
882     if (inactive_event) {
883       inactive_event->cancel();
884       inactive_event = nullptr;
885     }
886   }
887 }
888 
889 void
set_default_inactivity_timeout(ink_hrtime timeout_in)890 PluginVC::set_default_inactivity_timeout(ink_hrtime timeout_in)
891 {
892   set_inactivity_timeout(timeout_in);
893 }
894 
895 bool
is_default_inactivity_timeout()896 PluginVC::is_default_inactivity_timeout()
897 {
898   return false;
899 }
900 
901 void
cancel_active_timeout()902 PluginVC::cancel_active_timeout()
903 {
904   set_active_timeout(0);
905 }
906 
907 void
cancel_inactivity_timeout()908 PluginVC::cancel_inactivity_timeout()
909 {
910   set_inactivity_timeout(0);
911 }
912 
913 ink_hrtime
get_active_timeout()914 PluginVC::get_active_timeout()
915 {
916   return active_timeout;
917 }
918 
919 ink_hrtime
get_inactivity_timeout()920 PluginVC::get_inactivity_timeout()
921 {
922   return inactive_timeout;
923 }
924 
925 void
add_to_keep_alive_queue()926 PluginVC::add_to_keep_alive_queue()
927 {
928   // do nothing
929 }
930 
931 void
remove_from_keep_alive_queue()932 PluginVC::remove_from_keep_alive_queue()
933 {
934   // do nothing
935 }
936 
937 bool
add_to_active_queue()938 PluginVC::add_to_active_queue()
939 {
940   // do nothing
941   return true;
942 }
943 
944 SOCKET
get_socket()945 PluginVC::get_socket()
946 {
947   // Return an invalid file descriptor
948   return ts::NO_FD;
949 }
950 
951 void
set_local_addr()952 PluginVC::set_local_addr()
953 {
954   if (vc_type == PLUGIN_VC_ACTIVE) {
955     ats_ip_copy(&local_addr, &core_obj->active_addr_struct);
956     //    local_addr = core_obj->active_addr_struct;
957   } else {
958     ats_ip_copy(&local_addr, &core_obj->passive_addr_struct);
959     //    local_addr = core_obj->passive_addr_struct;
960   }
961 }
962 
963 void
set_remote_addr()964 PluginVC::set_remote_addr()
965 {
966   if (vc_type == PLUGIN_VC_ACTIVE) {
967     ats_ip_copy(&remote_addr, &core_obj->passive_addr_struct);
968   } else {
969     ats_ip_copy(&remote_addr, &core_obj->active_addr_struct);
970   }
971 }
972 
973 void
set_remote_addr(const sockaddr *)974 PluginVC::set_remote_addr(const sockaddr * /* new_sa ATS_UNUSED */)
975 {
976   return;
977 }
978 
979 void
set_mptcp_state()980 PluginVC::set_mptcp_state()
981 {
982   return;
983 }
984 
985 int
set_tcp_congestion_control(int ATS_UNUSED)986 PluginVC::set_tcp_congestion_control(int ATS_UNUSED)
987 {
988   return -1;
989 }
990 
991 void
apply_options()992 PluginVC::apply_options()
993 {
994   // do nothing
995 }
996 
997 bool
get_data(int id,void * data)998 PluginVC::get_data(int id, void *data)
999 {
1000   if (data == nullptr) {
1001     return false;
1002   }
1003   switch (id) {
1004   case PLUGIN_VC_DATA_LOCAL:
1005     if (vc_type == PLUGIN_VC_ACTIVE) {
1006       *static_cast<void **>(data) = core_obj->active_data;
1007     } else {
1008       *static_cast<void **>(data) = core_obj->passive_data;
1009     }
1010     return true;
1011   case PLUGIN_VC_DATA_REMOTE:
1012     if (vc_type == PLUGIN_VC_ACTIVE) {
1013       *static_cast<void **>(data) = core_obj->passive_data;
1014     } else {
1015       *static_cast<void **>(data) = core_obj->active_data;
1016     }
1017     return true;
1018   case TS_API_DATA_CLOSED:
1019     *static_cast<int *>(data) = this->closed;
1020     return true;
1021   default:
1022     *static_cast<void **>(data) = nullptr;
1023     return false;
1024   }
1025 }
1026 
1027 bool
set_data(int id,void * data)1028 PluginVC::set_data(int id, void *data)
1029 {
1030   switch (id) {
1031   case PLUGIN_VC_DATA_LOCAL:
1032     if (vc_type == PLUGIN_VC_ACTIVE) {
1033       core_obj->active_data = data;
1034     } else {
1035       core_obj->passive_data = data;
1036     }
1037     return true;
1038   case PLUGIN_VC_DATA_REMOTE:
1039     if (vc_type == PLUGIN_VC_ACTIVE) {
1040       core_obj->passive_data = data;
1041     } else {
1042       core_obj->active_data = data;
1043     }
1044     return true;
1045   default:
1046     return false;
1047   }
1048 }
1049 
1050 // PluginVCCore
1051 
1052 int32_t PluginVCCore::nextid;
1053 
1054 PluginVCCore::~PluginVCCore() = default;
1055 
1056 PluginVCCore *
alloc(Continuation * acceptor)1057 PluginVCCore::alloc(Continuation *acceptor)
1058 {
1059   PluginVCCore *pvc = new PluginVCCore;
1060   pvc->init();
1061   pvc->connect_to = acceptor;
1062   return pvc;
1063 }
1064 
1065 void
init()1066 PluginVCCore::init()
1067 {
1068   mutex = new_ProxyMutex();
1069 
1070   active_vc.vc_type    = PLUGIN_VC_ACTIVE;
1071   active_vc.other_side = &passive_vc;
1072   active_vc.core_obj   = this;
1073   active_vc.mutex      = mutex;
1074   active_vc.thread     = this_ethread();
1075 
1076   passive_vc.vc_type    = PLUGIN_VC_PASSIVE;
1077   passive_vc.other_side = &active_vc;
1078   passive_vc.core_obj   = this;
1079   passive_vc.mutex      = mutex;
1080   passive_vc.thread     = active_vc.thread;
1081 
1082   p_to_a_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
1083   p_to_a_reader = p_to_a_buffer->alloc_reader();
1084 
1085   a_to_p_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
1086   a_to_p_reader = a_to_p_buffer->alloc_reader();
1087 
1088   Debug("pvc", "[%u] Created PluginVCCore at %p, active %p, passive %p", id, this, &active_vc, &passive_vc);
1089 }
1090 
1091 void
destroy()1092 PluginVCCore::destroy()
1093 {
1094   Debug("pvc", "[%u] Destroying PluginVCCore at %p", id, this);
1095 
1096   ink_assert(active_vc.closed == true || !connected);
1097   active_vc.mutex = nullptr;
1098   active_vc.read_state.vio.buffer.clear();
1099   active_vc.write_state.vio.buffer.clear();
1100   active_vc.magic = PLUGIN_VC_MAGIC_DEAD;
1101 
1102   ink_assert(passive_vc.closed == true || !connected);
1103   passive_vc.mutex = nullptr;
1104   passive_vc.read_state.vio.buffer.clear();
1105   passive_vc.write_state.vio.buffer.clear();
1106   passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
1107 
1108   if (p_to_a_buffer) {
1109     free_MIOBuffer(p_to_a_buffer);
1110     p_to_a_buffer = nullptr;
1111   }
1112 
1113   if (a_to_p_buffer) {
1114     free_MIOBuffer(a_to_p_buffer);
1115     a_to_p_buffer = nullptr;
1116   }
1117 
1118   this->mutex = nullptr;
1119   delete this;
1120 }
1121 
1122 PluginVC *
connect()1123 PluginVCCore::connect()
1124 {
1125   ink_release_assert(connect_to != nullptr);
1126 
1127   connected = true;
1128   state_send_accept(EVENT_IMMEDIATE, nullptr);
1129 
1130   return &active_vc;
1131 }
1132 
1133 Action *
connect_re(Continuation * c)1134 PluginVCCore::connect_re(Continuation *c)
1135 {
1136   ink_release_assert(connect_to != nullptr);
1137 
1138   EThread *my_thread = this_ethread();
1139   MUTEX_TAKE_LOCK(this->mutex, my_thread);
1140 
1141   connected = true;
1142   state_send_accept(EVENT_IMMEDIATE, nullptr);
1143 
1144   // We have to take out our mutex because rest of the
1145   //   system expects the VC mutex to held when calling back.
1146   // We can use take lock here instead of try lock because the
1147   //   lock should never already be held.
1148 
1149   c->handleEvent(NET_EVENT_OPEN, &active_vc);
1150   MUTEX_UNTAKE_LOCK(this->mutex, my_thread);
1151 
1152   return ACTION_RESULT_DONE;
1153 }
1154 
1155 int
state_send_accept_failed(int,void *)1156 PluginVCCore::state_send_accept_failed(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1157 {
1158   if (connect_to->mutex == nullptr) {
1159     connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, nullptr);
1160     destroy();
1161   } else {
1162     MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1163 
1164     if (lock.is_locked()) {
1165       connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, nullptr);
1166       destroy();
1167     } else {
1168       SET_HANDLER(&PluginVCCore::state_send_accept_failed);
1169       eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1170     }
1171   }
1172 
1173   return 0;
1174 }
1175 
1176 int
state_send_accept(int,void *)1177 PluginVCCore::state_send_accept(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1178 {
1179   if (connect_to->mutex == nullptr) {
1180     connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
1181   } else {
1182     MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1183 
1184     if (lock.is_locked()) {
1185       connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
1186     } else {
1187       SET_HANDLER(&PluginVCCore::state_send_accept);
1188       eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1189     }
1190   }
1191 
1192   return 0;
1193 }
1194 
1195 // void PluginVCCore::attempt_delete()
1196 //
1197 //  Mutex must be held when calling this function
1198 //
1199 void
attempt_delete()1200 PluginVCCore::attempt_delete()
1201 {
1202   if (active_vc.deletable) {
1203     if (passive_vc.deletable) {
1204       destroy();
1205     } else if (!connected) {
1206       state_send_accept_failed(EVENT_IMMEDIATE, nullptr);
1207     }
1208   }
1209 }
1210 
1211 // void PluginVCCore::kill_no_connect()
1212 //
1213 //   Called to kill the PluginVCCore when the
1214 //     connect call hasn't been made yet
1215 //
1216 void
kill_no_connect()1217 PluginVCCore::kill_no_connect()
1218 {
1219   ink_assert(!connected);
1220   ink_assert(!active_vc.closed);
1221   active_vc.do_io_close();
1222 }
1223 
1224 void
set_passive_addr(in_addr_t ip,int port)1225 PluginVCCore::set_passive_addr(in_addr_t ip, int port)
1226 {
1227   ats_ip4_set(&passive_addr_struct, htonl(ip), htons(port));
1228 }
1229 
1230 void
set_passive_addr(sockaddr const * ip)1231 PluginVCCore::set_passive_addr(sockaddr const *ip)
1232 {
1233   passive_addr_struct.assign(ip);
1234 }
1235 
1236 void
set_active_addr(in_addr_t ip,int port)1237 PluginVCCore::set_active_addr(in_addr_t ip, int port)
1238 {
1239   ats_ip4_set(&active_addr_struct, htonl(ip), htons(port));
1240 }
1241 
1242 void
set_active_addr(sockaddr const * ip)1243 PluginVCCore::set_active_addr(sockaddr const *ip)
1244 {
1245   active_addr_struct.assign(ip);
1246 }
1247 
1248 void
set_passive_data(void * data)1249 PluginVCCore::set_passive_data(void *data)
1250 {
1251   passive_data = data;
1252 }
1253 
1254 void
set_active_data(void * data)1255 PluginVCCore::set_active_data(void *data)
1256 {
1257   active_data = data;
1258 }
1259 
1260 void
set_transparent(bool passive_side,bool active_side)1261 PluginVCCore::set_transparent(bool passive_side, bool active_side)
1262 {
1263   passive_vc.set_is_transparent(passive_side);
1264   active_vc.set_is_transparent(active_side);
1265 }
1266 
1267 void
set_plugin_id(int64_t id)1268 PluginVCCore::set_plugin_id(int64_t id)
1269 {
1270   passive_vc.plugin_id = active_vc.plugin_id = id;
1271 }
1272 
1273 void
set_plugin_tag(const char * tag)1274 PluginVCCore::set_plugin_tag(const char *tag)
1275 {
1276   passive_vc.plugin_tag = active_vc.plugin_tag = tag;
1277 }
1278 
1279 /*************************************************************
1280  *
1281  *   REGRESSION TEST STUFF
1282  *
1283  **************************************************************/
1284 
1285 #if TS_HAS_TESTS
1286 class PVCTestDriver : public NetTestDriver
1287 {
1288 public:
1289   PVCTestDriver();
1290   ~PVCTestDriver() override;
1291 
1292   void start_tests(RegressionTest *r_arg, int *pstatus_arg);
1293   void run_next_test();
1294   int main_handler(int event, void *data);
1295 
1296 private:
1297   unsigned i                    = 0;
1298   unsigned completions_received = 0;
1299 };
1300 
PVCTestDriver()1301 PVCTestDriver::PVCTestDriver() : NetTestDriver() {}
1302 
~PVCTestDriver()1303 PVCTestDriver::~PVCTestDriver()
1304 {
1305   mutex = nullptr;
1306 }
1307 
1308 void
start_tests(RegressionTest * r_arg,int * pstatus_arg)1309 PVCTestDriver::start_tests(RegressionTest *r_arg, int *pstatus_arg)
1310 {
1311   mutex = new_ProxyMutex();
1312   MUTEX_TRY_LOCK(lock, mutex, this_ethread());
1313 
1314   r       = r_arg;
1315   pstatus = pstatus_arg;
1316   SET_HANDLER(&PVCTestDriver::main_handler);
1317 
1318   run_next_test();
1319 }
1320 
1321 void
run_next_test()1322 PVCTestDriver::run_next_test()
1323 {
1324   unsigned a_index = i * 2;
1325   unsigned p_index = a_index + 1;
1326 
1327   if (p_index >= num_netvc_tests) {
1328     // We are done - // FIX - PASS or FAIL?
1329     if (errors == 0) {
1330       *pstatus = REGRESSION_TEST_PASSED;
1331     } else {
1332       *pstatus = REGRESSION_TEST_FAILED;
1333     }
1334     delete this;
1335     return;
1336   }
1337   completions_received = 0;
1338   i++;
1339 
1340   Debug("pvc_test", "Starting test %s", netvc_tests_def[a_index].test_name);
1341 
1342   NetVCTest *p       = new NetVCTest;
1343   NetVCTest *a       = new NetVCTest;
1344   PluginVCCore *core = PluginVCCore::alloc(p);
1345 
1346   p->init_test(NET_VC_TEST_PASSIVE, this, nullptr, r, &netvc_tests_def[p_index], "PluginVC", "pvc_test_detail");
1347   PluginVC *a_vc = core->connect();
1348 
1349   a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "PluginVC", "pvc_test_detail");
1350 }
1351 
1352 int
main_handler(int,void *)1353 PVCTestDriver::main_handler(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1354 {
1355   completions_received++;
1356 
1357   if (completions_received == 2) {
1358     run_next_test();
1359   }
1360 
1361   return 0;
1362 }
1363 
EXCLUSIVE_REGRESSION_TEST(PVC)1364 EXCLUSIVE_REGRESSION_TEST(PVC)(RegressionTest *t, int /* atype ATS_UNUSED */, int *pstatus)
1365 {
1366   PVCTestDriver *driver = new PVCTestDriver;
1367   driver->start_tests(t, pstatus);
1368 }
1369 #endif
1370