1 // -*- Mode: C++; -*-
2 // Package : omniORB
3 // giopRope.cc Created on: 16/01/2001
4 // Author : Sai Lai Lo (sll)
5 //
6 // Copyright (C) 2002-2018 Apasphere Ltd
7 // Copyright (C) 2001 AT&T Laboratories Cambridge
8 //
9 // This file is part of the omniORB library
10 //
11 // The omniORB library is free software; you can redistribute it and/or
12 // modify it under the terms of the GNU Lesser General Public
13 // License as published by the Free Software Foundation; either
14 // version 2.1 of the License, or (at your option) any later version.
15 //
16 // This library is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 // Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public
22 // License along with this library. If not, see http://www.gnu.org/licenses/
23 //
24 //
25 // Description:
26 //
27
28 #include <omniORB4/CORBA.h>
29 #include <omniORB4/IOP_C.h>
30 #include <omniORB4/callDescriptor.h>
31 #include <omniORB4/minorCode.h>
32 #include <omniORB4/omniInterceptors.h>
33 #include <giopRope.h>
34 #include <giopStream.h>
35 #include <giopStrand.h>
36 #include <giopStrandFlags.h>
37 #include <giopStreamImpl.h>
38 #include <giopBiDir.h>
39 #include <GIOP_C.h>
40 #include <objectAdapter.h>
41 #include <exceptiondefs.h>
42 #include <initialiser.h>
43 #include <orbOptions.h>
44 #include <orbParameters.h>
45 #include <transportRules.h>
46 #include <interceptors.h>
47 #include <libcWrapper.h>
48
49 #include <stdlib.h>
50
51 OMNI_NAMESPACE_BEGIN(omni)
52
53 ////////////////////////////////////////////////////////////////////////////
54 // Configuration options //
55 ////////////////////////////////////////////////////////////////////////////
56 CORBA::Boolean orbParameters::oneCallPerConnection = 1;
57 // 1 means only one call can be in progress at any time per connection.
58 //
59 // Valid values = 0 or 1
60
61 CORBA::ULong orbParameters::maxGIOPConnectionPerServer = 5;
62 // The ORB can open more than one connection to a server depending on
63 // the number of concurrent invocations to the same server. This
64 // variable decides the maximum number of connections to use per
65 // server. If the number of concurrent invocations exceeds this
66 // number, the extra invocations would be blocked until the the
67 // outstanding ones return.
68 //
69 // Valid values = (n >= 1)
70
71 CORBA::Boolean orbParameters::immediateRopeSwitch = 0;
72 // Switch rope to use a new address immediately, rather than
73 // retrying with the existing one.
74 //
75 // Valid values = 0 or 1
76
77 CORBA::Boolean orbParameters::resolveNamesForTransportRules = 1;
78 // If true, names in IORs will be resolved when evaluating client
79 // transport rules, and remembered from then on; if false, names
80 // will not be resolved until connect time. Client transport rules
81 // based on IP address will therefore not match, but some platforms
82 // can use external knowledge to pick the best address to use if
83 // given a name to connect to.
84 //
85 // Valid values = 0 or 1
86
87 CORBA::Boolean orbParameters::retainAddressOrder = 1;
88 // For IORs with multiple addresses, determines how the address to
89 // connect to is chosen. When first estabilishing a connection, the
90 // addresses are ordered according to the client transport rules
91 // (after resolving names if resolveNamesForTransportRules is true),
92 // and the addresses are tried in priority order until one connects
93 // successfully. For as long as there is at least one connection
94 // open to the address, new connections continue to use the same
95 // address.
96 //
97 // After a failure, or after all open connections have been
98 // scavenged and closed, this parameter determines the address used
99 // to reconnect on the next call. If this parameter is true, the
100 // address order and chosen address within the order is remembered;
101 // if false, a new connection attempt causes re-evaluation of the
102 // order (in case name resolutions change), and the highest priority
103 // address is tried first.
104 //
105 // Valid values = 0 or 1
106
107
108 ///////////////////////////////////////////////////////////////////////
109 RopeLink giopRope::ropes;
110
111 ////////////////////////////////////////////////////////////////////////
giopRope(const giopAddressList & addrlist,omniIOR::IORInfo * info)112 giopRope::giopRope(const giopAddressList& addrlist, omniIOR::IORInfo* info) :
113 pd_refcount(0),
114 pd_address_in_use(0),
115 pd_maxStrands(orbParameters::maxGIOPConnectionPerServer),
116 pd_oneCallPerConnection(orbParameters::oneCallPerConnection),
117 pd_nwaiting(0),
118 pd_cond(omniTransportLock, "giopRope::pd_cond"),
119 pd_flags(0),
120 pd_ior_flags(info->flags()),
121 pd_offerBiDir(orbParameters::offerBiDirectionalGIOP),
122 pd_addrs_filtered(0),
123 pd_filtering(0)
124 {
125 {
126 giopAddressList::const_iterator i, last;
127 i = addrlist.begin();
128 last = addrlist.end();
129 for (; i != last; i++) {
130 giopAddress* a = (*i)->duplicate();
131 pd_addresses.push_back(a);
132 }
133 }
134 pd_ior_addr_size = pd_addresses.size();
135 }
136
137
138 ////////////////////////////////////////////////////////////////////////
giopRope(giopAddress * addr)139 giopRope::giopRope(giopAddress* addr) :
140 pd_refcount(0),
141 pd_address_in_use(0),
142 pd_maxStrands(orbParameters::maxGIOPConnectionPerServer),
143 pd_oneCallPerConnection(orbParameters::oneCallPerConnection),
144 pd_nwaiting(0),
145 pd_cond(omniTransportLock, "giopRope::pd_cond"),
146 pd_flags(0),
147 pd_ior_flags(0),
148 pd_offerBiDir(orbParameters::offerBiDirectionalGIOP),
149 pd_addrs_filtered(1),
150 pd_filtering(0)
151 {
152 pd_addresses.push_back(addr);
153 pd_addresses_order.push_back(0);
154 pd_ior_addr_size = 1;
155 }
156
157 ////////////////////////////////////////////////////////////////////////
~giopRope()158 giopRope::~giopRope() {
159 OMNIORB_ASSERT(pd_nwaiting == 0);
160 giopAddressList::iterator i, last;
161 i = pd_addresses.begin();
162 last = pd_addresses.end();
163 for (; i != last; i++) {
164 delete (*i);
165 }
166 }
167
168 ////////////////////////////////////////////////////////////////////////
169 IOP_C*
acquireClient(const omniIOR * ior,const CORBA::Octet * key,CORBA::ULong keysize,omniCallDescriptor * calldesc)170 giopRope::acquireClient(const omniIOR* ior,
171 const CORBA::Octet* key,
172 CORBA::ULong keysize,
173 omniCallDescriptor* calldesc)
174 {
175 GIOP::Version v = ior->getIORInfo()->version();
176 giopStreamImpl* impl = giopStreamImpl::matchVersion(v);
177 if (!impl) {
178 impl = giopStreamImpl::maxVersion();
179 v = impl->version();
180 }
181
182 omni_tracedmutex_lock sync(*omniTransportLock);
183
184 while (!pd_addrs_filtered) {
185
186 if (pd_filtering) {
187 // Another thread is filtering. Wait until it is done
188 while (pd_filtering)
189 pd_cond.wait();
190
191 // The thread doing the filtering will have filtered the
192 // addresses and set pd_addrs_filtered to 1. However, by the
193 // time this thread runs, it is possible for the addresses to
194 // have been reset again by resetAddressOrder, so we loop to
195 // check again.
196 }
197 else {
198 pd_filtering = 1;
199 filterAndSortAddressList();
200
201 pd_filtering = 0;
202 pd_addrs_filtered = 1;
203
204 pd_cond.broadcast();
205 break;
206 }
207 }
208
209 again:
210
211 unsigned int nbusy = 0;
212 unsigned int ndying = 0;
213 unsigned int nwrongver = 0;
214 CORBA::ULong max = pd_maxStrands; // snap the value now as it may
215 // change by the application any time.
216 RopeLink* p = pd_strands.next;
217 for (; p != &pd_strands; p = p->next) {
218 giopStrand* s = (giopStrand*)p;
219 switch (s->state()) {
220 case giopStrand::DYING:
221 {
222 // Bidirectional strands do not count towards the total of
223 // dying strands. This is because with a bidirectional rope,
224 // the max number of strands is one. Below, if the number of
225 // dying strands is > the max, we wait for the strands to die.
226 // However, it is possible that we are the client keeping the
227 // strand alive, leading to a deadlock. To avoid the
228 // situation, we do not count dying bidir strands, allowing us
229 // to create a new one, and release the one that is dying.
230 if (!s->isBiDir())
231 ndying++;
232
233 break;
234 }
235 case giopStrand::TIMEDOUT:
236 {
237 s->StrandList::remove();
238 s->state(giopStrand::ACTIVE);
239 s->StrandList::insert(giopStrand::active);
240 // falls through
241 }
242 case giopStrand::ACTIVE:
243 {
244 if (s->version.major != v.major || s->version.minor != v.minor) {
245 // Wrong GIOP version. Each strand can only be used
246 // for one GIOP version.
247 // If ever we allow more than one GIOP version
248 // to use one strand, make sure the client side interceptor
249 // for codeset is updated to reflect this.
250 nwrongver++;
251 }
252 else {
253 GIOP_C* g;
254 if (!giopStreamList::is_empty(s->clients)) {
255 giopStreamList* gp = s->clients.next;
256 for (; gp != &s->clients; gp = gp->next) {
257 g = (GIOP_C*)gp;
258 if (g->state() == IOP_C::UnUsed) {
259 g->initialise(ior,key,keysize,calldesc);
260 return g;
261 }
262 }
263 nbusy++;
264 }
265 else {
266 g = new GIOP_C(this,s);
267 g->impl(s->giopImpl);
268 g->initialise(ior,key,keysize,calldesc);
269 g->giopStreamList::insert(s->clients);
270 return g;
271 }
272 }
273 }
274 }
275 }
276
277 // Reach here if we haven't got a strand to grab a GIOP_C.
278 if ((nbusy + ndying) < max) {
279 // Create a new strand.
280 // Notice that we can have up to
281 // pd_maxStrands * <no. of supported GIOP versions> strands created.
282 //
283
284 // Do a sanity check here. It could be the case that this rope has
285 // no valid address to use. This can be the case if we have
286 // unmarshalled an IOR which has no profiles we can support, or
287 // the client transport rules have denied all the addresses.
288 // Notice that we do not raise an exception at the time when the
289 // IOR was unmarshalled because we would like to be able to
290 // receive and pass along object references that we ourselves
291 // cannot talk to.
292 if (pd_addresses_order.empty()) {
293 resetAddressOrder(1, 0);
294 OMNIORB_THROW(TRANSIENT,TRANSIENT_NoUsableProfile,CORBA::COMPLETED_NO);
295 }
296
297 giopStrand* s = new giopStrand(pd_addresses[pd_addresses_order[pd_address_in_use]]);
298
299 s->state(giopStrand::ACTIVE);
300 s->RopeLink::insert(pd_strands);
301 s->StrandList::insert(giopStrand::active);
302 s->version = v;
303 s->giopImpl = impl;
304 s->flags = pd_flags;
305
306 GIOP_C* g = new GIOP_C(this,s);
307 g->impl(s->giopImpl);
308 g->initialise(ior,key,keysize,calldesc);
309 g->giopStreamList::insert(s->clients);
310 return g;
311 }
312 else if (pd_oneCallPerConnection || ndying >= max) {
313 // Wait for a strand to be unused.
314 pd_nwaiting++;
315
316 const omni_time_t& deadline = calldesc->getDeadline();
317 if (deadline) {
318 if (pd_cond.timedwait(deadline) == 0) {
319 pd_nwaiting--;
320 if (orbParameters::throwTransientOnTimeOut) {
321 OMNIORB_THROW(TRANSIENT,
322 TRANSIENT_CallTimedout,
323 CORBA::COMPLETED_NO);
324 }
325 else {
326 OMNIORB_THROW(TIMEOUT,
327 TIMEOUT_CallTimedOutOnClient,
328 CORBA::COMPLETED_NO);
329 }
330 }
331 }
332 else {
333 pd_cond.wait();
334 }
335 pd_nwaiting--;
336 }
337 else {
338 // Pick a random non-dying strand.
339 OMNIORB_ASSERT(nbusy); // There must be a non-dying strand that can
340 // serve this GIOP version
341 int n = LibcWrapper::Rand() % nbusy;
342 // Pick a random and non-dying strand
343 RopeLink* p = pd_strands.next;
344 giopStrand* q = 0;
345 giopStrand* s = 0;
346 while (n >=0 && p != &pd_strands) {
347 s = (giopStrand*)p;
348 if (s->state() == giopStrand::ACTIVE &&
349 s->version.major == v.major &&
350 s->version.minor == v.minor)
351 {
352 n--;
353 if (!q) q = s;
354 }
355 else {
356 s = 0;
357 }
358 p = p->next;
359 }
360 s = (s) ? s : q;
361 // By the time we look for busy strands, it's possible that they
362 // are all dying, in which case we have to start again.
363 if (s) {
364 GIOP_C* g = new GIOP_C(this,s);
365 g->impl(s->giopImpl);
366 g->initialise(ior,key,keysize,calldesc);
367 g->giopStreamList::insert(s->clients);
368 return g;
369 }
370 }
371 goto again;
372 }
373
374 ////////////////////////////////////////////////////////////////////////
375 void
releaseClient(IOP_C * iop_c)376 giopRope::releaseClient(IOP_C* iop_c)
377 {
378 omni_tracedmutex_lock sync(*omniTransportLock);
379
380 GIOP_C* giop_c = (GIOP_C*) iop_c;
381
382 giop_c->rdUnLock();
383 giop_c->wrUnLock();
384
385 // We decide in the following what to do with this GIOP_C and the strand
386 // it is attached:
387 //
388 // 1. If the strand is used simultaneously for multiple calls, it will have
389 // multiple GIOP_Cs attached. We only want to keep at most 1 idle GIOP_C.
390 // In other words, if this is not the last GIOP_C attached to the strand
391 // we delete it. (Actually it does no harm to delete all GIOP_C
392 // irrespectively. It will be slower to do an invocation because a
393 // new GIOP_C has to be instantiated in every call.
394 //
395 // 2. If the strand is in the DYING state, we obviously should delete the
396 // GIOP_C. If this is also the last GIOP_C, we delete the strand as
397 // well. If the strand is used to support bidirectional GIOP, we
398 // also check to ensure that the GIOP_S list is empty.
399 //
400
401 giopStrand* s = &giop_c->strand();
402 giop_c->giopStreamList::remove();
403
404 CORBA::Boolean remove = 0;
405 CORBA::Boolean avail = 1;
406
407 if (giop_c->state() != IOP_C::Idle && s->state() != giopStrand::DYING) {
408 if (omniORB::trace(30)) {
409 omniORB::logger l;
410
411 if (s->connection) {
412 l << "Unexpected error encountered in talking to the server "
413 << s->connection->peeraddress()
414 << ". The connection is closed immediately. ";
415 }
416 else {
417 OMNIORB_ASSERT(s->address);
418 l << "Unexpected error encountered before talking to the server "
419 << s->address->address()
420 << ". No connection was opened.";
421 }
422 l << " GIOP_C state " << (int)giop_c->state()
423 << ", strand state " << (int)s->state() << "\n";
424 }
425 s->state(giopStrand::DYING);
426 }
427
428 if (s->state() == giopStrand::DYING) {
429
430 if (s->isBiDir() && s->isClient() && s->connection) {
431 // This is the client side of a bidirectional connection. There
432 // may be another thread using the strand on behalf of the
433 // client, so we do not delete the strand yet. Instead we
434 // re-insert the GIOP_C and start the strand's idle counter so
435 // it can be scavenged.
436
437 if (omniORB::trace(25)) {
438 omniORB::logger log;
439 log << "Strand " << (void*)s
440 << " in bi-directional client rope is dying.\n";
441 }
442 giop_c->giopStreamList::insert(s->clients);
443 s->startIdleCounter();
444 }
445 else {
446 remove = 1;
447
448 // If safeDelete() returns 1, this strand can be regarded as
449 // deleted. Therefore, we flag avail to 1 to wake up any threads
450 // waiting on the rope to have a chance to create another
451 // strand.
452 avail = s->safeDelete();
453 }
454 }
455 else if ((s->isBiDir() && !s->isClient()) ||
456 !giopStreamList::is_empty(s->clients)) {
457 // We do not cache the GIOP_C if this is server side bidirectional or
458 // we already have other GIOP_Cs active or available.
459 remove = 1;
460 avail = 0;
461 }
462 else {
463 OMNIORB_ASSERT(giop_c->state() == IOP_C::Idle);
464 giop_c->giopStreamList::insert(s->clients);
465 // The strand is definitely idle from this point onwards, we
466 // reset the idle counter so that it will be retired at the right time.
467 if (s->isClient() && !s->biDir_has_callbacks)
468 s->startIdleCounter();
469 }
470
471 if (remove) {
472 delete giop_c;
473 }
474 else {
475 giop_c->cleanup();
476 }
477
478 // If any thread is waiting for a strand to become available, we signal
479 // it here.
480 if (avail && pd_nwaiting) pd_cond.signal();
481 }
482
483 ////////////////////////////////////////////////////////////////////////
484 void
realIncrRefCount()485 giopRope::realIncrRefCount()
486 {
487 ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1);
488
489 OMNIORB_ASSERT(pd_refcount >= 0);
490
491 if (pd_refcount == 0 && !RopeLink::is_empty(pd_strands)) {
492 // This Rope still has some strands in the giopStrand::active_timedout list
493 // put there by decrRefCount() when the reference count goes to 0
494 // previously. We move these stands back to the giopStrand::active list so
495 // that they can be used straight away.
496 RopeLink* p = pd_strands.next;
497 for (; p != &pd_strands; p = p->next) {
498 giopStrand* g = (giopStrand*)p;
499 if (g->state() != giopStrand::DYING) {
500 g->StrandList::remove();
501 g->state(giopStrand::ACTIVE);
502 g->StrandList::insert(giopStrand::active);
503 }
504 }
505 }
506
507 pd_refcount++;
508 }
509
510 ////////////////////////////////////////////////////////////////////////
511 void
incrRefCount()512 giopRope::incrRefCount()
513 {
514 omni_tracedmutex_lock sync(*omniTransportLock);
515 realIncrRefCount();
516 }
517
518 ////////////////////////////////////////////////////////////////////////
519 void
decrRefCount()520 giopRope::decrRefCount()
521 {
522 omni_tracedmutex_lock sync(*omniTransportLock);
523
524 pd_refcount--;
525 OMNIORB_ASSERT(pd_refcount >=0);
526
527 if (pd_refcount) return;
528
529 // This Rope is not used by any object reference.
530 //
531 // If this Rope has no strand, we can remove this instance straight
532 // away.
533 //
534 // Otherwise, we move all the strands from the giopStrand::active
535 // list to the giopStrand::active_timedout list. Eventually when
536 // all the strands are retired by the scavenger, this instance will
537 // also be deleted on the next call to selectRope(), or the module
538 // detach() at shutdown.
539
540 if (RopeLink::is_empty(pd_strands) && !pd_nwaiting) {
541 RopeLink::remove();
542 delete this;
543 }
544 else {
545 RopeLink* p = pd_strands.next;
546 for (; p != &pd_strands; p = p->next) {
547 giopStrand* g = (giopStrand*)p;
548 if (g->state() != giopStrand::DYING) {
549 g->state(giopStrand::TIMEDOUT);
550 // The strand may already be on the active_timedout list. However
551 // it is OK to remove and reinsert again.
552 g->StrandList::remove();
553 g->StrandList::insert(giopStrand::active_timedout);
554 }
555 }
556 }
557 }
558
559
560 ////////////////////////////////////////////////////////////////////////
561 void
disconnect()562 giopRope::disconnect()
563 {
564 omni_tracedmutex_lock sync(*omniTransportLock);
565
566 RopeLink* p = pd_strands.next;
567 for (; p != &pd_strands; p = p->next) {
568 giopStrand* s = (giopStrand*)p;
569
570 if (s->state() != giopStrand::DYING) {
571 if (s->connection) {
572 if (omniORB::trace(10)) {
573 omniORB::logger log;
574 log << "Force disconnect connection to "
575 << s->connection->peeraddress() << "\n";
576 }
577 s->connection->Shutdown();
578 s->state(giopStrand::DYING);
579 }
580 }
581 }
582 }
583
584 ////////////////////////////////////////////////////////////////////////
585 CORBA::Boolean
hasAddress(const giopAddress * addr)586 giopRope::hasAddress(const giopAddress* addr)
587 {
588 giopAddressList::const_iterator ai;
589 for (ai = pd_addresses.begin(); ai != pd_addresses.end(); ++ai) {
590 if (*ai == addr) {
591 return 1;
592 }
593 }
594 return 0;
595 }
596
597 ////////////////////////////////////////////////////////////////////////
598 const giopAddress*
notifyCommFailure(const giopAddress * addr,CORBA::Boolean heldlock)599 giopRope::notifyCommFailure(const giopAddress* addr,
600 CORBA::Boolean heldlock)
601 {
602 omni_optional_lock sync(*omniTransportLock, heldlock, heldlock);
603
604 const giopAddress* addr_in_use;
605
606 addr_in_use = pd_addresses[pd_addresses_order[pd_address_in_use]];
607 if (addr == addr_in_use) {
608 pd_address_in_use++;
609 if (pd_address_in_use >= pd_addresses_order.size())
610 pd_address_in_use = 0;
611 addr_in_use = pd_addresses[pd_addresses_order[pd_address_in_use]];
612
613 if (omniORB::trace(20)) {
614 omniORB::logger l;
615 l << "Switch rope to use address " << addr_in_use->address() << "\n";
616 }
617 }
618 return addr_in_use;
619 }
620
621 ////////////////////////////////////////////////////////////////////////
622 void
resetAddressOrder(CORBA::Boolean heldlock,giopStrand * strand)623 giopRope::resetAddressOrder(CORBA::Boolean heldlock, giopStrand* strand)
624 {
625 if (orbParameters::retainAddressOrder)
626 return;
627
628 omni_optional_lock sync(*omniTransportLock, heldlock, heldlock);
629
630 if (!pd_addrs_filtered || pd_filtering)
631 return;
632
633 CORBA::Boolean do_reset = 1;
634
635 RopeLink* p = pd_strands.next;
636 for (; p != &pd_strands; p = p->next) {
637 giopStrand* s = (giopStrand*)p;
638 if (s != strand) {
639 // The rope contains a strand other than the triggering strand,
640 // so we do not reset the addresses.
641 do_reset = 0;
642 break;
643 }
644 }
645
646 if (omniORB::trace(25)) {
647 omniORB::logger log;
648
649 if (do_reset)
650 log << "Reset rope addresses (";
651 else
652 log << "Rope not reset due to other active strands (";
653
654 if (pd_addresses_order.size() > pd_address_in_use) {
655 const giopAddress* addr =
656 pd_addresses[pd_addresses_order[pd_address_in_use]];
657
658 log << "current address " << addr->address();
659 }
660 else {
661 log << "no current address";
662 }
663 log << ")\n";
664 }
665
666 if (!do_reset)
667 return;
668
669 // Names may have been resolved to addresses, so we remove the
670 // resolved addresses from the end of pd_addresses.
671 while (pd_addresses.size() > pd_ior_addr_size) {
672 delete pd_addresses.back();
673 pd_addresses.pop_back();
674 }
675
676 pd_addresses_order.clear();
677 pd_addrs_filtered = 0;
678 pd_address_in_use = 0;
679 }
680
681
682 ////////////////////////////////////////////////////////////////////////
683 void
resetIdleRopeAddresses()684 giopRope::resetIdleRopeAddresses()
685 {
686 omni_tracedmutex_lock sync(*omniTransportLock);
687
688 if (orbParameters::retainAddressOrder)
689 return;
690
691 RopeLink* p = giopRope::ropes.next;
692 while (p != &giopRope::ropes) {
693 giopRope* gr = (giopRope*)p;
694
695 if (gr->pd_addrs_filtered && RopeLink::is_empty(gr->pd_strands))
696 gr->resetAddressOrder(1, 0);
697
698 p = p->next;
699 }
700 }
701
702
703 ////////////////////////////////////////////////////////////////////////
704 int
selectRope(const giopAddressList & addrlist,omniIOR::IORInfo * info,Rope * & rope,CORBA::Boolean & is_local)705 giopRope::selectRope(const giopAddressList& addrlist,
706 omniIOR::IORInfo* info,
707 Rope*& rope,
708 CORBA::Boolean& is_local)
709 {
710 giopRope* gr;
711
712 omni_tracedmutex_lock sync(*omniTransportLock);
713
714 // Check if we have to use an existing bidirectional connection for
715 // a callback
716 if (orbParameters::acceptBiDirectionalGIOP &&
717 BiDirServerRope::selectRope(addrlist, info, rope)) {
718 is_local = 0;
719 return 1;
720 }
721
722 // Check if these are our addresses
723 giopAddressList::const_iterator i, last;
724 i = addrlist.begin();
725 last = addrlist.end();
726 for (; i != last; i++) {
727 if (omniObjAdapter::matchMyEndpoints((*i)->address())) {
728 rope = 0; is_local = 1;
729 return 1;
730 }
731 }
732
733 // Check if there already exists a rope that goes to the same
734 // addresses and matches the IOR
735 RopeLink* p = giopRope::ropes.next;
736 while (p != &giopRope::ropes) {
737 gr = (giopRope*)p;
738 if (gr->match(addrlist, info)) {
739 gr->realIncrRefCount();
740 rope = (Rope*)gr; is_local = 0;
741 return 1;
742 }
743 else if (gr->pd_refcount == 0 &&
744 RopeLink::is_empty(gr->pd_strands) &&
745 !gr->pd_nwaiting) {
746 // garbage rope, remove it
747 p = p->next;
748 gr->RopeLink::remove();
749 delete gr;
750 }
751 else {
752 p = p->next;
753 }
754 }
755
756 // Reach here because we cannot find an existing rope that matches,
757 // must create a new one.
758
759 gr = 0;
760
761 if (omniInterceptorP::createRope) {
762 omniInterceptors::createRope_T::info_T iinfo(addrlist, info, gr);
763 omniInterceptorP::visit(iinfo);
764 }
765
766 if (!gr) {
767 if (orbParameters::offerBiDirectionalGIOP) {
768 gr = new BiDirClientRope(addrlist, info);
769 }
770 else {
771 gr = new giopRope(addrlist, info);
772 }
773 }
774
775 gr->RopeLink::insert(giopRope::ropes);
776 gr->realIncrRefCount();
777 rope = (Rope*)gr; is_local = 0;
778 return 1;
779 }
780
781
782 ////////////////////////////////////////////////////////////////////////
783 CORBA::Boolean
match(const giopAddressList & addrlist,omniIOR::IORInfo * info) const784 giopRope::match(const giopAddressList& addrlist, omniIOR::IORInfo* info) const
785 {
786 if ((info->flags() != pd_ior_flags) ||
787 (addrlist.size() != pd_ior_addr_size) ||
788 (orbParameters::offerBiDirectionalGIOP != pd_offerBiDir)) {
789 return 0;
790 }
791
792 giopAddressList::const_iterator i, last, j;
793 i = addrlist.begin();
794 j = pd_addresses.begin();
795 last = addrlist.end();
796 for (; i != last; i++, j++) {
797 if (!omni::ptrStrMatch((*i)->address(),(*j)->address())) return 0;
798 }
799 return 1;
800 }
801
802 ////////////////////////////////////////////////////////////////////////
803 void
filterAndSortAddressList()804 giopRope::filterAndSortAddressList()
805 {
806 // We consult the clientTransportRules to decide the preference
807 // orders for the addresses. The rules may forbid the use of some of
808 // the addresses and these will be filtered out. We then record the
809 // order of the remaining addresses in pd_addresses_order.
810
811 // First, resolve any names in pd_addresses and add their
812 // resolutions to the end.
813
814 CORBA::Boolean do_resolve = orbParameters::resolveNamesForTransportRules;
815
816 if (do_resolve) {
817 giopAddressList resolved;
818 giopAddressList::const_iterator it;
819
820 for (it = pd_addresses.begin(); it != pd_addresses.end(); ++it) {
821 giopAddress* ga = *it;
822 const char* host = ga->host();
823
824 if (host && !LibcWrapper::isipaddr(host)) {
825
826 // Unlock omniTransportLock while resolving the name.
827 omni_tracedmutex_unlock ul(*omniTransportLock);
828
829 if (omniORB::trace(25)) {
830 omniORB::logger log;
831 log << "Resolve name '" << host << "'...\n";
832 }
833
834 LibcWrapper::AddrInfo_var aiv;
835 aiv = LibcWrapper::getAddrInfo(host, 0);
836
837 LibcWrapper::AddrInfo* ai = aiv;
838
839 if (ai == 0) {
840 if (omniORB::trace(25)) {
841 omniORB::logger log;
842 log << "Unable to resolve '" << host << "'.\n";
843 }
844 }
845 else {
846 while (ai) {
847 CORBA::String_var addr = ai->asString();
848
849 if (omniORB::trace(25)) {
850 omniORB::logger log;
851 log << "Name '" << host << "' resolved to " << addr << "\n";
852 }
853 resolved.push_back(ga->duplicate(addr));
854 ai = ai->next();
855 }
856 }
857 }
858 }
859
860 if (!resolved.empty()) {
861 for (it = resolved.begin(); it != resolved.end(); ++it) {
862 pd_addresses.push_back(*it);
863 }
864 }
865 }
866
867 // For each address, find the rule that is applicable. Record the
868 // rules priority in the priority list.
869 omnivector<CORBA::ULong> priority_list;
870
871 CORBA::ULong index;
872 CORBA::ULong total = pd_addresses.size();
873
874 for (index = 0; index < total; index++) {
875 giopAddress* ga = pd_addresses[index];
876 const char* host = ga->host();
877
878 if (do_resolve && host && !LibcWrapper::isipaddr(host)) {
879 // Skip name -- it has been resolved to an address above
880 continue;
881 }
882
883 CORBA::StringSeq actions;
884 CORBA::ULong matchedRule;
885
886 if (transportRules::clientRules().match(ga->address(),
887 actions, matchedRule)) {
888
889 const char* transport = strchr(ga->type(),':');
890 OMNIORB_ASSERT(transport);
891 transport++;
892
893 CORBA::ULong i;
894 CORBA::Boolean matched = 0;
895 CORBA::ULong flags = 0;
896 CORBA::ULong priority;
897
898 for (i = 0; i < actions.length(); i++) {
899 size_t len = strlen(actions[i]);
900 if (strncmp(actions[i],transport,len) == 0) {
901 priority = (matchedRule << 16) + i;
902 matched = 1;
903 }
904 else if (strcmp(actions[i],"none") == 0) {
905 break;
906 }
907 else if (orbParameters::offerBiDirectionalGIOP &&
908 strcmp(actions[i],"bidir") == 0) {
909 flags |= GIOPSTRAND_BIDIR;
910 }
911 else if (strcmp(actions[i],"ziop") == 0) {
912 flags |= pd_ior_flags & GIOPSTRAND_COMPRESSION;
913 }
914 }
915 if (matched) {
916 pd_addresses_order.push_back(index);
917 priority_list.push_back(priority);
918 pd_flags |= flags;
919 }
920 }
921 }
922
923 // If we have more than 1 address to use, sort them according to
924 // their value in prioritylist.
925
926 if (pd_addresses_order.size() > 1) {
927 // Won't it be nice to just use stl qsort? It is tempting to just
928 // forget about old C++ compiler and use stl. Until the time has come
929 // use shell sort to sort the addresses in order.
930
931 int n = pd_addresses_order.size();
932 for (int gap=n/2; gap > 0; gap=gap/2) {
933 for (int i=gap; i < n; i++)
934 for (int j = i-gap; j>=0; j=j-gap) {
935 if (priority_list[j] > priority_list[j+gap]) {
936 CORBA::ULong temp = pd_addresses_order[j];
937 pd_addresses_order[j] = pd_addresses_order[j+gap];
938 pd_addresses_order[j+gap] = temp;
939 temp = priority_list[j];
940 priority_list[j] = priority_list[j+gap];
941 priority_list[j+gap] = temp;
942 }
943 }
944 }
945 }
946
947 #if 0
948 {
949 omniORB::logger log;
950 log << "Sorted addresses are: \n";
951 for (size_t i=0; i < pd_addresses_order.size(); i++) {
952 log << pd_addresses[pd_addresses_order[i]]->address() << "\n";
953 }
954 }
955 #endif
956 }
957
958 /////////////////////////////////////////////////////////////////////////////
959 // Handlers for Configuration Options //
960 /////////////////////////////////////////////////////////////////////////////
961
962 /////////////////////////////////////////////////////////////////////////////
963 class oneCallPerConnectionHandler : public orbOptions::Handler {
964 public:
965
oneCallPerConnectionHandler()966 oneCallPerConnectionHandler() :
967 orbOptions::Handler("oneCallPerConnection",
968 "oneCallPerConnection = 0 or 1",
969 1,
970 "-ORBoneCallPerConnection < 0 | 1 >") {}
971
972
visit(const char * value,orbOptions::Source)973 void visit(const char* value,orbOptions::Source)
974 OMNI_THROW_SPEC (orbOptions::BadParam)
975 {
976 CORBA::Boolean v;
977 if (!orbOptions::getBoolean(value,v)) {
978 throw orbOptions::BadParam(key(),value,
979 orbOptions::expect_boolean_msg);
980 }
981 orbParameters::oneCallPerConnection = v;
982 }
983
dump(orbOptions::sequenceString & result)984 void dump(orbOptions::sequenceString& result) {
985 orbOptions::addKVBoolean(key(),orbParameters::oneCallPerConnection,
986 result);
987 }
988 };
989
990 static oneCallPerConnectionHandler oneCallPerConnectionHandler_;
991
992 /////////////////////////////////////////////////////////////////////////////
993 class maxGIOPConnectionPerServerHandler : public orbOptions::Handler {
994 public:
995
maxGIOPConnectionPerServerHandler()996 maxGIOPConnectionPerServerHandler() :
997 orbOptions::Handler("maxGIOPConnectionPerServer",
998 "maxGIOPConnectionPerServer = n > 0",
999 1,
1000 "-ORBmaxGIOPConnectionPerServer < n > 0 >") {}
1001
visit(const char * value,orbOptions::Source)1002 void visit(const char* value,orbOptions::Source)
1003 OMNI_THROW_SPEC (orbOptions::BadParam)
1004 {
1005 CORBA::ULong v;
1006 if (!orbOptions::getULong(value,v) || v < 1) {
1007 throw orbOptions::BadParam(key(),value,
1008 orbOptions::expect_greater_than_zero_ulong_msg);
1009 }
1010 orbParameters::maxGIOPConnectionPerServer = v;
1011 }
1012
dump(orbOptions::sequenceString & result)1013 void dump(orbOptions::sequenceString& result) {
1014 orbOptions::addKVULong(key(),orbParameters::maxGIOPConnectionPerServer,
1015 result);
1016 }
1017
1018 };
1019
1020 static maxGIOPConnectionPerServerHandler maxGIOPConnectionPerServerHandler_;
1021
1022
1023 /////////////////////////////////////////////////////////////////////////////
1024 class immediateRopeSwitchHandler : public orbOptions::Handler {
1025 public:
1026
immediateRopeSwitchHandler()1027 immediateRopeSwitchHandler() :
1028 orbOptions::Handler("immediateAddressSwitch",
1029 "immediateAddressSwitch = 0 or 1",
1030 1,
1031 "-ORBimmediateAddressSwitch < 0 | 1 >") {}
1032
1033
visit(const char * value,orbOptions::Source)1034 void visit(const char* value,orbOptions::Source)
1035 OMNI_THROW_SPEC (orbOptions::BadParam)
1036 {
1037 CORBA::Boolean v;
1038 if (!orbOptions::getBoolean(value,v)) {
1039 throw orbOptions::BadParam(key(),value,
1040 orbOptions::expect_boolean_msg);
1041 }
1042 orbParameters::immediateRopeSwitch = v;
1043 }
1044
dump(orbOptions::sequenceString & result)1045 void dump(orbOptions::sequenceString& result) {
1046 orbOptions::addKVBoolean(key(),orbParameters::immediateRopeSwitch,
1047 result);
1048 }
1049 };
1050
1051 static immediateRopeSwitchHandler immediateRopeSwitchHandler_;
1052
1053
1054 /////////////////////////////////////////////////////////////////////////////
1055 class resolveNamesForTransportRulesHandler : public orbOptions::Handler {
1056 public:
1057
resolveNamesForTransportRulesHandler()1058 resolveNamesForTransportRulesHandler() :
1059 orbOptions::Handler("resolveNamesForTransportRules",
1060 "resolveNamesForTransportRules = 0 or 1",
1061 1,
1062 "-ORBresolveNamesForTransportRules < 0 | 1 >") {}
1063
visit(const char * value,orbOptions::Source)1064 void visit(const char* value,orbOptions::Source)
1065 OMNI_THROW_SPEC (orbOptions::BadParam)
1066 {
1067 CORBA::Boolean v;
1068 if (!orbOptions::getBoolean(value,v)) {
1069 throw orbOptions::BadParam(key(),value,
1070 orbOptions::expect_boolean_msg);
1071 }
1072 orbParameters::resolveNamesForTransportRules = v;
1073 }
1074
dump(orbOptions::sequenceString & result)1075 void dump(orbOptions::sequenceString& result) {
1076 orbOptions::addKVBoolean(key(),orbParameters::resolveNamesForTransportRules,
1077 result);
1078 }
1079
1080 };
1081
1082 static resolveNamesForTransportRulesHandler resolveNamesForTransportRulesHandler_;
1083
1084
1085 /////////////////////////////////////////////////////////////////////////////
1086 class retainAddressOrderHandler : public orbOptions::Handler {
1087 public:
1088
retainAddressOrderHandler()1089 retainAddressOrderHandler() :
1090 orbOptions::Handler("retainAddressOrder",
1091 "retainAddressOrder = 0 or 1",
1092 1,
1093 "-ORBretainAddressOrder < 0 | 1 >") {}
1094
visit(const char * value,orbOptions::Source)1095 void visit(const char* value,orbOptions::Source)
1096 OMNI_THROW_SPEC (orbOptions::BadParam)
1097 {
1098 CORBA::Boolean v;
1099 if (!orbOptions::getBoolean(value,v)) {
1100 throw orbOptions::BadParam(key(),value,
1101 orbOptions::expect_boolean_msg);
1102 }
1103 orbParameters::retainAddressOrder = v;
1104 }
1105
dump(orbOptions::sequenceString & result)1106 void dump(orbOptions::sequenceString& result) {
1107 orbOptions::addKVBoolean(key(),orbParameters::retainAddressOrder,
1108 result);
1109 }
1110
1111 };
1112
1113 static retainAddressOrderHandler retainAddressOrderHandler_;
1114
1115
1116 /////////////////////////////////////////////////////////////////////////////
1117 // Module initialiser //
1118 /////////////////////////////////////////////////////////////////////////////
1119
1120 class omni_giopRope_initialiser : public omniInitialiser {
1121 public:
1122
omni_giopRope_initialiser()1123 omni_giopRope_initialiser() {
1124 orbOptions::singleton().registerHandler(oneCallPerConnectionHandler_);
1125 orbOptions::singleton().registerHandler(maxGIOPConnectionPerServerHandler_);
1126 orbOptions::singleton().registerHandler(immediateRopeSwitchHandler_);
1127 orbOptions::singleton().registerHandler(resolveNamesForTransportRulesHandler_);
1128 orbOptions::singleton().registerHandler(retainAddressOrderHandler_);
1129 }
1130
attach()1131 void attach() {
1132 }
detach()1133 void detach() {
1134 // Get rid of any remaining ropes. By now they should all be strand-less.
1135 omni_tracedmutex_lock sync(*omniTransportLock);
1136
1137 RopeLink* p = giopRope::ropes.next;
1138 giopRope* gr;
1139 int i=0;
1140
1141 while (p != &giopRope::ropes) {
1142 gr = (giopRope*)p;
1143 OMNIORB_ASSERT(gr->pd_refcount == 0 &&
1144 RopeLink::is_empty(gr->pd_strands) &&
1145 !gr->pd_nwaiting);
1146 p = p->next;
1147 gr->RopeLink::remove();
1148 delete gr;
1149 ++i;
1150 }
1151 if (omniORB::trace(15)) {
1152 omniORB::logger l;
1153 l << i << " remaining rope" << (i == 1 ? "" : "s") << " deleted.\n";
1154 }
1155 }
1156 };
1157
1158
1159 static omni_giopRope_initialiser initialiser;
1160
1161 omniInitialiser& omni_giopRope_initialiser_ = initialiser;
1162
1163
1164 OMNI_NAMESPACE_END(omni)
1165