1 #include <config.h>
2 #include <string.h>
3 #include <stdio.h>
4 #include <errno.h>
5 #include <glib.h>
6 #include <glib/gprintf.h>
7 
8 #include "giop-private.h"
9 #include "giop-debug.h"
10 #include <orbit/GIOP/giop-types.h>
11 #include <orbit/GIOP/giop-recv-buffer.h>
12 
13 #undef DEBUG
14 
15 #ifdef G_ENABLE_DEBUG
16 void (*giop_debug_hook_unexpected_reply) (GIOPRecvBuffer        *buf) = NULL;
17 void (*giop_debug_hook_spoofed_reply)    (GIOPRecvBuffer        *buf,
18 					  GIOPMessageQueueEntry *ent) = NULL;
19 void (*giop_debug_hook_incoming_mangler) (GIOPRecvBuffer        *buf) = NULL;
20 #endif
21 
22 
23 #define MORE_FRAGMENTS_FOLLOW(buf) ((buf)->msg.header.flags & GIOP_FLAG_FRAGMENTED)
24 
25 /*
26  * FIXME: pretty much this whole module,
27  * and all the giop-types headers should be
28  * auto-generated and generic ...
29  */
30 
31 /* A list of GIOPMessageQueueEntrys */
32 static GList  *giop_queued_messages = NULL;
33 static GMutex *giop_queued_messages_lock = NULL;
34 
35 /* Don't do this genericaly, union's suck genericaly */
36 static gboolean
giop_GIOP_TargetAddress_demarshal(GIOPRecvBuffer * buf,GIOP_TargetAddress * value)37 giop_GIOP_TargetAddress_demarshal (GIOPRecvBuffer     *buf,
38 				   GIOP_TargetAddress *value)
39 {
40 	gboolean do_bswap = giop_msg_conversion_needed (buf);
41 
42 	buf->cur = ALIGN_ADDRESS(buf->cur, 2);
43 	if ((buf->cur + 2) > buf->end)
44 		return TRUE;
45 	if (do_bswap)
46 		value->_d = GUINT16_SWAP_LE_BE (
47 			*(guint16 *) buf->cur);
48 	else
49 		value->_d = *(guint16 *) buf->cur;
50 	buf->cur += 2;
51 
52 	switch (value->_d) {
53 	case GIOP_KeyAddr:
54 		buf->cur = ALIGN_ADDRESS (buf->cur, 4);
55 
56 		if ((buf->cur + 4) > buf->end)
57 			return TRUE;
58 		value->_u.object_key._release = CORBA_FALSE;
59 		if (do_bswap)
60 			value->_u.object_key._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
61 		else
62 			value->_u.object_key._length = *((guint32 *)buf->cur);
63 		buf->cur += 4;
64 		if ((buf->cur + value->_u.object_key._length) > buf->end ||
65 		    (buf->cur + value->_u.object_key._length) < buf->cur)
66 			return TRUE;
67 		value->_u.object_key._buffer = buf->cur;
68 		buf->cur += value->_u.object_key._length;
69 
70 		break;
71 	case GIOP_ProfileAddr:
72 		g_warning ("XXX FIXME GIOP_ProfileAddr not handled");
73 		return TRUE;
74 		break;
75 	case GIOP_ReferenceAddr:
76 		g_warning ("XXX FIXME GIOP_ReferenceAddr not handled");
77 		return TRUE;
78 		break;
79 	}
80 
81 	return FALSE;
82 }
83 
84 static gboolean
giop_IOP_ServiceContextList_demarshal(GIOPRecvBuffer * buf,IOP_ServiceContextList * value)85 giop_IOP_ServiceContextList_demarshal (GIOPRecvBuffer         *buf,
86 				       IOP_ServiceContextList *value)
87 {
88 	return ORBit_demarshal_value (
89 		TC_IOP_ServiceContextList,
90 		(gpointer *)&value, buf, NULL);
91 }
92 
93 static void
giop_IOP_ServiceContextList_free(IOP_ServiceContextList * value)94 giop_IOP_ServiceContextList_free (IOP_ServiceContextList *value)
95 {
96 	if (value)
97 		CORBA_free (value->_buffer);
98 }
99 
100 static gboolean
giop_recv_buffer_demarshal_request_1_1(GIOPRecvBuffer * buf)101 giop_recv_buffer_demarshal_request_1_1(GIOPRecvBuffer *buf)
102 {
103   gboolean do_bswap = giop_msg_conversion_needed(buf);
104   CORBA_unsigned_long oplen;
105 
106   buf->msg.u.request_1_1.service_context._buffer = NULL;
107   if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.request_1_1.service_context))
108     return TRUE;
109 
110   buf->cur = ALIGN_ADDRESS(buf->cur, 4);
111 
112   if((buf->cur + 12) > buf->end)
113     return TRUE;
114 
115   if(do_bswap)
116     buf->msg.u.request_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
117   else
118     buf->msg.u.request_1_1.request_id = *((guint32 *)buf->cur);
119   buf->cur += 4;
120   buf->msg.u.request_1_1.response_expected = *buf->cur;
121   buf->cur += 4;
122   if(do_bswap)
123     buf->msg.u.request_1_1.object_key._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
124   else
125     buf->msg.u.request_1_1.object_key._length = *((guint32 *)buf->cur);
126   buf->cur += 4;
127 
128   if((buf->cur + buf->msg.u.request_1_1.object_key._length) > buf->end
129      || (buf->cur + buf->msg.u.request_1_1.object_key._length) < buf->cur)
130     return TRUE;
131 
132   buf->msg.u.request_1_1.object_key._buffer = buf->cur;
133   buf->msg.u.request_1_1.object_key._release = CORBA_FALSE;
134 
135   buf->cur += buf->msg.u.request_1_1.object_key._length;
136   buf->cur = ALIGN_ADDRESS(buf->cur, 4);
137   if((buf->cur + 4) > buf->end)
138     return TRUE;
139   if(do_bswap)
140     oplen = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
141   else
142     oplen = *((guint32 *)buf->cur);
143   buf->cur += 4;
144 
145   if((buf->cur + oplen) > buf->end
146      || (buf->cur + oplen) < buf->cur)
147     return TRUE;
148 
149   buf->msg.u.request_1_1.operation = (CORBA_char *) buf->cur;
150   buf->cur += oplen;
151   buf->cur = ALIGN_ADDRESS(buf->cur, 4);
152   if((buf->cur + 4) > buf->end)
153     return TRUE;
154 
155   if(do_bswap)
156     buf->msg.u.request_1_1.requesting_principal._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
157   else
158     buf->msg.u.request_1_1.requesting_principal._length = *((guint32 *)buf->cur);
159 
160   buf->cur += 4;
161   if((buf->cur + buf->msg.u.request_1_1.requesting_principal._length) > buf->end
162      || (buf->cur + buf->msg.u.request_1_1.requesting_principal._length) < buf->cur)
163     return TRUE;
164 
165   buf->msg.u.request_1_1.requesting_principal._buffer = buf->cur;
166   buf->msg.u.request_1_1.requesting_principal._release = CORBA_FALSE;
167   buf->cur += buf->msg.u.request_1_1.requesting_principal._length;
168 
169   return FALSE;
170 }
171 
172 static gboolean
giop_recv_buffer_demarshal_request_1_2(GIOPRecvBuffer * buf)173 giop_recv_buffer_demarshal_request_1_2(GIOPRecvBuffer *buf)
174 {
175   gboolean do_bswap = giop_msg_conversion_needed(buf);
176   CORBA_unsigned_long oplen;
177 
178   buf->cur = ALIGN_ADDRESS(buf->cur, 4);
179 
180   if((buf->cur + 8) > buf->end)
181     return TRUE;
182 
183   if(do_bswap)
184     buf->msg.u.request_1_2.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
185   else
186     buf->msg.u.request_1_2.request_id = *((guint32 *) buf->cur);
187   buf->cur += 4;
188   buf->msg.u.request_1_2.response_flags = *buf->cur;
189   buf->cur += 4;
190 
191   if(giop_GIOP_TargetAddress_demarshal(buf, &buf->msg.u.request_1_2.target))
192     return TRUE;
193 
194   buf->cur = ALIGN_ADDRESS(buf->cur, 4);
195   if((buf->cur + 4) > buf->end)
196     return TRUE;
197 
198   if(do_bswap)
199     oplen = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
200   else
201     oplen = *((guint32 *)buf->cur);
202   buf->cur += 4;
203 
204   if((buf->cur + oplen) > buf->end
205      || (buf->cur + oplen) < buf->cur)
206     return TRUE;
207 
208   buf->msg.u.request_1_2.operation = (CORBA_char *) buf->cur;
209   buf->cur += oplen;
210 
211   buf->msg.u.request_1_2.service_context._buffer = NULL;
212   if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.request_1_2.service_context))
213     return TRUE;
214   buf->cur = ALIGN_ADDRESS(buf->cur, 8);
215 
216   return FALSE;
217 }
218 
219 static gboolean
giop_recv_buffer_demarshal_reply_1_1(GIOPRecvBuffer * buf)220 giop_recv_buffer_demarshal_reply_1_1(GIOPRecvBuffer *buf)
221 {
222   gboolean do_bswap = giop_msg_conversion_needed(buf);
223 
224   buf->msg.u.reply_1_1.service_context._buffer = NULL;
225   if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.reply_1_1.service_context))
226     return TRUE;
227   buf->cur = ALIGN_ADDRESS(buf->cur, 4);
228   if((buf->cur + 8) > buf->end)
229     return TRUE;
230   if(do_bswap)
231     {
232       buf->msg.u.reply_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
233       buf->cur += 4;
234       buf->msg.u.reply_1_1.reply_status = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
235     }
236   else
237     {
238       buf->msg.u.reply_1_1.request_id = *((guint32 *)buf->cur);
239       buf->cur += 4;
240       buf->msg.u.reply_1_1.reply_status = *((guint32 *)buf->cur);
241     }
242   buf->cur += 4;
243 
244  return FALSE;
245 }
246 
247 static gboolean
giop_recv_buffer_demarshal_reply_1_2(GIOPRecvBuffer * buf)248 giop_recv_buffer_demarshal_reply_1_2(GIOPRecvBuffer *buf)
249 {
250   gboolean do_bswap = giop_msg_conversion_needed(buf);
251 
252   buf->cur = ALIGN_ADDRESS(buf->cur, 4);
253   if((buf->cur + 8) > buf->end)
254     return TRUE;
255   if(do_bswap)
256     {
257       buf->msg.u.reply_1_2.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
258       buf->cur += 4;
259       buf->msg.u.reply_1_2.reply_status = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
260     }
261   else
262     {
263       buf->msg.u.reply_1_2.request_id = *((guint32 *)buf->cur);
264       buf->cur += 4;
265       buf->msg.u.reply_1_2.reply_status = *((guint32 *)buf->cur);
266     }
267   buf->cur += 4;
268 
269   buf->msg.u.reply_1_2.service_context._buffer = NULL;
270   if(giop_IOP_ServiceContextList_demarshal(buf, &buf->msg.u.reply_1_2.service_context))
271     return TRUE;
272 
273   buf->cur = ALIGN_ADDRESS(buf->cur, 8);
274   return FALSE;
275 }
276 
277 static gboolean
giop_recv_buffer_demarshal_cancel(GIOPRecvBuffer * buf)278 giop_recv_buffer_demarshal_cancel(GIOPRecvBuffer *buf)
279 {
280 	gboolean do_bswap = giop_msg_conversion_needed (buf);
281 
282 	buf->cur = ALIGN_ADDRESS (buf->cur, 4);
283 	if ((buf->cur + 4) > buf->end)
284 		return TRUE;
285 	if (do_bswap)
286 		buf->msg.u.cancel_request.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
287 	else
288 		buf->msg.u.cancel_request.request_id = *((guint32 *)buf->cur);
289 	buf->cur += 4;
290 
291 	return FALSE;
292 }
293 
294 static gboolean
giop_recv_buffer_demarshal_locate_request_1_1(GIOPRecvBuffer * buf)295 giop_recv_buffer_demarshal_locate_request_1_1(GIOPRecvBuffer *buf)
296 {
297 	gboolean do_bswap = giop_msg_conversion_needed (buf);
298 
299 	buf->cur = ALIGN_ADDRESS (buf->cur, 4);
300 
301 	if ((buf->cur + 8) > buf->end)
302 		return TRUE;
303 
304 	if (do_bswap)
305 		buf->msg.u.locate_request_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
306 	else
307 		buf->msg.u.locate_request_1_1.request_id = *((guint32 *)buf->cur);
308 	buf->cur += 4;
309 	if (do_bswap)
310 		buf->msg.u.locate_request_1_1.object_key._length = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
311 	else
312 		buf->msg.u.locate_request_1_1.object_key._length = *((guint32 *)buf->cur);
313 	buf->cur += 4;
314 
315 	if ((buf->cur + buf->msg.u.locate_request_1_1.object_key._length) > buf->end ||
316 	    (buf->cur + buf->msg.u.locate_request_1_1.object_key._length) < buf->cur)
317 		return TRUE;
318 
319 	buf->msg.u.locate_request_1_1.object_key._buffer = buf->cur;
320 	buf->msg.u.locate_request_1_1.object_key._release = CORBA_FALSE;
321 	buf->cur += buf->msg.u.locate_request_1_1.object_key._length;
322 
323 	return FALSE;
324 }
325 
326 static gboolean
giop_recv_buffer_demarshal_locate_request_1_2(GIOPRecvBuffer * buf)327 giop_recv_buffer_demarshal_locate_request_1_2(GIOPRecvBuffer *buf)
328 {
329 	gboolean do_bswap = giop_msg_conversion_needed(buf);
330 
331 	buf->cur = ALIGN_ADDRESS(buf->cur, 4);
332 
333 	if((buf->cur + 4) > buf->end)
334 		return TRUE;
335 
336 	if(do_bswap)
337 		buf->msg.u.locate_request_1_2.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
338 	else
339 		buf->msg.u.locate_request_1_2.request_id = *((guint32 *)buf->cur);
340 	buf->cur += 4;
341 
342 	return giop_GIOP_TargetAddress_demarshal (buf, &buf->msg.u.locate_request_1_2.target);
343 }
344 
345 static gboolean
giop_recv_buffer_demarshal_locate_reply_1_1(GIOPRecvBuffer * buf)346 giop_recv_buffer_demarshal_locate_reply_1_1(GIOPRecvBuffer *buf)
347 {
348 	gboolean do_bswap = giop_msg_conversion_needed(buf);
349 
350 	buf->cur = ALIGN_ADDRESS (buf->cur, 4);
351 
352 	if ((buf->cur + 8) > buf->end)
353 		return TRUE;
354 
355 	if (do_bswap) {
356 		buf->msg.u.locate_reply_1_1.request_id = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
357 		buf->cur += 4;
358 		buf->msg.u.locate_reply_1_1.locate_status = GUINT32_SWAP_LE_BE(*((guint32 *)buf->cur));
359 	} else {
360 		buf->msg.u.locate_reply_1_1.request_id = *((guint32 *)buf->cur);
361 		buf->cur += 4;
362 		buf->msg.u.locate_reply_1_1.locate_status = *((guint32 *)buf->cur);
363 	}
364 	buf->cur += 4;
365 
366 	return FALSE;
367 }
368 
369 static gboolean
giop_recv_buffer_demarshal_locate_reply_1_2(GIOPRecvBuffer * buf)370 giop_recv_buffer_demarshal_locate_reply_1_2 (GIOPRecvBuffer *buf)
371 {
372 	return giop_recv_buffer_demarshal_locate_reply_1_1 (buf);
373 }
374 
375 typedef gboolean (*GIOPDecodeFunc) (GIOPRecvBuffer *buf);
376 
377 static gboolean
giop_recv_buffer_demarshal(GIOPRecvBuffer * buf)378 giop_recv_buffer_demarshal (GIOPRecvBuffer *buf)
379 {
380 	GIOPDecodeFunc              decode_func;
381 	static const GIOPDecodeFunc decode_funcs [GIOP_NUM_MSG_TYPES] [GIOP_NUM_VERSIONS] = {
382 		/* request */
383 		{ giop_recv_buffer_demarshal_request_1_1,
384 		  giop_recv_buffer_demarshal_request_1_1,
385 		  giop_recv_buffer_demarshal_request_1_2},
386 		/* reply */
387 		{ giop_recv_buffer_demarshal_reply_1_1,
388 		  giop_recv_buffer_demarshal_reply_1_1,
389 		  giop_recv_buffer_demarshal_reply_1_2},
390 		/* cancel request */
391 		{ giop_recv_buffer_demarshal_cancel,
392 		  giop_recv_buffer_demarshal_cancel,
393 		  giop_recv_buffer_demarshal_cancel},
394 		/* locate request */
395 		{ giop_recv_buffer_demarshal_locate_request_1_1,
396 		  giop_recv_buffer_demarshal_locate_request_1_1,
397 		  giop_recv_buffer_demarshal_locate_request_1_2},
398 		/* locate reply */
399 		{ giop_recv_buffer_demarshal_locate_reply_1_1,
400 		  giop_recv_buffer_demarshal_locate_reply_1_1,
401 		  giop_recv_buffer_demarshal_locate_reply_1_2},
402 		/* close connection */
403 		{NULL, NULL, NULL},
404 		/* message error */
405 		{NULL, NULL, NULL},
406 		/* fragment */
407 		{NULL, NULL, NULL}
408 	};
409 
410 	if (buf->msg.header.message_type >= GIOP_NUM_MSG_TYPES)
411 		return TRUE;
412 
413 	if (buf->giop_version >= GIOP_NUM_VERSIONS)
414 		return TRUE;
415 
416 	decode_func = decode_funcs [buf->msg.header.message_type] [buf->giop_version];
417 
418 	if (decode_func)
419 		return decode_func (buf);
420 
421 	return FALSE;
422 }
423 
424 GIOPRecvBuffer *
giop_recv_buffer_use_encaps(guchar * mem,gulong len)425 giop_recv_buffer_use_encaps (guchar *mem, gulong len)
426 {
427 	GIOPRecvBuffer *buf = giop_recv_buffer_use_buf (NULL);
428 
429 	buf->cur = buf->message_body = mem;
430 	buf->end = buf->cur + len;
431 	buf->msg.header.message_size = len;
432 	buf->msg.header.flags = *(buf->cur++);
433 	buf->giop_version = GIOP_LATEST;
434 	buf->left_to_read = 0;
435 	buf->state = GIOP_MSG_READY;
436 	buf->free_body = FALSE;
437 
438 	return buf;
439 }
440 
441 GIOPRecvBuffer *
giop_recv_buffer_use_encaps_buf(GIOPRecvBuffer * buf)442 giop_recv_buffer_use_encaps_buf (GIOPRecvBuffer *buf)
443 {
444 	guchar             *ptr;
445 	CORBA_unsigned_long len;
446 
447 	buf->cur = ALIGN_ADDRESS (buf->cur, 4);
448 
449 	if ((buf->cur + 4) > buf->end)
450 		return NULL;
451 	len = *(CORBA_unsigned_long *) buf->cur;
452 
453 	if (giop_msg_conversion_needed (buf))
454 		len = GUINT32_SWAP_LE_BE (len);
455 
456 	buf->cur += 4;
457 	if ((buf->cur + len) > buf->end ||
458 	    (buf->cur + len) < buf->cur)
459 		return NULL;
460 
461 	ptr = buf->cur;
462 	buf->cur += len;
463 
464 	return giop_recv_buffer_use_encaps (ptr, len);
465 }
466 
467 void
giop_recv_buffer_unuse(GIOPRecvBuffer * buf)468 giop_recv_buffer_unuse (GIOPRecvBuffer *buf)
469 {
470 	if (!buf)
471 		return;
472 
473 	if (buf->free_body) {
474 		g_free (buf->message_body);
475 		buf->message_body = NULL;
476 	}
477 
478 	switch (buf->giop_version) {
479 	case GIOP_1_0:
480 	case GIOP_1_1:
481 		switch (buf->msg.header.message_type) {
482 		case GIOP_REPLY:
483 			giop_IOP_ServiceContextList_free (
484 				&buf->msg.u.reply_1_1.service_context);
485 			break;
486 		case GIOP_REQUEST:
487 			giop_IOP_ServiceContextList_free (
488 				&buf->msg.u.request_1_1.service_context);
489 			break;
490 		default:
491 			break;
492 		}
493 		break;
494 	case GIOP_1_2:
495 		switch (buf->msg.header.message_type) {
496 		case GIOP_REPLY:
497 			giop_IOP_ServiceContextList_free (
498 				&buf->msg.u.reply_1_2.service_context);
499 			break;
500 		case GIOP_REQUEST:
501 			giop_IOP_ServiceContextList_free (
502 				&buf->msg.u.request_1_2.service_context);
503 			break;
504 		default:
505 			break;
506 		}
507 		break;
508 	default:
509 		break;
510 	}
511 	if(buf->connection)
512 		giop_connection_unref (buf->connection);
513 	g_free (buf);
514 }
515 
516 static void
ent_lock(GIOPMessageQueueEntry * ent)517 ent_lock (GIOPMessageQueueEntry *ent)
518 {
519 	if (ent->src_thread)
520 		g_mutex_lock (ent->src_thread->lock);
521 }
522 
523 static void
ent_unlock(GIOPMessageQueueEntry * ent)524 ent_unlock (GIOPMessageQueueEntry *ent)
525 {
526 	if (ent->src_thread)
527 		g_mutex_unlock (ent->src_thread->lock);
528 }
529 
530 static void
giop_recv_destroy_queue_entry_T(GIOPMessageQueueEntry * ent)531 giop_recv_destroy_queue_entry_T (GIOPMessageQueueEntry *ent)
532 {
533 	if (ent->cnx) {
534 		giop_connection_unref (ent->cnx);
535 		ent->cnx = NULL;
536 	}
537 }
538 
539 void
giop_recv_list_destroy_queue_entry(GIOPMessageQueueEntry * ent)540 giop_recv_list_destroy_queue_entry (GIOPMessageQueueEntry *ent)
541 {
542 	LINK_MUTEX_LOCK (giop_queued_messages_lock);
543 #ifdef DEBUG
544 	g_warning ("Remove XX:%p:(%p) - %d", ent, ent->async_cb,
545 		   g_list_length (giop_queued_messages));
546 #endif
547 	giop_queued_messages = g_list_remove (giop_queued_messages, ent);
548 	LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
549 
550 	giop_recv_destroy_queue_entry_T (ent);
551 }
552 
553 void
giop_recv_list_setup_queue_entry(GIOPMessageQueueEntry * ent,GIOPConnection * cnx,CORBA_unsigned_long msg_type,CORBA_unsigned_long request_id)554 giop_recv_list_setup_queue_entry (GIOPMessageQueueEntry *ent,
555 				  GIOPConnection        *cnx,
556 				  CORBA_unsigned_long    msg_type,
557 				  CORBA_unsigned_long    request_id)
558 {
559 	ent->src_thread = giop_thread_self ();
560 	ent->async_cb = NULL;
561 
562 	ent->cnx = giop_connection_ref (cnx);
563 	ent->msg_type = msg_type;
564 	ent->request_id = request_id;
565 	ent->buffer = NULL;
566 
567 	LINK_MUTEX_LOCK   (giop_queued_messages_lock);
568 #ifdef DEBUG
569 	g_warning ("Push XX:%p:(%p) - %d", ent, ent->async_cb,
570 		   g_list_length (giop_queued_messages));
571 #endif
572 	giop_queued_messages = g_list_prepend (giop_queued_messages, ent);
573 	LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
574 }
575 
576 void
giop_recv_list_setup_queue_entry_async(GIOPMessageQueueEntry * ent,GIOPAsyncCallback cb)577 giop_recv_list_setup_queue_entry_async (GIOPMessageQueueEntry *ent,
578 					GIOPAsyncCallback      cb)
579 {
580 	g_return_if_fail (ent != NULL);
581 
582 	ent->async_cb = cb;
583 }
584 
585 void
giop_recv_list_zap(GIOPConnection * cnx)586 giop_recv_list_zap (GIOPConnection *cnx)
587 {
588 	GList  *l, *next;
589 	GSList *sl, *notify = NULL;
590 
591 	LINK_MUTEX_LOCK (giop_queued_messages_lock);
592 
593 	for (l = giop_queued_messages; l; l = next) {
594 		GIOPMessageQueueEntry *ent = l->data;
595 
596 		next = l->next;
597 
598 		if (ent->cnx == cnx) {
599 			ent_lock (ent);
600 
601 			dprintf (ERRORS, "Zap listener on dead cnx with buffer %p\n",
602 				 ent->buffer);
603 
604 			giop_recv_buffer_unuse (ent->buffer);
605 			ent->buffer = NULL;
606 
607 			giop_recv_destroy_queue_entry_T (ent);
608 
609 			if (giop_thread_io () && !ent->async_cb)
610 				giop_incoming_signal_T (ent->src_thread, GIOP_CLOSECONNECTION);
611 			ent_unlock (ent);
612 
613 			if (ent->async_cb)
614 				notify = g_slist_prepend (notify, ent);
615 			giop_queued_messages = g_list_delete_link (
616 				giop_queued_messages, l);
617 		}
618 	}
619 
620 	LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
621 
622 	for (sl = notify; sl; sl = sl->next) {
623 		GIOPMessageQueueEntry *ent = sl->data;
624 
625 		if (!ent->async_cb) {
626 			/* This should never happen */
627 			g_warning ("Extraordinary recv list re-enterancy");
628 			continue;
629 		}
630 
631 		giop_invoke_async (ent);
632 	}
633 	g_slist_free (notify);
634 }
635 
636 CORBA_unsigned_long
giop_recv_buffer_get_request_id(GIOPRecvBuffer * buf)637 giop_recv_buffer_get_request_id (GIOPRecvBuffer *buf)
638 {
639 	static const glong reqid_offsets [GIOP_NUM_MSG_TYPES] [GIOP_NUM_VERSIONS] = {
640 		/* GIOP_REQUEST */
641 		{ G_STRUCT_OFFSET(GIOPRecvBuffer,
642 				  msg.u.request_1_0.request_id),
643 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
644 				  msg.u.request_1_1.request_id),
645 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
646 				  msg.u.request_1_2.request_id)},
647 		/* GIOP_REPLY */
648 		{ G_STRUCT_OFFSET(GIOPRecvBuffer,
649 				  msg.u.reply_1_0.request_id),
650 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
651 				  msg.u.reply_1_1.request_id),
652 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
653 				  msg.u.reply_1_2.request_id)},
654 		/* GIOP_CANCELREQUEST */
655 		{ G_STRUCT_OFFSET(GIOPRecvBuffer,
656 				  msg.u.cancel_request.request_id),
657 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
658 				  msg.u.cancel_request.request_id),
659 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
660 				  msg.u.cancel_request.request_id)},
661 		/* GIOP_LOCATEREQUEST */
662 		{ G_STRUCT_OFFSET(GIOPRecvBuffer,
663 				  msg.u.locate_request_1_0.request_id),
664 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
665 				  msg.u.locate_request_1_1.request_id),
666 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
667 				  msg.u.locate_request_1_2.request_id)},
668 		/* GIOP_LOCATEREPLY */
669 		{ G_STRUCT_OFFSET(GIOPRecvBuffer,
670 				  msg.u.locate_reply_1_0.request_id),
671 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
672 				  msg.u.locate_reply_1_1.request_id),
673 		  G_STRUCT_OFFSET(GIOPRecvBuffer,
674 				  msg.u.locate_reply_1_2.request_id)},
675 		{0,0,0}, /* GIOP_CLOSECONNECTION */
676 		{0,0,0}, /* GIOP_MESSAGEERROR */
677 		{0,0,0} /* GIOP_FRAGMENT */
678 	};
679 	gulong offset;
680 
681 	offset = reqid_offsets [buf->msg.header.message_type] [buf->giop_version];
682 	if (!offset)
683 		return 0;
684 
685 	return G_STRUCT_MEMBER (CORBA_unsigned_long, buf, offset);
686 }
687 
688 static inline gboolean
check_got(GIOPMessageQueueEntry * ent)689 check_got (GIOPMessageQueueEntry *ent)
690 {
691 	return (ent->buffer ||
692 		!ent->cnx ||
693 		(ent->cnx->parent.status == LINK_DISCONNECTED) ||
694 		(ent->cnx->parent.status == LINK_TIMEOUT));
695 }
696 
697 GIOPRecvBuffer *
giop_recv_buffer_get(GIOPMessageQueueEntry * ent,gboolean * timeout)698 giop_recv_buffer_get (GIOPMessageQueueEntry *ent,
699 		      gboolean *timeout)
700 {
701 	GIOPThread *tdata = NULL;
702 
703 	*timeout = FALSE;
704 	tdata = giop_thread_self ();
705 
706  thread_switch:
707 	if (giop_thread_io ()) {
708 		ent_lock (ent);
709 
710 		for (; !check_got (ent); ) {
711 			if (!giop_thread_queue_empty_T (tdata)) {
712 				ent_unlock (ent);
713 				giop_thread_queue_process (tdata);
714 				ent_lock (ent);
715 			} else
716 				g_cond_wait (tdata->incoming, tdata->lock);
717 		}
718 
719 		ent_unlock (ent);
720 
721 	} else { /* non-threaded */
722 
723 		while (!ent->buffer && ent->cnx &&
724 		       (ent->cnx->parent.status != LINK_DISCONNECTED) &&
725 		       (ent->cnx->parent.status != LINK_TIMEOUT) &&
726 		       !giop_thread_io())
727 			link_main_iteration (TRUE);
728 
729 		if (giop_thread_io())
730 			goto thread_switch;
731 	}
732 
733 	if (giop_thread_io()
734 	    && ent
735 	    && ent->cnx
736 	    && ent->cnx->parent.timeout_mutex) {
737 		g_mutex_lock (ent->cnx->parent.timeout_mutex);
738 		if (ent->cnx->parent.timeout_status == LINK_TIMEOUT_UNKNOWN) {
739 			link_io_thread_remove_timeout (ent->cnx->parent.timeout_source_id);
740 			ent->cnx->parent.timeout_source_id = 0;
741 			ent->cnx->parent.timeout_status = LINK_TIMEOUT_NO;
742 			giop_connection_unref (ent->cnx); // we remove the source so we must unref the connection
743 		} else if (ent->cnx->parent.timeout_status == LINK_TIMEOUT_YES)
744 			*timeout = TRUE;
745 		g_mutex_unlock (ent->cnx->parent.timeout_mutex);
746 	}
747 
748 	giop_thread_queue_tail_wakeup (tdata);
749 	giop_recv_list_destroy_queue_entry (ent);
750 
751 	return ent->buffer;
752 }
753 
754 ORBit_ObjectKey*
giop_recv_buffer_get_objkey(GIOPRecvBuffer * buf)755 giop_recv_buffer_get_objkey (GIOPRecvBuffer *buf)
756 {
757 	switch (buf->msg.header.message_type) {
758 	case GIOP_REQUEST:
759 		switch (buf->msg.header.version [1]) {
760 		case 0:
761 			return &buf->msg.u.request_1_0.object_key;
762 			break;
763 		case 1:
764 			return &buf->msg.u.request_1_1.object_key;
765 			break;
766 		case 2:
767 			g_assert (buf->msg.u.request_1_2.target._d == GIOP_KeyAddr);
768 			return &buf->msg.u.request_1_2.target._u.object_key;
769 			break;
770 		}
771 		break;
772 
773 	case GIOP_LOCATEREQUEST:
774 		switch (buf->msg.header.version [1]) {
775 		case 0:
776 			return &buf->msg.u.locate_request_1_0.object_key;
777 			break;
778 		case 1:
779 			return &buf->msg.u.locate_request_1_1.object_key;
780 			break;
781 		case 2:
782 			g_assert (buf->msg.u.locate_request_1_2.target._d == GIOP_KeyAddr);
783 			return &buf->msg.u.locate_request_1_2.target._u.object_key;
784 			break;
785 		}
786 		break;
787 
788 	default:
789 		g_assert_not_reached ();
790 	}
791 
792 	return NULL;
793 }
794 
795 char *
giop_recv_buffer_get_opname(GIOPRecvBuffer * buf)796 giop_recv_buffer_get_opname (GIOPRecvBuffer *buf)
797 {
798 	switch(buf->msg.header.version [1]) {
799 	case 0:
800 		return buf->msg.u.request_1_0.operation;
801 		break;
802 	case 1:
803 		return buf->msg.u.request_1_1.operation;
804 		break;
805 	case 2:
806 		return buf->msg.u.request_1_2.operation;
807 		break;
808 	}
809 
810 	return NULL;
811 }
812 
813 void
giop_recv_buffer_init(void)814 giop_recv_buffer_init (void)
815 {
816 	giop_queued_messages_lock = link_mutex_new ();
817 }
818 
819 static void
giop_connection_add_frag(GIOPConnection * cnx,GIOPRecvBuffer * buf)820 giop_connection_add_frag (GIOPConnection *cnx,
821 			  GIOPRecvBuffer *buf)
822 {
823 	cnx->incoming_frags = g_list_prepend (cnx->incoming_frags,
824 					      g_list_prepend (NULL, buf));
825 }
826 
827 static GList *
giop_connection_get_frag(GIOPConnection * cnx,CORBA_unsigned_long request_id,gboolean return_first_if_none)828 giop_connection_get_frag (GIOPConnection     *cnx,
829 			  CORBA_unsigned_long request_id,
830 			  gboolean            return_first_if_none)
831 {
832 	GList *l;
833 
834 	for (l = cnx->incoming_frags; l; l = l->next) {
835 		GList *frags = l->data;
836 
837 		if (giop_recv_buffer_get_request_id (frags->data) == request_id)
838 			return l->data;
839 	}
840 
841 	/* This sucks, but it's prolly a GIOP-1.1 spec issue */
842 	if (return_first_if_none && cnx->incoming_frags) {
843 		static int warned = 0;
844 		if (!warned++)
845 			dprintf (MESSAGES, "GIOP-1.1 1 fragment set per cnx (?)");
846 
847 		return cnx->incoming_frags->data;
848 	}
849 
850 	return NULL;
851 }
852 
853 static void
giop_connection_remove_frag(GIOPConnection * cnx,GList * frags)854 giop_connection_remove_frag (GIOPConnection *cnx, GList *frags)
855 {
856 	GList *l;
857 
858 	g_return_if_fail (frags != NULL);
859 
860 	for (l = frags->next; l; l = l->next)
861 		giop_recv_buffer_unuse (l->data);
862 
863 	cnx->incoming_frags = g_list_remove (cnx->incoming_frags, frags);
864 	g_list_free (frags);
865 }
866 
867 void
giop_connection_destroy_frags(GIOPConnection * cnx)868 giop_connection_destroy_frags (GIOPConnection *cnx)
869 {
870 	GList *l;
871 
872 	for (l = cnx->incoming_frags; l; l = l->next) {
873 		GList *l2;
874 
875 		for (l2 = l->data; l2; l2 = l2->next)
876 			giop_recv_buffer_unuse (l2->data);
877 
878 		g_list_free (l->data);
879 	}
880 	g_list_free (cnx->incoming_frags);
881 	cnx->incoming_frags = NULL;
882 }
883 
884 static gboolean
alloc_buffer(GIOPRecvBuffer * buf,gpointer old_alloc,gulong body_size)885 alloc_buffer (GIOPRecvBuffer *buf, gpointer old_alloc, gulong body_size)
886 {
887 	buf->message_body = g_try_realloc (old_alloc, body_size + 12);
888 
889 	if (!buf->message_body)
890 		return TRUE;
891 
892 	/*
893 	 *   We assume that this is 8 byte aligned, for efficiency -
894 	 * so we can align to the memory address rather than the offset
895 	 * into the buffer.
896 	 */
897 	g_assert (((gulong)buf->message_body & 0x3) == 0);
898 	buf->free_body = TRUE;
899 	buf->cur = buf->message_body + 12;
900 	buf->end = buf->cur + body_size;
901 	buf->left_to_read = body_size;
902 
903 	return FALSE;
904 }
905 
906 static gboolean
concat_frags(GList * list)907 concat_frags (GList *list)
908 {
909 	GList *l;
910 	guchar *ptr;
911 	gulong length = 0;
912 	gulong initial_length;
913 	gulong initial_offset;
914 	GIOPRecvBuffer *head;
915 
916 	head = list->data;
917 
918 	length = head->msg.header.message_size;
919 	initial_offset = (head->cur - head->message_body);
920 	initial_length = (head->end - head->cur);
921 
922 	length += initial_offset - 12; /* include what we read of the header */
923 
924 	g_assert (head->free_body);
925 
926 	if (alloc_buffer (head, head->message_body, length)) {
927 		dprintf (ERRORS, "failed to allocate fragment collation buffer");
928 		return TRUE;
929 	}
930 
931 	head->left_to_read = 0;
932 	head->cur = head->message_body + initial_offset;
933 
934 	ptr = head->cur + initial_length;
935 
936 	for (l = list->next; l; l = l->next) {
937 		gulong len;
938 		GIOPRecvBuffer *buf = l->data;
939 
940 		len = buf->end - buf->cur;
941 		memcpy (ptr, buf->cur, len);
942 		ptr+= len;
943 	}
944 
945 	head->end = ptr;
946 
947 	return FALSE;
948 }
949 
950 static glong giop_initial_msg_size_limit = GIOP_INITIAL_MSG_SIZE_LIMIT;
951 
952 void
giop_recv_set_limit(glong limit)953 giop_recv_set_limit (glong limit)
954 {
955 	if (limit > 256) /* Something slightly sensible ? */
956 		giop_initial_msg_size_limit = limit;
957 }
958 
959 glong
giop_recv_get_limit(void)960 giop_recv_get_limit (void)
961 {
962 	return giop_initial_msg_size_limit;
963 }
964 
965 /**
966  * giop_recv_buffer_handle_fragmented:
967  * @buf: pointer to recv buffer pointer
968  * @cnx: current connection.
969  *
970  *   This will append @buf to the right list of buffers
971  * on the connection, forming a complete message, and
972  * re-write *@buf to the first buffer in the chain.
973  *
974  * Return value: TRUE on error else FALSE
975  **/
976 static gboolean
giop_recv_buffer_handle_fragmented(GIOPRecvBuffer ** ret_buf,GIOPConnection * cnx)977 giop_recv_buffer_handle_fragmented (GIOPRecvBuffer **ret_buf,
978 				    GIOPConnection  *cnx)
979 {
980 	GList *list;
981 	gboolean giop_1_1;
982 	gboolean error = FALSE;
983 	CORBA_long message_id;
984 	GIOPRecvBuffer *buf = *ret_buf;
985 
986 	giop_1_1 = (buf->giop_version == GIOP_1_1);
987 
988 	switch (buf->msg.header.message_type) {
989 	case GIOP_REPLY:
990 	case GIOP_LOCATEREPLY:
991 	case GIOP_REQUEST:
992 	case GIOP_LOCATEREQUEST:
993 		message_id = giop_recv_buffer_get_request_id (buf);
994 		break;
995 	case GIOP_FRAGMENT:
996 		if (!giop_1_1) {
997 			buf->cur = ALIGN_ADDRESS (buf->cur, 4);
998 
999 			if ((buf->cur + 4) > buf->end) {
1000 				dprintf (ERRORS, "incoming bogus fragment length");
1001 				return TRUE;
1002 			}
1003 			if (giop_msg_conversion_needed (buf))
1004 				message_id = GUINT32_SWAP_LE_BE (*((guint32 *)buf->cur));
1005 			else
1006 				message_id = *(guint32 *) buf->cur;
1007 			buf->cur += 4;
1008 		} else
1009 			message_id = 0;
1010 		break;
1011 	default:
1012 		dprintf (ERRORS, "Bogus fragment packet type %d",
1013 			 buf->msg.header.message_type);
1014 		return TRUE;
1015 	}
1016 
1017 	if (!(list = giop_connection_get_frag (cnx, message_id, giop_1_1))) {
1018 		if (!MORE_FRAGMENTS_FOLLOW (buf))
1019 			return TRUE;
1020 
1021 		giop_connection_add_frag (cnx, buf);
1022 
1023 	} else {
1024 		GIOPRecvBuffer *head = list->data;
1025 
1026 		*ret_buf = head;
1027 		g_assert (head->msg.header.message_type != GIOP_FRAGMENT);
1028 
1029 		/* track total length on head node */
1030 		/* (end - cur) to account for fragment (msg id) header */
1031 		head->msg.header.message_size += (buf->end - buf->cur);
1032 
1033 		list = g_list_append (list, buf);
1034 
1035 		if (!cnx->parent.is_auth &&
1036 		    buf->msg.header.message_size > giop_initial_msg_size_limit) {
1037 			dprintf (ERRORS, "Message exceeded initial size limit\n");
1038 			error = TRUE;
1039 			giop_connection_remove_frag (cnx, list);
1040 		}
1041 
1042 		if (!MORE_FRAGMENTS_FOLLOW (buf)) {
1043 			g_assert (buf->msg.header.message_type == GIOP_FRAGMENT);
1044 
1045 			/* concat all fragments - re-write & continue */
1046 			error = concat_frags (list);
1047 
1048 			giop_connection_remove_frag (cnx, list);
1049 		}
1050 	}
1051 
1052 	return error;
1053 }
1054 
1055 static gboolean
handle_reply(GIOPRecvBuffer * buf)1056 handle_reply (GIOPRecvBuffer *buf)
1057 {
1058 	GList                 *l;
1059 	gboolean               error;
1060 	GIOPMessageQueueEntry *ent;
1061 	CORBA_unsigned_long    request_id;
1062 
1063 	request_id = giop_recv_buffer_get_request_id (buf);
1064 
1065 	error = FALSE;
1066 
1067 	LINK_MUTEX_LOCK (giop_queued_messages_lock);
1068 
1069 	for (l = giop_queued_messages; l; l = l->next) {
1070 		ent = l->data;
1071 
1072 		if (ent->request_id == request_id &&
1073 		    ent->msg_type == buf->msg.header.message_type)
1074 			break;
1075 	}
1076 
1077 	ent = l ? l->data : NULL;
1078 
1079 	if (!ent) {
1080 		if (giop_recv_buffer_reply_status (buf) ==
1081 		    CORBA_SYSTEM_EXCEPTION) {
1082 			/*
1083 			 * Unexpected - but sometimes a oneway
1084 			 * method invocation on a de-activated
1085 			 * object results in us getting a bogus
1086 			 * system exception in reply.
1087 			 */
1088  		} else {
1089 #ifdef G_ENABLE_DEBUG
1090 			if (giop_debug_hook_unexpected_reply)
1091 				giop_debug_hook_unexpected_reply (buf);
1092 			else
1093 				dprintf (ERRORS, "We received an unexpected reply\n");
1094 #endif /* G_ENABLE_DEBUG */
1095 			error = TRUE;
1096 		}
1097 
1098 	} else if (ent->cnx != buf->connection) {
1099 #ifdef G_ENABLE_DEBUG
1100 		if (giop_debug_hook_spoofed_reply)
1101 			giop_debug_hook_spoofed_reply (buf, ent);
1102 #endif
1103 		dprintf (ERRORS, "We received a bogus reply\n");
1104 
1105 		error = TRUE;
1106 
1107 	} else {
1108 #ifdef DEBUG
1109 		g_warning ("Pop XX:%p:%p - %d",
1110 			   ent, ent->async_cb,
1111 			   g_list_length (giop_queued_messages));
1112 #endif
1113 		giop_queued_messages = g_list_delete_link
1114 			(giop_queued_messages, l);
1115 	}
1116 
1117 	LINK_MUTEX_UNLOCK (giop_queued_messages_lock);
1118 
1119 	if (ent && !error) {
1120 		gboolean async = FALSE;
1121 
1122 		ent_lock (ent);
1123 		ent->buffer = buf;
1124 
1125 		if (giop_thread_io () && !ent->async_cb)
1126 			giop_incoming_signal_T (ent->src_thread,
1127 						GIOP_REPLY);
1128 
1129 		else if (ent->async_cb)
1130 			async = TRUE;
1131 
1132 		ent_unlock (ent);
1133 
1134 		if (async)
1135 			giop_invoke_async (ent);
1136 
1137 		buf = NULL;
1138 	}
1139 
1140 	giop_recv_buffer_unuse (buf);
1141 
1142 	return error;
1143 }
1144 
1145 static gboolean
giop_recv_msg_reading_body(GIOPRecvBuffer * buf,gboolean is_auth)1146 giop_recv_msg_reading_body (GIOPRecvBuffer *buf,
1147 			    gboolean        is_auth)
1148 {
1149 	dprintf (GIOP, "Incoming IIOP header:\n");
1150 
1151 	do_giop_dump (stderr, (guint8 *) &buf->msg.header, 12, 0);
1152 
1153 	/* Check the header */
1154 	if (memcmp (buf->msg.header.magic, "GIOP", 4))
1155 		return TRUE;
1156 
1157 	if (buf->msg.header.message_type >= GIOP_NUM_MSG_TYPES)
1158 		return TRUE;
1159 
1160 	switch (buf->msg.header.version [0]) {
1161 	case 1:
1162 		switch (buf->msg.header.version [1]) {
1163 		case 0:
1164 			buf->giop_version = GIOP_1_0;
1165 			break;
1166 		case 1:
1167 			buf->giop_version = GIOP_1_1;
1168 			break;
1169 		case 2:
1170 			buf->giop_version = GIOP_1_2;
1171 			break;
1172 		default:
1173 			return TRUE;
1174 			break;
1175 		}
1176 		break;
1177 	default:
1178 		return TRUE;
1179 		break;
1180 	}
1181 
1182 	if ((buf->msg.header.flags & GIOP_FLAG_LITTLE_ENDIAN) != GIOP_FLAG_ENDIANNESS)
1183 		buf->msg.header.message_size = GUINT32_SWAP_LE_BE (buf->msg.header.message_size);
1184 
1185 	/* NB. at least CLOSECONNECTION has 0 length message_size */
1186 
1187 	if (!is_auth && buf->msg.header.message_size > giop_initial_msg_size_limit) {
1188 		dprintf (ERRORS, "Message exceeded unauthorized size limit\n");
1189 		return TRUE;
1190 	}
1191 
1192 	if (alloc_buffer (buf, NULL, buf->msg.header.message_size))
1193 		return TRUE;
1194 
1195 	return FALSE;
1196 }
1197 
1198 /*
1199  * FIXME: we should definately handle things more asynchronously,
1200  * perhaps even at the expense of having to go to the GSource
1201  * twice in order to get fresh input (?)
1202  * or should we poll ourselves on the source to see what's up?
1203  *
1204  * The whole locking concept here looks broken to me,
1205  * especially since 'read' can flag the connection disconnected
1206  * giving a nice deadlock.
1207  */
1208 gboolean
giop_connection_handle_input(LinkConnection * lcnx)1209 giop_connection_handle_input (LinkConnection *lcnx)
1210 {
1211 	GIOPRecvBuffer *buf;
1212 	GIOPConnection *cnx = (GIOPConnection *) lcnx;
1213 
1214 	do {
1215 		int n;
1216 
1217 		if (!cnx->incoming_msg)
1218 			cnx->incoming_msg = giop_recv_buffer_use_buf (cnx);
1219 
1220 		buf = cnx->incoming_msg;
1221 
1222 		n = link_connection_read (
1223 			lcnx, buf->cur, buf->left_to_read, FALSE);
1224 
1225 		if (n == 0) /* We'll be back */
1226 			return TRUE;
1227 
1228 		if (n < 0 || !buf->left_to_read) /* HUP */
1229 			goto msg_error;
1230 
1231 /*		fprintf (stderr, "Read %d\n", n);
1232 		giop_dump (stderr, buf->cur, n, 0); */
1233 
1234 		buf->left_to_read -= n;
1235 		buf->cur += n;
1236 
1237 		if (buf->left_to_read == 0) {
1238 
1239 #ifdef G_ENABLE_DEBUG
1240 			if (giop_debug_hook_incoming_mangler)
1241 				giop_debug_hook_incoming_mangler (buf);
1242 #endif
1243 
1244 			switch (buf->state) {
1245 
1246 			case GIOP_MSG_READING_HEADER:
1247 				if (giop_recv_msg_reading_body (buf, cnx->parent.is_auth)) {
1248 					dprintf (ERRORS, "OOB incoming msg header data\n");
1249 					goto msg_error;
1250 				}
1251 				buf->state = GIOP_MSG_READING_BODY;
1252 				break;
1253 
1254 			case GIOP_MSG_READING_BODY: {
1255 
1256 				dprintf (GIOP, "Incoming IIOP body:\n");
1257 
1258 				buf->cur = buf->message_body + 12;
1259 				if ((buf->cur + buf->msg.header.message_size) > buf->end) {
1260 					dprintf (ERRORS, "broken incoming length data\n");
1261 					goto msg_error;
1262 				}
1263 				do_giop_dump (stderr, buf->cur, buf->msg.header.message_size, 12);
1264 
1265 				buf->state = GIOP_MSG_READY;
1266 
1267 				if (giop_recv_buffer_demarshal (buf)) {
1268 					dprintf (ERRORS, "broken incoming header data\n");
1269 					goto msg_error;
1270 				}
1271 
1272 				if (MORE_FRAGMENTS_FOLLOW (buf)) {
1273 					if (giop_recv_buffer_handle_fragmented (&buf, cnx))
1274 						goto msg_error;
1275 
1276 					else {
1277 						cnx->incoming_msg = NULL;
1278 						goto frag_out;
1279 					}
1280 
1281 				} else if (buf->msg.header.message_type == GIOP_FRAGMENT) {
1282 					if (giop_recv_buffer_handle_fragmented (&buf, cnx))
1283 						goto msg_error;
1284 					/* else last fragment */
1285 				}
1286 				break;
1287 			}
1288 
1289 			case GIOP_MSG_AWAITING_FRAGMENTS:
1290 			case GIOP_MSG_READY:
1291 				g_assert_not_reached ();
1292 				break;
1293 			}
1294 		}
1295 
1296 	} while (cnx->incoming_msg &&
1297 		 buf->left_to_read > 0 &&
1298 		 buf->state != GIOP_MSG_READY);
1299 
1300 	cnx->incoming_msg = NULL;
1301 
1302 	switch (buf->msg.header.message_type) {
1303 	case GIOP_REPLY:
1304 	case GIOP_LOCATEREPLY:
1305 		dprintf (MESSAGES, "handling reply\n");
1306 		if (handle_reply (buf)) /* dodgy inbound data, pull the cnx */
1307 			link_connection_state_changed (lcnx, LINK_DISCONNECTED);
1308 		break;
1309 
1310 	case GIOP_REQUEST:
1311 		dprintf (MESSAGES, "handling request\n");
1312 		ORBit_handle_request (cnx->orb_data, buf);
1313 		break;
1314 
1315 	case GIOP_LOCATEREQUEST:
1316 		dprintf (MESSAGES, "handling locate request\n");
1317 		ORBit_handle_locate_request (cnx->orb_data, buf);
1318 		break;
1319 
1320 	case GIOP_CANCELREQUEST:
1321 	case GIOP_MESSAGEERROR:
1322 		dprintf (ERRORS, "dropping an unusual & unhandled input buffer 0x%x",
1323 			 buf->msg.header.message_type);
1324 		giop_recv_buffer_unuse (buf);
1325 		break;
1326 
1327 	case GIOP_CLOSECONNECTION:
1328 		dprintf (MESSAGES, "received close connection\n");
1329 		giop_recv_buffer_unuse (buf);
1330 		link_connection_state_changed (lcnx, LINK_DISCONNECTED);
1331 		break;
1332 
1333 	case GIOP_FRAGMENT:
1334 		dprintf (ERRORS, "Fragment got in the wrong channel\n");
1335 	default:
1336 		dprintf (ERRORS, "dropping an out of bound input buffer "
1337 			 "on the floor 0x%x\n", buf->msg.header.message_type);
1338 		goto msg_error;
1339 		break;
1340 	}
1341 
1342  frag_out:
1343 	return TRUE;
1344 
1345  msg_error:
1346 	cnx->incoming_msg = NULL;
1347 
1348 	buf->msg.header.message_type = GIOP_MESSAGEERROR;
1349 	buf->msg.header.message_size = 0;
1350 
1351 	giop_recv_buffer_unuse (buf);
1352 
1353 	/* Zap it for badness.
1354 	 * XXX We should probably handle oversized
1355 	 * messages more graciously XXX */
1356 	link_connection_state_changed (LINK_CONNECTION (cnx),
1357 				       LINK_DISCONNECTED);
1358 
1359 	return TRUE;
1360 }
1361 
1362 static gboolean
giop_timeout(gpointer data)1363 giop_timeout (gpointer data)
1364 {
1365 	gboolean retv = FALSE;
1366 	GIOPConnection *cnx = (GIOPConnection*)data;
1367 	LinkConnection *lcnx = LINK_CONNECTION (cnx);
1368 	GIOPThread *tdata = (GIOPThread *)lcnx->tdata;
1369 
1370 	g_assert (lcnx->timeout_mutex);
1371 
1372 	if (lcnx->status == LINK_DISCONNECTED) {
1373 		giop_connection_unref (cnx); // we remove the source so we must unref cnx
1374 		goto out;
1375 	}
1376 
1377 	g_mutex_lock (lcnx->timeout_mutex);
1378 	if (lcnx->timeout_status == LINK_TIMEOUT_UNKNOWN) {
1379 		lcnx->timeout_source_id = 0;
1380 		lcnx->timeout_status = LINK_TIMEOUT_YES;
1381 	} else {
1382 		g_mutex_unlock (lcnx->timeout_mutex);
1383 		retv = TRUE; // do not remove the source - the one who sets timeout_status will do that
1384 		goto out;
1385 	}
1386 	g_mutex_unlock (lcnx->timeout_mutex);
1387 
1388 	link_connection_state_changed (lcnx, LINK_TIMEOUT);
1389 
1390 	g_mutex_lock (tdata->lock); /* ent_lock */
1391 	giop_incoming_signal_T (tdata, GIOP_CLOSECONNECTION);
1392 	g_mutex_unlock (tdata->lock); /* ent_lock */
1393 
1394 	giop_connection_unref (cnx); // we remove the source so we must unref cnx
1395 
1396 out:
1397 	return retv;
1398 }
1399 
1400 void
giop_timeout_add(GIOPConnection * cnx)1401 giop_timeout_add (GIOPConnection *cnx)
1402 {
1403 	static GStaticMutex static_mutex = G_STATIC_MUTEX_INIT;
1404 	LinkConnection *lcnx = LINK_CONNECTION (cnx);
1405 
1406 	if (!giop_thread_io ())
1407 		return;
1408 	if (!lcnx->timeout_msec)
1409 		return;
1410 
1411 	g_static_mutex_lock (&static_mutex);
1412 	if (lcnx->timeout_source_id)
1413 		goto out;
1414 
1415 	giop_connection_ref (cnx); // to be unref'ed by the one who removes the timeout source
1416 
1417 	if (!lcnx->timeout_mutex)
1418 		lcnx->timeout_mutex = g_mutex_new ();
1419 
1420 	g_mutex_lock (lcnx->timeout_mutex);
1421 	lcnx->timeout_status = LINK_TIMEOUT_UNKNOWN;
1422 	g_mutex_unlock (lcnx->timeout_mutex);
1423 
1424 	lcnx->tdata = giop_thread_self ();
1425 
1426 	lcnx->timeout_source_id = link_io_thread_add_timeout (lcnx->timeout_msec, giop_timeout, (gpointer)cnx);
1427 
1428 out:
1429 	g_static_mutex_unlock (&static_mutex);
1430 }
1431 
1432 GIOPRecvBuffer *
giop_recv_buffer_use_buf(GIOPConnection * cnx)1433 giop_recv_buffer_use_buf (GIOPConnection *cnx)
1434 {
1435 	GIOPRecvBuffer *buf = NULL;
1436 
1437 	if(cnx)
1438 		giop_connection_ref (cnx);
1439 
1440 	buf = g_new0 (GIOPRecvBuffer, 1);
1441 
1442 	buf->state = GIOP_MSG_READING_HEADER;
1443 	buf->cur = (guchar *)&buf->msg.header;
1444 	buf->left_to_read = 12;
1445 	buf->connection = cnx;
1446 
1447 	return buf;
1448 }
1449