1 // -*- Mode: C++; -*-
2 // Package : omniORB
3 // callHandle.cc Created on: 16/05/2001
4 // Author : Duncan Grisby (dpg1)
5 //
6 // Copyright (C) 2003-2012 Apasphere Ltd
7 // Copyright (C) 2001 AT&T Laboratories Cambridge
8 //
9 // This file is part of the omniORB library
10 //
11 // The omniORB library is free software; you can redistribute it and/or
12 // modify it under the terms of the GNU Lesser General Public
13 // License as published by the Free Software Foundation; either
14 // version 2.1 of the License, or (at your option) any later version.
15 //
16 // This library is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 // Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public
22 // License along with this library. If not, see http://www.gnu.org/licenses/
23 //
24 //
25 // Description:
26 //
27 // Call handle used during remote or in-process operation dispatch.
28
29 #include <omniORB4/CORBA.h>
30 #include <omniORB4/omniORB.h>
31 #include <omniORB4/callHandle.h>
32 #include <omniORB4/callDescriptor.h>
33 #include <omniORB4/omniServant.h>
34 #include <omniORB4/IOP_C.h>
35 #include <poacurrentimpl.h>
36 #include <invoker.h>
37 #include <giopStream.h>
38 #include <giopStrand.h>
39 #include <giopRope.h>
40 #include <omniORB4/giopEndpoint.h>
41
42 OMNI_USING_NAMESPACE(omni)
43
44
45 static void
46 dealWithUserException(cdrMemoryStream& stream,
47 omniCallDescriptor* desc,
48 CORBA::UserException& ex);
49
50
51 #ifdef HAS_Cplusplus_Namespace
52 namespace {
53 #endif
54 class PostInvoker {
55 public:
PostInvoker(omniCallHandle::PostInvokeHook * hook)56 inline PostInvoker(omniCallHandle::PostInvokeHook* hook)
57 : pd_hook(hook) {}
~PostInvoker()58 inline ~PostInvoker() {
59 if (pd_hook)
60 pd_hook->postinvoke();
61 }
62 private:
63 omniCallHandle::PostInvokeHook* pd_hook;
64 };
65
66 class MainThreadTask : public omniTask {
67 public:
MainThreadTask(omniServant * servant,omniCallDescriptor & desc,omni_tracedmutex * mu,omni_tracedcondition * cond)68 inline MainThreadTask(omniServant* servant, omniCallDescriptor& desc,
69 omni_tracedmutex* mu, omni_tracedcondition* cond)
70 : omniTask(omniTask::DedicatedThread),
71 pd_servant(servant),
72 pd_desc(desc),
73 pd_mu(mu),
74 pd_cond(cond),
75 pd_except(0),
76 pd_done(0)
77 {
78 if (omniORB::trace(25)) {
79 omniORB::logger l;
80 l << "Preparing to dispatch '" << desc.op() << "' to main thread\n";
81 }
82 }
83
84 void execute();
85 // Called by the async invoker. Performs the upcall. If an
86 // exception occurs, places a copy in pd_except.
87
88 void wait();
89 // Wait for execute() to finish. Throws the exception in pd_except
90 // if there is one.
91
92 private:
93 omniServant* pd_servant;
94 omniCallDescriptor& pd_desc;
95 omni_tracedmutex* pd_mu;
96 omni_tracedcondition* pd_cond;
97 CORBA::Exception* pd_except;
98 int pd_done;
99 };
100
101 #ifdef HAS_Cplusplus_Namespace
102 }
103 #endif
104
105
106 void
upcall(omniServant * servant,omniCallDescriptor & desc)107 omniCallHandle::upcall(omniServant* servant, omniCallDescriptor& desc)
108 {
109 OMNIORB_ASSERT(pd_localId);
110 desc.poa(pd_poa);
111 desc.localId(pd_localId);
112
113 _OMNI_NS(poaCurrentStackInsert) insert(&desc, pd_self_thread);
114
115 if (pd_iop_s) { // Remote call
116 pd_iop_s->ReceiveRequest(desc);
117 {
118 PostInvoker postinvoker(pd_postinvoke_hook);
119
120 if (!pd_mainthread_mu) {
121 desc.doLocalCall(servant);
122 }
123 else {
124 // Main thread dispatch
125 MainThreadTask mtt(servant, desc,
126 pd_mainthread_mu, pd_mainthread_cond);
127 int i = _OMNI_NS(orbAsyncInvoker)->insert(&mtt); OMNIORB_ASSERT(i);
128 mtt.wait();
129 }
130 }
131 pd_iop_s->SendReply();
132 }
133 else { // In process call
134
135 if (pd_call_desc == &desc) {
136 // Fast case -- call descriptor can invoke directly on the servant
137 PostInvoker postinvoker(pd_postinvoke_hook);
138
139 if (!pd_mainthread_mu) {
140 desc.doLocalCall(servant);
141 }
142 else {
143 // Main thread dispatch
144 MainThreadTask mtt(servant, desc,
145 pd_mainthread_mu, pd_mainthread_cond);
146 int i = _OMNI_NS(orbAsyncInvoker)->insert(&mtt); OMNIORB_ASSERT(i);
147 mtt.wait();
148 }
149 }
150 else {
151 // Cannot call directly -- use a memory stream
152 if (omniORB::traceInvocations) {
153 omniORB::logger l;
154 l << "In process indirect call '" << desc.op() << "'\n";
155 }
156 cdrMemoryStream stream;
157 pd_call_desc->initialiseCall(stream);
158 pd_call_desc->marshalArguments(stream);
159 stream.clearValueTracker();
160
161 if (omniORB::trace(30)) {
162 omniORB::logs(30, "Indirect call buffer:");
163 _OMNI_NS(giopStream)::dumpbuf((unsigned char*)stream.bufPtr(),
164 stream.bufSize());
165 }
166
167 desc.unmarshalArguments(stream);
168 stream.clearValueTracker();
169
170 try {
171 PostInvoker postinvoker(pd_postinvoke_hook);
172
173 if (!pd_mainthread_mu) {
174 desc.doLocalCall(servant);
175 }
176 else {
177 // Main thread dispatch
178 MainThreadTask mtt(servant, desc,
179 pd_mainthread_mu, pd_mainthread_cond);
180 int i = _OMNI_NS(orbAsyncInvoker)->insert(&mtt); OMNIORB_ASSERT(i);
181 mtt.wait();
182 }
183 stream.rewindPtrs();
184
185 desc.marshalReturnedValues(stream);
186 stream.clearValueTracker();
187
188 pd_call_desc->unmarshalReturnedValues(stream);
189 }
190 #ifdef HAS_Cplusplus_catch_exception_by_base
191 catch (CORBA::UserException& ex) {
192 stream.rewindPtrs();
193 stream.clearValueTracker();
194 dealWithUserException(stream, pd_call_desc, ex);
195 }
196 #else
197 catch (omniORB::StubUserException& uex) {
198 try {
199 CORBA::UserException& ex = *((CORBA::UserException*)uex.ex());
200 stream.rewindPtrs();
201 stream.clearValueTracker();
202 dealWithUserException(stream, pd_call_desc, ex);
203 }
204 catch (...) {
205 delete uex.ex(); // ?? Possible memory leak?
206 throw;
207 }
208 delete uex.ex();
209 }
210 #endif
211 }
212 }
213 }
214
215
216 static void
dealWithUserException(cdrMemoryStream & stream,omniCallDescriptor * desc,CORBA::UserException & ex)217 dealWithUserException(cdrMemoryStream& stream,
218 omniCallDescriptor* desc,
219 CORBA::UserException& ex)
220 {
221 int size;
222 const char* repoId = ex._NP_repoId(&size);
223
224 if (omniORB::trace(25)) {
225 omniORB::logger l;
226 l << "Handling in-process user exception '" << repoId << "'\n";
227 }
228
229 ex._NP_marshal(stream);
230 stream.clearValueTracker();
231 desc->userException(stream, 0, repoId);
232 }
233
234 void
SkipRequestBody()235 omniCallHandle::SkipRequestBody()
236 {
237 if (pd_iop_s)
238 pd_iop_s->SkipRequestBody();
239 }
240
241
242 giopConnection*
connection()243 omniCallHandle::connection()
244 {
245 if (pd_iop_s) {
246 cdrStream& stream = pd_iop_s->getStream();
247 giopStream* gstream = giopStream::downcast(&stream);
248 if (gstream)
249 return gstream->strand().connection;
250 }
251 return 0;
252 }
253
254
255 const char*
myaddress()256 omniCallHandle::myaddress()
257 {
258 giopConnection* conn = connection();
259 return conn ? conn->myaddress() : 0;
260 }
261
262 const char*
peeraddress()263 omniCallHandle::peeraddress()
264 {
265 giopConnection* conn = connection();
266 return conn ? conn->peeraddress() : 0;
267 }
268
269 const char*
peeridentity()270 omniCallHandle::peeridentity()
271 {
272 giopConnection* conn = connection();
273 return conn ? conn->peeridentity() : 0;
274 }
275
276 void*
peerdetails()277 omniCallHandle::peerdetails()
278 {
279 giopConnection* conn = connection();
280 return conn ? conn->peerdetails() : 0;
281 }
282
283
~PostInvokeHook()284 omniCallHandle::PostInvokeHook::~PostInvokeHook() {}
285
286
287 void
execute()288 MainThreadTask::execute()
289 {
290 if (omniORB::traceInvocations) {
291 omniORB::logger l;
292 l << "Main thread dispatch '" << pd_desc.op() << "'\n";
293 }
294
295 try {
296 _OMNI_NS(poaCurrentStackInsert) insert(&pd_desc);
297 pd_desc.doLocalCall(pd_servant);
298 }
299 #ifdef HAS_Cplusplus_catch_exception_by_base
300 catch (CORBA::Exception& ex) {
301 pd_except = CORBA::Exception::_duplicate(&ex);
302 }
303 #else
304 # define DUPLICATE_AND_STORE(name) \
305 catch (CORBA::name& ex) { \
306 pd_except = CORBA::Exception::_duplicate(&ex); \
307 }
308
309 OMNIORB_FOR_EACH_SYS_EXCEPTION(DUPLICATE_AND_STORE)
310 # undef DUPLICATE_AND_STORE
311
312 catch (omniORB::StubUserException& uex) {
313 pd_except = CORBA::Exception::_duplicate(uex.ex());
314 }
315 #endif
316 catch (...) {
317 CORBA::UNKNOWN ex;
318 pd_except = CORBA::Exception::_duplicate(&ex);
319 }
320
321 {
322 // Wake up the dispatch thread
323 omni_tracedmutex_lock l(*pd_mu);
324 pd_done = 1;
325 pd_cond->broadcast();
326 }
327 }
328
329 void
wait()330 MainThreadTask::wait()
331 {
332 {
333 omni_tracedmutex_lock l(*pd_mu);
334 while (!pd_done)
335 pd_cond->wait();
336 }
337 if (pd_except) {
338 // This interesting construction contrives to ask the
339 // heap-allocated exception to throw a copy of itself, then
340 // deletes it.
341 try {
342 pd_except->_raise();
343 }
344 catch (...) {
345 delete pd_except;
346 throw;
347 }
348 }
349 }
350