1 /*********************************************************************************************************
2 * Software License Agreement (BSD License) *
3 * Author: Sebastien Decugis <sdecugis@freediameter.net> *
4 * *
5 * Copyright (c) 2019, WIDE Project and NICT *
6 * All rights reserved. *
7 * *
8 * Redistribution and use of this software in source and binary forms, with or without modification, are *
9 * permitted provided that the following conditions are met: *
10 * *
11 * * Redistributions of source code must retain the above *
12 * copyright notice, this list of conditions and the *
13 * following disclaimer. *
14 * *
15 * * Redistributions in binary form must reproduce the above *
16 * copyright notice, this list of conditions and the *
17 * following disclaimer in the documentation and/or other *
18 * materials provided with the distribution. *
19 * *
20 * * Neither the name of the WIDE Project or NICT nor the *
21 * names of its contributors may be used to endorse or *
22 * promote products derived from this software without *
23 * specific prior written permission of WIDE Project and *
24 * NICT. *
25 * *
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34 *********************************************************************************************************/
35
36 #include "fdcore-internal.h"
37
38 #ifdef linux
39 /* This needs -D_USE_GNU, and since I have no idea what else that does, let's simply copy the declaration. */
40
41 /* Set thread name visible in the kernel and its interfaces. */
42 extern int pthread_setname_np (pthread_t __target_thread, const char *__name);
43 #endif
44
45 /********************************************************************************/
46 /* First part : handling the extensions callbacks */
47 /********************************************************************************/
48
49 /* Lists of the callbacks, and locks to protect them */
50 static pthread_rwlock_t rt_fwd_lock = PTHREAD_RWLOCK_INITIALIZER;
51 static struct fd_list rt_fwd_list = FD_LIST_INITIALIZER_O(rt_fwd_list, &rt_fwd_lock);
52
53 static pthread_rwlock_t rt_out_lock = PTHREAD_RWLOCK_INITIALIZER;
54 static struct fd_list rt_out_list = FD_LIST_INITIALIZER_O(rt_out_list, &rt_out_lock);
55
56 /* Items in the lists are the same */
57 struct rt_hdl {
58 struct fd_list chain; /* link in the rt_fwd_list or rt_out_list */
59 void * cbdata; /* the registered data */
60 union {
61 int order; /* This value is used to sort the list */
62 int dir; /* It is the direction for FWD handlers */
63 int prio; /* and the priority for OUT handlers */
64 };
65 union {
66 int (*rt_fwd_cb)(void * cbdata, struct msg ** msg);
67 int (*rt_out_cb)(void * cbdata, struct msg ** msg, struct fd_list * candidates);
68 };
69 };
70
71 /* Add a new entry in the list */
add_ordered(struct rt_hdl * new,struct fd_list * list)72 static int add_ordered(struct rt_hdl * new, struct fd_list * list)
73 {
74 /* The list is ordered by prio parameter */
75 struct fd_list * li;
76
77 CHECK_POSIX( pthread_rwlock_wrlock(list->o) );
78
79 for (li = list->next; li != list; li = li->next) {
80 struct rt_hdl * h = (struct rt_hdl *) li;
81 if (new->order <= h->order)
82 break;
83 }
84
85 fd_list_insert_before(li, &new->chain);
86
87 CHECK_POSIX( pthread_rwlock_unlock(list->o) );
88
89 return 0;
90 }
91
92 /* Register a new FWD callback */
fd_rt_fwd_register(int (* rt_fwd_cb)(void * cbdata,struct msg ** msg),void * cbdata,enum fd_rt_fwd_dir dir,struct fd_rt_fwd_hdl ** handler)93 int fd_rt_fwd_register ( int (*rt_fwd_cb)(void * cbdata, struct msg ** msg), void * cbdata, enum fd_rt_fwd_dir dir, struct fd_rt_fwd_hdl ** handler )
94 {
95 struct rt_hdl * new;
96
97 TRACE_ENTRY("%p %p %d %p", rt_fwd_cb, cbdata, dir, handler);
98 CHECK_PARAMS( rt_fwd_cb );
99 CHECK_PARAMS( (dir >= RT_FWD_REQ) && ( dir <= RT_FWD_ANS) );
100
101 /* Create a new container */
102 CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
103 memset(new, 0, sizeof(struct rt_hdl));
104
105 /* Write the content */
106 fd_list_init(&new->chain, NULL);
107 new->cbdata = cbdata;
108 new->dir = dir;
109 new->rt_fwd_cb = rt_fwd_cb;
110
111 /* Save this in the list */
112 CHECK_FCT( add_ordered(new, &rt_fwd_list) );
113
114 /* Give it back to the extension if needed */
115 if (handler)
116 *handler = (void *)new;
117
118 return 0;
119 }
120
121 /* Remove it */
fd_rt_fwd_unregister(struct fd_rt_fwd_hdl * handler,void ** cbdata)122 int fd_rt_fwd_unregister ( struct fd_rt_fwd_hdl * handler, void ** cbdata )
123 {
124 struct rt_hdl * del;
125 TRACE_ENTRY( "%p %p", handler, cbdata);
126 CHECK_PARAMS( handler );
127
128 del = (struct rt_hdl *)handler;
129 CHECK_PARAMS( del->chain.head == &rt_fwd_list );
130
131 /* Unlink */
132 CHECK_POSIX( pthread_rwlock_wrlock(&rt_fwd_lock) );
133 fd_list_unlink(&del->chain);
134 CHECK_POSIX( pthread_rwlock_unlock(&rt_fwd_lock) );
135
136 if (cbdata)
137 *cbdata = del->cbdata;
138
139 free(del);
140 return 0;
141 }
142
143 /* Register a new OUT callback */
fd_rt_out_register(int (* rt_out_cb)(void * cbdata,struct msg ** pmsg,struct fd_list * candidates),void * cbdata,int priority,struct fd_rt_out_hdl ** handler)144 int fd_rt_out_register ( int (*rt_out_cb)(void * cbdata, struct msg ** pmsg, struct fd_list * candidates), void * cbdata, int priority, struct fd_rt_out_hdl ** handler )
145 {
146 struct rt_hdl * new;
147
148 TRACE_ENTRY("%p %p %d %p", rt_out_cb, cbdata, priority, handler);
149 CHECK_PARAMS( rt_out_cb );
150
151 /* Create a new container */
152 CHECK_MALLOC(new = malloc(sizeof(struct rt_hdl)));
153 memset(new, 0, sizeof(struct rt_hdl));
154
155 /* Write the content */
156 fd_list_init(&new->chain, NULL);
157 new->cbdata = cbdata;
158 new->prio = priority;
159 new->rt_out_cb = rt_out_cb;
160
161 /* Save this in the list */
162 CHECK_FCT( add_ordered(new, &rt_out_list) );
163
164 /* Give it back to the extension if needed */
165 if (handler)
166 *handler = (void *)new;
167
168 return 0;
169 }
170
171 /* Remove it */
fd_rt_out_unregister(struct fd_rt_out_hdl * handler,void ** cbdata)172 int fd_rt_out_unregister ( struct fd_rt_out_hdl * handler, void ** cbdata )
173 {
174 struct rt_hdl * del;
175 TRACE_ENTRY( "%p %p", handler, cbdata);
176 CHECK_PARAMS( handler );
177
178 del = (struct rt_hdl *)handler;
179 CHECK_PARAMS( del->chain.head == &rt_out_list );
180
181 /* Unlink */
182 CHECK_POSIX( pthread_rwlock_wrlock(&rt_out_lock) );
183 fd_list_unlink(&del->chain);
184 CHECK_POSIX( pthread_rwlock_unlock(&rt_out_lock) );
185
186 if (cbdata)
187 *cbdata = del->cbdata;
188
189 free(del);
190 return 0;
191 }
192
193 /********************************************************************************/
194 /* Some default OUT routing callbacks */
195 /********************************************************************************/
196
197 /* Prevent sending to peers that do not support the message application */
dont_send_if_no_common_app(void * cbdata,struct msg ** pmsg,struct fd_list * candidates)198 static int dont_send_if_no_common_app(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
199 {
200 struct msg * msg = *pmsg;
201 struct fd_list * li;
202 struct msg_hdr * hdr;
203
204 TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
205 CHECK_PARAMS(msg && candidates);
206
207 CHECK_FCT( fd_msg_hdr(msg, &hdr) );
208
209 /* For Base Diameter Protocol, every peer is supposed to support it, so skip */
210 if (hdr->msg_appl == 0)
211 return 0;
212
213 /* Otherwise, check that the peers support the application */
214 for (li = candidates->next; li != candidates; li = li->next) {
215 struct rtd_candidate *c = (struct rtd_candidate *) li;
216 struct fd_peer * peer;
217 struct fd_app *found;
218 CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
219 if (peer && !peer->p_hdr.info.runtime.pir_relay) {
220 /* Check if the remote peer advertised the message's appli */
221 CHECK_FCT( fd_app_check(&peer->p_hdr.info.runtime.pir_apps, hdr->msg_appl, &found) );
222 if (!found)
223 c->score += FD_SCORE_NO_DELIVERY;
224 }
225 }
226
227 return 0;
228 }
229
230 /* Detect if the Destination-Host and Destination-Realm match the peer */
score_destination_avp(void * cbdata,struct msg ** pmsg,struct fd_list * candidates)231 static int score_destination_avp(void * cbdata, struct msg ** pmsg, struct fd_list * candidates)
232 {
233 struct msg * msg = *pmsg;
234 struct fd_list * li;
235 struct avp * avp;
236 union avp_value *dh = NULL, *dr = NULL;
237
238 TRACE_ENTRY("%p %p %p", cbdata, msg, candidates);
239 CHECK_PARAMS(msg && candidates);
240
241 /* Search the Destination-Host and Destination-Realm AVPs -- we could also use fd_msg_search_avp here, but this one is slightly more efficient */
242 CHECK_FCT( fd_msg_browse(msg, MSG_BRW_FIRST_CHILD, &avp, NULL) );
243 while (avp) {
244 struct avp_hdr * ahdr;
245 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
246
247 if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
248 switch (ahdr->avp_code) {
249 case AC_DESTINATION_HOST:
250 /* Parse this AVP */
251 CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
252 ASSERT( ahdr->avp_value );
253 dh = ahdr->avp_value;
254 break;
255
256 case AC_DESTINATION_REALM:
257 /* Parse this AVP */
258 CHECK_FCT( fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, NULL ) );
259 ASSERT( ahdr->avp_value );
260 dr = ahdr->avp_value;
261 break;
262 }
263 }
264
265 if (dh && dr)
266 break;
267
268 /* Go to next AVP */
269 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
270 }
271
272 /* Now, check each candidate against these AVP values */
273 for (li = candidates->next; li != candidates; li = li->next) {
274 struct rtd_candidate *c = (struct rtd_candidate *) li;
275
276 #if 0 /* this is actually useless since the sending process will also ensure that the peer is still available */
277 struct fd_peer * peer;
278 /* Since the candidates list comes from the peers list, we do not have any issue with upper/lower case to find the peer object */
279 CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
280 if (!peer)
281 continue; /* it has been deleted since the candidate list was generated; avoid sending to this one in that case. */
282 #endif /* 0 */
283
284 /* In the AVPs, the value comes from the network, so let's be case permissive */
285 if (dh && !fd_os_almostcasesrch(dh->os.data, dh->os.len, c->diamid, c->diamidlen, NULL) ) {
286 /* The candidate is the Destination-Host */
287 c->score += FD_SCORE_FINALDEST;
288 } else {
289 if (dr && !fd_os_almostcasesrch(dr->os.data, dr->os.len, c->realm, c->realmlen, NULL) ) {
290 /* The candidate's realm matchs the Destination-Realm */
291 c->score += FD_SCORE_REALM;
292 }
293 }
294 }
295
296 return 0;
297 }
298
299 /********************************************************************************/
300 /* Helper functions */
301 /********************************************************************************/
302
303 /* Find (first) '!' and '@' positions in a UTF-8 encoded string (User-Name AVP value) */
nai_get_indexes(union avp_value * un,int * excl_idx,int * at_idx)304 static void nai_get_indexes(union avp_value * un, int * excl_idx, int * at_idx)
305 {
306 int i;
307
308 TRACE_ENTRY("%p %p %p", un, excl_idx, at_idx);
309 CHECK_PARAMS_DO( un && excl_idx && at_idx, return );
310
311 *excl_idx = 0;
312 *at_idx = 0;
313
314 /* Search if there is a '!' before any '@' -- do we need to check it contains a '.' ? */
315 for (i = 0; i < un->os.len; i++) {
316 /* The '!' marks the decorated NAI */
317 if ( un->os.data[i] == (unsigned char) '!' ) {
318 if (!*excl_idx)
319 *excl_idx = i;
320 continue;
321 }
322 /* If we reach the realm part, we can stop */
323 if ( un->os.data[i] == (unsigned char) '@' ) {
324 *at_idx = i;
325 break;
326 }
327 /* Stop if we find a \0 in the middle */
328 if ( un->os.data[i] == 0 ) {
329 return;
330 }
331 /* Skip escaped characters */
332 if ( un->os.data[i] == (unsigned char) '\\' ) {
333 i++;
334 continue;
335 }
336 }
337
338 return;
339 }
340
341 /* Test if a User-Name AVP contains a Decorated NAI -- RFC4282, RFC5729 */
342 /* Create new User-Name and Destination-Realm values */
process_decorated_NAI(int * was_nai,union avp_value * un,union avp_value * dr)343 static int process_decorated_NAI(int * was_nai, union avp_value * un, union avp_value * dr)
344 {
345 int at_idx, sep_idx;
346 unsigned char * old_un;
347 TRACE_ENTRY("%p %p %p", was_nai, un, dr);
348 CHECK_PARAMS(was_nai && un && dr);
349
350 /* Save the decorated User-Name, for example 'homerealm.example.net!user@otherrealm.example.net' */
351 old_un = un->os.data;
352
353 /* Search the positions of the first '!' and the '@' in the string */
354 nai_get_indexes(un, &sep_idx, &at_idx);
355 if ((!sep_idx) || (sep_idx > at_idx) || !fd_os_is_valid_DiameterIdentity(old_un, sep_idx /* this is the new realm part */)) {
356 *was_nai = 0;
357 return 0;
358 }
359
360 *was_nai = 1;
361
362 /* Create the new User-Name value */
363 CHECK_MALLOC( un->os.data = malloc( at_idx ) );
364 memcpy( un->os.data, old_un + sep_idx + 1, at_idx - sep_idx ); /* user@ */
365 memcpy( un->os.data + at_idx - sep_idx, old_un, sep_idx ); /* homerealm.example.net */
366
367 /* Create the new Destination-Realm value */
368 CHECK_MALLOC( dr->os.data = realloc(dr->os.data, sep_idx) );
369 memcpy( dr->os.data, old_un, sep_idx );
370 dr->os.len = sep_idx;
371
372 TRACE_DEBUG(FULL, "Processed Decorated NAI : '%.*s' became '%.*s' (%.*s)",
373 (int)un->os.len, old_un,
374 (int)at_idx, un->os.data,
375 (int)dr->os.len, dr->os.data);
376
377 un->os.len = at_idx;
378 free(old_un);
379
380 return 0;
381 }
382
383
384 /* Function to return an error to an incoming request */
return_error(struct msg ** pmsg,char * error_code,char * error_message,struct avp * failedavp)385 static int return_error(struct msg ** pmsg, char * error_code, char * error_message, struct avp * failedavp)
386 {
387 struct fd_peer * peer;
388 int is_loc = 0;
389
390 /* Get the source of the message */
391 {
392 DiamId_t id;
393 size_t idlen;
394 CHECK_FCT( fd_msg_source_get( *pmsg, &id, &idlen ) );
395
396 if (id == NULL) {
397 is_loc = 1; /* The message was issued locally */
398 } else {
399
400 /* Search the peer with this id */
401 CHECK_FCT( fd_peer_getbyid( id, idlen, 0, (void *)&peer ) );
402
403 if (!peer) {
404 char buf[256];
405 snprintf(buf, sizeof(buf), "Unable to send error '%s' to deleted peer '%s' in reply to this message.", error_code, id);
406 fd_hook_call(HOOK_MESSAGE_DROPPED, *pmsg, NULL, buf, fd_msg_pmdl_get(*pmsg));
407 fd_msg_free(*pmsg);
408 *pmsg = NULL;
409 return 0;
410 }
411 }
412 }
413
414 /* Create the error message */
415 CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, pmsg, MSGFL_ANSW_ERROR ) );
416
417 /* Set the error code */
418 CHECK_FCT( fd_msg_rescode_set(*pmsg, error_code, error_message, failedavp, 1 ) );
419
420 /* Send the answer */
421 if (is_loc) {
422 CHECK_FCT( fd_fifo_post(fd_g_incoming, pmsg) );
423 } else {
424 CHECK_FCT( fd_out_send(pmsg, NULL, peer, 1) );
425 }
426
427 /* Done */
428 return 0;
429 }
430
431
432 /****************************************************************************/
433 /* Second part : threads moving messages in the daemon */
434 /****************************************************************************/
435
436 /* The DISPATCH message processing */
msg_dispatch(struct msg * msg)437 static int msg_dispatch(struct msg * msg)
438 {
439 struct msg_hdr * hdr;
440 int is_req = 0;
441 struct session * sess;
442 enum disp_action action;
443 char * ec = NULL;
444 char * em = NULL;
445 struct msg *msgptr = msg, *error = NULL;
446
447 /* Read the message header */
448 CHECK_FCT( fd_msg_hdr(msg, &hdr) );
449 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
450
451 /* Note: if the message is for local delivery, we should test for duplicate
452 (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
453
454 /* At this point, we need to understand the message content, so parse it */
455 CHECK_FCT_DO( fd_msg_parse_or_error( &msgptr, &error ),
456 {
457 int rescue = 0;
458 if (__ret__ != EBADMSG) {
459 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Error while parsing received answer", fd_msg_pmdl_get(msgptr));
460 fd_msg_free(msgptr);
461 } else {
462 if (!msgptr) {
463 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR2, error, NULL, NULL, fd_msg_pmdl_get(error));
464 /* error now contains the answer message to send back */
465 CHECK_FCT( fd_fifo_post(fd_g_outgoing, &error) );
466 } else if (!error) {
467 /* We have received an invalid answer to our query */
468 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Received answer failed the dictionary / rules parsing", fd_msg_pmdl_get(msgptr));
469 fd_msg_free(msgptr);
470 } else {
471 /* We will pass the invalid received error to the application */
472 rescue = 1;
473 }
474 }
475 if (!rescue)
476 return 0; /* We are done with this message, go to the next */
477 } );
478
479 /* First, if the original request was registered with a callback and we receive the answer, call it. */
480 if ( ! is_req ) {
481 struct msg * qry;
482 void (*anscb)(void *, struct msg **) = NULL;
483 void * data = NULL;
484
485 /* Retrieve the corresponding query */
486 CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
487
488 /* Retrieve any registered handler */
489 CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) );
490
491 /* If a callback was registered, pass the message to it */
492 if (anscb != NULL) {
493
494 TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
495 (*anscb)(data, &msgptr);
496
497 /* If the message is processed, we're done */
498 if (msgptr == NULL) {
499 return 0;
500 }
501
502 /* otherwise continue the dispatching --hoping that the anscb callback did not mess with our message :) */
503 }
504 }
505
506 /* Retrieve the session of the message */
507 CHECK_FCT( fd_msg_sess_get(fd_g_config->cnf_dict, msgptr, &sess, NULL) );
508
509 /* Now, call any callback registered for the message */
510 CHECK_FCT( fd_msg_dispatch ( &msgptr, sess, &action, &ec, &em, &error) );
511
512 /* Now, act depending on msg and action and ec */
513 if (msgptr) {
514 switch ( action ) {
515 case DISP_ACT_CONT:
516 /* No callback has handled the message, let's reply with a generic error or relay it */
517 if (!fd_g_config->cnf_flags.no_fwd) {
518 /* requeue to fd_g_outgoing */
519 fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
520 CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
521 break;
522 }
523 /* We don't relay => reply error */
524 em = "The message was not handled by any extension callback";
525 ec = "DIAMETER_COMMAND_UNSUPPORTED";
526 /* and continue as if an error occurred... */
527 case DISP_ACT_ERROR:
528 /* We have a problem with delivering the message */
529 if (ec == NULL) {
530 ec = "DIAMETER_UNABLE_TO_COMPLY";
531 }
532
533 if (!is_req) {
534 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, "Internal error: Answer received to locally issued request, but not handled by any handler.", fd_msg_pmdl_get(msgptr));
535 fd_msg_free(msgptr);
536 break;
537 }
538
539 /* Create an answer with the error code and message */
540 CHECK_FCT( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msgptr, 0 ) );
541 CHECK_FCT( fd_msg_rescode_set(msgptr, ec, em, NULL, 1 ) );
542
543 case DISP_ACT_SEND:
544 /* Now, send the message */
545 CHECK_FCT( fd_fifo_post(fd_g_outgoing, &msgptr) );
546 }
547 } else if (em) {
548 fd_hook_call(HOOK_MESSAGE_DROPPED, error, NULL, em, fd_msg_pmdl_get(error));
549 fd_msg_free(error);
550 }
551
552 /* We're done with dispatching this message */
553 return 0;
554 }
555
556 /* The ROUTING-IN message processing */
msg_rt_in(struct msg * msg)557 static int msg_rt_in(struct msg * msg)
558 {
559 struct msg_hdr * hdr;
560 int is_req = 0;
561 int is_err = 0;
562 DiamId_t qry_src = NULL;
563 struct msg *msgptr = msg;
564
565 /* Read the message header */
566 CHECK_FCT( fd_msg_hdr(msg, &hdr) );
567 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
568 is_err = hdr->msg_flags & CMD_FLAG_ERROR;
569
570 /* Handle incorrect bits */
571 if (is_req && is_err) {
572 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "R & E bits were set", fd_msg_pmdl_get(msgptr));
573 CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_HDR_BITS", "R & E bits were set", NULL) );
574 return 0;
575 }
576
577 /* If it is a request, we must analyze its content to decide what we do with it */
578 if (is_req) {
579 struct avp * avp, *un = NULL;
580 union avp_value * un_val = NULL, *dr_val = NULL;
581 enum status { UNKNOWN, YES, NO };
582 /* Are we Destination-Host? */
583 enum status is_dest_host = UNKNOWN;
584 /* Are we Destination-Realm? */
585 enum status is_dest_realm = UNKNOWN;
586 /* Do we support the application of the message? */
587 enum status is_local_app = UNKNOWN;
588
589 /* Check if we have local support for the message application */
590 if ( (hdr->msg_appl == 0) || (hdr->msg_appl == AI_RELAY) ) {
591 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Received a routable message with application id 0 or " _stringize(AI_RELAY) " (relay)", fd_msg_pmdl_get(msgptr));
592 CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", "Routable message with application id 0 or relay", NULL) );
593 return 0;
594 } else {
595 struct fd_app * app;
596 CHECK_FCT( fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) );
597 is_local_app = (app ? YES : NO);
598 }
599
600 /* Parse the message for Dest-Host, Dest-Realm, and Route-Record */
601 CHECK_FCT( fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL) );
602 while (avp) {
603 struct avp_hdr * ahdr;
604 struct fd_pei error_info;
605 int ret;
606
607 memset(&error_info, 0, sizeof(struct fd_pei));
608
609 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
610
611 if (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) {
612 switch (ahdr->avp_code) {
613 case AC_DESTINATION_HOST:
614 /* Parse this AVP */
615 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
616 {
617 if (error_info.pei_errcode) {
618 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
619 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
620 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
621 return 0;
622 } else {
623 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Host AVP", fd_msg_pmdl_get(msgptr));
624 return ret;
625 }
626 } );
627 ASSERT( ahdr->avp_value );
628 /* Compare the Destination-Host AVP of the message with our identity */
629 if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
630 is_dest_host = YES;
631 } else {
632 is_dest_host = NO;
633 }
634 break;
635
636 case AC_DESTINATION_REALM:
637 /* Parse this AVP */
638 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
639 {
640 if (error_info.pei_errcode) {
641 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
642 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
643 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
644 return 0;
645 } else {
646 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Destination-Realm AVP", fd_msg_pmdl_get(msgptr));
647 return ret;
648 }
649 } );
650 ASSERT( ahdr->avp_value );
651 dr_val = ahdr->avp_value;
652 /* Compare the Destination-Realm AVP of the message with our identity */
653 if (!fd_os_almostcasesrch(dr_val->os.data, dr_val->os.len, fd_g_config->cnf_diamrlm, fd_g_config->cnf_diamrlm_len, NULL)) {
654 is_dest_realm = YES;
655 } else {
656 is_dest_realm = NO;
657 }
658 break;
659
660 /* we also use User-Name for decorated NAI */
661 case AC_USER_NAME:
662 /* Parse this AVP */
663 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
664 {
665 if (error_info.pei_errcode) {
666 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
667 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
668 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
669 return 0;
670 } else {
671 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing User-Name AVP", fd_msg_pmdl_get(msgptr));
672 return ret;
673 }
674 } );
675 ASSERT( ahdr->avp_value );
676 un = avp;
677 un_val = ahdr->avp_value;
678 break;
679
680 case AC_ROUTE_RECORD:
681 /* Parse this AVP */
682 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
683 {
684 if (error_info.pei_errcode) {
685 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error_info.pei_message ?: error_info.pei_errcode, fd_msg_pmdl_get(msgptr));
686 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
687 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
688 return 0;
689 } else {
690 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Unspecified error while parsing Route-Record AVP", fd_msg_pmdl_get(msgptr));
691 return ret;
692 }
693 } );
694 ASSERT( ahdr->avp_value );
695 /* Is this our own name ? */
696 if (!fd_os_almostcasesrch(ahdr->avp_value->os.data, ahdr->avp_value->os.len, fd_g_config->cnf_diamid, fd_g_config->cnf_diamid_len, NULL)) {
697 /* Yes: then we must return DIAMETER_LOOP_DETECTED according to Diameter RFC */
698 char * error = "DIAMETER_LOOP_DETECTED";
699 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, error, fd_msg_pmdl_get(msgptr));
700 CHECK_FCT( return_error( &msgptr, error, NULL, NULL) );
701 return 0;
702 }
703 break;
704
705
706 }
707 }
708
709 /* Stop when we found all 3 AVPs -- they are supposed to be at the beginning of the message, so this should be fast */
710 if ((is_dest_host != UNKNOWN) && (is_dest_realm != UNKNOWN) && un)
711 break;
712
713 /* Go to next AVP */
714 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
715 }
716
717 /* OK, now decide what we do with the request */
718
719 /* Handle the missing routing AVPs first */
720 if ( is_dest_realm == UNKNOWN ) {
721 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", fd_msg_pmdl_get(msgptr));
722 CHECK_FCT( return_error( &msgptr, "DIAMETER_COMMAND_UNSUPPORTED", "Non-routable message not supported (invalid bit ? missing Destination-Realm ?)", NULL) );
723 return 0;
724 }
725
726 /* If we are listed as Destination-Host */
727 if (is_dest_host == YES) {
728 if (is_local_app == YES) {
729 /* Ok, give the message to the dispatch thread */
730 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
731 CHECK_FCT( fd_fifo_post(fd_g_local, &msgptr) );
732 } else {
733 /* We don't support the application, reply an error */
734 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
735 CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
736 }
737 return 0;
738 }
739
740 /* If the message is explicitely for someone else */
741 if ((is_dest_host == NO) || (is_dest_realm == NO)) {
742 if (fd_g_config->cnf_flags.no_fwd) {
743 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Message for another realm/host", fd_msg_pmdl_get(msgptr));
744 CHECK_FCT( return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "I am not a Diameter agent", NULL) );
745 return 0;
746 }
747 } else {
748 /* Destination-Host was not set, and Destination-Realm is matching : we may handle or pass to a fellow peer */
749 int is_nai = 0;
750
751 /* test for decorated NAI (RFC5729 section 4.4) */
752 /* Handle the decorated NAI */
753 if (un_val) {
754 CHECK_FCT_DO( process_decorated_NAI(&is_nai, un_val, dr_val),
755 {
756 /* If the process failed, we assume it is because of the AVP format */
757 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msgptr, NULL, "Failed to process decorated NAI", fd_msg_pmdl_get(msgptr));
758 CHECK_FCT( return_error( &msgptr, "DIAMETER_INVALID_AVP_VALUE", "Failed to process decorated NAI", un) );
759 return 0;
760 } );
761 }
762
763 if (is_nai) {
764 /* We have transformed the AVP, now submit it again in the queue */
765 CHECK_FCT(fd_fifo_post(fd_g_incoming, &msgptr) );
766 return 0;
767 }
768
769 if (is_local_app == YES) {
770 /* Handle localy since we are able to */
771 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
772 CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
773 return 0;
774 }
775
776 if (fd_g_config->cnf_flags.no_fwd) {
777 /* We return an error */
778 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "Application unsupported", fd_msg_pmdl_get(msgptr));
779 CHECK_FCT( return_error( &msgptr, "DIAMETER_APPLICATION_UNSUPPORTED", NULL, NULL) );
780 return 0;
781 }
782 }
783
784 /* From that point, for requests, we will call the registered callbacks, then forward to another peer */
785
786 } else {
787 /* The message is an answer */
788 struct msg * qry;
789
790 /* Retrieve the corresponding query and its origin */
791 CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
792 CHECK_FCT( fd_msg_source_get( qry, &qry_src, NULL ) );
793
794 if ((!qry_src) && (!is_err)) {
795 /* The message is a normal answer to a request issued localy, we do not call the callbacks chain on it. */
796 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
797 CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
798 return 0;
799 }
800
801 /* From that point, for answers, we will call the registered callbacks, then pass it to the dispatch module or forward it */
802 }
803
804 /* Call all registered callbacks for this message */
805 {
806 struct fd_list * li;
807
808 CHECK_FCT( pthread_rwlock_rdlock( &rt_fwd_lock ) );
809 pthread_cleanup_push( fd_cleanup_rwlock, &rt_fwd_lock );
810
811 /* requests: dir = 1 & 2 => in order; answers = 3 & 2 => in reverse order */
812 for ( li = (is_req ? rt_fwd_list.next : rt_fwd_list.prev) ; msgptr && (li != &rt_fwd_list) ; li = (is_req ? li->next : li->prev) ) {
813 struct rt_hdl * rh = (struct rt_hdl *)li;
814 int ret;
815
816 if (is_req && (rh->dir > RT_FWD_ALL))
817 break;
818 if ((!is_req) && (rh->dir < RT_FWD_ALL))
819 break;
820
821 /* Ok, call this cb */
822 TRACE_DEBUG(ANNOYING, "Calling next FWD callback on %p : %p", msgptr, rh->rt_fwd_cb);
823 CHECK_FCT_DO( ret = (*rh->rt_fwd_cb)(rh->cbdata, &msgptr),
824 {
825 char buf[256];
826 snprintf(buf, sizeof(buf), "A FWD routing callback returned an error: %s", strerror(ret));
827 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
828 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
829 fd_msg_free(msgptr);
830 msgptr = NULL;
831 break;
832 } );
833 }
834
835 pthread_cleanup_pop(0);
836 CHECK_FCT( pthread_rwlock_unlock( &rt_fwd_lock ) );
837
838 /* If a callback has handled the message, we stop now */
839 if (!msgptr)
840 return 0;
841 }
842
843 /* Now pass the message to the next step: either forward to another peer, or dispatch to local extensions */
844 if (is_req || qry_src) {
845 fd_hook_call(HOOK_MESSAGE_ROUTING_FORWARD, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
846 CHECK_FCT(fd_fifo_post(fd_g_outgoing, &msgptr) );
847 } else {
848 fd_hook_call(HOOK_MESSAGE_ROUTING_LOCAL, msgptr, NULL, NULL, fd_msg_pmdl_get(msgptr));
849 CHECK_FCT(fd_fifo_post(fd_g_local, &msgptr) );
850 }
851
852 /* We're done with this message */
853 return 0;
854 }
855
856
857 /* The ROUTING-OUT message processing */
msg_rt_out(struct msg * msg)858 static int msg_rt_out(struct msg * msg)
859 {
860 struct rt_data * rtd = NULL;
861 struct msg_hdr * hdr;
862 int is_req = 0;
863 int ret;
864 struct fd_list * li, *candidates;
865 struct avp * avp;
866 struct rtd_candidate * c;
867 struct msg *msgptr = msg;
868 DiamId_t qry_src = NULL;
869 size_t qry_src_len = 0;
870
871 /* Read the message header */
872 CHECK_FCT( fd_msg_hdr(msgptr, &hdr) );
873 is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
874
875 /* For answers, the routing is very easy */
876 if ( ! is_req ) {
877 struct msg * qry;
878 struct msg_hdr * qry_hdr;
879 struct fd_peer * peer = NULL;
880
881 /* Retrieve the corresponding query and its origin */
882 CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
883 CHECK_FCT( fd_msg_source_get( qry, &qry_src, &qry_src_len ) );
884
885 ASSERT( qry_src ); /* if it is NULL, the message should have been in the LOCAL queue! */
886
887 /* Find the peer corresponding to this name */
888 CHECK_FCT( fd_peer_getbyid( qry_src, qry_src_len, 0, (void *) &peer ) );
889 if (fd_peer_getstate(peer) != STATE_OPEN && fd_peer_getstate(peer) != STATE_CLOSING_GRACE) {
890 char buf[128];
891 snprintf(buf, sizeof(buf), "Unable to forward answer to deleted / closed peer '%s'.", qry_src);
892 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
893 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
894 fd_msg_free(msgptr);
895 return 0;
896 }
897
898 /* We must restore the hop-by-hop id */
899 CHECK_FCT( fd_msg_hdr(qry, &qry_hdr) );
900 hdr->msg_hbhid = qry_hdr->msg_hbhid;
901
902 /* Push the message into this peer */
903 CHECK_FCT( fd_out_send(&msgptr, NULL, peer, 1) );
904
905 /* We're done with this answer */
906 return 0;
907 }
908
909 /* From that point, the message is a request */
910 CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) );
911 /* if qry_src != NULL, this message is relayed, otherwise it is locally issued */
912
913 /* Get the routing data out of the message if any (in case of re-transmit) */
914 CHECK_FCT( fd_msg_rt_get ( msgptr, &rtd ) );
915
916 /* If there is no routing data already, let's create it */
917 if (rtd == NULL) {
918 CHECK_FCT( fd_rtd_init(&rtd) );
919
920 /* Add all peers currently in OPEN state */
921 CHECK_FCT( pthread_rwlock_rdlock(&fd_g_activ_peers_rw) );
922 for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
923 struct fd_peer * p = (struct fd_peer *)li->o;
924 CHECK_FCT_DO( ret = fd_rtd_candidate_add(rtd,
925 p->p_hdr.info.pi_diamid,
926 p->p_hdr.info.pi_diamidlen,
927 p->p_hdr.info.runtime.pir_realm,
928 p->p_hdr.info.runtime.pir_realmlen),
929 { CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_activ_peers_rw), ); return ret; } );
930 }
931 CHECK_FCT( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
932
933 /* Now let's remove all peers from the Route-Records */
934 CHECK_FCT( fd_msg_browse(msgptr, MSG_BRW_FIRST_CHILD, &avp, NULL) );
935 while (avp) {
936 struct avp_hdr * ahdr;
937 struct fd_pei error_info;
938 CHECK_FCT( fd_msg_avp_hdr( avp, &ahdr ) );
939
940 if ((ahdr->avp_code == AC_ROUTE_RECORD) && (! (ahdr->avp_flags & AVP_FLAG_VENDOR)) ) {
941 /* Parse this AVP */
942 CHECK_FCT_DO( ret = fd_msg_parse_dict ( avp, fd_g_config->cnf_dict, &error_info ),
943 {
944 if (error_info.pei_errcode) {
945 CHECK_FCT( return_error( &msgptr, error_info.pei_errcode, error_info.pei_message, error_info.pei_avp) );
946 if (error_info.pei_avp_free) { fd_msg_free(error_info.pei_avp); }
947 return 0;
948 } else {
949 return ret;
950 }
951 } );
952 ASSERT( ahdr->avp_value );
953 /* Remove this value from the list. We don't need to pay special attention to the contents here. */
954 fd_rtd_candidate_del(rtd, ahdr->avp_value->os.data, ahdr->avp_value->os.len);
955 }
956
957 /* Go to next AVP */
958 CHECK_FCT( fd_msg_browse(avp, MSG_BRW_NEXT, &avp, NULL) );
959 }
960
961 /* Save the routing information in the message */
962 CHECK_FCT( fd_msg_rt_associate ( msgptr, rtd ) );
963 }
964
965 /* Note: we reset the scores and pass the message to the callbacks, maybe we could re-use the saved scores when we have received an error ? -- TODO */
966
967 /* Ok, we have our list in rtd now, let's (re)initialize the scores */
968 fd_rtd_candidate_extract(rtd, &candidates, FD_SCORE_INI);
969
970 /* Pass the list to registered callbacks (even if it is empty list) */
971 {
972 CHECK_FCT( pthread_rwlock_rdlock( &rt_out_lock ) );
973 pthread_cleanup_push( fd_cleanup_rwlock, &rt_out_lock );
974
975 /* We call the cb by reverse priority order */
976 for ( li = rt_out_list.prev ; (msgptr != NULL) && (li != &rt_out_list) ; li = li->prev ) {
977 struct rt_hdl * rh = (struct rt_hdl *)li;
978
979 TRACE_DEBUG(ANNOYING, "Calling next OUT callback on %p : %p (prio %d)", msgptr, rh->rt_out_cb, rh->prio);
980 CHECK_FCT_DO( ret = (*rh->rt_out_cb)(rh->cbdata, &msgptr, candidates),
981 {
982 char buf[256];
983 snprintf(buf, sizeof(buf), "An OUT routing callback returned an error: %s", strerror(ret));
984 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
985 fd_hook_call(HOOK_MESSAGE_DROPPED, msgptr, NULL, buf, fd_msg_pmdl_get(msgptr));
986 fd_msg_free(msgptr);
987 msgptr = NULL;
988 } );
989 }
990
991 pthread_cleanup_pop(0);
992 CHECK_FCT( pthread_rwlock_unlock( &rt_out_lock ) );
993
994 /* If an error occurred or the callback disposed of the message, go to next message */
995 if (! msgptr) {
996 return 0;
997 }
998 }
999
1000 /* Order the candidate peers by score attributed by the callbacks */
1001 CHECK_FCT( fd_rtd_candidate_reorder(candidates) );
1002
1003 /* Now try sending the message */
1004 for (li = candidates->prev; li != candidates; li = li->prev) {
1005 struct fd_peer * peer;
1006
1007 c = (struct rtd_candidate *) li;
1008
1009 /* Stop when we have reached the end of valid candidates */
1010 if (c->score < 0)
1011 break;
1012
1013 /* Search for the peer */
1014 CHECK_FCT( fd_peer_getbyid( c->diamid, c->diamidlen, 0, (void *)&peer ) );
1015
1016 if (fd_peer_getstate(peer) == STATE_OPEN) {
1017 /* Send to this one */
1018 CHECK_FCT_DO( fd_out_send(&msgptr, NULL, peer, 1), continue );
1019
1020 /* If the sending was successful */
1021 break;
1022 }
1023 }
1024
1025 /* If the message has not been sent, return an error */
1026 if (msgptr) {
1027 fd_hook_call(HOOK_MESSAGE_ROUTING_ERROR, msgptr, NULL, "No remaining suitable candidate to route the message to", fd_msg_pmdl_get(msgptr));
1028 return_error( &msgptr, "DIAMETER_UNABLE_TO_DELIVER", "No suitable candidate to route the message to", NULL);
1029 }
1030
1031 /* We're done with this message */
1032
1033 return 0;
1034 }
1035
1036
1037 /********************************************************************************/
1038 /* Management of the threads */
1039 /********************************************************************************/
1040
1041 /* Note: in the first version, we only create one thread of each kind.
1042 We could improve the scalability by using the threshold feature of the queues
1043 to create additional threads if a queue is filling up, or at least giving a configurable
1044 number of threads of each kind.
1045 */
1046
1047 /* Control of the threads */
1048 static enum { RUN = 0, STOP = 1 } order_val = RUN;
1049 static pthread_mutex_t order_state_lock = PTHREAD_MUTEX_INITIALIZER;
1050
1051 /* Threads report their status */
1052 enum thread_state { NOTRUNNING = 0, RUNNING = 1 };
cleanup_state(void * state_loc)1053 static void cleanup_state(void * state_loc)
1054 {
1055 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1056 *(enum thread_state *)state_loc = NOTRUNNING;
1057 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1058 }
1059
1060 /* This is the common thread code (same for routing and dispatching) */
process_thr(void * arg,int (* action_cb)(struct msg * msg),struct fifo * queue,char * action_name)1061 static void * process_thr(void * arg, int (*action_cb)(struct msg * msg), struct fifo * queue, char * action_name)
1062 {
1063 TRACE_ENTRY("%p %p %p %p", arg, action_cb, queue, action_name);
1064
1065 /* Set the thread name */
1066 {
1067 char buf[48];
1068 snprintf(buf, sizeof(buf), "%s (%p)", action_name, arg);
1069 fd_log_threadname ( buf );
1070 }
1071
1072 /* The thread reports its status when canceled */
1073 CHECK_PARAMS_DO(arg, return NULL);
1074 pthread_cleanup_push( cleanup_state, arg );
1075
1076 /* Mark the thread running */
1077 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1078 *(enum thread_state *)arg = RUNNING;
1079 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1080
1081 do {
1082 struct msg * msg;
1083
1084 /* Get the next message from the queue */
1085 {
1086 int ret;
1087 struct timespec ts;
1088
1089 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), goto fatal_error );
1090 ts.tv_sec += 1;
1091
1092 ret = fd_fifo_timedget ( queue, &msg, &ts );
1093 if (ret == ETIMEDOUT) {
1094 /* Test the current order */
1095 {
1096 int must_stop;
1097 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), { ASSERT(0); } ); /* we lock to flush the caches */
1098 must_stop = (order_val == STOP);
1099 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), { ASSERT(0); } );
1100 if (must_stop)
1101 goto end;
1102
1103 pthread_testcancel();
1104 }
1105 /* Ok, we are allowed to continue */
1106 continue;
1107 }
1108 if (ret == EPIPE)
1109 /* The queue was destroyed, we are probably exiting */
1110 goto end;
1111
1112 /* check if another error occurred */
1113 CHECK_FCT_DO( ret, goto fatal_error );
1114 }
1115
1116 LOG_A("%s: Picked next message", action_name);
1117
1118 /* Now process the message */
1119 CHECK_FCT_DO( (*action_cb)(msg), goto fatal_error);
1120
1121 /* We're done with this message */
1122
1123 } while (1);
1124
1125 fatal_error:
1126 TRACE_DEBUG(INFO, "An unrecoverable error occurred, %s thread is terminating...", action_name);
1127 CHECK_FCT_DO(fd_core_shutdown(), );
1128
1129 end:
1130 ; /* noop so that we get rid of "label at end of compund statement" warning */
1131 /* Mark the thread as terminated */
1132 pthread_cleanup_pop(1);
1133 return NULL;
1134 }
1135
1136 /* The dispatch thread */
dispatch_thr(void * arg)1137 static void * dispatch_thr(void * arg)
1138 {
1139 return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch");
1140 }
1141
1142 /* The (routing-in) thread -- see description in freeDiameter.h */
routing_in_thr(void * arg)1143 static void * routing_in_thr(void * arg)
1144 {
1145 return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN");
1146 }
1147
1148 /* The (routing-out) thread -- see description in freeDiameter.h */
routing_out_thr(void * arg)1149 static void * routing_out_thr(void * arg)
1150 {
1151 return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT");
1152 }
1153
1154
1155 /********************************************************************************/
1156 /* The functions for the other files */
1157 /********************************************************************************/
1158
1159 static pthread_t * dispatch = NULL;
1160 static enum thread_state * disp_state = NULL;
1161
1162 static pthread_t * rt_out = NULL;
1163 static enum thread_state * out_state = NULL;
1164
1165 static pthread_t * rt_in = NULL;
1166 static enum thread_state * in_state = NULL;
1167
1168 /* Initialize the routing and dispatch threads */
fd_rtdisp_init(void)1169 int fd_rtdisp_init(void)
1170 {
1171 int i;
1172
1173 /* Prepare the array for threads */
1174 CHECK_MALLOC( disp_state = calloc(fd_g_config->cnf_dispthr, sizeof(enum thread_state)) );
1175 CHECK_MALLOC( dispatch = calloc(fd_g_config->cnf_dispthr, sizeof(pthread_t)) );
1176 CHECK_MALLOC( out_state = calloc(fd_g_config->cnf_rtoutthr, sizeof(enum thread_state)) );
1177 CHECK_MALLOC( rt_out = calloc(fd_g_config->cnf_rtoutthr, sizeof(pthread_t)) );
1178 CHECK_MALLOC( in_state = calloc(fd_g_config->cnf_rtinthr, sizeof(enum thread_state)) );
1179 CHECK_MALLOC( rt_in = calloc(fd_g_config->cnf_rtinthr, sizeof(pthread_t)) );
1180
1181 /* Create the threads */
1182 for (i=0; i < fd_g_config->cnf_dispthr; i++) {
1183 CHECK_POSIX( pthread_create( &dispatch[i], NULL, dispatch_thr, &disp_state[i] ) );
1184 #ifdef linux
1185 pthread_setname_np(dispatch[i], "fd-dispatch");
1186 #endif
1187 }
1188 for (i=0; i < fd_g_config->cnf_rtoutthr; i++) {
1189 CHECK_POSIX( pthread_create( &rt_out[i], NULL, routing_out_thr, &out_state[i] ) );
1190 #ifdef linux
1191 pthread_setname_np(rt_out[i], "fd-routing-out");
1192 #endif
1193 }
1194 for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
1195 CHECK_POSIX( pthread_create( &rt_in[i], NULL, routing_in_thr, &in_state[i] ) );
1196 #ifdef linux
1197 pthread_setname_np(rt_in[i], "fd-routing-in");
1198 #endif
1199 }
1200
1201 /* Later: TODO("Set the thresholds for the queues to create more threads as needed"); */
1202
1203 /* Register the built-in callbacks */
1204 CHECK_FCT( fd_rt_out_register( dont_send_if_no_common_app, NULL, 10, NULL ) );
1205 CHECK_FCT( fd_rt_out_register( score_destination_avp, NULL, 10, NULL ) );
1206
1207 return 0;
1208 }
1209
1210 /* Ask the thread to terminate after next iteration */
fd_rtdisp_cleanstop(void)1211 int fd_rtdisp_cleanstop(void)
1212 {
1213 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1214 order_val = STOP;
1215 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1216
1217 return 0;
1218 }
1219
stop_thread_delayed(enum thread_state * st,pthread_t * thr,char * th_name)1220 static void stop_thread_delayed(enum thread_state *st, pthread_t * thr, char * th_name)
1221 {
1222 TRACE_ENTRY("%p %p", st, thr);
1223 CHECK_PARAMS_DO(st && thr, return);
1224 int terminated;
1225
1226 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1227 terminated = (*st == NOTRUNNING);
1228 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1229
1230
1231 /* Wait for a second for the thread to complete, by monitoring my_state */
1232 if (!terminated) {
1233 TRACE_DEBUG(INFO, "Waiting for the %s thread to have a chance to terminate", th_name);
1234 do {
1235 struct timespec ts, ts_final;
1236
1237 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
1238
1239 ts_final.tv_sec = ts.tv_sec + 1;
1240 ts_final.tv_nsec = ts.tv_nsec;
1241
1242 while (TS_IS_INFERIOR( &ts, &ts_final )) {
1243
1244 CHECK_POSIX_DO( pthread_mutex_lock(&order_state_lock), );
1245 terminated = (*st == NOTRUNNING);
1246 CHECK_POSIX_DO( pthread_mutex_unlock(&order_state_lock), );
1247 if (terminated)
1248 break;
1249
1250 usleep(100000);
1251 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), break );
1252 }
1253 } while (0);
1254 }
1255
1256 /* Now stop the thread and reclaim its resources */
1257 CHECK_FCT_DO( fd_thr_term(thr ), /* continue */);
1258
1259 }
1260
1261 /* Stop the thread after up to one second of wait */
fd_rtdisp_fini(void)1262 int fd_rtdisp_fini(void)
1263 {
1264 int i;
1265
1266 /* Destroy the incoming queue */
1267 CHECK_FCT_DO( fd_queues_fini(&fd_g_incoming), /* ignore */);
1268
1269 /* Stop the routing IN thread */
1270 if (rt_in != NULL) {
1271 for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
1272 stop_thread_delayed(&in_state[i], &rt_in[i], "IN routing");
1273 }
1274 free(rt_in);
1275 rt_in = NULL;
1276 }
1277 if (in_state != NULL) {
1278 free(in_state);
1279 in_state = NULL;
1280 }
1281
1282 /* Destroy the outgoing queue */
1283 CHECK_FCT_DO( fd_queues_fini(&fd_g_outgoing), /* ignore */);
1284
1285 /* Stop the routing OUT thread */
1286 if (rt_out != NULL) {
1287 for (i=0; i < fd_g_config->cnf_rtinthr; i++) {
1288 stop_thread_delayed(&out_state[i], &rt_out[i], "OUT routing");
1289 }
1290 free(rt_out);
1291 rt_out = NULL;
1292 }
1293 if (out_state != NULL) {
1294 free(out_state);
1295 out_state = NULL;
1296 }
1297
1298 /* Destroy the local queue */
1299 CHECK_FCT_DO( fd_queues_fini(&fd_g_local), /* ignore */);
1300
1301 /* Stop the Dispatch threads */
1302 if (dispatch != NULL) {
1303 for (i=0; i < fd_g_config->cnf_dispthr; i++) {
1304 stop_thread_delayed(&disp_state[i], &dispatch[i], "Dispatching");
1305 }
1306 free(dispatch);
1307 dispatch = NULL;
1308 }
1309 if (disp_state != NULL) {
1310 free(disp_state);
1311 disp_state = NULL;
1312 }
1313
1314 return 0;
1315 }
1316
1317 /* Cleanup handlers */
fd_rtdisp_cleanup(void)1318 int fd_rtdisp_cleanup(void)
1319 {
1320 /* Cleanup all remaining handlers */
1321 while (!FD_IS_LIST_EMPTY(&rt_fwd_list)) {
1322 CHECK_FCT_DO( fd_rt_fwd_unregister ( (void *)rt_fwd_list.next, NULL ), /* continue */ );
1323 }
1324 while (!FD_IS_LIST_EMPTY(&rt_out_list)) {
1325 CHECK_FCT_DO( fd_rt_out_unregister ( (void *)rt_out_list.next, NULL ), /* continue */ );
1326 }
1327
1328 fd_disp_unregister_all(); /* destroy remaining handlers */
1329
1330 return 0;
1331 }
1332
1333
1334 /********************************************************************************/
1335 /* For extensions to register a new appl */
1336 /********************************************************************************/
1337
1338 /* Add an application into the peer's supported apps */
fd_disp_app_support(struct dict_object * app,struct dict_object * vendor,int auth,int acct)1339 int fd_disp_app_support ( struct dict_object * app, struct dict_object * vendor, int auth, int acct )
1340 {
1341 application_id_t aid = 0;
1342 vendor_id_t vid = 0;
1343
1344 TRACE_ENTRY("%p %p %d %d", app, vendor, auth, acct);
1345 CHECK_PARAMS( app && (auth || acct) );
1346
1347 {
1348 enum dict_object_type type = 0;
1349 struct dict_application_data data;
1350 CHECK_FCT( fd_dict_gettype(app, &type) );
1351 CHECK_PARAMS( type == DICT_APPLICATION );
1352 CHECK_FCT( fd_dict_getval(app, &data) );
1353 aid = data.application_id;
1354 }
1355
1356 if (vendor) {
1357 enum dict_object_type type = 0;
1358 struct dict_vendor_data data;
1359 CHECK_FCT( fd_dict_gettype(vendor, &type) );
1360 CHECK_PARAMS( type == DICT_VENDOR );
1361 CHECK_FCT( fd_dict_getval(vendor, &data) );
1362 vid = data.vendor_id;
1363 }
1364
1365 return fd_app_merge(&fd_g_config->cnf_apps, aid, vid, auth, acct);
1366 }
1367
1368
1369
1370