1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 /****************************************************************************
25
26 PluginVC.cc
27
28 Description: Allows bi-directional transfer for data from one
29 continuation to another via a mechanism that impersonates a
30 NetVC. Should implement all external attributes of NetVConnections.
31
32 Since data is transferred within Traffic Server, this is a two
33 headed beast. One NetVC on initiating side (active side) and
34 one NetVC on the receiving side (passive side).
35
36 The two NetVC subclasses, PluginVC, are part PluginVCCore object. All
37 three objects share the same mutex. That mutex is required
38 for doing operations that affect the shared buffers,
39 read state from the PluginVC on the other side or deal with deallocation.
40
41 To simplify the code, all data passing through the system goes initially
42 into a shared buffer. There are two shared buffers, one for each
43 direction of the connection. While it's more efficient to transfer
44 the data from one buffer to another directly, this creates a lot
45 of tricky conditions since you must be holding the lock for both
46 sides, in additional this VC's lock. Additionally, issues like
47 watermarks are very hard to deal with. Since we try to
48 to move data by IOBufferData references the efficiency penalty shouldn't
49 be too bad and if it is a big penalty, a brave soul can reimplement
50 to move the data directly without the intermediate buffer.
51
52 Locking is difficult issue for this multi-headed beast. In each
53 PluginVC, there a two locks. The one we got from our PluginVCCore and
54 the lock from the state machine using the PluginVC. The read side
55 lock & the write side lock must be the same. The regular net processor has
56 this constraint as well. In order to handle scheduling of retry events cleanly,
57 we have two event pointers, one for each lock. sm_lock_retry_event can only
58 be changed while holding the using state machine's lock and
59 core_lock_retry_event can only be manipulated while holding the PluginVC's
60 lock. On entry to PluginVC::main_handler, we obtain all the locks
61 before looking at the events. If we can't get all the locks
62 we reschedule the event for further retries. Since all the locks are
63 obtained in the beginning of the handler, we know we are running
64 exclusively in the later parts of the handler and we will
65 be free from do_io or reenable calls on the PluginVC.
66
67 The assumption is made (consistent with IO Core spec) that any close,
68 shutdown, reenable, or do_io_{read,write) operation is done by the callee
69 while holding the lock for that side of the operation.
70
71
72 ****************************************************************************/
73
74 #include "PluginVC.h"
75 #include "P_EventSystem.h"
76 #include "P_Net.h"
77 #include "tscore/Regression.h"
78
79 #define PVC_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
80 #define PVC_DEFAULT_MAX_BYTES 32768
81 #define MIN_BLOCK_TRANSFER_BYTES 128
82
83 #define PVC_TYPE ((vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")
84
PluginVC(PluginVCCore * core_obj)85 PluginVC::PluginVC(PluginVCCore *core_obj)
86 : NetVConnection(),
87 magic(PLUGIN_VC_MAGIC_ALIVE),
88 vc_type(PLUGIN_VC_UNKNOWN),
89 core_obj(core_obj),
90 other_side(nullptr),
91 read_state(),
92 write_state(),
93 need_read_process(false),
94 need_write_process(false),
95 closed(false),
96 sm_lock_retry_event(nullptr),
97 core_lock_retry_event(nullptr),
98 deletable(false),
99 reentrancy_count(0),
100 active_timeout(0),
101 active_event(nullptr),
102 inactive_timeout(0),
103 inactive_timeout_at(0),
104 inactive_event(nullptr),
105 plugin_tag(nullptr),
106 plugin_id(0)
107 {
108 ink_assert(core_obj != nullptr);
109 SET_HANDLER(&PluginVC::main_handler);
110 }
111
~PluginVC()112 PluginVC::~PluginVC()
113 {
114 mutex = nullptr;
115 }
116
117 int
main_handler(int event,void * data)118 PluginVC::main_handler(int event, void *data)
119 {
120 Debug("pvc_event", "[%u] %s: Received event %d", core_obj->id, PVC_TYPE, event);
121
122 ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE);
123 ink_release_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
124 ink_assert(!deletable);
125 ink_assert(data != nullptr);
126
127 Event *call_event = static_cast<Event *>(data);
128 EThread *my_ethread = mutex->thread_holding;
129 ink_release_assert(my_ethread != nullptr);
130
131 bool read_mutex_held = false;
132 bool write_mutex_held = false;
133 Ptr<ProxyMutex> read_side_mutex = read_state.vio.mutex;
134 Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex;
135
136 if (read_side_mutex) {
137 read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread);
138
139 if (!read_mutex_held) {
140 if (call_event != inactive_event) {
141 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
142 }
143 return 0;
144 }
145
146 if (read_side_mutex != read_state.vio.mutex) {
147 // It's possible some swapped the mutex on us before
148 // we were able to grab it
149 Mutex_unlock(read_side_mutex, my_ethread);
150 if (call_event != inactive_event) {
151 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
152 }
153 return 0;
154 }
155 }
156
157 if (write_side_mutex) {
158 write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread);
159
160 if (!write_mutex_held) {
161 if (read_mutex_held) {
162 Mutex_unlock(read_side_mutex, my_ethread);
163 }
164 if (call_event != inactive_event) {
165 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
166 }
167 return 0;
168 }
169
170 if (write_side_mutex != write_state.vio.mutex) {
171 // It's possible some swapped the mutex on us before
172 // we were able to grab it
173 Mutex_unlock(write_side_mutex, my_ethread);
174 if (read_mutex_held) {
175 Mutex_unlock(read_side_mutex, my_ethread);
176 }
177 if (call_event != inactive_event) {
178 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
179 }
180 return 0;
181 }
182 }
183 // We've got all the locks so there should not be any
184 // other calls active
185 ink_release_assert(reentrancy_count == 0);
186
187 if (closed) {
188 process_close();
189
190 if (read_mutex_held) {
191 Mutex_unlock(read_side_mutex, my_ethread);
192 }
193
194 if (write_mutex_held) {
195 Mutex_unlock(write_side_mutex, my_ethread);
196 }
197
198 return 0;
199 }
200 // We can get closed while we're calling back the
201 // continuation. Set the reentrancy count so we know
202 // we could be calling the continuation and that we
203 // need to defer close processing
204 reentrancy_count++;
205
206 if (call_event == active_event) {
207 process_timeout(&active_event, VC_EVENT_ACTIVE_TIMEOUT);
208 } else if (call_event == inactive_event) {
209 if (inactive_timeout_at && inactive_timeout_at < Thread::get_hrtime()) {
210 process_timeout(&inactive_event, VC_EVENT_INACTIVITY_TIMEOUT);
211 }
212 } else {
213 if (call_event == sm_lock_retry_event) {
214 sm_lock_retry_event = nullptr;
215 } else {
216 ink_release_assert(call_event == core_lock_retry_event);
217 core_lock_retry_event = nullptr;
218 }
219
220 if (need_read_process) {
221 process_read_side(false);
222 }
223
224 if (need_write_process && !closed) {
225 process_write_side(false);
226 }
227 }
228
229 reentrancy_count--;
230 if (closed) {
231 process_close();
232 }
233
234 if (read_mutex_held) {
235 Mutex_unlock(read_side_mutex, my_ethread);
236 }
237
238 if (write_mutex_held) {
239 Mutex_unlock(write_side_mutex, my_ethread);
240 }
241
242 return 0;
243 }
244
245 VIO *
do_io_read(Continuation * c,int64_t nbytes,MIOBuffer * buf)246 PluginVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
247 {
248 ink_assert(!closed);
249 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
250
251 if (buf) {
252 read_state.vio.buffer.writer_for(buf);
253 } else {
254 read_state.vio.buffer.clear();
255 }
256
257 // Note: we set vio.op last because process_read_side looks at it to
258 // tell if the VConnection is active.
259 read_state.vio.mutex = c ? c->mutex : this->mutex;
260 read_state.vio.cont = c;
261 read_state.vio.nbytes = nbytes;
262 read_state.vio.ndone = 0;
263 read_state.vio.vc_server = (VConnection *)this;
264 read_state.vio.op = VIO::READ;
265
266 Debug("pvc", "[%u] %s: do_io_read for %" PRId64 " bytes", core_obj->id, PVC_TYPE, nbytes);
267
268 // Since reentrant callbacks are not allowed on from do_io
269 // functions schedule ourselves get on a different stack
270 need_read_process = true;
271 setup_event_cb(0, &sm_lock_retry_event);
272
273 return &read_state.vio;
274 }
275
276 VIO *
do_io_write(Continuation * c,int64_t nbytes,IOBufferReader * abuffer,bool owner)277 PluginVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
278 {
279 ink_assert(!closed);
280 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
281
282 if (abuffer) {
283 ink_assert(!owner);
284 write_state.vio.buffer.reader_for(abuffer);
285 } else {
286 write_state.vio.buffer.clear();
287 }
288
289 // Note: we set vio.op last because process_write_side looks at it to
290 // tell if the VConnection is active.
291 write_state.vio.mutex = c ? c->mutex : this->mutex;
292 write_state.vio.cont = c;
293 write_state.vio.nbytes = nbytes;
294 write_state.vio.ndone = 0;
295 write_state.vio.vc_server = (VConnection *)this;
296 write_state.vio.op = VIO::WRITE;
297
298 Debug("pvc", "[%u] %s: do_io_write for %" PRId64 " bytes", core_obj->id, PVC_TYPE, nbytes);
299
300 // Since reentrant callbacks are not allowed on from do_io
301 // functions schedule ourselves get on a different stack
302 need_write_process = true;
303 setup_event_cb(0, &sm_lock_retry_event);
304
305 return &write_state.vio;
306 }
307
308 void
reenable(VIO * vio)309 PluginVC::reenable(VIO *vio)
310 {
311 ink_assert(!closed);
312 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
313 ink_assert(vio->mutex->thread_holding == this_ethread());
314
315 Ptr<ProxyMutex> sm_mutex = vio->mutex;
316 SCOPED_MUTEX_LOCK(lock, sm_mutex, this_ethread());
317
318 Debug("pvc", "[%u] %s: reenable %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
319
320 if (vio->op == VIO::WRITE) {
321 ink_assert(vio == &write_state.vio);
322 need_write_process = true;
323 } else if (vio->op == VIO::READ) {
324 need_read_process = true;
325 } else {
326 ink_release_assert(0);
327 }
328 setup_event_cb(0, &sm_lock_retry_event);
329 }
330
331 void
reenable_re(VIO * vio)332 PluginVC::reenable_re(VIO *vio)
333 {
334 ink_assert(!closed);
335 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
336 ink_assert(vio->mutex->thread_holding == this_ethread());
337
338 Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
339
340 SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
341
342 ++reentrancy_count;
343
344 if (vio->op == VIO::WRITE) {
345 ink_assert(vio == &write_state.vio);
346 need_write_process = true;
347 process_write_side(false);
348 } else if (vio->op == VIO::READ) {
349 ink_assert(vio == &read_state.vio);
350 need_read_process = true;
351 process_read_side(false);
352 } else {
353 ink_release_assert(0);
354 }
355
356 --reentrancy_count;
357
358 // To process the close, we need the lock
359 // for the PluginVC. Schedule an event
360 // to make sure we get it
361 if (closed) {
362 setup_event_cb(0, &sm_lock_retry_event);
363 }
364 }
365
366 void
do_io_close(int)367 PluginVC::do_io_close(int /* flag ATS_UNUSED */)
368 {
369 ink_assert(!closed);
370 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
371
372 Debug("pvc", "[%u] %s: do_io_close", core_obj->id, PVC_TYPE);
373
374 SCOPED_MUTEX_LOCK(lock, mutex, this_ethread());
375 if (!closed) { // if already closed, need to do nothing.
376 closed = true;
377
378 // If re-entered then that earlier handler will clean up, otherwise set up a ping
379 // to drive that process (too dangerous to do it here).
380 if (reentrancy_count <= 0) {
381 setup_event_cb(0, &sm_lock_retry_event);
382 }
383 }
384 }
385
386 void
do_io_shutdown(ShutdownHowTo_t howto)387 PluginVC::do_io_shutdown(ShutdownHowTo_t howto)
388 {
389 ink_assert(!closed);
390 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
391
392 switch (howto) {
393 case IO_SHUTDOWN_READ:
394 read_state.shutdown = true;
395 break;
396 case IO_SHUTDOWN_WRITE:
397 write_state.shutdown = true;
398 break;
399 case IO_SHUTDOWN_READWRITE:
400 read_state.shutdown = true;
401 write_state.shutdown = true;
402 break;
403 }
404 }
405
406 // int PluginVC::transfer_bytes(MIOBuffer* transfer_to,
407 // IOBufferReader* transfer_from, int act_on)
408 //
409 // Takes care of transferring bytes from a reader to another buffer
410 // In the case of large transfers, we move blocks. In the case
411 // of small transfers we copy data so as to not build too many
412 // buffer blocks
413 //
414 // Args:
415 // transfer_to: buffer to copy to
416 // transfer_from: buffer_copy_from
417 // act_on: is the max number of bytes we are to copy. There must
418 // be at least act_on bytes available from transfer_from
419 //
420 // Returns number of bytes transferred
421 //
422 int64_t
transfer_bytes(MIOBuffer * transfer_to,IOBufferReader * transfer_from,int64_t act_on)423 PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from, int64_t act_on)
424 {
425 int64_t total_added = 0;
426
427 ink_assert(act_on <= transfer_from->read_avail());
428
429 while (act_on > 0) {
430 int64_t block_read_avail = transfer_from->block_read_avail();
431 int64_t to_move = std::min(act_on, block_read_avail);
432 int64_t moved = 0;
433
434 if (to_move <= 0) {
435 break;
436 }
437
438 if (to_move >= MIN_BLOCK_TRANSFER_BYTES) {
439 moved = transfer_to->write(transfer_from, to_move, 0);
440 } else {
441 // We have a really small amount of data. To make
442 // sure we don't get a huge build up of blocks which
443 // can lead to stack overflows if the buffer is destroyed
444 // before we read from it, we need copy over to the new
445 // buffer instead of doing a block transfer
446 moved = transfer_to->write(transfer_from->start(), to_move);
447
448 if (moved == 0) {
449 // We are out of buffer space
450 break;
451 }
452 }
453
454 act_on -= moved;
455 transfer_from->consume(moved);
456 total_added += moved;
457 }
458
459 return total_added;
460 }
461
462 // void PluginVC::process_write_side(bool cb_ok)
463 //
464 // This function may only be called while holding
465 // this->mutex & while it is ok to callback the
466 // write side continuation
467 //
468 // Does write side processing
469 //
470 void
process_write_side(bool other_side_call)471 PluginVC::process_write_side(bool other_side_call)
472 {
473 ink_assert(!deletable);
474 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
475
476 MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
477
478 Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
479 need_write_process = false;
480
481 // Check write_state
482 if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
483 return;
484 }
485
486 // Check the state of our write buffer as well as ntodo
487 int64_t ntodo = write_state.vio.ntodo();
488 if (ntodo == 0) {
489 return;
490 }
491
492 IOBufferReader *reader = write_state.vio.get_reader();
493 int64_t bytes_avail = reader->read_avail();
494 int64_t act_on = std::min(bytes_avail, ntodo);
495
496 Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
497
498 if (other_side->closed || other_side->read_state.shutdown) {
499 write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
500 return;
501 }
502
503 if (act_on <= 0) {
504 if (ntodo > 0) {
505 // Notify the continuation that we are "disabling"
506 // ourselves due to to nothing to write
507 write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
508 }
509 return;
510 }
511 // Bytes available, try to transfer to the PluginVCCore
512 // intermediate buffer
513 //
514 int64_t buf_space = PVC_DEFAULT_MAX_BYTES - core_buffer->max_read_avail();
515 if (buf_space <= 0) {
516 Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
517 return;
518 }
519 act_on = std::min(act_on, buf_space);
520
521 int64_t added = transfer_bytes(core_buffer, reader, act_on);
522 if (added < 0) {
523 // Couldn't actually get the buffer space. This only
524 // happens on small transfers with the above
525 // PVC_DEFAULT_MAX_BYTES factor doesn't apply
526 Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
527 return;
528 }
529
530 write_state.vio.ndone += added;
531
532 Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
533
534 if (write_state.vio.ntodo() == 0) {
535 write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
536 } else {
537 write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
538 }
539
540 update_inactive_time();
541
542 // Wake up the read side on the other side to process these bytes
543 if (!other_side->closed) {
544 if (!other_side_call) {
545 /* To clear the `need_read_process`, the mutexes must be obtained:
546 *
547 * - PluginVC::mutex
548 * - PluginVC::read_state.vio.mutex
549 *
550 */
551 if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
552 // Just return, no touch on `other_side->need_read_process`.
553 return;
554 }
555 // Acquire the lock of the read side continuation
556 EThread *my_ethread = mutex->thread_holding;
557 ink_assert(my_ethread != nullptr);
558 MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
559 if (!lock.is_locked()) {
560 Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
561 ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
562
563 // set need_read_process to enforce the read processing
564 other_side->need_read_process = true;
565 other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
566 return;
567 }
568
569 other_side->process_read_side(true);
570 } else {
571 other_side->read_state.vio.reenable();
572 }
573 }
574 }
575
576 // void PluginVC::process_read_side()
577 //
578 // This function may only be called while holding
579 // this->mutex & while it is ok to callback the
580 // read side continuation
581 //
582 // Does read side processing
583 //
584 void
process_read_side(bool other_side_call)585 PluginVC::process_read_side(bool other_side_call)
586 {
587 ink_assert(!deletable);
588 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
589
590 // TODO: Never used??
591 // MIOBuffer *core_buffer;
592
593 IOBufferReader *core_reader;
594
595 if (vc_type == PLUGIN_VC_ACTIVE) {
596 // core_buffer = core_obj->p_to_a_buffer;
597 core_reader = core_obj->p_to_a_reader;
598 } else {
599 ink_assert(vc_type == PLUGIN_VC_PASSIVE);
600 // core_buffer = core_obj->a_to_p_buffer;
601 core_reader = core_obj->a_to_p_reader;
602 }
603
604 Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
605 need_read_process = false;
606
607 // Check read_state
608 if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
609 return;
610 }
611
612 // Check the state of our read buffer as well as ntodo
613 int64_t ntodo = read_state.vio.ntodo();
614 if (ntodo == 0) {
615 return;
616 }
617
618 int64_t bytes_avail = core_reader->read_avail();
619 int64_t act_on = std::min(bytes_avail, ntodo);
620
621 Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);
622
623 if (act_on <= 0) {
624 if (other_side->closed || other_side->write_state.shutdown) {
625 read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
626 }
627 return;
628 }
629 // Bytes available, try to transfer from the PluginVCCore
630 // intermediate buffer
631 //
632 MIOBuffer *output_buffer = read_state.vio.get_writer();
633
634 int64_t water_mark = output_buffer->water_mark;
635 water_mark = std::max(water_mark, static_cast<int64_t>(PVC_DEFAULT_MAX_BYTES));
636 int64_t buf_space = water_mark - output_buffer->max_read_avail();
637 if (buf_space <= 0) {
638 Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
639 return;
640 }
641 act_on = std::min(act_on, buf_space);
642
643 int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
644 if (added <= 0) {
645 // Couldn't actually get the buffer space. This only
646 // happens on small transfers with the above
647 // PVC_DEFAULT_MAX_BYTES factor doesn't apply
648 Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
649 return;
650 }
651
652 read_state.vio.ndone += added;
653
654 Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
655
656 if (read_state.vio.ntodo() == 0) {
657 read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
658 } else {
659 read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
660 }
661
662 update_inactive_time();
663
664 // Wake up the other side so it knows there is space available in
665 // intermediate buffer
666 if (!other_side->closed) {
667 if (!other_side_call) {
668 /* To clear the `need_write_process`, the mutexes must be obtained:
669 *
670 * - PluginVC::mutex
671 * - PluginVC::write_state.vio.mutex
672 *
673 */
674 if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) {
675 // Just return, no touch on `other_side->need_write_process`.
676 return;
677 }
678 // Acquire the lock of the write side continuation
679 EThread *my_ethread = mutex->thread_holding;
680 ink_assert(my_ethread != nullptr);
681 MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
682 if (!lock.is_locked()) {
683 Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
684 ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));
685
686 // set need_write_process to enforce the write processing
687 other_side->need_write_process = true;
688 other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
689 return;
690 }
691
692 other_side->process_write_side(true);
693 } else {
694 other_side->write_state.vio.reenable();
695 }
696 }
697 }
698
699 // void PluginVC::process_read_close()
700 //
701 // This function may only be called while holding
702 // this->mutex
703 //
704 // Tries to close the and dealloc the the vc
705 //
706 void
process_close()707 PluginVC::process_close()
708 {
709 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
710
711 Debug("pvc", "[%u] %s: process_close", core_obj->id, PVC_TYPE);
712
713 if (!deletable) {
714 deletable = true;
715 }
716
717 if (sm_lock_retry_event) {
718 sm_lock_retry_event->cancel();
719 sm_lock_retry_event = nullptr;
720 }
721
722 if (core_lock_retry_event) {
723 core_lock_retry_event->cancel();
724 core_lock_retry_event = nullptr;
725 }
726
727 if (active_event) {
728 active_event->cancel();
729 active_event = nullptr;
730 }
731
732 if (inactive_event) {
733 inactive_event->cancel();
734 inactive_event = nullptr;
735 inactive_timeout_at = 0;
736 }
737 // If the other side of the PluginVC is not closed
738 // we need to force it process both living sides
739 // of the connection in order that it recognizes
740 // the close
741 if (!other_side->closed && core_obj->connected) {
742 other_side->need_write_process = true;
743 other_side->need_read_process = true;
744 other_side->setup_event_cb(0, &other_side->core_lock_retry_event);
745 }
746
747 core_obj->attempt_delete();
748 }
749
750 // void PluginVC::process_timeout(Event** e, int event_to_send, Event** our_eptr)
751 //
752 // Handles sending timeout event to the VConnection. e is the event we got
753 // which indicates the timeout. event_to_send is the event to the
754 // vc user. e is a pointer to either inactive_event,
755 // or active_event. If we successfully send the timeout to vc user,
756 // we clear the pointer, otherwise we reschedule it.
757 //
758 // Because the possibility of reentrant close from vc user, we don't want to
759 // touch any state after making the call back
760 //
761 void
process_timeout(Event ** e,int event_to_send)762 PluginVC::process_timeout(Event **e, int event_to_send)
763 {
764 ink_assert(*e == inactive_event || *e == active_event);
765
766 if (closed) {
767 // already closed, ignore the timeout event
768 // to avoid handle_event asserting use-after-free
769 clear_event(e);
770 return;
771 }
772
773 if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) {
774 MUTEX_TRY_LOCK(lock, read_state.vio.mutex, (*e)->ethread);
775 if (!lock.is_locked()) {
776 if (*e == active_event) {
777 // Only reschedule active_event due to inactive_event is perorid event.
778 (*e)->schedule_in(PVC_LOCK_RETRY_TIME);
779 }
780 return;
781 }
782 clear_event(e);
783 read_state.vio.cont->handleEvent(event_to_send, &read_state.vio);
784 } else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) {
785 MUTEX_TRY_LOCK(lock, write_state.vio.mutex, (*e)->ethread);
786 if (!lock.is_locked()) {
787 if (*e == active_event) {
788 // Only reschedule active_event due to inactive_event is perorid event.
789 (*e)->schedule_in(PVC_LOCK_RETRY_TIME);
790 }
791 return;
792 }
793 clear_event(e);
794 write_state.vio.cont->handleEvent(event_to_send, &write_state.vio);
795 } else {
796 clear_event(e);
797 }
798 }
799
800 void
clear_event(Event ** e)801 PluginVC::clear_event(Event **e)
802 {
803 if (e == nullptr || *e == nullptr) {
804 return;
805 }
806 if (*e == inactive_event) {
807 inactive_event->cancel();
808 inactive_timeout_at = 0;
809 }
810 *e = nullptr;
811 }
812
813 void
update_inactive_time()814 PluginVC::update_inactive_time()
815 {
816 if (inactive_event && inactive_timeout) {
817 // inactive_event->cancel();
818 // inactive_event = eventProcessor.schedule_in(this, inactive_timeout);
819 inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
820 }
821 }
822
823 // void PluginVC::setup_event_cb(ink_hrtime in)
824 //
825 // Setup up the event processor to call us back.
826 // We've got two different event pointers to handle
827 // locking issues
828 //
829 void
setup_event_cb(ink_hrtime in,Event ** e_ptr)830 PluginVC::setup_event_cb(ink_hrtime in, Event **e_ptr)
831 {
832 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
833
834 if (*e_ptr == nullptr) {
835 // We locked the pointer so we can now allocate an event
836 // to call us back
837 if (in == 0) {
838 if (this_ethread()->tt == REGULAR) {
839 *e_ptr = this_ethread()->schedule_imm_local(this);
840 } else {
841 *e_ptr = eventProcessor.schedule_imm(this);
842 }
843 } else {
844 if (this_ethread()->tt == REGULAR) {
845 *e_ptr = this_ethread()->schedule_in_local(this, in);
846 } else {
847 *e_ptr = eventProcessor.schedule_in(this, in);
848 }
849 }
850 }
851 }
852
853 void
set_active_timeout(ink_hrtime timeout_in)854 PluginVC::set_active_timeout(ink_hrtime timeout_in)
855 {
856 active_timeout = timeout_in;
857
858 // FIX - Do we need to handle the case where the timeout is set
859 // but no io has been done?
860 if (active_event) {
861 ink_assert(!active_event->cancelled);
862 active_event->cancel();
863 active_event = nullptr;
864 }
865
866 if (active_timeout > 0) {
867 active_event = eventProcessor.schedule_in(this, active_timeout);
868 }
869 }
870
871 void
set_inactivity_timeout(ink_hrtime timeout_in)872 PluginVC::set_inactivity_timeout(ink_hrtime timeout_in)
873 {
874 inactive_timeout = timeout_in;
875 if (inactive_timeout != 0) {
876 inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
877 if (inactive_event == nullptr) {
878 inactive_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(1));
879 }
880 } else {
881 inactive_timeout_at = 0;
882 if (inactive_event) {
883 inactive_event->cancel();
884 inactive_event = nullptr;
885 }
886 }
887 }
888
889 void
set_default_inactivity_timeout(ink_hrtime timeout_in)890 PluginVC::set_default_inactivity_timeout(ink_hrtime timeout_in)
891 {
892 set_inactivity_timeout(timeout_in);
893 }
894
895 bool
is_default_inactivity_timeout()896 PluginVC::is_default_inactivity_timeout()
897 {
898 return false;
899 }
900
901 void
cancel_active_timeout()902 PluginVC::cancel_active_timeout()
903 {
904 set_active_timeout(0);
905 }
906
907 void
cancel_inactivity_timeout()908 PluginVC::cancel_inactivity_timeout()
909 {
910 set_inactivity_timeout(0);
911 }
912
913 ink_hrtime
get_active_timeout()914 PluginVC::get_active_timeout()
915 {
916 return active_timeout;
917 }
918
919 ink_hrtime
get_inactivity_timeout()920 PluginVC::get_inactivity_timeout()
921 {
922 return inactive_timeout;
923 }
924
925 void
add_to_keep_alive_queue()926 PluginVC::add_to_keep_alive_queue()
927 {
928 // do nothing
929 }
930
931 void
remove_from_keep_alive_queue()932 PluginVC::remove_from_keep_alive_queue()
933 {
934 // do nothing
935 }
936
937 bool
add_to_active_queue()938 PluginVC::add_to_active_queue()
939 {
940 // do nothing
941 return true;
942 }
943
944 SOCKET
get_socket()945 PluginVC::get_socket()
946 {
947 // Return an invalid file descriptor
948 return ts::NO_FD;
949 }
950
951 void
set_local_addr()952 PluginVC::set_local_addr()
953 {
954 if (vc_type == PLUGIN_VC_ACTIVE) {
955 ats_ip_copy(&local_addr, &core_obj->active_addr_struct);
956 // local_addr = core_obj->active_addr_struct;
957 } else {
958 ats_ip_copy(&local_addr, &core_obj->passive_addr_struct);
959 // local_addr = core_obj->passive_addr_struct;
960 }
961 }
962
963 void
set_remote_addr()964 PluginVC::set_remote_addr()
965 {
966 if (vc_type == PLUGIN_VC_ACTIVE) {
967 ats_ip_copy(&remote_addr, &core_obj->passive_addr_struct);
968 } else {
969 ats_ip_copy(&remote_addr, &core_obj->active_addr_struct);
970 }
971 }
972
973 void
set_remote_addr(const sockaddr *)974 PluginVC::set_remote_addr(const sockaddr * /* new_sa ATS_UNUSED */)
975 {
976 return;
977 }
978
979 void
set_mptcp_state()980 PluginVC::set_mptcp_state()
981 {
982 return;
983 }
984
985 int
set_tcp_congestion_control(int ATS_UNUSED)986 PluginVC::set_tcp_congestion_control(int ATS_UNUSED)
987 {
988 return -1;
989 }
990
991 void
apply_options()992 PluginVC::apply_options()
993 {
994 // do nothing
995 }
996
997 bool
get_data(int id,void * data)998 PluginVC::get_data(int id, void *data)
999 {
1000 if (data == nullptr) {
1001 return false;
1002 }
1003 switch (id) {
1004 case PLUGIN_VC_DATA_LOCAL:
1005 if (vc_type == PLUGIN_VC_ACTIVE) {
1006 *static_cast<void **>(data) = core_obj->active_data;
1007 } else {
1008 *static_cast<void **>(data) = core_obj->passive_data;
1009 }
1010 return true;
1011 case PLUGIN_VC_DATA_REMOTE:
1012 if (vc_type == PLUGIN_VC_ACTIVE) {
1013 *static_cast<void **>(data) = core_obj->passive_data;
1014 } else {
1015 *static_cast<void **>(data) = core_obj->active_data;
1016 }
1017 return true;
1018 case TS_API_DATA_CLOSED:
1019 *static_cast<int *>(data) = this->closed;
1020 return true;
1021 default:
1022 *static_cast<void **>(data) = nullptr;
1023 return false;
1024 }
1025 }
1026
1027 bool
set_data(int id,void * data)1028 PluginVC::set_data(int id, void *data)
1029 {
1030 switch (id) {
1031 case PLUGIN_VC_DATA_LOCAL:
1032 if (vc_type == PLUGIN_VC_ACTIVE) {
1033 core_obj->active_data = data;
1034 } else {
1035 core_obj->passive_data = data;
1036 }
1037 return true;
1038 case PLUGIN_VC_DATA_REMOTE:
1039 if (vc_type == PLUGIN_VC_ACTIVE) {
1040 core_obj->passive_data = data;
1041 } else {
1042 core_obj->active_data = data;
1043 }
1044 return true;
1045 default:
1046 return false;
1047 }
1048 }
1049
1050 // PluginVCCore
1051
1052 int32_t PluginVCCore::nextid;
1053
1054 PluginVCCore::~PluginVCCore() = default;
1055
1056 PluginVCCore *
alloc(Continuation * acceptor)1057 PluginVCCore::alloc(Continuation *acceptor)
1058 {
1059 PluginVCCore *pvc = new PluginVCCore;
1060 pvc->init();
1061 pvc->connect_to = acceptor;
1062 return pvc;
1063 }
1064
1065 void
init()1066 PluginVCCore::init()
1067 {
1068 mutex = new_ProxyMutex();
1069
1070 active_vc.vc_type = PLUGIN_VC_ACTIVE;
1071 active_vc.other_side = &passive_vc;
1072 active_vc.core_obj = this;
1073 active_vc.mutex = mutex;
1074 active_vc.thread = this_ethread();
1075
1076 passive_vc.vc_type = PLUGIN_VC_PASSIVE;
1077 passive_vc.other_side = &active_vc;
1078 passive_vc.core_obj = this;
1079 passive_vc.mutex = mutex;
1080 passive_vc.thread = active_vc.thread;
1081
1082 p_to_a_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
1083 p_to_a_reader = p_to_a_buffer->alloc_reader();
1084
1085 a_to_p_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
1086 a_to_p_reader = a_to_p_buffer->alloc_reader();
1087
1088 Debug("pvc", "[%u] Created PluginVCCore at %p, active %p, passive %p", id, this, &active_vc, &passive_vc);
1089 }
1090
1091 void
destroy()1092 PluginVCCore::destroy()
1093 {
1094 Debug("pvc", "[%u] Destroying PluginVCCore at %p", id, this);
1095
1096 ink_assert(active_vc.closed == true || !connected);
1097 active_vc.mutex = nullptr;
1098 active_vc.read_state.vio.buffer.clear();
1099 active_vc.write_state.vio.buffer.clear();
1100 active_vc.magic = PLUGIN_VC_MAGIC_DEAD;
1101
1102 ink_assert(passive_vc.closed == true || !connected);
1103 passive_vc.mutex = nullptr;
1104 passive_vc.read_state.vio.buffer.clear();
1105 passive_vc.write_state.vio.buffer.clear();
1106 passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
1107
1108 if (p_to_a_buffer) {
1109 free_MIOBuffer(p_to_a_buffer);
1110 p_to_a_buffer = nullptr;
1111 }
1112
1113 if (a_to_p_buffer) {
1114 free_MIOBuffer(a_to_p_buffer);
1115 a_to_p_buffer = nullptr;
1116 }
1117
1118 this->mutex = nullptr;
1119 delete this;
1120 }
1121
1122 PluginVC *
connect()1123 PluginVCCore::connect()
1124 {
1125 ink_release_assert(connect_to != nullptr);
1126
1127 connected = true;
1128 state_send_accept(EVENT_IMMEDIATE, nullptr);
1129
1130 return &active_vc;
1131 }
1132
1133 Action *
connect_re(Continuation * c)1134 PluginVCCore::connect_re(Continuation *c)
1135 {
1136 ink_release_assert(connect_to != nullptr);
1137
1138 EThread *my_thread = this_ethread();
1139 MUTEX_TAKE_LOCK(this->mutex, my_thread);
1140
1141 connected = true;
1142 state_send_accept(EVENT_IMMEDIATE, nullptr);
1143
1144 // We have to take out our mutex because rest of the
1145 // system expects the VC mutex to held when calling back.
1146 // We can use take lock here instead of try lock because the
1147 // lock should never already be held.
1148
1149 c->handleEvent(NET_EVENT_OPEN, &active_vc);
1150 MUTEX_UNTAKE_LOCK(this->mutex, my_thread);
1151
1152 return ACTION_RESULT_DONE;
1153 }
1154
1155 int
state_send_accept_failed(int,void *)1156 PluginVCCore::state_send_accept_failed(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1157 {
1158 if (connect_to->mutex == nullptr) {
1159 connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, nullptr);
1160 destroy();
1161 } else {
1162 MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1163
1164 if (lock.is_locked()) {
1165 connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, nullptr);
1166 destroy();
1167 } else {
1168 SET_HANDLER(&PluginVCCore::state_send_accept_failed);
1169 eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1170 }
1171 }
1172
1173 return 0;
1174 }
1175
1176 int
state_send_accept(int,void *)1177 PluginVCCore::state_send_accept(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1178 {
1179 if (connect_to->mutex == nullptr) {
1180 connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
1181 } else {
1182 MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1183
1184 if (lock.is_locked()) {
1185 connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
1186 } else {
1187 SET_HANDLER(&PluginVCCore::state_send_accept);
1188 eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1189 }
1190 }
1191
1192 return 0;
1193 }
1194
1195 // void PluginVCCore::attempt_delete()
1196 //
1197 // Mutex must be held when calling this function
1198 //
1199 void
attempt_delete()1200 PluginVCCore::attempt_delete()
1201 {
1202 if (active_vc.deletable) {
1203 if (passive_vc.deletable) {
1204 destroy();
1205 } else if (!connected) {
1206 state_send_accept_failed(EVENT_IMMEDIATE, nullptr);
1207 }
1208 }
1209 }
1210
1211 // void PluginVCCore::kill_no_connect()
1212 //
1213 // Called to kill the PluginVCCore when the
1214 // connect call hasn't been made yet
1215 //
1216 void
kill_no_connect()1217 PluginVCCore::kill_no_connect()
1218 {
1219 ink_assert(!connected);
1220 ink_assert(!active_vc.closed);
1221 active_vc.do_io_close();
1222 }
1223
1224 void
set_passive_addr(in_addr_t ip,int port)1225 PluginVCCore::set_passive_addr(in_addr_t ip, int port)
1226 {
1227 ats_ip4_set(&passive_addr_struct, htonl(ip), htons(port));
1228 }
1229
1230 void
set_passive_addr(sockaddr const * ip)1231 PluginVCCore::set_passive_addr(sockaddr const *ip)
1232 {
1233 passive_addr_struct.assign(ip);
1234 }
1235
1236 void
set_active_addr(in_addr_t ip,int port)1237 PluginVCCore::set_active_addr(in_addr_t ip, int port)
1238 {
1239 ats_ip4_set(&active_addr_struct, htonl(ip), htons(port));
1240 }
1241
1242 void
set_active_addr(sockaddr const * ip)1243 PluginVCCore::set_active_addr(sockaddr const *ip)
1244 {
1245 active_addr_struct.assign(ip);
1246 }
1247
1248 void
set_passive_data(void * data)1249 PluginVCCore::set_passive_data(void *data)
1250 {
1251 passive_data = data;
1252 }
1253
1254 void
set_active_data(void * data)1255 PluginVCCore::set_active_data(void *data)
1256 {
1257 active_data = data;
1258 }
1259
1260 void
set_transparent(bool passive_side,bool active_side)1261 PluginVCCore::set_transparent(bool passive_side, bool active_side)
1262 {
1263 passive_vc.set_is_transparent(passive_side);
1264 active_vc.set_is_transparent(active_side);
1265 }
1266
1267 void
set_plugin_id(int64_t id)1268 PluginVCCore::set_plugin_id(int64_t id)
1269 {
1270 passive_vc.plugin_id = active_vc.plugin_id = id;
1271 }
1272
1273 void
set_plugin_tag(const char * tag)1274 PluginVCCore::set_plugin_tag(const char *tag)
1275 {
1276 passive_vc.plugin_tag = active_vc.plugin_tag = tag;
1277 }
1278
1279 /*************************************************************
1280 *
1281 * REGRESSION TEST STUFF
1282 *
1283 **************************************************************/
1284
1285 #if TS_HAS_TESTS
1286 class PVCTestDriver : public NetTestDriver
1287 {
1288 public:
1289 PVCTestDriver();
1290 ~PVCTestDriver() override;
1291
1292 void start_tests(RegressionTest *r_arg, int *pstatus_arg);
1293 void run_next_test();
1294 int main_handler(int event, void *data);
1295
1296 private:
1297 unsigned i = 0;
1298 unsigned completions_received = 0;
1299 };
1300
PVCTestDriver()1301 PVCTestDriver::PVCTestDriver() : NetTestDriver() {}
1302
~PVCTestDriver()1303 PVCTestDriver::~PVCTestDriver()
1304 {
1305 mutex = nullptr;
1306 }
1307
1308 void
start_tests(RegressionTest * r_arg,int * pstatus_arg)1309 PVCTestDriver::start_tests(RegressionTest *r_arg, int *pstatus_arg)
1310 {
1311 mutex = new_ProxyMutex();
1312 MUTEX_TRY_LOCK(lock, mutex, this_ethread());
1313
1314 r = r_arg;
1315 pstatus = pstatus_arg;
1316 SET_HANDLER(&PVCTestDriver::main_handler);
1317
1318 run_next_test();
1319 }
1320
1321 void
run_next_test()1322 PVCTestDriver::run_next_test()
1323 {
1324 unsigned a_index = i * 2;
1325 unsigned p_index = a_index + 1;
1326
1327 if (p_index >= num_netvc_tests) {
1328 // We are done - // FIX - PASS or FAIL?
1329 if (errors == 0) {
1330 *pstatus = REGRESSION_TEST_PASSED;
1331 } else {
1332 *pstatus = REGRESSION_TEST_FAILED;
1333 }
1334 delete this;
1335 return;
1336 }
1337 completions_received = 0;
1338 i++;
1339
1340 Debug("pvc_test", "Starting test %s", netvc_tests_def[a_index].test_name);
1341
1342 NetVCTest *p = new NetVCTest;
1343 NetVCTest *a = new NetVCTest;
1344 PluginVCCore *core = PluginVCCore::alloc(p);
1345
1346 p->init_test(NET_VC_TEST_PASSIVE, this, nullptr, r, &netvc_tests_def[p_index], "PluginVC", "pvc_test_detail");
1347 PluginVC *a_vc = core->connect();
1348
1349 a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "PluginVC", "pvc_test_detail");
1350 }
1351
1352 int
main_handler(int,void *)1353 PVCTestDriver::main_handler(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
1354 {
1355 completions_received++;
1356
1357 if (completions_received == 2) {
1358 run_next_test();
1359 }
1360
1361 return 0;
1362 }
1363
EXCLUSIVE_REGRESSION_TEST(PVC)1364 EXCLUSIVE_REGRESSION_TEST(PVC)(RegressionTest *t, int /* atype ATS_UNUSED */, int *pstatus)
1365 {
1366 PVCTestDriver *driver = new PVCTestDriver;
1367 driver->start_tests(t, pstatus);
1368 }
1369 #endif
1370