1 /*
2 * The Spread Toolkit.
3 *
4 * The contents of this file are subject to the Spread Open-Source
5 * License, Version 1.0 (the ``License''); you may not use
6 * this file except in compliance with the License. You may obtain a
7 * copy of the License at:
8 *
9 * http://www.spread.org/license/
10 *
11 * or in the file ``license.txt'' found in this distribution.
12 *
13 * Software distributed under the License is distributed on an AS IS basis,
14 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
15 * for the specific language governing rights and limitations under the
16 * License.
17 *
18 * The Creators of Spread are:
19 * Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
20 *
21 * Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
22 *
23 * All Rights Reserved.
24 *
25 * Major Contributor(s):
26 * ---------------
27 * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
28 * Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
29 * Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
30 * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
31 *
32 */
33
34
35 #include "arch.h"
36
37 #include <string.h>
38 #include <stdlib.h>
39 #include <stdio.h>
40 #include <assert.h>
41
42 #ifndef ARCH_PC_WIN95
43
44 #include <errno.h>
45 #include <unistd.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #include <sys/stat.h>
49
50 #ifdef HAVE_SYS_UIO_H
51 #include <sys/uio.h>
52 #endif
53
54 #include <netinet/in.h>
55 #include <netinet/tcp.h>
56 #include <sys/un.h>
57 #include <signal.h>
58 #include <sys/ioctl.h>
59
60 #else /* ARCH_PC_WIN95 */
61
62 #include <winsock.h>
63 #define ioctl ioctlsocket
64
65 #endif /* ARCH_PC_WIN95 */
66
67 #include "spread_params.h"
68 #include "net_types.h"
69 #include "sp_events.h"
70 #include "objects.h"
71 #include "memory.h"
72 #include "session.h"
73 #include "sess_types.h"
74
75 #define ext_sess_body
76 #include "sess_body.h"
77 #undef ext_sess_body
78
79 #include "groups.h"
80 #include "log.h"
81 #include "status.h"
82 #include "alarm.h"
83 #include "objects.h"
84 #include "prot_body.h"
85 #if ( SPREAD_PROTOCOL > 3 )
86 #include "queues.h"
87 #endif
88 #include "message.h"
89 #include "acm.h"
90
91 static sp_time Badger_timeout = { 0, 100000 };
92
93 static message_obj New_mess;
94
95 static int Accept_inet_mbox_num;
96 static mailbox Accept_inet_mbox[MAX_INTERFACES_PROC];
97 static mailbox Accept_unix_mbox;
98
99 static int Protocol_threshold;
100
101 #define SESSION_FD_HASH_SIZE 256
102 static session *Sessions_hash_head[SESSION_FD_HASH_SIZE];
103 static session *Sessions_head;
104 static session *Sessions_tail;
105 static session *Sessions_free;
106
107 static void Sess_attach_accept(void);
108 static void Sess_detach_accept(void);
109 static void Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p);
110 static void Sess_accept( mailbox mbox, int domain, void *dummy );
111 static void Sess_accept_continue( mailbox, int, void * );
112 static void Sess_read( mailbox mbox, int domain, void *dummy );
113 static void Sess_badger( mailbox mbox, void *dummy );
114 static void Sess_kill( mailbox mbox );
115 static void Sess_handle_join( message_link *mess_link );
116 static void Sess_handle_leave( message_link *mess_link );
117 static void Sess_handle_kill( message_link *mess_link );
118 static void Sess_deliver_reject( message_obj *msg );
119 static void Sess_create_reject_message ( message_obj *msg );
120 static int Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] );
121
122 #define ACTIVATE_PORT_REUSE(mbox) do { \
123 int on = 1; \
124 if (setsockopt(mbox, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) < 0) \
125 Alarm( EXIT, "Sess_init: Error setting SO_REUSEADDR socket option\n" ); \
126 } while (0)
127
Sess_get_session_index(int mbox)128 int Sess_get_session_index (int mbox)
129 {
130 session *tmp;
131 unsigned char *c = (unsigned char *) &mbox;
132 unsigned int i;
133
134 i = c[0] ^ c[1] ^ c[2] ^ c[3];
135
136 Alarm( NONE, "Sess_get_session_index: mbox %d hashed to %u\n", mbox, i);
137 for (tmp = Sessions_hash_head[i]; tmp; tmp = tmp->hash_next)
138 if (tmp->mbox == mbox)
139 return (tmp - Sessions);
140
141 return -1;
142 }
143
Sess_hash_session(session * ses)144 static void Sess_hash_session (session *ses)
145 {
146 unsigned int i;
147 unsigned char *c;
148
149 c = (unsigned char *) &ses->mbox;
150 i = c[0] ^ c[1] ^ c[2] ^ c[3];
151 ses->hash_next = Sessions_hash_head[i];
152 Sessions_hash_head[i] = ses;
153 }
154
Sess_unhash_session(session * ses)155 static void Sess_unhash_session (session *ses)
156 {
157 unsigned int i;
158 unsigned char *c;
159 session *tmp;
160
161 c = (char *) &ses->mbox;
162 i = c[0] ^ c[1] ^ c[2] ^ c[3];
163 tmp = Sessions_hash_head[i];
164 if (tmp == ses)
165 {
166 Sessions_hash_head[i] = ses->hash_next;
167 ses->hash_next = NULL;
168 return;
169 }
170
171 for ( ; tmp->hash_next != ses; tmp = tmp->hash_next);
172 tmp->hash_next = ses->hash_next;
173 ses->hash_next = NULL;
174 }
175
Sess_get_free_session(void)176 static session *Sess_get_free_session (void)
177 {
178 session *ses;
179
180 if ((ses = Sessions_free) == NULL)
181 {
182 Alarm (EXIT, "Sess_get_free_session: BUG ! No free sessions !\n");
183 }
184
185 Sessions_free = Sessions_free->sort_next;
186
187 return ses;
188 }
189
Sess_free_session(session * ses)190 static void Sess_free_session (session *ses)
191 {
192 ses->sort_next = Sessions_free;
193 Sessions_free = ses;
194 }
195
Sess_insert_new_session(session * where,session * template)196 static int Sess_insert_new_session (session *where, session *template)
197 {
198 session *new_ses;
199
200 new_ses = Sess_get_free_session();
201 memmove(new_ses, template, sizeof (*template));
202
203 if (!where)
204 {
205 /* Ok, we insert a session at the end of the list... */
206 new_ses->sort_next = NULL;
207
208 if (!Sessions_tail)
209 {
210 /* List is empty */
211 new_ses->sort_prev = NULL;
212 Sessions_head = Sessions_tail = new_ses;
213 }
214 else
215 {
216 new_ses->sort_prev = Sessions_tail;
217 Sessions_tail->sort_next = new_ses;
218 Sessions_tail = new_ses;
219 }
220 }
221 else
222 {
223 /* Ok, we insert a session in the middle of the list, just
224 * before where... */
225 new_ses->sort_next = where;
226 new_ses->sort_prev = where->sort_prev;
227 where->sort_prev = new_ses;
228
229 if (!new_ses->sort_prev)
230 {
231 /* new_ses is new head */
232 Sessions_head = new_ses;
233 }
234 else
235 {
236 new_ses->sort_prev->sort_next = new_ses;
237 }
238 }
239
240 return(new_ses - Sessions);
241 }
242
Sess_remove_session(session * ses)243 static void Sess_remove_session (session *ses)
244 {
245 if (!ses->sort_prev && !ses->sort_next)
246 {
247 /* Last session */
248 Sessions_head = Sessions_tail = NULL;
249 Sess_free_session(ses);
250
251 return;
252 }
253
254 if (!ses->sort_prev)
255 {
256 /* Head */
257 Sessions_head = ses->sort_next;
258 ses->sort_next->sort_prev = NULL;
259 Sess_free_session(ses);
260
261 return;
262 }
263
264 if (!ses->sort_next)
265 {
266 /* Tail */
267 Sessions_tail = ses->sort_prev;
268 ses->sort_prev->sort_next = NULL;
269 Sess_free_session(ses);
270
271 return;
272 }
273
274 /* All troubled cases are above ;-) */
275 ses->sort_next->sort_prev = ses->sort_prev;
276 ses->sort_prev->sort_next = ses->sort_next;
277 Sess_free_session(ses);
278 }
279
Sess_init_sessions(void)280 static void Sess_init_sessions (void)
281 {
282 int i;
283
284 for (i = 0; i < SESSION_FD_HASH_SIZE; i++)
285 Sessions_hash_head[i] = NULL;
286
287 Sessions_free = Sessions_head = Sessions_tail = NULL;
288
289 for (i = 0; i < MAX_SESSIONS; i++)
290 Sess_free_session( &Sessions[i] );
291 }
292
count_bits_set(int32u field,int first_index,int last_index)293 int count_bits_set( int32u field, int first_index, int last_index)
294 {
295 int i;
296 int count = 0;
297 int32u bitfield;
298 assert(first_index >= 0 && first_index < 32);
299 assert(last_index >= 0 && last_index <=32);
300 assert(last_index >= first_index);
301
302 bitfield = 0x1 << first_index;
303 for ( i=0; i < (last_index - first_index); i++, bitfield <<=1)
304 {
305 if (field & bitfield )
306 count++;
307 }
308 return(count);
309 }
310
Sess_init()311 void Sess_init()
312 {
313 struct sockaddr_in inet_addr;
314 int16 port;
315 int ret, i;
316 mailbox mbox;
317
318 #ifndef ARCH_PC_WIN95
319
320 struct sockaddr_un unix_addr;
321 char name[80];
322
323 signal( SIGPIPE, SIG_IGN );
324
325 #endif /* ARCH_PC_WIN95 */
326
327 ret = Mem_init_object( MESSAGE_LINK, sizeof(message_link), 1000, 0);
328 if (ret < 0)
329 {
330 Alarm(EXIT, "Sess_init: Failure to Initialize MESSAGE_LINK memory objects\n");
331 }
332
333 ret = Mem_init_object( DOWN_LINK, sizeof(down_link), 200, 0);
334 if (ret < 0)
335 {
336 Alarm(EXIT, "Sess_Init: Failure to Initialize DOWN_LINK memory objects\n");
337 }
338
339 Sess_init_sessions ();
340
341 Num_sessions = 0;
342 GlobalStatus.num_sessions = Num_sessions;
343 GlobalStatus.message_delivered = 0;
344 My = Conf_my();
345 port = My.port;
346 Session_threshold = LOW_PRIORITY;
347
348 /* Initializing the protocol */
349 Protocol_threshold = LOW_PRIORITY;
350
351 Prot_init_down_queues();
352 Prot_set_down_queue( NORMAL_DOWNQUEUE );
353 Prot_init();
354
355 /* Initiation of the INET socket */
356 memset(&inet_addr.sin_zero, 0, sizeof(inet_addr.sin_zero));
357
358 inet_addr.sin_family = AF_INET;
359 inet_addr.sin_port = htons(port);
360 Accept_inet_mbox_num = 0;
361
362 /* Bind to all interfaces specified in config file */
363 for ( i=0; i < My.num_if; i++)
364 {
365 if (Is_IfType_Client(My.ifc[i].type) || Is_IfType_Any(My.ifc[i].type) )
366 {
367 port_reuse type;
368 if( (mbox = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1)
369 Alarm( EXIT, "Sess_init: INET sock error\n" );
370 type = Conf_get_port_reuse_type();
371 if (type == port_reuse_on)
372 ACTIVATE_PORT_REUSE(mbox);
373
374 if (Is_IfType_Any(My.ifc[i].type) )
375 inet_addr.sin_addr.s_addr = INADDR_ANY;
376 else
377 {
378 if (type == port_reuse_auto)
379 ACTIVATE_PORT_REUSE(mbox);
380 inet_addr.sin_addr.s_addr = htonl(My.ifc[i].ip);
381 }
382 if( bind( mbox, (struct sockaddr *)&inet_addr, sizeof(inet_addr) ) == -1)
383 {
384 Alarm( PRINT, "Sess_init: INET unable to bind to port %d, already running \n" ,port );
385 exit(0);
386 }
387 inet_addr.sin_addr.s_addr = ntohl(inet_addr.sin_addr.s_addr);
388 Alarm( SESSION, "Sess_init: INET bind for port %d interface %d.%d.%d.%d ok\n", port,
389 IP1(inet_addr.sin_addr.s_addr), IP2(inet_addr.sin_addr.s_addr),
390 IP3(inet_addr.sin_addr.s_addr), IP4(inet_addr.sin_addr.s_addr) );
391
392 if( listen( mbox, 25 ) < 0 )
393 Alarm( EXIT, "Sess_init: INET unable to listen\n" );
394
395 Accept_inet_mbox[Accept_inet_mbox_num] = mbox;
396 Accept_inet_mbox_num++;
397 Alarm( SESSION, "Sess_init: INET went ok on mailbox %d\n", mbox );
398 }
399 }
400
401 #ifndef ARCH_PC_WIN95
402
403 /* Initiation of the UNIX socket */
404
405 if( (mbox = socket( AF_UNIX, SOCK_STREAM, 0 ) ) == -1)
406 Alarm( EXIT, "Sess_init: UNIX sock error\n" );
407
408 unix_addr.sun_family = AF_UNIX;
409 sprintf( name, "%s/spread.sock", _PATH_SPREAD_PIDDIR );
410 strcpy( unix_addr.sun_path, name );
411 unlink( name );
412
413 if( bind( mbox, (struct sockaddr *)&unix_addr, sizeof(unix_addr) ) == -1)
414 {
415 Alarm( PRINT, "Sess_init: UNIX unable to bind to name %s, already running \n" , name );
416 exit(0);
417 }
418 Alarm( SESSION, "Sess_init: UNIX bind for name %s ok\n", name );
419
420 chmod( name, 0666 );
421
422 if( listen( mbox, 5 ) < 0 )
423 Alarm( EXIT, "Sess_init: UNIX unable to listen\n" );
424
425 Accept_unix_mbox = mbox;
426 Alarm( SESSION, "Sess_init: UNIX went ok on mailbox %d\n", mbox );
427
428 #endif /* ARCH_PC_WIN95 */
429
430 Sess_attach_accept();
431
432 Message_populate_with_buffers(&New_mess);
433
434 G_init();
435
436 Alarm( SESSION, "Sess_init: ended ok\n" );
437 }
438
Sess_set_active_threshold()439 void Sess_set_active_threshold()
440 {
441 /* This function is used only by the session (and groups) layer */
442
443 if( Protocol_threshold > Session_threshold )
444 E_set_active_threshold( Protocol_threshold );
445 else E_set_active_threshold( Session_threshold );
446 }
447
Sess_block_user(int xxx)448 void Sess_block_user(int xxx)
449 {
450
451 Alarm(EXIT,"Sess_block_user: NOT IMPLEMENTED!\n");
452 }
453
Sess_unblock_user(int xxx)454 void Sess_unblock_user(int xxx)
455 {
456
457 Alarm(EXIT,"Sess_unblock_user: NOT IMPLEMENTED!\n");
458 }
Sess_block_users_level()459 void Sess_block_users_level()
460 {
461 /* This function is used only by lower layers (protocol) */
462 if( Protocol_threshold < MEDIUM_PRIORITY )
463 {
464 Protocol_threshold = MEDIUM_PRIORITY;
465 Sess_set_active_threshold();
466 }
467 }
468
Sess_unblock_users_level()469 void Sess_unblock_users_level()
470 {
471 /* This function is used only by lower layers (protocol) */
472 if( Protocol_threshold > LOW_PRIORITY )
473 {
474 Protocol_threshold = LOW_PRIORITY;
475 Sess_set_active_threshold();
476 }
477 }
478
Sess_attach_accept()479 static void Sess_attach_accept()
480 {
481 int i;
482 for ( i=0; i < Accept_inet_mbox_num; i++)
483 {
484 E_attach_fd( Accept_inet_mbox[i], READ_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
485 E_attach_fd( Accept_inet_mbox[i], EXCEPT_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
486 }
487 #ifndef ARCH_PC_WIN95
488
489 E_attach_fd( Accept_unix_mbox, READ_FD, Sess_accept, AF_UNIX, NULL, LOW_PRIORITY );
490
491 #endif /* ARCH_PC_WIN95 */
492
493 }
494
Sess_detach_accept()495 static void Sess_detach_accept()
496 {
497 int i;
498 for (i=0; i < Accept_inet_mbox_num; i++)
499 {
500 E_detach_fd( Accept_inet_mbox[i], READ_FD );
501 E_detach_fd( Accept_inet_mbox[i], EXCEPT_FD );
502 }
503 #ifndef ARCH_PC_WIN95
504
505 E_detach_fd( Accept_unix_mbox, READ_FD );
506
507 #endif /* ARCH_PC_WIN95 */
508
509 }
510
Sess_accept_continue2(int d1,void * d2)511 void Sess_accept_continue2(int d1, void *d2)
512 {
513 Sess_accept_continue(0,0,NULL);
514 }
515
516
Sess_accept(mailbox mbox,int domain,void * dummy)517 static void Sess_accept( mailbox mbox, int domain, void *dummy )
518 {
519 struct sockaddr_in inet_addr;
520 socklen_t inet_len;
521 sockopt_len_t onlen;
522 sp_time accept_delay;
523 char response;
524 int ret;
525 int i;
526
527 int32 on;
528
529 if( domain == AF_INET )
530 {
531 inet_len = sizeof(inet_addr);
532 Sessions[MAX_SESSIONS].mbox = accept( mbox, (struct sockaddr *)&inet_addr, &inet_len );
533
534 Sessions[MAX_SESSIONS].type = AF_INET;
535 /*
536 * sender's machine ip address is: htonl(inet_addr.sin_addr.s_addr)
537 * sender's assigned port is : htons(inet_addr.sin_port)
538 */
539 Sessions[MAX_SESSIONS].address = htonl(inet_addr.sin_addr.s_addr);
540 }else if( domain == AF_UNIX ){
541 /* no need for return values for AF_UNIX on the accept */
542 Sessions[MAX_SESSIONS].mbox = accept( mbox, 0, 0 );
543
544 Sessions[MAX_SESSIONS].type = AF_UNIX;
545 Sessions[MAX_SESSIONS].address = 0;
546 }else Alarm( EXIT, "Sess_accept: Unknown domain %d on mailbox %d\n", domain, mbox );
547
548 if( Sessions[MAX_SESSIONS].mbox < 0 )
549 {
550 Alarm( SESSION, "Sess_accept: accept failed for domain %d\n", domain );
551 return;
552 }
553 if( Num_sessions == MAX_SESSIONS )
554 {
555 response = REJECT_QUOTA;
556 send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
557 close( Sessions[MAX_SESSIONS].mbox );
558 Alarm( SESSION, "Sess_accept: rejecting session due to quota\n" );
559 return;
560 }
561
562 if ( ( (i = Sess_get_session_index(Sessions[MAX_SESSIONS].mbox)) != -1)
563 && (Sessions[i].mbox == Sessions[MAX_SESSIONS].mbox)
564 && (Is_op_session( Sessions[i].status )) )
565 {
566 /* This is impossible as the mbox must have been closed to be returned by accept */
567 Alarm(EXIT, "Sess_accept: BUG! Accepted new FD %d that is currently in use(ses %d).\n", Sessions[i].mbox, i);
568 }
569 for( i=10; i <= 200; i+=5 )
570 {
571 on = 1024*i;
572 onlen = sizeof(on);
573
574 ret = setsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_SNDBUF, (void *)&on, onlen);
575 if (ret < 0 ) break;
576
577 ret = setsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_RCVBUF, (void *)&on, onlen);
578 if (ret < 0 ) break;
579
580 ret= getsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_SNDBUF, (void *)&on, &onlen );
581 if( on < i*1024 ) break;
582 Alarm( NONE, "Sess_accept: set sndbuf %d, ret is %d\n", on, ret );
583
584 onlen = sizeof(on);
585 ret= getsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_RCVBUF, (void *)&on, &onlen );
586 if( on < i*1024 ) break;
587 Alarm( NONE, "Sess_accept: set rcvbuf %d, ret is %d\n", on, ret );
588 }
589 Alarm( SESSION, "Sess_accept: set sndbuf/rcvbuf to %d\n", 1024*(i-5) );
590
591 if ( domain == AF_INET ) {
592 on = 1;
593 ret = setsockopt( Sessions[MAX_SESSIONS].mbox, IPPROTO_TCP, TCP_NODELAY, (void *)&on, 4);
594 if (ret < 0)
595 Alarm(PRINT, "Setting TCP_NODELAY failed with error: %s\n", sock_strerror(sock_errno));
596 else
597 Alarm( SESSION, "Setting TCP_NODELAY on socket %d\n", Sessions[MAX_SESSIONS].mbox );
598 }
599 /* delaying for the private name to be written */
600 Sess_detach_accept();
601 E_attach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
602 E_attach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
603 accept_delay.sec = 1;
604 accept_delay.usec= 0;
605 E_queue( Sess_accept_continue2, 1, NULL, accept_delay );
606 }
607
Sess_accept_continue(mailbox d1,int d2,void * d3)608 void Sess_accept_continue(mailbox d1, int d2, void *d3)
609 {
610 char response;
611 int legal_private_name;
612 int unique_private_name;
613 int name_len;
614 int ioctl_cmd;
615 char version[3];
616 char conn[2];
617 char priv_user_name[MAX_PRIVATE_NAME];
618 int ret, i, sess_location, rnum;
619 char *allowed_auth_list;
620 unsigned char list_len;
621 session *tmp_ses;
622
623 E_dequeue( Sess_accept_continue2, 1, NULL );
624 E_detach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD );
625 E_detach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD );
626 Sess_attach_accept();
627
628 /* set file descriptor to non blocking */
629 ioctl_cmd = 1;
630 ret = ioctl( Sessions[MAX_SESSIONS].mbox, FIONBIO, &ioctl_cmd);
631
632 /*
633 * connect message looks like:
634 *
635 * byte - version of lib
636 * byte - subversion of lib
637 * (optional) byte - patchversion of lib (only if version.subversion > 3.14)
638 * byte - 1/0 with or without groups
639 * byte - len of name
640 * len bytes - name
641 *
642 */
643 /* version checking 3.01 is minimal */
644
645 version[0] = version[1] = version[2] = 0;
646
647 ret = recv( Sessions[MAX_SESSIONS].mbox, version, 2, 0 );
648 if( ret < 0 )
649 {
650 Alarm( SESSION, "Sess_accept_continue: reading version.subversion failed on mailbox %d\n",
651 Sessions[MAX_SESSIONS].mbox );
652 close( Sessions[MAX_SESSIONS].mbox );
653 return;
654 }
655 if ( version[0]*10000 + version[1]*100 + version[2] > 31400 )
656 {
657 ret = recv( Sessions[MAX_SESSIONS].mbox, &version[2], 1, 0 );
658 if( ret < 0 )
659 {
660 Alarm( SESSION, "Sess_accept_continue: reading patch_version failed on mailbox %d\n",
661 Sessions[MAX_SESSIONS].mbox );
662 close( Sessions[MAX_SESSIONS].mbox );
663 return;
664 }
665 }
666 if( version[0]*10000 + version[1]*100 + version[2] < 30100 )
667 {
668 response = REJECT_VERSION;
669 send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
670 Alarm( SESSION, "Sess_accept_continue: version %d.%d.%d is not supported\n",
671 version[0], version[1], version[2] );
672 close( Sessions[MAX_SESSIONS].mbox );
673 return;
674 }
675
676 Sessions[MAX_SESSIONS].lib_version[0] = version[0];
677 Sessions[MAX_SESSIONS].lib_version[1] = version[1];
678 Sessions[MAX_SESSIONS].lib_version[2] = version[2];
679
680 ret = recv( Sessions[MAX_SESSIONS].mbox, conn, 2, 0 );
681 if( ret < 0 )
682 {
683 Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
684 Sessions[MAX_SESSIONS].mbox );
685 close( Sessions[MAX_SESSIONS].mbox );
686 return;
687
688 }else if( ret < 2 ){
689 response = REJECT_NO_NAME;
690 send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
691 Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
692 Sessions[MAX_SESSIONS].mbox );
693 close( Sessions[MAX_SESSIONS].mbox );
694 return;
695 }
696
697 if( ((int)conn[0] % 2) == 1 ) Sessions[MAX_SESSIONS].status = Set_memb_session( Sessions[MAX_SESSIONS].status );
698 else Sessions[MAX_SESSIONS].status = Clear_memb_session( Sessions[MAX_SESSIONS].status );
699 Sessions[MAX_SESSIONS].priority = (int)conn[0] / 16 ;
700
701 name_len = (int)conn[1];
702 if( name_len > MAX_PRIVATE_NAME || name_len < 0 )
703 {
704 response = REJECT_ILLEGAL_NAME;
705 send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
706 Alarm( SESSION, "Sess_accept_continue: len %d of private name does not fit (ret = %d) on mailbox %d\n",
707 name_len, ret,
708 Sessions[MAX_SESSIONS].mbox );
709 close( Sessions[MAX_SESSIONS].mbox );
710 return;
711 }
712
713 /* reading private name */
714 for( i=0; i < MAX_PRIVATE_NAME+1; i++ )
715 Sessions[MAX_SESSIONS].name[i] = 0;
716
717 if (name_len == 0)
718 {
719 /* Assign a random user name to this user. Currently this is a random 4 digit number followed
720 * by the mbox of the users connection.
721 */
722 char newname[MAX_PRIVATE_NAME];
723 unique_private_name = 0;
724 while ( !unique_private_name )
725 {
726 memset(newname, '\0', MAX_PRIVATE_NAME);
727 rnum = (int) (9999.0*get_rand()/(RAND_MAX+1.0));
728 snprintf(newname, MAX_PRIVATE_NAME, "r%u-%u", rnum, Sessions[MAX_SESSIONS].mbox);
729 memcpy( Sessions[MAX_SESSIONS].name, newname, MAX_PRIVATE_NAME);
730
731 /* checking if private name is unique */
732 for( unique_private_name=1, tmp_ses = Sessions_head; tmp_ses; tmp_ses = tmp_ses->sort_next )
733 {
734 ret = strcmp( Sessions[MAX_SESSIONS].name, tmp_ses->name );
735 if( ret <= 0 )
736 {
737 if( ret == 0 ) unique_private_name = 0;
738 break;
739 }
740 }
741 }
742 } else {
743 /* recive user name from client and validate it */
744 ret = recv( Sessions[MAX_SESSIONS].mbox, priv_user_name, MAX_PRIVATE_NAME, 0 );
745 if( ret < 0 )
746 {
747 Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
748 Sessions[MAX_SESSIONS].mbox );
749 close( Sessions[MAX_SESSIONS].mbox );
750 return;
751 }else if( ret != name_len )
752 {
753 response = REJECT_ILLEGAL_NAME;
754 send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
755 Alarm( SESSION, "Sess_accept_continue: len %d of private name does not fit (ret = %d) on mailbox %d\n",
756 name_len, ret, Sessions[MAX_SESSIONS].mbox );
757 close( Sessions[MAX_SESSIONS].mbox );
758 return;
759 }
760 memcpy( Sessions[MAX_SESSIONS].name, priv_user_name, name_len );
761
762 /* checking if private name is legal */
763 for( legal_private_name=1, i=0; i < name_len; i++ )
764 if( Sessions[MAX_SESSIONS].name[i] <= '#' ||
765 Sessions[MAX_SESSIONS].name[i] > '~' ) legal_private_name = 0;
766 if( !legal_private_name )
767 {
768 response = REJECT_ILLEGAL_NAME;
769 send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
770 Alarm( SESSION, "Sess_accept_continue: illegal private name %s on mailbox %d\n",
771 Sessions[MAX_SESSIONS].name,
772 Sessions[MAX_SESSIONS].mbox );
773 close( Sessions[MAX_SESSIONS].mbox );
774 return;
775 }
776
777 /* checking if private name is unique */
778 for( unique_private_name=1, tmp_ses = Sessions_head; tmp_ses; tmp_ses = tmp_ses->sort_next )
779 {
780 ret = strcmp( Sessions[MAX_SESSIONS].name, tmp_ses->name );
781 if( ret <= 0 )
782 {
783 if( ret == 0 ) unique_private_name = 0;
784 break;
785 }
786 }
787 if( !unique_private_name )
788 {
789 response = REJECT_NOT_UNIQUE;
790 send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
791 Alarm( SESSION, "Sess_accept_continue: non unique private name %s on mailbox %d\n",
792 Sessions[MAX_SESSIONS].name,
793 Sessions[MAX_SESSIONS].mbox );
794 close( Sessions[MAX_SESSIONS].mbox );
795 return;
796 }
797 }
798
799 /* set file descriptor back to blocking */
800 ioctl_cmd = 0;
801 ret = ioctl( Sessions[MAX_SESSIONS].mbox, FIONBIO, &ioctl_cmd);
802
803 /* Insert the new session just before the point we already
804 * found while checking unique private name... */
805 sess_location = Sess_insert_new_session (tmp_ses, &Sessions[MAX_SESSIONS]);
806
807 Num_sessions++;
808 GlobalStatus.num_sessions = Num_sessions;
809
810 Sessions[sess_location].status = Set_preauth_session( Sessions[sess_location].status );
811
812 Sess_hash_session (&Sessions[sess_location]);
813
814 /* OLD client library without authentication/authorization code */
815 if ( version[0]*10000 + version[1]*100 + version[2] < 31600 )
816 {
817 Acm_acp_fill_ops( &(Sessions[sess_location].acp_ops) );
818
819 /* If IP access control is enabled, then check it.
820 */
821 if ( Acm_auth_query_allowed("IP") )
822 {
823 void (*auth_open)(struct session_auth_info *);
824 struct session_auth_info *sess_auth_p;
825
826 sess_auth_p = Acm_auth_create_sess_info_forIP(Sessions[sess_location].mbox);
827 auth_open = Acm_auth_get_auth_client_connection_byname("IP");
828 auth_open(sess_auth_p);
829 return;
830 }
831 /* If no IP authentication enabled, then try to use NULL */
832 if ( Acm_auth_query_allowed("NULL") )
833 Sess_session_authorized(sess_location);
834 else
835 Sess_session_denied(sess_location);
836 return;
837 }
838
839 allowed_auth_list = Acm_auth_get_allowed_list();
840 list_len = strlen(allowed_auth_list);
841
842 send( Sessions[sess_location].mbox, &list_len, 1, 0 );
843 send( Sessions[sess_location].mbox, allowed_auth_list, list_len, 0 );
844
845 /* If no AllowedAuthMethods are declared, reject all sessions.
846 * To maintain old behaivor of allowing all, just add the NULL
847 * method to the allowed list and then all clients will be allowed.
848 */
849 if (list_len == 0)
850 {
851 Sess_session_denied(sess_location);
852 return;
853 }
854
855 /* Now wait for client reply */
856 E_attach_fd( Sessions[sess_location].mbox, READ_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
857 E_attach_fd( Sessions[sess_location].mbox, EXCEPT_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
858 }
859
Sess_recv_client_auth(mailbox mbox,int dummy,void * dummy_p)860 static void Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p)
861 {
862 int ret, i, ioctl_cmd, ses;
863 char auth_name[MAX_AUTH_NAME * MAX_AUTH_METHODS];
864 void (*auth_open)(struct session_auth_info *);
865 struct session_auth_info *sess_auth_p;
866
867 ses = Sess_get_session_index(mbox);
868 if( ses < 0 || ses >= MAX_SESSIONS ) {
869 Alarm( PRINT, "Sess_recv_client_auth: Illegal mbox %d for receiving client auth. Cannot deny or allow\n", mbox);
870 return;
871 }
872 if (!Is_preauth_session(Sessions[ses].status) )
873 {
874 Alarm( EXIT, "Sess_recv_client_auth: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
875 }
876
877 E_detach_fd(mbox, READ_FD);
878 E_detach_fd(mbox, EXCEPT_FD);
879
880 /* set file descriptor to non blocking */
881 ioctl_cmd = 1;
882 ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
883
884 /* FIXME: Support partial reads by storing the portion read so far and requeuing */
885 ret = recv( mbox, auth_name, MAX_AUTH_NAME * MAX_AUTH_METHODS, 0 );
886 if( ret < 0 )
887 {
888 Alarm( SESSION, "Sess_recv_client_auth: reading auth string failed on mailbox %d\n", mbox );
889 Sess_session_denied(ses);
890 return;
891 }
892 if( ret < (MAX_AUTH_NAME * MAX_AUTH_METHODS) )
893 {
894 Alarm( SESSION, "Sess_recv_client_auth: reading auth string SHORT on mailbox %d\n", mbox );
895 Sess_session_denied(ses);
896 return;
897 }
898
899 /* set file descriptor back to blocking */
900 ioctl_cmd = 0;
901 ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
902
903 i = 0;
904 while ( auth_name[i * MAX_AUTH_NAME] != '\0')
905 {
906 Alarm( SESSION, "Sess_recv_client_auth: Client requested %s type authentication\n", &auth_name[i * MAX_AUTH_NAME]);
907 if ( !Acm_auth_query_allowed(&auth_name[i * MAX_AUTH_NAME]) )
908 {
909 Alarm( SESSION, "Sess_recv_client_auth: received non-allowed auth method %s, closing session %d on mailbox %d\n", &auth_name[i * MAX_AUTH_NAME], ses, mbox);
910 Sess_session_denied(ses);
911 return;
912 }
913 i++;
914 }
915 /* Register default permit all ops access control policy. */
916 Acm_acp_fill_ops( &(Sessions[ses].acp_ops) );
917
918 sess_auth_p = Acm_auth_create_sess_info(mbox, auth_name);
919 if ( NULL == sess_auth_p )
920 {
921 Alarm( SESSION, "Sess_recv_client_auth: no valid auth_methods set or received: auth method %s, closing session %d on mailbox %d\n", auth_name, ses, mbox);
922 Sess_session_denied(ses);
923 return;
924 }
925 auth_open = Acm_auth_get_auth_client_connection( sess_auth_p->required_auth_methods[0] );
926 if (auth_open == NULL)
927 {
928 Alarm(PRINT, "Sess_recv_client_auth: Illegal auth_method_id (%d) tried\n", sess_auth_p->required_auth_methods[0]);
929 dispose( sess_auth_p );
930 Sess_session_denied( ses );
931 return;
932 }
933 auth_open( sess_auth_p );
934 }
935
Sess_session_report_auth_result(struct session_auth_info * sess_auth_h,int authenticated_p)936 void Sess_session_report_auth_result(struct session_auth_info *sess_auth_h, int authenticated_p )
937 {
938 int ses, authid, num_auths;
939 int permit_count, decision, i;
940 void (*auth_open)(struct session_auth_info *);
941
942 ses = Sess_get_session_index(sess_auth_h->mbox);
943 if( ses < 0 || ses >= MAX_SESSIONS ) {
944 Alarm( PRINT, "Sess_session_report_auth_result: Illegal mbox %d for authentication. Cannot deny or allow\n", sess_auth_h->mbox);
945 dispose( sess_auth_h );
946 return;
947 }
948 num_auths = sess_auth_h->num_required_auths;
949 /* finished another method. See if entire set of checks is complete */
950 sess_auth_h->required_auth_results[sess_auth_h->completed_required_auths] = authenticated_p;
951 sess_auth_h->completed_required_auths++;
952 if (sess_auth_h->completed_required_auths < num_auths)
953 {
954 authid = sess_auth_h->required_auth_methods[sess_auth_h->completed_required_auths];
955 auth_open = Acm_auth_get_auth_client_connection(authid);
956 if (auth_open == NULL)
957 {
958 Alarm(PRINT, "Sess_session_report_auth_result: Illegal auth_method_id (%d) tried\n", authid);
959 dispose( sess_auth_h );
960 Sess_session_denied( ses );
961 return;
962 }
963 auth_open(sess_auth_h);
964 return;
965 }
966 permit_count = 0;
967 for (i = 0; i < num_auths; i++)
968 if ( sess_auth_h->required_auth_results[i] ) permit_count++;
969
970 dispose( sess_auth_h );
971
972 if ( permit_count < num_auths )
973 {
974 /* session is denied if any authentication method fails */
975 Sess_session_denied( ses );
976 return;
977 }
978 decision = Sessions[ses].acp_ops.open_connection(Sessions[ses].name);
979 if (decision != ACM_ACCESS_ALLOWED)
980 {
981 Sess_session_denied( ses );
982 return;
983 }
984 /* NOTE: Here and in the backwards compat support in Sess_accept_cont are the only places
985 * that should authorize connections
986 */
987 Sess_session_authorized( ses );
988 }
Sess_session_denied(int ses)989 void Sess_session_denied(int ses)
990 {
991 char response;
992
993 if (!Is_preauth_session(Sessions[ses].status) )
994 {
995 Alarm( EXIT, "Sess_session_denied: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
996 }
997
998 response = REJECT_AUTH;
999 send( Sessions[ses].mbox, &response, sizeof(response), 0 );
1000 Alarm( SESSION, "Sess_session_denied: Authorization denied for %s on mailbox %d\n",
1001 Sessions[ses].name,
1002 Sessions[ses].mbox );
1003 close( Sessions[ses].mbox );
1004
1005 Sess_unhash_session (&Sessions[ses]);
1006 Sess_remove_session (&Sessions[ses]);
1007 Num_sessions--;
1008 GlobalStatus.num_sessions = Num_sessions;
1009
1010 return;
1011 }
1012
Sess_session_authorized(int ses)1013 void Sess_session_authorized(int ses)
1014 {
1015 char ip[16];
1016 char response;
1017 unsigned int name_len;
1018 char private_group_name[MAX_GROUP_NAME];
1019
1020 if (!Is_preauth_session(Sessions[ses].status) )
1021 {
1022 Alarm( EXIT, "Sess_session_authorized: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
1023 }
1024
1025 /*
1026 * accept message looks like:
1027 *
1028 * byte - ACCEPT_SESSION code
1029 * byte - version of spread
1030 * byte - subversion of spread
1031 * (optional) byte - patch version of spread (only if library is 3.15.0 or greater)
1032 * byte - len of name
1033 * len bytes - name
1034 *
1035 */
1036 Sessions[ses].num_mess = 0;
1037 response = ACCEPT_SESSION;
1038 send( Sessions[ses].mbox, &response, 1, 0 );
1039
1040 response = SP_MAJOR_VERSION;
1041 send( Sessions[ses].mbox, &response, 1, 0 );
1042 response = SP_MINOR_VERSION;
1043 send( Sessions[ses].mbox, &response, 1, 0 );
1044
1045 if (Sessions[ses].lib_version[0]*10000 + Sessions[ses].lib_version[1]*100 + Sessions[ses].lib_version[2] >= 31500)
1046 {
1047 response = SP_PATCH_VERSION;
1048 send( Sessions[ses].mbox, &response, 1, 0 );
1049 }
1050
1051 sprintf(private_group_name, "#%s#%s", Sessions[ses].name, My.name );
1052 name_len = strlen( private_group_name );
1053 /* sending the len of the private group in one byte */
1054 response = name_len;
1055 send( Sessions[ses].mbox, &response, 1, 0 );
1056 /* sending the private group name */
1057 send( Sessions[ses].mbox, private_group_name, name_len, 0 );
1058
1059 E_attach_fd( Sessions[ses].mbox, READ_FD, Sess_read, Sessions[ses].type, NULL,
1060 LOW_PRIORITY );
1061 E_attach_fd( Sessions[ses].mbox, EXCEPT_FD, Sess_read, Sessions[ses].type, NULL,
1062 LOW_PRIORITY );
1063
1064 Sessions[ses].status = Set_op_session( Sessions[ses].status );
1065 Sessions[ses].status = Clear_preauth_session( Sessions[ses].status );
1066
1067 Prot_Create_Local_Session(&Sessions[ses]);
1068
1069 Message_reset_current_location(&(Sessions[ses].read) );
1070 Message_reset_current_location(&(Sessions[ses].write) );
1071 Sessions[ses].read.in_mess_head = 1;
1072
1073 Log_sess_connect( Sessions[ses].mbox, Sessions[ses].address,
1074 Sessions[ses].name );
1075
1076 Conf_id_to_str( Sessions[ses].address, ip );
1077 Alarm( SESSION, "Sess_session_authorized: Accepting from %s with private name %s on mailbox %d\n",
1078 ip,
1079 Sessions[ses].name,
1080 Sessions[ses].mbox );
1081 }
Sess_validate_read_header(mailbox mbox,int ses,int head_size,message_header * head_ptr)1082 static int Sess_validate_read_header( mailbox mbox, int ses, int head_size, message_header *head_ptr)
1083 {
1084 char private_name[MAX_PRIVATE_NAME+1];
1085 char proc_name[MAX_PROC_NAME];
1086 int ret, type_bits, memb_bits;
1087
1088 /* Disallow more then one message type being set or more then one Join/Leave/Kill being set */
1089 if ( ( (type_bits = count_bits_set( head_ptr->type, 0, 6)) > 1) ||
1090 ( (memb_bits = count_bits_set( head_ptr->type, 16, 19)) > 1) ||
1091 ( (type_bits + memb_bits) != 1 ) )
1092 {
1093 Alarm( SESSION, "Sess_validate_read_header: Message has illegal type field 0x%x\n", head_ptr->type);
1094 return(-1);
1095 }
1096 head_ptr->private_group_name[MAX_GROUP_NAME -1] = '\0';
1097
1098 ret = G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
1099 if( ret < 0 )
1100 {
1101 Alarm( SESSION, "Sess_validate_read_header: Message has illegal private_group_name (priv, proc)\n");
1102 return(-1);
1103 }
1104 if( strncmp( proc_name, My.name, MAX_PROC_NAME ) != 0 )
1105 {
1106 Alarm( SESSION, "Sess_validate_read_header: proc name %s is not my name %s\n",
1107 proc_name, My.name );
1108 return(-1);
1109 }
1110 if (strncmp(private_name , Sessions[ses].name, MAX_PRIVATE_NAME) )
1111 {
1112 Alarm( PRINT, "Sess_validate_read_header: Session %s trying to make session %s do something\n",
1113 private_name, Sessions[ses].name );
1114 return(-1);
1115 }
1116
1117 if ( (head_ptr->num_groups < 0) || (head_ptr->num_groups > MAX_GROUPS_PER_MESSAGE) )
1118 {
1119 Alarm( SESSION, "Sess_validate_read_header: Message has negative or too large num_groups field\n", head_ptr->num_groups);
1120 return(-1);
1121 }
1122 if ( head_ptr->hint & ~( 0x80000080 | 0x00ffff00 ) )
1123 {
1124 Alarm( SESSION, "Sess_validate_read_header: Message has illegal hint field 0x%x\n", head_ptr->hint);
1125 return(-1);
1126 }
1127
1128 if ( (head_ptr->data_len < 0 ) || ( head_ptr->data_len > MAX_MESSAGE_BODY_LEN) )
1129 {
1130 Alarm( SESSION, "Sess_validate_read_header: Message has negative or too large data_len %d\n", head_ptr->data_len);
1131 return(-1);
1132 }
1133 if ( (head_ptr->data_len + MAX_GROUP_NAME * head_ptr->num_groups) > (MAX_MESSAGE_BODY_LEN - head_size ) )
1134 {
1135 Alarm( SESSION, "Sess_validate_read_header: Message + Groups is too large (%d + %d = %d). MAX size is: %d\n",
1136 head_ptr->data_len, MAX_GROUP_NAME * head_ptr->num_groups,
1137 head_ptr->data_len + MAX_GROUP_NAME * head_ptr->num_groups,
1138 MAX_MESSAGE_BODY_LEN - head_size );
1139 return(-1);
1140 }
1141 /* Passed all checks, so valid */
1142 return( 0 );
1143 }
1144
Sess_read(mailbox mbox,int dummy,void * d2)1145 static void Sess_read( mailbox mbox, int dummy, void *d2 )
1146 {
1147 message_header *head_ptr, *msg_head;
1148 message_obj *msg;
1149 scatter *scat;
1150 down_link *down_ptr;
1151 message_link *mess_link;
1152 int packet_index, byte_index, to_read;
1153 int len, remain, ret;
1154 int head_size, data_frag_len;
1155 int ses, ioctl_cmd;
1156 char *head_cbuf;
1157 #if 0
1158 #ifndef ARCH_SCATTER_NONE
1159 static struct msghdr msgh;
1160 #endif /* ARCH_SCATTER_NONE */
1161 #endif /* 0 */
1162 #if 0
1163 /* we currently don't use recvmsg */
1164 #ifndef ARCH_SCATTER_NONE
1165 msgh.msg_name = (caddr_t) 0;
1166 msgh.msg_namelen = 0;
1167 msgh.msg_iov = (struct iovec *)scat->elements;
1168 msgh.msg_iovlen = scat->num_elements;
1169 #endif /* ARCH_SCATTER_NONE */
1170
1171 #ifdef ARCH_SCATTER_CONTROL
1172 msgh.msg_control = (caddr_t) 0;
1173 msgh.msg_controllen = 0;
1174 #endif /* ARCH_SCATTER_CONTROL */
1175 #ifdef ARCH_SCATTER_ACCRIGHTS
1176 msgh.msg_accrights = (caddr_t) 0;
1177 msgh.msg_accrightslen = 0;
1178 #endif /* ARCH_SCATTER_ACCRIGHTS */
1179 #endif /* 0 */
1180
1181 ses = Sess_get_session_index(mbox);
1182 if( ses < 0 || ses >= MAX_SESSIONS ) {
1183 Alarm( PRINT, "Sess_read: Illegal mbox %d for read\n", mbox);
1184 return;
1185 }
1186 if (Sessions[ses].read_mess == NULL)
1187 Sessions[ses].read_mess = Message_new_message();
1188
1189 msg = Sessions[ses].read_mess;
1190 head_ptr = Message_get_message_header(msg);
1191 head_cbuf = (char *) head_ptr;
1192 head_size = Message_get_header_size();
1193
1194 /* set file descriptor to non blocking */
1195 ioctl_cmd = 1;
1196 ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1197
1198 if ( Sessions[ses].read.in_mess_head == 1 )
1199 {
1200 /* read up to size of message_header */
1201 len = Sessions[ses].read.cur_byte;
1202 remain = sizeof(message_header) - len;
1203 ret = recv( mbox, (char *) &head_cbuf[len], remain, 0 );
1204 if( ret == remain )
1205 {
1206 Sessions[ses].read.cur_byte += ret;
1207 Message_set_location_begin_body(&(Sessions[ses].read) );
1208 } else if (ret > 0 ) {
1209 Sessions[ses].read.cur_byte += ret;
1210 ioctl_cmd = 0;
1211 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1212 return;
1213 } else {
1214 /* error reading */
1215 if ( (ret == -1) && ( (sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK) ) ) {
1216 ioctl_cmd = 0;
1217 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1218 return;
1219 }
1220 Alarm( SESSION, "Sess_read: failed receiving header on session %d: ret %d: error: %s \n", mbox, ret, sock_strerror(sock_errno) );
1221 Sess_kill( mbox );
1222 ioctl_cmd = 0;
1223 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1224 return;
1225 }
1226 /* When we get here we have a complete header */
1227 ioctl_cmd = 0;
1228 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1229
1230 /* Fliping message header to my form if needed */
1231 if( !Same_endian( head_ptr->type ) )
1232 {
1233 Flip_mess( head_ptr );
1234 }
1235 /* Setting endian to my endian on the header */
1236 head_ptr->type = Set_endian( head_ptr->type );
1237
1238 /* Validate all fields */
1239 Alarm( SESSION, "Sess_read: Message has type field 0x%x\n", head_ptr->type);
1240 ret = Sess_validate_read_header( mbox, ses, head_size, head_ptr);
1241 if (ret < 0 )
1242 {
1243 /* invalid header */
1244 Sess_kill(mbox);
1245 return;
1246 }
1247 } /* finished reading and validating header */
1248
1249 /*
1250 * to do recvmsg, but need to trick with the starting AFTER the header
1251 * on the first packet, and then to return the big_scatter to original
1252 * form. read at *most* head_ptr->data_len (set scatter lengths accordingly
1253 * everytime here from scratch!
1254 *
1255 * ret = recvmsg( mbox, &msg, 0 );
1256 * if( ret <=0 )
1257 * {
1258 * Alarm( SESSION, "Sess_read: failed receiving message on session %d\n", mbox );
1259 * Sess_kill( mbox );
1260 * return;
1261 * }
1262 */
1263
1264 /* read the rest of the message if needed, reserving room at the beginning
1265 * of the first fragment(scat buf) for the message header and the lts and seq fields. */
1266
1267 /* enable non-blocking io */
1268 ioctl_cmd = 1;
1269 ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1270
1271 data_frag_len = Message_get_data_fragment_len();
1272 scat = Message_get_data_scatter(msg);
1273 remain = ( head_ptr->data_len + MAX_GROUP_NAME*head_ptr->num_groups ) - Sessions[ses].read.total_bytes;
1274 for( ; remain > 0; remain -= ret )
1275 {
1276 packet_index = Sessions[ses].read.cur_element;
1277 byte_index = Sessions[ses].read.cur_byte;
1278 if (packet_index >= scat->num_elements)
1279 {
1280 /* We are beginning a new fragment -- so allocate it */
1281 assert(byte_index == 0);
1282 Message_add_scat_element(msg);
1283 }
1284 to_read = ( data_frag_len - byte_index );
1285 if( to_read > remain ) to_read = remain;
1286 ret = recv( mbox, &scat->elements[packet_index].buf[byte_index],
1287 to_read, 0 );
1288 if( ret == to_read )
1289 {
1290 Sessions[ses].read.cur_byte = 0;
1291 Sessions[ses].read.cur_element++;
1292 Sessions[ses].read.total_bytes += ret;
1293 } else if (ret > 0 ) {
1294 Sessions[ses].read.cur_byte += ret;
1295 Sessions[ses].read.total_bytes += ret;
1296 ioctl_cmd = 0;
1297 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1298 return;
1299 } else {
1300 if ( (ret == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) ) {
1301 ioctl_cmd = 0;
1302 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1303 return;
1304 }
1305 Alarm( SESSION, "Sess_read: failed receiving message on session %d, ret is %d: error: %s\n", mbox, ret, sock_strerror(sock_errno) );
1306 Alarm( SESSION, "Sess_read: failed recv msg more info: len read: %d, remain: %d, to_read: %d, pkt_index: %d, b_index: %d, scat_nums: %d\n",Sessions[ses].read.total_bytes, remain, to_read, packet_index, byte_index, scat->num_elements );
1307 Sess_kill( mbox );
1308 ioctl_cmd = 0;
1309 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1310 return;
1311 }
1312 }
1313
1314 /* We now have a complete message */
1315 ioctl_cmd = 0;
1316 ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1317
1318 /* reset active read_mess to empty */
1319 Message_reset_current_location(&(Sessions[ses].read));
1320 Sessions[ses].read.in_mess_head = 1;
1321 Sessions[ses].read_mess = NULL;
1322
1323 Message_element_len_fixup(msg);
1324
1325 #ifdef PROBE_LATENCY
1326 if (Is_latency_mess( head_ptr->type ) )
1327 {
1328 int32u initial_offset, htime_offset;
1329 int32u *p_time_offset;
1330 sp_time cur_time, ncur_time;
1331
1332 initial_offset = MAX_GROUP_NAME*head_ptr->num_groups + head_size;
1333 p_time_offset = (int32u *) &msg->body.elements[0].buf[initial_offset];
1334 htime_offset = ntohl(*p_time_offset);
1335 Alarm(SESSION, "Sess_read: Msg Data at %d with time_offset %u \n", initial_offset, htime_offset);
1336 cur_time = E_get_time();
1337 ncur_time.sec = htonl(cur_time.sec);
1338 ncur_time.usec = htonl(cur_time.usec);
1339 memcpy(&(msg->body.elements[0].buf[htime_offset + initial_offset]), &ncur_time, sizeof(sp_time));
1340 Alarm(SESSION, "Sess_read: timestamped time (%d, %d) in byte %d of message\n", cur_time.sec, cur_time.usec, htime_offset);
1341 *p_time_offset = htonl(htime_offset + sizeof(sp_time) );
1342 }
1343 #endif /* PROBE_LATENCY */
1344
1345 /* Do ACM access control checks */
1346 /* Note, disconnects (Is_kill_mess) are not limited. Someone can always cut themselves off */
1347 if ( Is_leave_mess( head_ptr->type ) )
1348 {
1349 char *groups_ptr;
1350 int decision;
1351 groups_ptr = Message_get_first_group( msg );
1352 decision = Sessions[ses].acp_ops.leave_group( head_ptr->private_group_name, groups_ptr, NULL);
1353 if (decision != ACM_ACCESS_ALLOWED)
1354 {
1355 head_ptr->type = (head_ptr->type & ~LEAVE_MESS);
1356 head_ptr->type |= CAUSED_BY_LEAVE;
1357 Sess_create_reject_message( msg );
1358 Sess_deliver_reject( msg );
1359 return;
1360 }
1361 }
1362 if ( Is_join_mess( head_ptr->type ) )
1363 {
1364 char *groups_ptr;
1365 int decision;
1366 groups_ptr = Message_get_first_group( msg );
1367
1368 /* Make sure we don't let a join happen if the limit has been reached. */
1369 if( G_get_num_local( groups_ptr ) == MAX_LOCAL_GROUP_MEMBERS ) {
1370 Alarm( PRINT, "Sess_read: Attempt by session %s to join group %s "
1371 "failed: too many local members.\n", head_ptr->private_group_name, groups_ptr );
1372 head_ptr->type = (head_ptr->type & ~JOIN_MESS);
1373 head_ptr->type |= CAUSED_BY_JOIN;
1374 Sess_create_reject_message( msg );
1375 Sess_deliver_reject( msg );
1376 return;
1377 }
1378
1379 decision = Sessions[ses].acp_ops.join_group( head_ptr->private_group_name, groups_ptr, NULL);
1380 if (decision != ACM_ACCESS_ALLOWED)
1381 {
1382 head_ptr->type = (head_ptr->type & ~JOIN_MESS);
1383 head_ptr->type |= CAUSED_BY_JOIN;
1384 Sess_create_reject_message( msg );
1385 Sess_deliver_reject( msg );
1386 return;
1387 }
1388 }
1389 if ( Is_only_regular_mess( head_ptr->type ) )
1390 {
1391 char *groups_ptr;
1392 char target_groups[MAX_GROUPS_PER_MESSAGE][MAX_GROUP_NAME];
1393 int decision, num_p2p_dest;
1394 groups_ptr = Message_get_groups_array( msg );
1395 num_p2p_dest = Sess_get_p2p_dests(head_ptr->num_groups, (char (*)[MAX_GROUP_NAME])groups_ptr, target_groups);
1396 if (num_p2p_dest)
1397 {
1398 decision = Sessions[ses].acp_ops.p2p_send( head_ptr->private_group_name, num_p2p_dest, target_groups, head_ptr->type, ( (head_ptr->hint >> 8) & 0x0000ffff) );
1399 if (decision != ACM_ACCESS_ALLOWED)
1400 {
1401 Sess_create_reject_message( msg );
1402 Sess_deliver_reject( msg );
1403 return;
1404 }
1405 }
1406 if (head_ptr->num_groups > num_p2p_dest)
1407 {
1408 decision = Sessions[ses].acp_ops.mcast_send( head_ptr->private_group_name, head_ptr->num_groups, (char (*)[MAX_GROUP_NAME])groups_ptr, head_ptr->type, ( (head_ptr->hint >> 8) & 0x0000ffff) );
1409 if (decision != ACM_ACCESS_ALLOWED)
1410 {
1411 Sess_create_reject_message( msg );
1412 Sess_deliver_reject( msg );
1413 return;
1414 }
1415 }
1416 }
1417
1418 /* create new down_link and big_scatter */
1419 down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), mbox, 0);
1420 if (down_ptr == NULL)
1421 {
1422 Alarm( SESSION, "Sess_read: Session has illegal message type 0x%x\n", head_ptr->type);
1423 Sess_kill( mbox );
1424 return;
1425 }
1426 down_ptr->mess = msg;
1427 msg_head = Message_get_message_header(down_ptr->mess);
1428
1429 if (Is_kill_mess(msg_head->type) )
1430 {
1431 /* We are going to overwrite the group that is sent from the library.
1432 * it is not needed for kill messages, so we shrink the data field to ignore it
1433 */
1434 len = Message_kill_mess_fixup(down_ptr->mess, Sessions[ses].read.total_bytes - MAX_GROUP_NAME, mbox);
1435
1436 /* A bug in both 3.13 and 4 I think is that if we get a DISCONNECT message
1437 * from the client and we process and deliver that before discovering the
1438 * closed socket ourselves and calling Sess_kill(), then the session
1439 * is in the wrong state and we will crash when we try to finish delivery.
1440 */
1441 Log_sess_disconnect( Sessions[ses].mbox, Sessions[ses].address, Sessions[ses].name,
1442 Sessions[ses].num_mess );
1443 /* clear his structure */
1444 while( Sessions[ses].num_mess > 0 )
1445 {
1446 mess_link = Sessions[ses].first;
1447 Sessions[ses].first = Sessions[ses].first->next;
1448 Sess_dispose_message( mess_link );
1449 Sessions[ses].num_mess--;
1450 }
1451
1452 /* close the mailbox and mark it unoperational */
1453 E_dequeue( Sess_badger, mbox, NULL );
1454 E_detach_fd( mbox, READ_FD );
1455 E_detach_fd( mbox, EXCEPT_FD );
1456 close( mbox );
1457 /* the mailbox is closed but the entry still points to it */
1458 Sessions[ses].status = Clear_op_session( Sessions[ses].status );
1459 Alarm( SESSION, "Sess_read: disconnecting session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
1460 }
1461
1462 Alarm( SESSION, "Sess_read: queueing message of type %d with len %d to the protocol\n",
1463 down_ptr->type, Sessions[ses].read.total_bytes );
1464 Prot_new_message( down_ptr, Sessions[ses].down_queue );
1465 }
1466
Sess_get_p2p_dests(int num_groups,char groups[][MAX_GROUP_NAME],char dests[][MAX_GROUP_NAME])1467 static int Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] )
1468 {
1469 int i, num_p2p_targets = 0;
1470 for( i=0; i < num_groups; i++ )
1471 {
1472 if( groups[i][0] == '#' )
1473 {
1474 /* private group */
1475 memcpy( dests[num_p2p_targets], groups[i], MAX_GROUP_NAME);
1476 num_p2p_targets++;
1477 }
1478 }
1479 return( num_p2p_targets );
1480 }
1481 /* Take a message received from a client and change it into the form of
1482 * a reject message. Destination groups, user data, mess_type and type field
1483 * are all preserved to give the sender information about what message was
1484 * rejected.
1485 */
Sess_create_reject_message(message_obj * msg)1486 static void Sess_create_reject_message ( message_obj *msg )
1487 {
1488 message_header *head_ptr;
1489 int32u old_type;
1490
1491 head_ptr = Message_get_message_header(msg);
1492
1493 old_type = head_ptr->type;
1494 head_ptr->type = REJECT_MESS;
1495 head_ptr->type = Set_endian( head_ptr->type );
1496 /* If original message was SELF_DISCARD, then maintain that state */
1497 if (Is_self_discard( old_type) ) head_ptr->type |= SELF_DISCARD;
1498
1499 Message_add_oldtype_to_reject( msg, old_type );
1500
1501 Alarm( PRINT, "Sess_create_reject_mess: Created Reject for sender %s type 0x%x oldtype 0x%x for first group %s\n",
1502 head_ptr->private_group_name, head_ptr->type, old_type, Message_get_first_group( msg ) );
1503 return;
1504 }
1505
Sess_deliver_reject(message_obj * msg)1506 static void Sess_deliver_reject( message_obj *msg )
1507 {
1508 message_link *mess_link;
1509
1510 mess_link = new(MESSAGE_LINK);
1511 if (mess_link == NULL ) {
1512 Alarm(EXIT, "Sess_deliver_reject: Failed to allocate a new MESSAGE_LINK.\n");
1513 return;
1514 }
1515 mess_link->mess = msg;
1516 mess_link->next = NULL;
1517
1518 Sess_deliver_message( mess_link );
1519
1520 return;
1521 }
1522
Sess_write(int ses,message_link * mess_link,int * needed)1523 void Sess_write( int ses, message_link *mess_link, int *needed )
1524 {
1525 message_obj *msg;
1526 message_link *tmp_link;
1527 scatter *scat;
1528 int ioctl_cmd;
1529 int ret;
1530 int total_to_send;
1531 int len_sent, first_data_len;
1532 char *first_data_ptr;
1533 int i;
1534 message_header *head_ptr;
1535
1536 if( !Is_op_session( Sessions[ses].status ) ) return;
1537
1538 if( Sessions[ses].num_mess >= MAX_SESSION_MESSAGES )
1539 {
1540 Alarm( SESSION,
1541 "Sess_write: killing mbox %d for not reading\n",
1542 Sessions[ses].mbox );
1543 Sess_kill( Sessions[ses].mbox );
1544 return;
1545 }
1546
1547 if( Sessions[ses].num_mess > 0 )
1548 {
1549 Sess_badger( Sessions[ses].mbox , NULL);
1550 }
1551
1552 msg = mess_link->mess;
1553 Obj_Inc_Refcount(msg);
1554 scat = Message_get_data_scatter(msg);
1555
1556 for( total_to_send=0, i=0; i < scat->num_elements; i++ )
1557 total_to_send += scat->elements[i].len;
1558
1559 /* since also sending message_header */
1560 total_to_send += Message_get_non_body_header_size();
1561
1562 head_ptr = Message_get_message_header(msg);
1563 #ifdef PROBE_LATENCY
1564 if (Is_latency_mess( head_ptr->type ) )
1565 {
1566 int32u initial_offset, htime_offset;
1567 int32u *p_time_offset;
1568 sp_time cur_time, ncur_time;
1569
1570 initial_offset = MAX_GROUP_NAME*head_ptr->num_groups;
1571 p_time_offset = (int32u *) &msg->body.elements[0].buf[initial_offset];
1572 htime_offset = ntohl(*p_time_offset);
1573 Alarm(SESSION, "Sess_write: Msg Data at %d with timeoffset %u\n", initial_offset, htime_offset);
1574 cur_time = E_get_time();
1575 ncur_time.sec = htonl(cur_time.sec);
1576 ncur_time.usec = htonl(cur_time.usec);
1577 memcpy(&(msg->body.elements[0].buf[htime_offset + initial_offset]), &ncur_time, sizeof(sp_time));
1578 Alarm(SESSION, "Sess_write: timestamped time (%d, %d) in byte %d of message\n", cur_time.sec, cur_time.usec, htime_offset);
1579 *p_time_offset = htonl(htime_offset + sizeof(sp_time) );
1580 }
1581 #endif
1582 len_sent = 0;
1583 if( Sessions[ses].num_mess == 0 )
1584 {
1585 /* set file descriptor to non blocking */
1586 ioctl_cmd = 1;
1587 ret = ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1588
1589 /* Send the first part of message which must be handled specially */
1590 ret = send(Sessions[ses].mbox, (char *) head_ptr,
1591 sizeof(message_header), 0);
1592 if ( ret > 0 ) len_sent += ret;
1593 if ( ret != sizeof(message_header) )
1594 {
1595 goto end_write;
1596 }
1597 /* Send the first data of message which must be handled specially */
1598 first_data_ptr = Message_get_first_data_ptr(msg);
1599 first_data_len = Message_get_first_data_len(msg);
1600 ret = send(Sessions[ses].mbox, first_data_ptr,
1601 first_data_len, 0);
1602 if ( ret > 0 ) len_sent += ret;
1603 if ( ret != first_data_len )
1604 {
1605 goto end_write;
1606 }
1607 /* send the message after first buffer*/
1608 for( i=1; i < scat->num_elements; i++ )
1609 {
1610 ret = send( Sessions[ses].mbox, scat->elements[i].buf,
1611 scat->elements[i].len, 0);
1612 if( ret > 0 ) len_sent += ret;
1613 if( ret != scat->elements[i].len )
1614 {
1615 break;
1616 }
1617 }
1618 /* set file descriptor back to blocking */
1619 end_write:
1620 ioctl_cmd = 0;
1621 ret = ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
1622
1623 }
1624
1625 if( len_sent < total_to_send )
1626 {
1627 /* this message has to be linked */
1628 if( *needed )
1629 {
1630 /* create a copy of mess_link and link it */
1631 tmp_link = new(MESSAGE_LINK);
1632 if (tmp_link == NULL ) {
1633 Alarm(EXIT, "Sess_write: Failed to allocate a new MESSAGE_LINK.\n");
1634 return;
1635 }
1636 tmp_link->mess = Message_copy_message(msg);
1637 ++*needed;
1638 }else{
1639 /* should link mess_link itself */
1640 tmp_link = mess_link;
1641 *needed=1;
1642 }
1643 /* link the message */
1644 tmp_link->next = 0;
1645 if( Sessions[ses].num_mess == 0 )
1646 {
1647 Sessions[ses].first = tmp_link;
1648 Sessions[ses].last = tmp_link;
1649 /* setting cur_element and cur_byte */
1650 Message_calculate_current_location(tmp_link->mess, len_sent, &(Sessions[ses].write) );
1651
1652 /* We will need to badger this guy */
1653 E_queue( Sess_badger, Sessions[ses].mbox, NULL, Badger_timeout );
1654 }else{
1655 /* This guy was already badgered */
1656 Sessions[ses].last->next = tmp_link;
1657 Sessions[ses].last = tmp_link;
1658 }
1659 Sessions[ses].num_mess++;
1660 }
1661 Message_Dec_Refcount(msg);
1662 }
1663
Sess_badger(mailbox mbox,void * dummy)1664 static void Sess_badger( mailbox mbox, void *dummy )
1665 {
1666 int ses;
1667 message_link *mess_link;
1668 int able_to_write;
1669 message_obj *msg;
1670 scatter *scat;
1671 int ioctl_cmd;
1672 int bytes_to_send, from;
1673 int i;
1674 int ret;
1675
1676 Alarm( SESSION, "Sess_badger: for mbox %d\n", mbox );
1677 ses = Sess_get_session_index( mbox );
1678 if( ses < 0 || ses >= MAX_SESSIONS ) return;
1679 if( !Is_op_session( Sessions[ses].status ) ) return;
1680
1681 if( Sessions[ses].num_mess <= 0 ) return;
1682
1683 /* set file descriptor to non blocking */
1684 ioctl_cmd = 1;
1685 ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1686
1687 for( able_to_write = 1 ; Sessions[ses].num_mess > 0 && able_to_write; )
1688 {
1689 msg = Sessions[ses].first->mess;
1690 Obj_Inc_Refcount(msg);
1691
1692 /* First if we havn't sent the mess_head yet, send that special
1693 * and then reset the cur_byte and in_mess_head fields
1694 */
1695 if ( Sessions[ses].write.in_mess_head == 1 )
1696 {
1697 char *tmp_buf;
1698
1699 tmp_buf = (char *) Message_get_message_header(msg);
1700 from = Sessions[ses].write.cur_byte;
1701 bytes_to_send = sizeof(message_header) - from;
1702 ret = send(mbox, &tmp_buf[from], bytes_to_send, 0);
1703 if (ret == bytes_to_send)
1704 {
1705 Message_set_location_begin_body(&(Sessions[ses].write));
1706 } else
1707 {
1708 if (ret > 0) Sessions[ses].write.cur_byte = from + ret;
1709 able_to_write = 0;
1710 }
1711 }
1712 /* Next if we still can, send some body of this messsage */
1713 if ( able_to_write )
1714 {
1715 scat = Message_get_data_scatter(msg);
1716 for( i=Sessions[ses].write.cur_element, from=Sessions[ses].write.cur_byte;
1717 i < scat->num_elements;
1718 i++, from=0 )
1719 {
1720 bytes_to_send = scat->elements[i].len - from;
1721 ret = send( mbox, &(scat->elements[i].buf[from]), bytes_to_send, 0);
1722 if( ret == bytes_to_send )
1723 {
1724 Sessions[ses].write.cur_byte = 0;
1725 Sessions[ses].write.cur_element++;
1726 }else{
1727 if( ret > 0 ) Sessions[ses].write.cur_byte += ret;
1728 able_to_write = 0;
1729 break;
1730 }
1731 }
1732 }
1733 if( able_to_write )
1734 {
1735 /* free that message */
1736 mess_link = Sessions[ses].first;
1737 Sessions[ses].first = Sessions[ses].first->next;
1738 Sessions[ses].num_mess--;
1739 Message_reset_current_location(&(Sessions[ses].write) );
1740 Sess_dispose_message( mess_link );
1741 }
1742 Message_Dec_Refcount(msg);
1743 } /* for loop per message */
1744 /* set file descriptor back to blocking */
1745 ioctl_cmd = 0;
1746 ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
1747
1748 if( Sessions[ses].num_mess > 0 ) E_queue( Sess_badger, mbox, NULL, Badger_timeout );
1749 }
1750
Sess_kill(mailbox mbox)1751 static void Sess_kill( mailbox mbox )
1752 {
1753 int ses;
1754 message_link *mess_link;
1755 message_obj *kill_mess;
1756 message_header *kill_head;
1757 down_link *down_ptr;
1758 int head_size;
1759 char private_send_group[MAX_GROUP_NAME];
1760
1761 head_size = Message_get_header_size();
1762 ses = Sess_get_session_index(mbox);
1763 if( ses < 0 || ses >= MAX_SESSIONS ) {
1764 Alarm( PRINT, "Sess_kill: Illegal mbox %d for killing. Cannot cleanup\n", mbox);
1765 return;
1766 }
1767 if( !Is_op_session( Sessions[ses].status ) )
1768 {
1769 if ( !Is_preauth_session( Sessions[ses].status ) )
1770 Alarm( PRINT,
1771 "Sess_kill: session %s with mailbox %d is already killed (status %d)\n",
1772 Sessions[ses].name, mbox, Sessions[ses].status );
1773 else
1774 Alarm( EXIT,
1775 "Sess_kill: BUG! session %s with mailbox %d killed before authentication and authorization completed (status 0x%x)\n",
1776 Sessions[ses].name, mbox, Sessions[ses].status);
1777 }
1778
1779 /* send a message to kill this session */
1780 sprintf( private_send_group, "#%s#%s", Sessions[ses].name, My.name );
1781 kill_mess = Message_create_message(KILL_MESS | SAFE_MESS, private_send_group);
1782 kill_head = Message_get_message_header(kill_mess);
1783 Message_kill_mess_fixup(kill_mess, head_size, mbox );
1784
1785 down_ptr = Prot_Create_Down_Link(kill_mess, Message_get_packet_type(kill_head->type), mbox, 0);
1786 down_ptr->mess = kill_mess;
1787
1788 Obj_Inc_Refcount(down_ptr->mess);
1789 Prot_new_message( down_ptr, Sessions[ses].down_queue );
1790 Message_Dec_Refcount(kill_mess);
1791
1792 Log_sess_disconnect( Sessions[ses].mbox, Sessions[ses].address, Sessions[ses].name,
1793 Sessions[ses].num_mess );
1794 /* clear his structure */
1795 while( Sessions[ses].num_mess > 0 )
1796 {
1797 mess_link = Sessions[ses].first;
1798 Sessions[ses].first = Sessions[ses].first->next;
1799 Sess_dispose_message( mess_link );
1800 Sessions[ses].num_mess--;
1801 }
1802 /* reset active read_mess to empty */
1803 Message_reset_current_location(&(Sessions[ses].read));
1804 Sessions[ses].read.in_mess_head = 1;
1805 if (Sessions[ses].read_mess != NULL)
1806 Message_dispose_message( Sessions[ses].read_mess );
1807 Sessions[ses].read_mess = NULL;
1808
1809 /* close the mailbox and mark it unoperational */
1810 E_dequeue( Sess_badger, mbox, NULL );
1811 E_detach_fd( mbox, READ_FD );
1812 E_detach_fd( mbox, EXCEPT_FD );
1813 close(mbox);
1814 /* the mailbox is closed but the entry still points to it */
1815 Sessions[ses].status = Clear_op_session( Sessions[ses].status );
1816 Alarm( SESSION, "Sess_kill: killing session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
1817 }
1818
Sess_deliver_message(message_link * mess_link)1819 void Sess_deliver_message( message_link *mess_link )
1820 {
1821 static int target_sessions[MAX_SESSIONS];
1822 int num_target_sessions;
1823 int source_ses;
1824 message_header *head_ptr;
1825 message_obj *msg;
1826 scatter *scat;
1827 char *target_groups;
1828 char private_name[MAX_PRIVATE_NAME+1];
1829 char proc_name[MAX_PROC_NAME];
1830 int needed;
1831 int i;
1832
1833 msg = mess_link->mess;
1834 Obj_Inc_Refcount(msg);
1835 head_ptr = Message_get_message_header(msg);
1836
1837 Message_endian_correct_message_header(msg);
1838
1839 if( Is_join_mess( head_ptr->type ) )
1840 {
1841 Sess_handle_join( mess_link );
1842 Message_Dec_Refcount(msg);
1843 return;
1844 }
1845
1846 if( Is_leave_mess( head_ptr->type ) )
1847 {
1848 Sess_handle_leave( mess_link );
1849 Message_Dec_Refcount(msg);
1850 return;
1851 }
1852
1853 if( Is_kill_mess( head_ptr->type ) )
1854 {
1855 Sess_handle_kill( mess_link );
1856 Message_Dec_Refcount(msg);
1857 return;
1858 }
1859
1860 if( Is_groups_mess( head_ptr->type ) )
1861 {
1862 G_handle_groups( mess_link );
1863 Message_Dec_Refcount(msg);
1864 return;
1865 }
1866
1867 /* regular message */
1868
1869 GlobalStatus.message_delivered++;
1870
1871 /* Setting endian to my endian on the header */
1872 head_ptr->type = Set_endian( head_ptr->type );
1873 scat = Message_get_data_scatter(msg);
1874 /* analyze message groups to sessions */
1875 if ( Is_reject_mess(head_ptr->type) ) {
1876 num_target_sessions = 1;
1877 G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
1878 target_sessions[0] = Sess_get_session( private_name );
1879 } else {
1880 target_groups = Message_get_groups_array(msg);
1881 num_target_sessions = G_analize_groups( head_ptr->num_groups,
1882 (char (*)[MAX_GROUP_NAME])target_groups,
1883 target_sessions ) ;
1884 }
1885 /* if self_discard, sender is local and a target then eliminate sender from targets */
1886 source_ses = -1;
1887 if( num_target_sessions > 0 && Is_self_discard( head_ptr->type ) )
1888 {
1889 G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
1890 if( strcmp( My.name, proc_name ) == 0 )
1891 {
1892 source_ses = Sess_get_session( private_name );
1893 }
1894 }
1895 needed = 0;
1896 for( i = 0; i < num_target_sessions ; i++ )
1897 {
1898 if( source_ses == target_sessions[i] ) continue; /* self_discard */
1899 Sess_write( target_sessions[i], mess_link, &needed );
1900 }
1901
1902 Message_Dec_Refcount(msg);
1903 if( !needed ) Sess_dispose_message( mess_link );
1904 }
1905
Sess_deliver_reg_memb(configuration reg_memb,membership_id reg_memb_id)1906 void Sess_deliver_reg_memb( configuration reg_memb, membership_id reg_memb_id )
1907 {
1908 G_handle_reg_memb( reg_memb, reg_memb_id );
1909 }
1910
Sess_deliver_trans_memb(configuration trans_memb,membership_id trans_memb_id)1911 void Sess_deliver_trans_memb( configuration trans_memb, membership_id trans_memb_id )
1912 {
1913 G_handle_trans_memb( trans_memb, trans_memb_id );
1914 }
1915
Sess_handle_join(message_link * mess_link)1916 static void Sess_handle_join( message_link *mess_link )
1917 {
1918 message_header *join_head;
1919 char *group_name;
1920
1921 join_head = Message_get_message_header(mess_link->mess);
1922 group_name = Message_get_first_group(mess_link->mess);
1923
1924 G_handle_join( join_head->private_group_name, group_name );
1925
1926 Sess_dispose_message( mess_link );
1927
1928 }
1929
Sess_handle_leave(message_link * mess_link)1930 static void Sess_handle_leave( message_link *mess_link )
1931 {
1932 message_header *leave_head;
1933 char *group_name;
1934
1935 leave_head = Message_get_message_header(mess_link->mess);
1936 group_name = Message_get_first_group(mess_link->mess);
1937
1938 G_handle_leave( leave_head->private_group_name, group_name );
1939
1940 Sess_dispose_message( mess_link );
1941 }
1942
Sess_handle_kill(message_link * mess_link)1943 static void Sess_handle_kill( message_link *mess_link )
1944 {
1945 char private_name[MAX_PRIVATE_NAME+1];
1946 char proc_name[MAX_PROC_NAME];
1947 message_header *kill_head;
1948 int ses;
1949 int ret;
1950
1951 kill_head = Message_get_message_header(mess_link->mess);
1952
1953 ret = G_private_to_names( kill_head->private_group_name, private_name, proc_name );
1954 if( ret < 0 ) Alarm( EXIT, "Sess_handle_kill: Illegal private name to kill %s\n",
1955 kill_head->private_group_name );
1956
1957 G_handle_kill( kill_head->private_group_name );
1958
1959 if( strcmp( My.name, proc_name ) == 0 )
1960 {
1961 /* my machine, we need to find the session */
1962 ses = Sess_get_session( private_name );
1963 if( ses >= 0 )
1964 {
1965 /* delete session ses */
1966 if( Is_op_session( Sessions[ses].status ) )
1967 Alarm( EXIT, "Sess_handle_kill: killing unkilled session bug!\n");
1968 Sess_unhash_session (&Sessions[ses]);
1969 Prot_Destroy_Local_Session(&(Sessions[ses]) );
1970 Sess_remove_session (&Sessions[ses]);
1971 Num_sessions--;
1972 GlobalStatus.num_sessions = Num_sessions;
1973 }
1974 }
1975 Prot_kill_session(mess_link->mess);
1976
1977 Sess_dispose_message( mess_link );
1978 }
1979
Sess_dispose_message(message_link * mess_link)1980 void Sess_dispose_message( message_link *mess_link )
1981 {
1982 Message_dispose_message(mess_link->mess);
1983 dispose( mess_link );
1984 }
1985
Sess_get_session(char * name)1986 int Sess_get_session( char *name )
1987 {
1988 int ret;
1989 session *ses;
1990
1991 for( ses = Sessions_head; ses; ses = ses->sort_next )
1992 {
1993 ret = strcmp( ses->name, name );
1994 if( ret < 0 ) continue;
1995 if( ret == 0 ) return( ses - Sessions );
1996 if( ret > 0 ) return( -1 );
1997 }
1998 return( -1 );
1999 }
2000
Flip_mess(message_header * head_ptr)2001 void Flip_mess( message_header *head_ptr )
2002 {
2003 head_ptr->type = Flip_int32( head_ptr->type );
2004 head_ptr->num_groups = Flip_int32( head_ptr->num_groups );
2005 head_ptr->data_len = Flip_int32( head_ptr->data_len );
2006 }
2007
2008