1 // -*- Mode: C++; -*-
2 //                            Package   : omniORB
3 // callHandle.cc              Created on: 16/05/2001
4 //                            Author    : Duncan Grisby (dpg1)
5 //
6 //    Copyright (C) 2003-2012 Apasphere Ltd
7 //    Copyright (C) 2001 AT&T Laboratories Cambridge
8 //
9 //    This file is part of the omniORB library
10 //
11 //    The omniORB library is free software; you can redistribute it and/or
12 //    modify it under the terms of the GNU Lesser General Public
13 //    License as published by the Free Software Foundation; either
14 //    version 2.1 of the License, or (at your option) any later version.
15 //
16 //    This library is distributed in the hope that it will be useful,
17 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
18 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19 //    Lesser General Public License for more details.
20 //
21 //    You should have received a copy of the GNU Lesser General Public
22 //    License along with this library. If not, see http://www.gnu.org/licenses/
23 //
24 //
25 // Description:
26 //
27 //   Call handle used during remote or in-process operation dispatch.
28 
29 #include <omniORB4/CORBA.h>
30 #include <omniORB4/omniORB.h>
31 #include <omniORB4/callHandle.h>
32 #include <omniORB4/callDescriptor.h>
33 #include <omniORB4/omniServant.h>
34 #include <omniORB4/IOP_C.h>
35 #include <poacurrentimpl.h>
36 #include <invoker.h>
37 #include <giopStream.h>
38 #include <giopStrand.h>
39 #include <giopRope.h>
40 #include <omniORB4/giopEndpoint.h>
41 
42 OMNI_USING_NAMESPACE(omni)
43 
44 
45 static void
46 dealWithUserException(cdrMemoryStream& stream,
47 		      omniCallDescriptor* desc,
48 		      CORBA::UserException& ex);
49 
50 
51 #ifdef HAS_Cplusplus_Namespace
52 namespace {
53 #endif
54   class PostInvoker {
55   public:
PostInvoker(omniCallHandle::PostInvokeHook * hook)56     inline PostInvoker(omniCallHandle::PostInvokeHook* hook)
57       : pd_hook(hook) {}
~PostInvoker()58     inline ~PostInvoker() {
59       if (pd_hook)
60 	pd_hook->postinvoke();
61     }
62   private:
63     omniCallHandle::PostInvokeHook* pd_hook;
64   };
65 
66   class MainThreadTask : public omniTask {
67   public:
MainThreadTask(omniServant * servant,omniCallDescriptor & desc,omni_tracedmutex * mu,omni_tracedcondition * cond)68     inline MainThreadTask(omniServant* servant, omniCallDescriptor& desc,
69 			  omni_tracedmutex* mu, omni_tracedcondition* cond)
70       : omniTask(omniTask::DedicatedThread),
71 	pd_servant(servant),
72 	pd_desc(desc),
73 	pd_mu(mu),
74 	pd_cond(cond),
75 	pd_except(0),
76 	pd_done(0)
77     {
78       if (omniORB::trace(25)) {
79 	omniORB::logger l;
80 	l << "Preparing to dispatch '" << desc.op() << "' to main thread\n";
81       }
82     }
83 
84     void execute();
85     // Called by the async invoker. Performs the upcall. If an
86     // exception occurs, places a copy in pd_except.
87 
88     void wait();
89     // Wait for execute() to finish. Throws the exception in pd_except
90     // if there is one.
91 
92   private:
93     omniServant*          pd_servant;
94     omniCallDescriptor&   pd_desc;
95     omni_tracedmutex*     pd_mu;
96     omni_tracedcondition* pd_cond;
97     CORBA::Exception*     pd_except;
98     int                   pd_done;
99   };
100 
101 #ifdef HAS_Cplusplus_Namespace
102 }
103 #endif
104 
105 
106 void
upcall(omniServant * servant,omniCallDescriptor & desc)107 omniCallHandle::upcall(omniServant* servant, omniCallDescriptor& desc)
108 {
109   OMNIORB_ASSERT(pd_localId);
110   desc.poa(pd_poa);
111   desc.localId(pd_localId);
112 
113   _OMNI_NS(poaCurrentStackInsert) insert(&desc, pd_self_thread);
114 
115   if (pd_iop_s) { // Remote call
116     pd_iop_s->ReceiveRequest(desc);
117     {
118       PostInvoker postinvoker(pd_postinvoke_hook);
119 
120       if (!pd_mainthread_mu) {
121 	desc.doLocalCall(servant);
122       }
123       else {
124 	// Main thread dispatch
125 	MainThreadTask mtt(servant, desc,
126 			   pd_mainthread_mu, pd_mainthread_cond);
127 	int i = _OMNI_NS(orbAsyncInvoker)->insert(&mtt); OMNIORB_ASSERT(i);
128 	mtt.wait();
129       }
130     }
131     pd_iop_s->SendReply();
132   }
133   else { // In process call
134 
135     if (pd_call_desc == &desc) {
136       // Fast case -- call descriptor can invoke directly on the servant
137       PostInvoker postinvoker(pd_postinvoke_hook);
138 
139       if (!pd_mainthread_mu) {
140 	desc.doLocalCall(servant);
141       }
142       else {
143 	// Main thread dispatch
144 	MainThreadTask mtt(servant, desc,
145 			   pd_mainthread_mu, pd_mainthread_cond);
146 	int i = _OMNI_NS(orbAsyncInvoker)->insert(&mtt); OMNIORB_ASSERT(i);
147 	mtt.wait();
148       }
149     }
150     else {
151       // Cannot call directly -- use a memory stream
152       if (omniORB::traceInvocations) {
153 	omniORB::logger l;
154 	l << "In process indirect call '" << desc.op() << "'\n";
155       }
156       cdrMemoryStream stream;
157       pd_call_desc->initialiseCall(stream);
158       pd_call_desc->marshalArguments(stream);
159       stream.clearValueTracker();
160 
161       if (omniORB::trace(30)) {
162         omniORB::logs(30, "Indirect call buffer:");
163         _OMNI_NS(giopStream)::dumpbuf((unsigned char*)stream.bufPtr(),
164                                       stream.bufSize());
165       }
166 
167       desc.unmarshalArguments(stream);
168       stream.clearValueTracker();
169 
170       try {
171 	PostInvoker postinvoker(pd_postinvoke_hook);
172 
173 	if (!pd_mainthread_mu) {
174 	  desc.doLocalCall(servant);
175 	}
176 	else {
177 	  // Main thread dispatch
178 	  MainThreadTask mtt(servant, desc,
179 			     pd_mainthread_mu, pd_mainthread_cond);
180 	  int i = _OMNI_NS(orbAsyncInvoker)->insert(&mtt); OMNIORB_ASSERT(i);
181 	  mtt.wait();
182 	}
183 	stream.rewindPtrs();
184 
185 	desc.marshalReturnedValues(stream);
186 	stream.clearValueTracker();
187 
188 	pd_call_desc->unmarshalReturnedValues(stream);
189       }
190 #ifdef HAS_Cplusplus_catch_exception_by_base
191       catch (CORBA::UserException& ex) {
192 	stream.rewindPtrs();
193 	stream.clearValueTracker();
194 	dealWithUserException(stream, pd_call_desc, ex);
195       }
196 #else
197       catch (omniORB::StubUserException& uex) {
198 	try {
199 	  CORBA::UserException& ex = *((CORBA::UserException*)uex.ex());
200 	  stream.rewindPtrs();
201 	  stream.clearValueTracker();
202 	  dealWithUserException(stream, pd_call_desc, ex);
203 	}
204 	catch (...) {
205 	  delete uex.ex();  // ?? Possible memory leak?
206 	  throw;
207 	}
208 	delete uex.ex();
209       }
210 #endif
211     }
212   }
213 }
214 
215 
216 static void
dealWithUserException(cdrMemoryStream & stream,omniCallDescriptor * desc,CORBA::UserException & ex)217 dealWithUserException(cdrMemoryStream& stream,
218 		      omniCallDescriptor* desc,
219 		      CORBA::UserException& ex)
220 {
221   int size;
222   const char* repoId = ex._NP_repoId(&size);
223 
224   if (omniORB::trace(25)) {
225     omniORB::logger l;
226     l << "Handling in-process user exception '" << repoId << "'\n";
227   }
228 
229   ex._NP_marshal(stream);
230   stream.clearValueTracker();
231   desc->userException(stream, 0, repoId);
232 }
233 
234 void
SkipRequestBody()235 omniCallHandle::SkipRequestBody()
236 {
237   if (pd_iop_s)
238     pd_iop_s->SkipRequestBody();
239 }
240 
241 
242 giopConnection*
connection()243 omniCallHandle::connection()
244 {
245   if (pd_iop_s) {
246     cdrStream&  stream  = pd_iop_s->getStream();
247     giopStream* gstream = giopStream::downcast(&stream);
248     if (gstream)
249       return gstream->strand().connection;
250   }
251   return 0;
252 }
253 
254 
255 const char*
myaddress()256 omniCallHandle::myaddress()
257 {
258   giopConnection* conn = connection();
259   return conn ? conn->myaddress() : 0;
260 }
261 
262 const char*
peeraddress()263 omniCallHandle::peeraddress()
264 {
265   giopConnection* conn = connection();
266   return conn ? conn->peeraddress() : 0;
267 }
268 
269 const char*
peeridentity()270 omniCallHandle::peeridentity()
271 {
272   giopConnection* conn = connection();
273   return conn ? conn->peeridentity() : 0;
274 }
275 
276 void*
peerdetails()277 omniCallHandle::peerdetails()
278 {
279   giopConnection* conn = connection();
280   return conn ? conn->peerdetails() : 0;
281 }
282 
283 
~PostInvokeHook()284 omniCallHandle::PostInvokeHook::~PostInvokeHook() {}
285 
286 
287 void
execute()288 MainThreadTask::execute()
289 {
290   if (omniORB::traceInvocations) {
291     omniORB::logger l;
292     l << "Main thread dispatch '" << pd_desc.op() << "'\n";
293   }
294 
295   try {
296     _OMNI_NS(poaCurrentStackInsert) insert(&pd_desc);
297     pd_desc.doLocalCall(pd_servant);
298   }
299 #ifdef HAS_Cplusplus_catch_exception_by_base
300   catch (CORBA::Exception& ex) {
301     pd_except = CORBA::Exception::_duplicate(&ex);
302   }
303 #else
304 #  define DUPLICATE_AND_STORE(name) \
305   catch (CORBA::name& ex) { \
306     pd_except = CORBA::Exception::_duplicate(&ex); \
307   }
308 
309   OMNIORB_FOR_EACH_SYS_EXCEPTION(DUPLICATE_AND_STORE)
310 #  undef DUPLICATE_AND_STORE
311 
312   catch (omniORB::StubUserException& uex) {
313     pd_except = CORBA::Exception::_duplicate(uex.ex());
314   }
315 #endif
316   catch (...) {
317     CORBA::UNKNOWN ex;
318     pd_except = CORBA::Exception::_duplicate(&ex);
319   }
320 
321   {
322     // Wake up the dispatch thread
323     omni_tracedmutex_lock l(*pd_mu);
324     pd_done = 1;
325     pd_cond->broadcast();
326   }
327 }
328 
329 void
wait()330 MainThreadTask::wait()
331 {
332   {
333     omni_tracedmutex_lock l(*pd_mu);
334     while (!pd_done)
335       pd_cond->wait();
336   }
337   if (pd_except) {
338     // This interesting construction contrives to ask the
339     // heap-allocated exception to throw a copy of itself, then
340     // deletes it.
341     try {
342       pd_except->_raise();
343     }
344     catch (...) {
345       delete pd_except;
346       throw;
347     }
348   }
349 }
350