1 #include <config.h>
2 #include <string.h>
3 #include <stdio.h>
4 #include <errno.h>
5 #include <glib.h>
6 #include <glib/gprintf.h>
7
8 #include "giop-private.h"
9 #include "giop-debug.h"
10 #include <orbit/GIOP/giop-types.h>
11 #include <orbit/GIOP/giop-recv-buffer.h>
12
13 #undef DEBUG
14
15 #ifdef G_ENABLE_DEBUG
16 void (*giop_debug_hook_unexpected_reply) (GIOPRecvBuffer *buf) = NULL;
17 void (*giop_debug_hook_spoofed_reply) (GIOPRecvBuffer *buf,
18 GIOPMessageQueueEntry *ent) = NULL;
19 void (*giop_debug_hook_incoming_mangler) (GIOPRecvBuffer *buf) = NULL;
20 #endif
21
22
23 #define MORE_FRAGMENTS_FOLLOW(buf) ((buf)->msg.header.flags & GIOP_FLAG_FRAGMENTED)
24
25 /*
26 * FIXME: pretty much this whole module,
27 * and all the giop-types headers should be
28 * auto-generated and generic ...
29 */
30
31 /* A list of GIOPMessageQueueEntrys */
32 static GList *giop_queued_messages = NULL;
33 static GMutex *giop_queued_messages_lock = NULL;
34
35 /* Don't do this genericaly, union's suck genericaly */
36 static gboolean
giop_GIOP_TargetAddress_demarshal(GIOPRecvBuffer * buf,GIOP_TargetAddress * value)37 giop_GIOP_TargetAddress_demarshal (GIOPRecvBuffer *buf,
38 GIOP_TargetAddress *value)
39 {
40 gboolean do_bswap = giop_msg_conversion_needed (buf);
41
42 buf->cur = ALIGN_ADDRESS(buf->cur, 2);
43 if ((buf->cur + 2) > buf->end)
44 return TRUE;
45 if (do_bswap)
46 value->_d = GUINT16_SWAP_LE_BE (
47 *(guint16 *) buf->cur);
48 else
49 value->_d = *(guint16 *) buf->cur;
50 buf->cur += 2;
51
52 switch (value->_d) {
53 case GIOP_KeyAddr:
54 buf->cur = ALIGN_ADDRESS (buf->cur, 4);
55
56 if ((buf->cur + 4) > buf->end)
57 return TRUE;
58 value->_u.object_key._release = CORBA_FALSE;
59 if (do_bswap)
60 value->_u.object_key._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
61 else
62 value->_u.object_key._length = *((guint32 *)buf->cur);
63 buf->cur += 4;
64 if ((buf->cur + value->_u.object_key._length) > buf->end ||
65 (buf->cur + value->_u.object_key._length) < buf->cur)
66 return TRUE;
67 value->_u.object_key._buffer = buf->cur;
68 buf->cur += value->_u.object_key._length;
69
70 break;
71 case GIOP_ProfileAddr:
72 g_warning ("XXX FIXME GIOP_ProfileAddr not handled");
73 return TRUE;
74 break;
75 case GIOP_ReferenceAddr:
76 g_warning ("XXX FIXME GIOP_ReferenceAddr not handled");
77 return TRUE;
78 break;
79 }
80
81 return FALSE;
82 }
83
84 static gboolean
giop_IOP_ServiceContextList_demarshal(GIOPRecvBuffer * buf,IOP_ServiceContextList * value)85 giop_IOP_ServiceContextList_demarshal (GIOPRecvBuffer *buf,
86 IOP_ServiceContextList *value)
87 {
88 return ORBit_demarshal_value (
89 TC_IOP_ServiceContextList,
90 (gpointer *)&value, buf, NULL);
91 }
92
93 static void
giop_IOP_ServiceContextList_free(IOP_ServiceContextList * value)94 giop_IOP_ServiceContextList_free (IOP_ServiceContextList *value)
95 {
96 if (value)
97 CORBA_free (value->_buffer);
98 }
99
100 static gboolean
giop_recv_buffer_demarshal_request_1_1(GIOPRecvBuffer * buf)101 giop_recv_buffer_demarshal_request_1_1(GIOPRecvBuffer *buf)
102 {
103 gboolean do_bswap = giop_msg_conversion_needed(buf);
104 CORBA_unsigned_long oplen;
105
106 buf->msg.u.request_1_1.service_context._buffer = NULL;
107 if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.request_1_1.service_context))
108 return TRUE;
109
110 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
111
112 if((buf->cur + 12) > buf->end)
113 return TRUE;
114
115 if(do_bswap)
116 buf->msg.u.request_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
117 else
118 buf->msg.u.request_1_1.request_id = *((guint32 *)buf->cur);
119 buf->cur += 4;
120 buf->msg.u.request_1_1.response_expected = *buf->cur;
121 buf->cur += 4;
122 if(do_bswap)
123 buf->msg.u.request_1_1.object_key._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
124 else
125 buf->msg.u.request_1_1.object_key._length = *((guint32 *)buf->cur);
126 buf->cur += 4;
127
128 if((buf->cur + buf->msg.u.request_1_1.object_key._length) > buf->end
129 || (buf->cur + buf->msg.u.request_1_1.object_key._length) < buf->cur)
130 return TRUE;
131
132 buf->msg.u.request_1_1.object_key._buffer = buf->cur;
133 buf->msg.u.request_1_1.object_key._release = CORBA_FALSE;
134
135 buf->cur += buf->msg.u.request_1_1.object_key._length;
136 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
137 if((buf->cur + 4) > buf->end)
138 return TRUE;
139 if(do_bswap)
140 oplen = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
141 else
142 oplen = *((guint32 *)buf->cur);
143 buf->cur += 4;
144
145 if((buf->cur + oplen) > buf->end
146 || (buf->cur + oplen) < buf->cur)
147 return TRUE;
148
149 buf->msg.u.request_1_1.operation = (CORBA_char *) buf->cur;
150 buf->cur += oplen;
151 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
152 if((buf->cur + 4) > buf->end)
153 return TRUE;
154
155 if(do_bswap)
156 buf->msg.u.request_1_1.requesting_principal._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
157 else
158 buf->msg.u.request_1_1.requesting_principal._length = *((guint32 *)buf->cur);
159
160 buf->cur += 4;
161 if((buf->cur + buf->msg.u.request_1_1.requesting_principal._length) > buf->end
162 || (buf->cur + buf->msg.u.request_1_1.requesting_principal._length) < buf->cur)
163 return TRUE;
164
165 buf->msg.u.request_1_1.requesting_principal._buffer = buf->cur;
166 buf->msg.u.request_1_1.requesting_principal._release = CORBA_FALSE;
167 buf->cur += buf->msg.u.request_1_1.requesting_principal._length;
168
169 return FALSE;
170 }
171
172 static gboolean
giop_recv_buffer_demarshal_request_1_2(GIOPRecvBuffer * buf)173 giop_recv_buffer_demarshal_request_1_2(GIOPRecvBuffer *buf)
174 {
175 gboolean do_bswap = giop_msg_conversion_needed(buf);
176 CORBA_unsigned_long oplen;
177
178 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
179
180 if((buf->cur + 8) > buf->end)
181 return TRUE;
182
183 if(do_bswap)
184 buf->msg.u.request_1_2.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
185 else
186 buf->msg.u.request_1_2.request_id = *((guint32 *) buf->cur);
187 buf->cur += 4;
188 buf->msg.u.request_1_2.response_flags = *buf->cur;
189 buf->cur += 4;
190
191 if(giop_GIOP_TargetAddress_demarshal(buf, &buf->msg.u.request_1_2.target))
192 return TRUE;
193
194 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
195 if((buf->cur + 4) > buf->end)
196 return TRUE;
197
198 if(do_bswap)
199 oplen = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
200 else
201 oplen = *((guint32 *)buf->cur);
202 buf->cur += 4;
203
204 if((buf->cur + oplen) > buf->end
205 || (buf->cur + oplen) < buf->cur)
206 return TRUE;
207
208 buf->msg.u.request_1_2.operation = (CORBA_char *) buf->cur;
209 buf->cur += oplen;
210
211 buf->msg.u.request_1_2.service_context._buffer = NULL;
212 if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.request_1_2.service_context))
213 return TRUE;
214 buf->cur = ALIGN_ADDRESS(buf->cur, 8);
215
216 return FALSE;
217 }
218
219 static gboolean
giop_recv_buffer_demarshal_reply_1_1(GIOPRecvBuffer * buf)220 giop_recv_buffer_demarshal_reply_1_1(GIOPRecvBuffer *buf)
221 {
222 gboolean do_bswap = giop_msg_conversion_needed(buf);
223
224 buf->msg.u.reply_1_1.service_context._buffer = NULL;
225 if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.reply_1_1.service_context))
226 return TRUE;
227 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
228 if((buf->cur + 8) > buf->end)
229 return TRUE;
230 if(do_bswap)
231 {
232 buf->msg.u.reply_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
233 buf->cur += 4;
234 buf->msg.u.reply_1_1.reply_status = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
235 }
236 else
237 {
238 buf->msg.u.reply_1_1.request_id = *((guint32 *)buf->cur);
239 buf->cur += 4;
240 buf->msg.u.reply_1_1.reply_status = *((guint32 *)buf->cur);
241 }
242 buf->cur += 4;
243
244 return FALSE;
245 }
246
247 static gboolean
giop_recv_buffer_demarshal_reply_1_2(GIOPRecvBuffer * buf)248 giop_recv_buffer_demarshal_reply_1_2(GIOPRecvBuffer *buf)
249 {
250 gboolean do_bswap = giop_msg_conversion_needed(buf);
251
252 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
253 if((buf->cur + 8) > buf->end)
254 return TRUE;
255 if(do_bswap)
256 {
257 buf->msg.u.reply_1_2.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
258 buf->cur += 4;
259 buf->msg.u.reply_1_2.reply_status = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
260 }
261 else
262 {
263 buf->msg.u.reply_1_2.request_id = *((guint32 *)buf->cur);
264 buf->cur += 4;
265 buf->msg.u.reply_1_2.reply_status = *((guint32 *)buf->cur);
266 }
267 buf->cur += 4;
268
269 buf->msg.u.reply_1_2.service_context._buffer = NULL;
270 if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.reply_1_2.service_context))
271 return TRUE;
272
273 buf->cur = ALIGN_ADDRESS(buf->cur, 8);
274 return FALSE;
275 }
276
277 static gboolean
giop_recv_buffer_demarshal_cancel(GIOPRecvBuffer * buf)278 giop_recv_buffer_demarshal_cancel(GIOPRecvBuffer *buf)
279 {
280 gboolean do_bswap = giop_msg_conversion_needed (buf);
281
282 buf->cur = ALIGN_ADDRESS (buf->cur, 4);
283 if ((buf->cur + 4) > buf->end)
284 return TRUE;
285 if (do_bswap)
286 buf->msg.u.cancel_request.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
287 else
288 buf->msg.u.cancel_request.request_id = *((guint32 *)buf->cur);
289 buf->cur += 4;
290
291 return FALSE;
292 }
293
294 static gboolean
giop_recv_buffer_demarshal_locate_request_1_1(GIOPRecvBuffer * buf)295 giop_recv_buffer_demarshal_locate_request_1_1(GIOPRecvBuffer *buf)
296 {
297 gboolean do_bswap = giop_msg_conversion_needed (buf);
298
299 buf->cur = ALIGN_ADDRESS (buf->cur, 4);
300
301 if ((buf->cur + 8) > buf->end)
302 return TRUE;
303
304 if (do_bswap)
305 buf->msg.u.locate_request_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
306 else
307 buf->msg.u.locate_request_1_1.request_id = *((guint32 *)buf->cur);
308 buf->cur += 4;
309 if (do_bswap)
310 buf->msg.u.locate_request_1_1.object_key._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
311 else
312 buf->msg.u.locate_request_1_1.object_key._length = *((guint32 *)buf->cur);
313 buf->cur += 4;
314
315 if ((buf->cur + buf->msg.u.locate_request_1_1.object_key._length) > buf->end ||
316 (buf->cur + buf->msg.u.locate_request_1_1.object_key._length) < buf->cur)
317 return TRUE;
318
319 buf->msg.u.locate_request_1_1.object_key._buffer = buf->cur;
320 buf->msg.u.locate_request_1_1.object_key._release = CORBA_FALSE;
321 buf->cur += buf->msg.u.locate_request_1_1.object_key._length;
322
323 return FALSE;
324 }
325
326 static gboolean
giop_recv_buffer_demarshal_locate_request_1_2(GIOPRecvBuffer * buf)327 giop_recv_buffer_demarshal_locate_request_1_2(GIOPRecvBuffer *buf)
328 {
329 gboolean do_bswap = giop_msg_conversion_needed(buf);
330
331 buf->cur = ALIGN_ADDRESS(buf->cur, 4);
332
333 if((buf->cur + 4) > buf->end)
334 return TRUE;
335
336 if(do_bswap)
337 buf->msg.u.locate_request_1_2.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
338 else
339 buf->msg.u.locate_request_1_2.request_id = *((guint32 *)buf->cur);
340 buf->cur += 4;
341
342 return giop_GIOP_TargetAddress_demarshal (buf, &buf->msg.u.locate_request_1_2.target);
343 }
344
345 static gboolean
giop_recv_buffer_demarshal_locate_reply_1_1(GIOPRecvBuffer * buf)346 giop_recv_buffer_demarshal_locate_reply_1_1(GIOPRecvBuffer *buf)
347 {
348 gboolean do_bswap = giop_msg_conversion_needed(buf);
349
350 buf->cur = ALIGN_ADDRESS (buf->cur, 4);
351
352 if ((buf->cur + 8) > buf->end)
353 return TRUE;
354
355 if (do_bswap) {
356 buf->msg.u.locate_reply_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
357 buf->cur += 4;
358 buf->msg.u.locate_reply_1_1.locate_status = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
359 } else {
360 buf->msg.u.locate_reply_1_1.request_id = *((guint32 *)buf->cur);
361 buf->cur += 4;
362 buf->msg.u.locate_reply_1_1.locate_status = *((guint32 *)buf->cur);
363 }
364 buf->cur += 4;
365
366 return FALSE;
367 }
368
369 static gboolean
giop_recv_buffer_demarshal_locate_reply_1_2(GIOPRecvBuffer * buf)370 giop_recv_buffer_demarshal_locate_reply_1_2 (GIOPRecvBuffer *buf)
371 {
372 return giop_recv_buffer_demarshal_locate_reply_1_1 (buf);
373 }
374
375 typedef gboolean (*GIOPDecodeFunc) (GIOPRecvBuffer *buf);
376
377 static gboolean
giop_recv_buffer_demarshal(GIOPRecvBuffer * buf)378 giop_recv_buffer_demarshal (GIOPRecvBuffer *buf)
379 {
380 GIOPDecodeFunc decode_func;
381 static const GIOPDecodeFunc decode_funcs [GIOP_NUM_MSG_TYPES] [GIOP_NUM_VERSIONS] = {
382 /* request */
383 { giop_recv_buffer_demarshal_request_1_1,
384 giop_recv_buffer_demarshal_request_1_1,
385 giop_recv_buffer_demarshal_request_1_2},
386 /* reply */
387 { giop_recv_buffer_demarshal_reply_1_1,
388 giop_recv_buffer_demarshal_reply_1_1,
389 giop_recv_buffer_demarshal_reply_1_2},
390 /* cancel request */
391 { giop_recv_buffer_demarshal_cancel,
392 giop_recv_buffer_demarshal_cancel,
393 giop_recv_buffer_demarshal_cancel},
394 /* locate request */
395 { giop_recv_buffer_demarshal_locate_request_1_1,
396 giop_recv_buffer_demarshal_locate_request_1_1,
397 giop_recv_buffer_demarshal_locate_request_1_2},
398 /* locate reply */
399 { giop_recv_buffer_demarshal_locate_reply_1_1,
400 giop_recv_buffer_demarshal_locate_reply_1_1,
401 giop_recv_buffer_demarshal_locate_reply_1_2},
402 /* close connection */
403 {NULL, NULL, NULL},
404 /* message error */
405 {NULL, NULL, NULL},
406 /* fragment */
407 {NULL, NULL, NULL}
408 };
409
410 if (buf->msg.header.message_type >= GIOP_NUM_MSG_TYPES)
411 return TRUE;
412
413 if (buf->giop_version >= GIOP_NUM_VERSIONS)
414 return TRUE;
415
416 decode_func = decode_funcs [buf->msg.header.message_type] [buf->giop_version];
417
418 if (decode_func)
419 return decode_func (buf);
420
421 return FALSE;
422 }
423
424 GIOPRecvBuffer *
giop_recv_buffer_use_encaps(guchar * mem,gulong len)425 giop_recv_buffer_use_encaps (guchar *mem, gulong len)
426 {
427 GIOPRecvBuffer *buf = giop_recv_buffer_use_buf (NULL);
428
429 buf->cur = buf->message_body = mem;
430 buf->end = buf->cur + len;
431 buf->msg.header.message_size = len;
432 buf->msg.header.flags = *(buf->cur++);
433 buf->giop_version = GIOP_LATEST;
434 buf->left_to_read = 0;
435 buf->state = GIOP_MSG_READY;
436 buf->free_body = FALSE;
437
438 return buf;
439 }
440
441 GIOPRecvBuffer *
giop_recv_buffer_use_encaps_buf(GIOPRecvBuffer * buf)442 giop_recv_buffer_use_encaps_buf (GIOPRecvBuffer *buf)
443 {
444 guchar *ptr;
445 CORBA_unsigned_long len;
446
447 buf->cur = ALIGN_ADDRESS (buf->cur, 4);
448
449 if ((buf->cur + 4) > buf->end)
450 return NULL;
451 len = *(CORBA_unsigned_long *) buf->cur;
452
453 if (giop_msg_conversion_needed (buf))
454 len = GUINT32_SWAP_LE_BE (len);
455
456 buf->cur += 4;
457 if ((buf->cur + len) > buf->end ||
458 (buf->cur + len) < buf->cur)
459 return NULL;
460
461 ptr = buf->cur;
462 buf->cur += len;
463
464 return giop_recv_buffer_use_encaps (ptr, len);
465 }
466
467 void
giop_recv_buffer_unuse(GIOPRecvBuffer * buf)468 giop_recv_buffer_unuse (GIOPRecvBuffer *buf)
469 {
470 if (!buf)
471 return;
472
473 if (buf->free_body) {
474 g_free (buf->message_body);
475 buf->message_body = NULL;
476 }
477
478 switch (buf->giop_version) {
479 case GIOP_1_0:
480 case GIOP_1_1:
481 switch (buf->msg.header.message_type) {
482 case GIOP_REPLY:
483 giop_IOP_ServiceContextList_free (
484 &buf->msg.u.reply_1_1.service_context);
485 break;
486 case GIOP_REQUEST:
487 giop_IOP_ServiceContextList_free (
488 &buf->msg.u.request_1_1.service_context);
489 break;
490 default:
491 break;
492 }
493 break;
494 case GIOP_1_2:
495 switch (buf->msg.header.message_type) {
496 case GIOP_REPLY:
497 giop_IOP_ServiceContextList_free (
498 &buf->msg.u.reply_1_2.service_context);
499 break;
500 case GIOP_REQUEST:
501 giop_IOP_ServiceContextList_free (
502 &buf->msg.u.request_1_2.service_context);
503 break;
504 default:
505 break;
506 }
507 break;
508 default:
509 break;
510 }
511 if(buf->connection)
512 giop_connection_unref (buf->connection);
513 g_free (buf);
514 }
515
516 static void
ent_lock(GIOPMessageQueueEntry * ent)517 ent_lock (GIOPMessageQueueEntry *ent)
518 {
519 if (ent->src_thread)
520 g_mutex_lock (ent->src_thread->lock);
521 }
522
523 static void
ent_unlock(GIOPMessageQueueEntry * ent)524 ent_unlock (GIOPMessageQueueEntry *ent)
525 {
526 if (ent->src_thread)
527 g_mutex_unlock (ent->src_thread->lock);
528 }
529
530 static void
giop_recv_destroy_queue_entry_T(GIOPMessageQueueEntry * ent)531 giop_recv_destroy_queue_entry_T (GIOPMessageQueueEntry *ent)
532 {
533 if (ent->cnx) {
534 giop_connection_unref (ent->cnx);
535 ent->cnx = NULL;
536 }
537 }
538
539 void
giop_recv_list_destroy_queue_entry(GIOPMessageQueueEntry * ent)540 giop_recv_list_destroy_queue_entry (GIOPMessageQueueEntry *ent)
541 {
542 LINK_MUTEX_LOCK (giop_queued_messages_lock);
543 #ifdef DEBUG
544 g_warning ("Remove XX:%p:(%p) - %d", ent, ent->async_cb,
545 g_list_length (giop_queued_messages));
546 #endif
547 giop_queued_messages = g_list_remove (giop_queued_messages, ent);
548 LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
549
550 giop_recv_destroy_queue_entry_T (ent);
551 }
552
553 void
giop_recv_list_setup_queue_entry(GIOPMessageQueueEntry * ent,GIOPConnection * cnx,CORBA_unsigned_long msg_type,CORBA_unsigned_long request_id)554 giop_recv_list_setup_queue_entry (GIOPMessageQueueEntry *ent,
555 GIOPConnection *cnx,
556 CORBA_unsigned_long msg_type,
557 CORBA_unsigned_long request_id)
558 {
559 ent->src_thread = giop_thread_self ();
560 ent->async_cb = NULL;
561
562 ent->cnx = giop_connection_ref (cnx);
563 ent->msg_type = msg_type;
564 ent->request_id = request_id;
565 ent->buffer = NULL;
566
567 LINK_MUTEX_LOCK (giop_queued_messages_lock);
568 #ifdef DEBUG
569 g_warning ("Push XX:%p:(%p) - %d", ent, ent->async_cb,
570 g_list_length (giop_queued_messages));
571 #endif
572 giop_queued_messages = g_list_prepend (giop_queued_messages, ent);
573 LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
574 }
575
576 void
giop_recv_list_setup_queue_entry_async(GIOPMessageQueueEntry * ent,GIOPAsyncCallback cb)577 giop_recv_list_setup_queue_entry_async (GIOPMessageQueueEntry *ent,
578 GIOPAsyncCallback cb)
579 {
580 g_return_if_fail (ent != NULL);
581
582 ent->async_cb = cb;
583 }
584
585 void
giop_recv_list_zap(GIOPConnection * cnx)586 giop_recv_list_zap (GIOPConnection *cnx)
587 {
588 GList *l, *next;
589 GSList *sl, *notify = NULL;
590
591 LINK_MUTEX_LOCK (giop_queued_messages_lock);
592
593 for (l = giop_queued_messages; l; l = next) {
594 GIOPMessageQueueEntry *ent = l->data;
595
596 next = l->next;
597
598 if (ent->cnx == cnx) {
599 ent_lock (ent);
600
601 dprintf (ERRORS, "Zap listener on dead cnx with buffer %p\n",
602 ent->buffer);
603
604 giop_recv_buffer_unuse (ent->buffer);
605 ent->buffer = NULL;
606
607 giop_recv_destroy_queue_entry_T (ent);
608
609 if (giop_thread_io () && !ent->async_cb)
610 giop_incoming_signal_T (ent->src_thread, GIOP_CLOSECONNECTION);
611 ent_unlock (ent);
612
613 if (ent->async_cb)
614 notify = g_slist_prepend (notify, ent);
615 giop_queued_messages = g_list_delete_link (
616 giop_queued_messages, l);
617 }
618 }
619
620 LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
621
622 for (sl = notify; sl; sl = sl->next) {
623 GIOPMessageQueueEntry *ent = sl->data;
624
625 if (!ent->async_cb) {
626 /* This should never happen */
627 g_warning ("Extraordinary recv list re-enterancy");
628 continue;
629 }
630
631 giop_invoke_async (ent);
632 }
633 g_slist_free (notify);
634 }
635
636 CORBA_unsigned_long
giop_recv_buffer_get_request_id(GIOPRecvBuffer * buf)637 giop_recv_buffer_get_request_id (GIOPRecvBuffer *buf)
638 {
639 static const glong reqid_offsets [GIOP_NUM_MSG_TYPES] [GIOP_NUM_VERSIONS] = {
640 /* GIOP_REQUEST */
641 { G_STRUCT_OFFSET(GIOPRecvBuffer,
642 msg.u.request_1_0.request_id),
643 G_STRUCT_OFFSET(GIOPRecvBuffer,
644 msg.u.request_1_1.request_id),
645 G_STRUCT_OFFSET(GIOPRecvBuffer,
646 msg.u.request_1_2.request_id)},
647 /* GIOP_REPLY */
648 { G_STRUCT_OFFSET(GIOPRecvBuffer,
649 msg.u.reply_1_0.request_id),
650 G_STRUCT_OFFSET(GIOPRecvBuffer,
651 msg.u.reply_1_1.request_id),
652 G_STRUCT_OFFSET(GIOPRecvBuffer,
653 msg.u.reply_1_2.request_id)},
654 /* GIOP_CANCELREQUEST */
655 { G_STRUCT_OFFSET(GIOPRecvBuffer,
656 msg.u.cancel_request.request_id),
657 G_STRUCT_OFFSET(GIOPRecvBuffer,
658 msg.u.cancel_request.request_id),
659 G_STRUCT_OFFSET(GIOPRecvBuffer,
660 msg.u.cancel_request.request_id)},
661 /* GIOP_LOCATEREQUEST */
662 { G_STRUCT_OFFSET(GIOPRecvBuffer,
663 msg.u.locate_request_1_0.request_id),
664 G_STRUCT_OFFSET(GIOPRecvBuffer,
665 msg.u.locate_request_1_1.request_id),
666 G_STRUCT_OFFSET(GIOPRecvBuffer,
667 msg.u.locate_request_1_2.request_id)},
668 /* GIOP_LOCATEREPLY */
669 { G_STRUCT_OFFSET(GIOPRecvBuffer,
670 msg.u.locate_reply_1_0.request_id),
671 G_STRUCT_OFFSET(GIOPRecvBuffer,
672 msg.u.locate_reply_1_1.request_id),
673 G_STRUCT_OFFSET(GIOPRecvBuffer,
674 msg.u.locate_reply_1_2.request_id)},
675 {0,0,0}, /* GIOP_CLOSECONNECTION */
676 {0,0,0}, /* GIOP_MESSAGEERROR */
677 {0,0,0} /* GIOP_FRAGMENT */
678 };
679 gulong offset;
680
681 offset = reqid_offsets [buf->msg.header.message_type] [buf->giop_version];
682 if (!offset)
683 return 0;
684
685 return G_STRUCT_MEMBER (CORBA_unsigned_long, buf, offset);
686 }
687
688 static inline gboolean
check_got(GIOPMessageQueueEntry * ent)689 check_got (GIOPMessageQueueEntry *ent)
690 {
691 return (ent->buffer ||
692 !ent->cnx ||
693 (ent->cnx->parent.status == LINK_DISCONNECTED) ||
694 (ent->cnx->parent.status == LINK_TIMEOUT));
695 }
696
697 GIOPRecvBuffer *
giop_recv_buffer_get(GIOPMessageQueueEntry * ent,gboolean * timeout)698 giop_recv_buffer_get (GIOPMessageQueueEntry *ent,
699 gboolean *timeout)
700 {
701 GIOPThread *tdata = NULL;
702
703 *timeout = FALSE;
704 tdata = giop_thread_self ();
705
706 thread_switch:
707 if (giop_thread_io ()) {
708 ent_lock (ent);
709
710 for (; !check_got (ent); ) {
711 if (!giop_thread_queue_empty_T (tdata)) {
712 ent_unlock (ent);
713 giop_thread_queue_process (tdata);
714 ent_lock (ent);
715 } else
716 g_cond_wait (tdata->incoming, tdata->lock);
717 }
718
719 ent_unlock (ent);
720
721 } else { /* non-threaded */
722
723 while (!ent->buffer && ent->cnx &&
724 (ent->cnx->parent.status != LINK_DISCONNECTED) &&
725 (ent->cnx->parent.status != LINK_TIMEOUT) &&
726 !giop_thread_io())
727 link_main_iteration (TRUE);
728
729 if (giop_thread_io())
730 goto thread_switch;
731 }
732
733 if (giop_thread_io()
734 && ent
735 && ent->cnx
736 && ent->cnx->parent.timeout_mutex) {
737 g_mutex_lock (ent->cnx->parent.timeout_mutex);
738 if (ent->cnx->parent.timeout_status == LINK_TIMEOUT_UNKNOWN) {
739 link_io_thread_remove_timeout (ent->cnx->parent.timeout_source_id);
740 ent->cnx->parent.timeout_source_id = 0;
741 ent->cnx->parent.timeout_status = LINK_TIMEOUT_NO;
742 giop_connection_unref (ent->cnx); // we remove the source so we must unref the connection
743 } else if (ent->cnx->parent.timeout_status == LINK_TIMEOUT_YES)
744 *timeout = TRUE;
745 g_mutex_unlock (ent->cnx->parent.timeout_mutex);
746 }
747
748 giop_thread_queue_tail_wakeup (tdata);
749 giop_recv_list_destroy_queue_entry (ent);
750
751 return ent->buffer;
752 }
753
754 ORBit_ObjectKey*
giop_recv_buffer_get_objkey(GIOPRecvBuffer * buf)755 giop_recv_buffer_get_objkey (GIOPRecvBuffer *buf)
756 {
757 switch (buf->msg.header.message_type) {
758 case GIOP_REQUEST:
759 switch (buf->msg.header.version [1]) {
760 case 0:
761 return &buf->msg.u.request_1_0.object_key;
762 break;
763 case 1:
764 return &buf->msg.u.request_1_1.object_key;
765 break;
766 case 2:
767 g_assert (buf->msg.u.request_1_2.target._d == GIOP_KeyAddr);
768 return &buf->msg.u.request_1_2.target._u.object_key;
769 break;
770 }
771 break;
772
773 case GIOP_LOCATEREQUEST:
774 switch (buf->msg.header.version [1]) {
775 case 0:
776 return &buf->msg.u.locate_request_1_0.object_key;
777 break;
778 case 1:
779 return &buf->msg.u.locate_request_1_1.object_key;
780 break;
781 case 2:
782 g_assert (buf->msg.u.locate_request_1_2.target._d == GIOP_KeyAddr);
783 return &buf->msg.u.locate_request_1_2.target._u.object_key;
784 break;
785 }
786 break;
787
788 default:
789 g_assert_not_reached ();
790 }
791
792 return NULL;
793 }
794
795 char *
giop_recv_buffer_get_opname(GIOPRecvBuffer * buf)796 giop_recv_buffer_get_opname (GIOPRecvBuffer *buf)
797 {
798 switch(buf->msg.header.version [1]) {
799 case 0:
800 return buf->msg.u.request_1_0.operation;
801 break;
802 case 1:
803 return buf->msg.u.request_1_1.operation;
804 break;
805 case 2:
806 return buf->msg.u.request_1_2.operation;
807 break;
808 }
809
810 return NULL;
811 }
812
813 void
giop_recv_buffer_init(void)814 giop_recv_buffer_init (void)
815 {
816 giop_queued_messages_lock = link_mutex_new ();
817 }
818
819 static void
giop_connection_add_frag(GIOPConnection * cnx,GIOPRecvBuffer * buf)820 giop_connection_add_frag (GIOPConnection *cnx,
821 GIOPRecvBuffer *buf)
822 {
823 cnx->incoming_frags = g_list_prepend (cnx->incoming_frags,
824 g_list_prepend (NULL, buf));
825 }
826
827 static GList *
giop_connection_get_frag(GIOPConnection * cnx,CORBA_unsigned_long request_id,gboolean return_first_if_none)828 giop_connection_get_frag (GIOPConnection *cnx,
829 CORBA_unsigned_long request_id,
830 gboolean return_first_if_none)
831 {
832 GList *l;
833
834 for (l = cnx->incoming_frags; l; l = l->next) {
835 GList *frags = l->data;
836
837 if (giop_recv_buffer_get_request_id (frags->data) == request_id)
838 return l->data;
839 }
840
841 /* This sucks, but it's prolly a GIOP-1.1 spec issue */
842 if (return_first_if_none && cnx->incoming_frags) {
843 static int warned = 0;
844 if (!warned++)
845 dprintf (MESSAGES, "GIOP-1.1 1 fragment set per cnx (?)");
846
847 return cnx->incoming_frags->data;
848 }
849
850 return NULL;
851 }
852
853 static void
giop_connection_remove_frag(GIOPConnection * cnx,GList * frags)854 giop_connection_remove_frag (GIOPConnection *cnx, GList *frags)
855 {
856 GList *l;
857
858 g_return_if_fail (frags != NULL);
859
860 for (l = frags->next; l; l = l->next)
861 giop_recv_buffer_unuse (l->data);
862
863 cnx->incoming_frags = g_list_remove (cnx->incoming_frags, frags);
864 g_list_free (frags);
865 }
866
867 void
giop_connection_destroy_frags(GIOPConnection * cnx)868 giop_connection_destroy_frags (GIOPConnection *cnx)
869 {
870 GList *l;
871
872 for (l = cnx->incoming_frags; l; l = l->next) {
873 GList *l2;
874
875 for (l2 = l->data; l2; l2 = l2->next)
876 giop_recv_buffer_unuse (l2->data);
877
878 g_list_free (l->data);
879 }
880 g_list_free (cnx->incoming_frags);
881 cnx->incoming_frags = NULL;
882 }
883
884 static gboolean
alloc_buffer(GIOPRecvBuffer * buf,gpointer old_alloc,gulong body_size)885 alloc_buffer (GIOPRecvBuffer *buf, gpointer old_alloc, gulong body_size)
886 {
887 buf->message_body = g_try_realloc (old_alloc, body_size + 12);
888
889 if (!buf->message_body)
890 return TRUE;
891
892 /*
893 * We assume that this is 8 byte aligned, for efficiency -
894 * so we can align to the memory address rather than the offset
895 * into the buffer.
896 */
897 g_assert (((gulong)buf->message_body & 0x3) == 0);
898 buf->free_body = TRUE;
899 buf->cur = buf->message_body + 12;
900 buf->end = buf->cur + body_size;
901 buf->left_to_read = body_size;
902
903 return FALSE;
904 }
905
906 static gboolean
concat_frags(GList * list)907 concat_frags (GList *list)
908 {
909 GList *l;
910 guchar *ptr;
911 gulong length = 0;
912 gulong initial_length;
913 gulong initial_offset;
914 GIOPRecvBuffer *head;
915
916 head = list->data;
917
918 length = head->msg.header.message_size;
919 initial_offset = (head->cur - head->message_body);
920 initial_length = (head->end - head->cur);
921
922 length += initial_offset - 12; /* include what we read of the header */
923
924 g_assert (head->free_body);
925
926 if (alloc_buffer (head, head->message_body, length)) {
927 dprintf (ERRORS, "failed to allocate fragment collation buffer");
928 return TRUE;
929 }
930
931 head->left_to_read = 0;
932 head->cur = head->message_body + initial_offset;
933
934 ptr = head->cur + initial_length;
935
936 for (l = list->next; l; l = l->next) {
937 gulong len;
938 GIOPRecvBuffer *buf = l->data;
939
940 len = buf->end - buf->cur;
941 memcpy (ptr, buf->cur, len);
942 ptr+= len;
943 }
944
945 head->end = ptr;
946
947 return FALSE;
948 }
949
950 static glong giop_initial_msg_size_limit = GIOP_INITIAL_MSG_SIZE_LIMIT;
951
952 void
giop_recv_set_limit(glong limit)953 giop_recv_set_limit (glong limit)
954 {
955 if (limit > 256) /* Something slightly sensible ? */
956 giop_initial_msg_size_limit = limit;
957 }
958
959 glong
giop_recv_get_limit(void)960 giop_recv_get_limit (void)
961 {
962 return giop_initial_msg_size_limit;
963 }
964
965 /**
966 * giop_recv_buffer_handle_fragmented:
967 * @buf: pointer to recv buffer pointer
968 * @cnx: current connection.
969 *
970 * This will append @buf to the right list of buffers
971 * on the connection, forming a complete message, and
972 * re-write *@buf to the first buffer in the chain.
973 *
974 * Return value: TRUE on error else FALSE
975 **/
976 static gboolean
giop_recv_buffer_handle_fragmented(GIOPRecvBuffer ** ret_buf,GIOPConnection * cnx)977 giop_recv_buffer_handle_fragmented (GIOPRecvBuffer **ret_buf,
978 GIOPConnection *cnx)
979 {
980 GList *list;
981 gboolean giop_1_1;
982 gboolean error = FALSE;
983 CORBA_long message_id;
984 GIOPRecvBuffer *buf = *ret_buf;
985
986 giop_1_1 = (buf->giop_version == GIOP_1_1);
987
988 switch (buf->msg.header.message_type) {
989 case GIOP_REPLY:
990 case GIOP_LOCATEREPLY:
991 case GIOP_REQUEST:
992 case GIOP_LOCATEREQUEST:
993 message_id = giop_recv_buffer_get_request_id (buf);
994 break;
995 case GIOP_FRAGMENT:
996 if (!giop_1_1) {
997 buf->cur = ALIGN_ADDRESS (buf->cur, 4);
998
999 if ((buf->cur + 4) > buf->end) {
1000 dprintf (ERRORS, "incoming bogus fragment length");
1001 return TRUE;
1002 }
1003 if (giop_msg_conversion_needed (buf))
1004 message_id = GUINT32_SWAP_LE_BE (*((guint32 *)buf->cur));
1005 else
1006 message_id = *(guint32 *) buf->cur;
1007 buf->cur += 4;
1008 } else
1009 message_id = 0;
1010 break;
1011 default:
1012 dprintf (ERRORS, "Bogus fragment packet type %d",
1013 buf->msg.header.message_type);
1014 return TRUE;
1015 }
1016
1017 if (!(list = giop_connection_get_frag (cnx, message_id, giop_1_1))) {
1018 if (!MORE_FRAGMENTS_FOLLOW (buf))
1019 return TRUE;
1020
1021 giop_connection_add_frag (cnx, buf);
1022
1023 } else {
1024 GIOPRecvBuffer *head = list->data;
1025
1026 *ret_buf = head;
1027 g_assert (head->msg.header.message_type != GIOP_FRAGMENT);
1028
1029 /* track total length on head node */
1030 /* (end - cur) to account for fragment (msg id) header */
1031 head->msg.header.message_size += (buf->end - buf->cur);
1032
1033 list = g_list_append (list, buf);
1034
1035 if (!cnx->parent.is_auth &&
1036 buf->msg.header.message_size > giop_initial_msg_size_limit) {
1037 dprintf (ERRORS, "Message exceeded initial size limit\n");
1038 error = TRUE;
1039 giop_connection_remove_frag (cnx, list);
1040 }
1041
1042 if (!MORE_FRAGMENTS_FOLLOW (buf)) {
1043 g_assert (buf->msg.header.message_type == GIOP_FRAGMENT);
1044
1045 /* concat all fragments - re-write & continue */
1046 error = concat_frags (list);
1047
1048 giop_connection_remove_frag (cnx, list);
1049 }
1050 }
1051
1052 return error;
1053 }
1054
1055 static gboolean
handle_reply(GIOPRecvBuffer * buf)1056 handle_reply (GIOPRecvBuffer *buf)
1057 {
1058 GList *l;
1059 gboolean error;
1060 GIOPMessageQueueEntry *ent;
1061 CORBA_unsigned_long request_id;
1062
1063 request_id = giop_recv_buffer_get_request_id (buf);
1064
1065 error = FALSE;
1066
1067 LINK_MUTEX_LOCK (giop_queued_messages_lock);
1068
1069 for (l = giop_queued_messages; l; l = l->next) {
1070 ent = l->data;
1071
1072 if (ent->request_id == request_id &&
1073 ent->msg_type == buf->msg.header.message_type)
1074 break;
1075 }
1076
1077 ent = l ? l->data : NULL;
1078
1079 if (!ent) {
1080 if (giop_recv_buffer_reply_status (buf) ==
1081 CORBA_SYSTEM_EXCEPTION) {
1082 /*
1083 * Unexpected - but sometimes a oneway
1084 * method invocation on a de-activated
1085 * object results in us getting a bogus
1086 * system exception in reply.
1087 */
1088 } else {
1089 #ifdef G_ENABLE_DEBUG
1090 if (giop_debug_hook_unexpected_reply)
1091 giop_debug_hook_unexpected_reply (buf);
1092 else
1093 dprintf (ERRORS, "We received an unexpected reply\n");
1094 #endif /* G_ENABLE_DEBUG */
1095 error = TRUE;
1096 }
1097
1098 } else if (ent->cnx != buf->connection) {
1099 #ifdef G_ENABLE_DEBUG
1100 if (giop_debug_hook_spoofed_reply)
1101 giop_debug_hook_spoofed_reply (buf, ent);
1102 #endif
1103 dprintf (ERRORS, "We received a bogus reply\n");
1104
1105 error = TRUE;
1106
1107 } else {
1108 #ifdef DEBUG
1109 g_warning ("Pop XX:%p:%p - %d",
1110 ent, ent->async_cb,
1111 g_list_length (giop_queued_messages));
1112 #endif
1113 giop_queued_messages = g_list_delete_link
1114 (giop_queued_messages, l);
1115 }
1116
1117 LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
1118
1119 if (ent && !error) {
1120 gboolean async = FALSE;
1121
1122 ent_lock (ent);
1123 ent->buffer = buf;
1124
1125 if (giop_thread_io () && !ent->async_cb)
1126 giop_incoming_signal_T (ent->src_thread,
1127 GIOP_REPLY);
1128
1129 else if (ent->async_cb)
1130 async = TRUE;
1131
1132 ent_unlock (ent);
1133
1134 if (async)
1135 giop_invoke_async (ent);
1136
1137 buf = NULL;
1138 }
1139
1140 giop_recv_buffer_unuse (buf);
1141
1142 return error;
1143 }
1144
1145 static gboolean
giop_recv_msg_reading_body(GIOPRecvBuffer * buf,gboolean is_auth)1146 giop_recv_msg_reading_body (GIOPRecvBuffer *buf,
1147 gboolean is_auth)
1148 {
1149 dprintf (GIOP, "Incoming IIOP header:\n");
1150
1151 do_giop_dump (stderr, (guint8 *) &buf->msg.header, 12, 0);
1152
1153 /* Check the header */
1154 if (memcmp (buf->msg.header.magic, "GIOP", 4))
1155 return TRUE;
1156
1157 if (buf->msg.header.message_type >= GIOP_NUM_MSG_TYPES)
1158 return TRUE;
1159
1160 switch (buf->msg.header.version [0]) {
1161 case 1:
1162 switch (buf->msg.header.version [1]) {
1163 case 0:
1164 buf->giop_version = GIOP_1_0;
1165 break;
1166 case 1:
1167 buf->giop_version = GIOP_1_1;
1168 break;
1169 case 2:
1170 buf->giop_version = GIOP_1_2;
1171 break;
1172 default:
1173 return TRUE;
1174 break;
1175 }
1176 break;
1177 default:
1178 return TRUE;
1179 break;
1180 }
1181
1182 if ((buf->msg.header.flags & GIOP_FLAG_LITTLE_ENDIAN) != GIOP_FLAG_ENDIANNESS)
1183 buf->msg.header.message_size = GUINT32_SWAP_LE_BE (buf->msg.header.message_size);
1184
1185 /* NB. at least CLOSECONNECTION has 0 length message_size */
1186
1187 if (!is_auth && buf->msg.header.message_size > giop_initial_msg_size_limit) {
1188 dprintf (ERRORS, "Message exceeded unauthorized size limit\n");
1189 return TRUE;
1190 }
1191
1192 if (alloc_buffer (buf, NULL, buf->msg.header.message_size))
1193 return TRUE;
1194
1195 return FALSE;
1196 }
1197
1198 /*
1199 * FIXME: we should definately handle things more asynchronously,
1200 * perhaps even at the expense of having to go to the GSource
1201 * twice in order to get fresh input (?)
1202 * or should we poll ourselves on the source to see what's up?
1203 *
1204 * The whole locking concept here looks broken to me,
1205 * especially since 'read' can flag the connection disconnected
1206 * giving a nice deadlock.
1207 */
1208 gboolean
giop_connection_handle_input(LinkConnection * lcnx)1209 giop_connection_handle_input (LinkConnection *lcnx)
1210 {
1211 GIOPRecvBuffer *buf;
1212 GIOPConnection *cnx = (GIOPConnection *) lcnx;
1213
1214 do {
1215 int n;
1216
1217 if (!cnx->incoming_msg)
1218 cnx->incoming_msg = giop_recv_buffer_use_buf (cnx);
1219
1220 buf = cnx->incoming_msg;
1221
1222 n = link_connection_read (
1223 lcnx, buf->cur, buf->left_to_read, FALSE);
1224
1225 if (n == 0) /* We'll be back */
1226 return TRUE;
1227
1228 if (n < 0 || !buf->left_to_read) /* HUP */
1229 goto msg_error;
1230
1231 /* fprintf (stderr, "Read %d\n", n);
1232 giop_dump (stderr, buf->cur, n, 0); */
1233
1234 buf->left_to_read -= n;
1235 buf->cur += n;
1236
1237 if (buf->left_to_read == 0) {
1238
1239 #ifdef G_ENABLE_DEBUG
1240 if (giop_debug_hook_incoming_mangler)
1241 giop_debug_hook_incoming_mangler (buf);
1242 #endif
1243
1244 switch (buf->state) {
1245
1246 case GIOP_MSG_READING_HEADER:
1247 if (giop_recv_msg_reading_body (buf, cnx->parent.is_auth)) {
1248 dprintf (ERRORS, "OOB incoming msg header data\n");
1249 goto msg_error;
1250 }
1251 buf->state = GIOP_MSG_READING_BODY;
1252 break;
1253
1254 case GIOP_MSG_READING_BODY: {
1255
1256 dprintf (GIOP, "Incoming IIOP body:\n");
1257
1258 buf->cur = buf->message_body + 12;
1259 if ((buf->cur + buf->msg.header.message_size) > buf->end) {
1260 dprintf (ERRORS, "broken incoming length data\n");
1261 goto msg_error;
1262 }
1263 do_giop_dump (stderr, buf->cur, buf->msg.header.message_size, 12);
1264
1265 buf->state = GIOP_MSG_READY;
1266
1267 if (giop_recv_buffer_demarshal (buf)) {
1268 dprintf (ERRORS, "broken incoming header data\n");
1269 goto msg_error;
1270 }
1271
1272 if (MORE_FRAGMENTS_FOLLOW (buf)) {
1273 if (giop_recv_buffer_handle_fragmented (&buf, cnx))
1274 goto msg_error;
1275
1276 else {
1277 cnx->incoming_msg = NULL;
1278 goto frag_out;
1279 }
1280
1281 } else if (buf->msg.header.message_type == GIOP_FRAGMENT) {
1282 if (giop_recv_buffer_handle_fragmented (&buf, cnx))
1283 goto msg_error;
1284 /* else last fragment */
1285 }
1286 break;
1287 }
1288
1289 case GIOP_MSG_AWAITING_FRAGMENTS:
1290 case GIOP_MSG_READY:
1291 g_assert_not_reached ();
1292 break;
1293 }
1294 }
1295
1296 } while (cnx->incoming_msg &&
1297 buf->left_to_read > 0 &&
1298 buf->state != GIOP_MSG_READY);
1299
1300 cnx->incoming_msg = NULL;
1301
1302 switch (buf->msg.header.message_type) {
1303 case GIOP_REPLY:
1304 case GIOP_LOCATEREPLY:
1305 dprintf (MESSAGES, "handling reply\n");
1306 if (handle_reply (buf)) /* dodgy inbound data, pull the cnx */
1307 link_connection_state_changed (lcnx, LINK_DISCONNECTED);
1308 break;
1309
1310 case GIOP_REQUEST:
1311 dprintf (MESSAGES, "handling request\n");
1312 ORBit_handle_request (cnx->orb_data, buf);
1313 break;
1314
1315 case GIOP_LOCATEREQUEST:
1316 dprintf (MESSAGES, "handling locate request\n");
1317 ORBit_handle_locate_request (cnx->orb_data, buf);
1318 break;
1319
1320 case GIOP_CANCELREQUEST:
1321 case GIOP_MESSAGEERROR:
1322 dprintf (ERRORS, "dropping an unusual & unhandled input buffer 0x%x",
1323 buf->msg.header.message_type);
1324 giop_recv_buffer_unuse (buf);
1325 break;
1326
1327 case GIOP_CLOSECONNECTION:
1328 dprintf (MESSAGES, "received close connection\n");
1329 giop_recv_buffer_unuse (buf);
1330 link_connection_state_changed (lcnx, LINK_DISCONNECTED);
1331 break;
1332
1333 case GIOP_FRAGMENT:
1334 dprintf (ERRORS, "Fragment got in the wrong channel\n");
1335 default:
1336 dprintf (ERRORS, "dropping an out of bound input buffer "
1337 "on the floor 0x%x\n", buf->msg.header.message_type);
1338 goto msg_error;
1339 break;
1340 }
1341
1342 frag_out:
1343 return TRUE;
1344
1345 msg_error:
1346 cnx->incoming_msg = NULL;
1347
1348 buf->msg.header.message_type = GIOP_MESSAGEERROR;
1349 buf->msg.header.message_size = 0;
1350
1351 giop_recv_buffer_unuse (buf);
1352
1353 /* Zap it for badness.
1354 * XXX We should probably handle oversized
1355 * messages more graciously XXX */
1356 link_connection_state_changed (LINK_CONNECTION (cnx),
1357 LINK_DISCONNECTED);
1358
1359 return TRUE;
1360 }
1361
1362 static gboolean
giop_timeout(gpointer data)1363 giop_timeout (gpointer data)
1364 {
1365 gboolean retv = FALSE;
1366 GIOPConnection *cnx = (GIOPConnection*)data;
1367 LinkConnection *lcnx = LINK_CONNECTION (cnx);
1368 GIOPThread *tdata = (GIOPThread *)lcnx->tdata;
1369
1370 g_assert (lcnx->timeout_mutex);
1371
1372 if (lcnx->status == LINK_DISCONNECTED) {
1373 giop_connection_unref (cnx); // we remove the source so we must unref cnx
1374 goto out;
1375 }
1376
1377 g_mutex_lock (lcnx->timeout_mutex);
1378 if (lcnx->timeout_status == LINK_TIMEOUT_UNKNOWN) {
1379 lcnx->timeout_source_id = 0;
1380 lcnx->timeout_status = LINK_TIMEOUT_YES;
1381 } else {
1382 g_mutex_unlock (lcnx->timeout_mutex);
1383 retv = TRUE; // do not remove the source - the one who sets timeout_status will do that
1384 goto out;
1385 }
1386 g_mutex_unlock (lcnx->timeout_mutex);
1387
1388 link_connection_state_changed (lcnx, LINK_TIMEOUT);
1389
1390 g_mutex_lock (tdata->lock); /* ent_lock */
1391 giop_incoming_signal_T (tdata, GIOP_CLOSECONNECTION);
1392 g_mutex_unlock (tdata->lock); /* ent_lock */
1393
1394 giop_connection_unref (cnx); // we remove the source so we must unref cnx
1395
1396 out:
1397 return retv;
1398 }
1399
1400 void
giop_timeout_add(GIOPConnection * cnx)1401 giop_timeout_add (GIOPConnection *cnx)
1402 {
1403 static GStaticMutex static_mutex = G_STATIC_MUTEX_INIT;
1404 LinkConnection *lcnx = LINK_CONNECTION (cnx);
1405
1406 if (!giop_thread_io ())
1407 return;
1408 if (!lcnx->timeout_msec)
1409 return;
1410
1411 g_static_mutex_lock (&static_mutex);
1412 if (lcnx->timeout_source_id)
1413 goto out;
1414
1415 giop_connection_ref (cnx); // to be unref'ed by the one who removes the timeout source
1416
1417 if (!lcnx->timeout_mutex)
1418 lcnx->timeout_mutex = g_mutex_new ();
1419
1420 g_mutex_lock (lcnx->timeout_mutex);
1421 lcnx->timeout_status = LINK_TIMEOUT_UNKNOWN;
1422 g_mutex_unlock (lcnx->timeout_mutex);
1423
1424 lcnx->tdata = giop_thread_self ();
1425
1426 lcnx->timeout_source_id = link_io_thread_add_timeout (lcnx->timeout_msec, giop_timeout, (gpointer)cnx);
1427
1428 out:
1429 g_static_mutex_unlock (&static_mutex);
1430 }
1431
1432 GIOPRecvBuffer *
giop_recv_buffer_use_buf(GIOPConnection * cnx)1433 giop_recv_buffer_use_buf (GIOPConnection *cnx)
1434 {
1435 GIOPRecvBuffer *buf = NULL;
1436
1437 if(cnx)
1438 giop_connection_ref (cnx);
1439
1440 buf = g_new0 (GIOPRecvBuffer, 1);
1441
1442 buf->state = GIOP_MSG_READING_HEADER;
1443 buf->cur = (guchar *)&buf->msg.header;
1444 buf->left_to_read = 12;
1445 buf->connection = cnx;
1446
1447 return buf;
1448 }
1449