1 /*********************************************************************************************************
2 * Software License Agreement (BSD License) *
3 * Author: Sebastien Decugis <sdecugis@freediameter.net> *
4 * *
5 * Copyright (c) 2019, WIDE Project and NICT *
6 * All rights reserved. *
7 * *
8 * Redistribution and use of this software in source and binary forms, with or without modification, are *
9 * permitted provided that the following conditions are met: *
10 * *
11 * * Redistributions of source code must retain the above *
12 * copyright notice, this list of conditions and the *
13 * following disclaimer. *
14 * *
15 * * Redistributions in binary form must reproduce the above *
16 * copyright notice, this list of conditions and the *
17 * following disclaimer in the documentation and/or other *
18 * materials provided with the distribution. *
19 * *
20 * * Neither the name of the WIDE Project or NICT nor the *
21 * names of its contributors may be used to endorse or *
22 * promote products derived from this software without *
23 * specific prior written permission of WIDE Project and *
24 * NICT. *
25 * *
26 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
27 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
28 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
29 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *
30 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS *
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
32 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF *
33 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
34 *********************************************************************************************************/
35
36 #include "fdcore-internal.h"
37
38 /*
39 This file implement a Peer State Machine which is a mix of:
40 - the state machine described in rfc3588bis
41 - the state machine described in rfc3539#section-3.4
42 - the following observations.
43
44 The delivery of Diameter messages must not always be unordered: order is important at
45 beginning and end of a connection lifetime. It means we need agility to
46 switch between "ordering enforced" and "ordering not enforced to counter
47 Head of the Line Blocking" modes of operation.
48
49 The connection state machine represented in RFC3588 (and RFC6733) is
50 incomplete, because it lacks the SUSPECT state and the 3 DWR/DWA
51 exchanges (section 5.1) when the peer recovers from this state.
52 Personally I don't see the rationale for exchanging 3 messages (why 3?)
53 but, if we require at least 1 DWR/DWA exchange to be always performed
54 after the CER/CEA exchange (and initiated by the peer that sent the
55 CEA), we have a simple way to deal with our ordering problem, as resumed
56 below. Peers are: [i]nitiator, [r]esponder.
57 (1) [i] SCTP connection attempt.
58 (2) [r] accept the connection.
59 (3) [i,r] (if secure port) DTLS handshake, close on failure.
60 (4) [i] Send CER
61 (5) [r] Receive CER, send CEA using stream 0, flag "unordered" cleared.
62 [r] Immediately send a DWR after the CEA, also using stream 0,
63 flag "unordered" cleared.
64 [r] Move to STATE_OPEN_NEW state -- equivalent to OPEN except
65 that all messages are sent ordered at the moment.
66 (6) [i] receive CEA, move to OPEN state. All messages can be sent
67 unordered in OPEN state.
68 [i] As per normal operation, reply with DWA to the DWR.
69 (7) [r] Upon reception of the DWA, move to OPEN state, messages can be
70 sent unordered from this point.
71
72 Note about (5) and (6): if the Diameter Identity received in CER or CEA
73 does not match the credentials from the certificate presented during
74 TLS handshake, we may need to specify a path of clean disconnection
75 (not blocking the remote peer waiting for something).
76
77 This proposed mechanism removes the problem of application messages
78 received before the CEA by the initiator. Note that if the "old" inband
79 TLS handshake is used, this handshake plays the same synchronization
80 role than the new DWR/DWA, which becomes useless.
81
82
83 The other time where ordering is important is by the end of connection
84 lifetime, when one peer is shutting down the link for some reason
85 (reboot, overload, no activity, etc...). In case of unordered delivery,
86 we may have:
87 - peer A sends an application message followed by a DPR. Peer B receives
88 the DPR first and tears down the connection. Application message is lost.
89 - Peer B sends an application message, then receives a DPR and answers a
90 DPA. Peer A receives the DPA before the application message. The
91 application message is lost.
92
93 This situation is actually happening easily because DPR/DPA messages are
94 very short, while application messages can be quite large. Therefore,
95 they require much more time to deliver.
96
97 I really cannot see a way to counter this effect by using the ordering
98 of the messages, except by applying a timer (state STATE_CLOSING_GRACE).
99 This timer can be also useful when we detect that some messages has not
100 yet received an answer on this link, to give time to the application to
101 complete the exchange ongoing.
102
103 However, this problem must be balanced with the fact that the message
104 that is lost will be in many cases sent again as the failover mechanism
105 specifies.
106 */
107
108 /* The actual declaration of peer_state_str */
109 DECLARE_STATE_STR();
110
111 /* Helper for next macro */
112 #define case_str( _val ) \
113 case _val : return #_val
114
115 DECLARE_PEV_STR();
116
117 /************************************************************************/
118 /* Delayed startup */
119 /************************************************************************/
120 static int started = 0;
121 static pthread_mutex_t started_mtx = PTHREAD_MUTEX_INITIALIZER;
122 static pthread_cond_t started_cnd = PTHREAD_COND_INITIALIZER;
123
124 /* Wait for start signal */
fd_psm_waitstart()125 static int fd_psm_waitstart()
126 {
127 int ret = 0;
128 TRACE_ENTRY("");
129 CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
130 awake:
131 if (!ret && !started) {
132 pthread_cleanup_push( fd_cleanup_mutex, &started_mtx );
133 CHECK_POSIX_DO( ret = pthread_cond_wait(&started_cnd, &started_mtx), );
134 pthread_cleanup_pop( 0 );
135 goto awake;
136 }
137 CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
138 return ret;
139 }
140
141 /* Allow the state machines to start */
fd_psm_start()142 int fd_psm_start()
143 {
144 TRACE_ENTRY("");
145 CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
146 started = 1;
147 CHECK_POSIX( pthread_cond_broadcast(&started_cnd) );
148 CHECK_POSIX( pthread_mutex_unlock(&started_mtx) );
149 return 0;
150 }
151
152
153 /************************************************************************/
154 /* Manage the list of active peers */
155 /************************************************************************/
156
157 /* Enter/leave OPEN state */
enter_open_state(struct fd_peer * peer)158 static int enter_open_state(struct fd_peer * peer)
159 {
160 struct fd_list * li;
161 CHECK_PARAMS( FD_IS_LIST_EMPTY(&peer->p_actives) );
162
163 /* Callback registered by the credential validator (fd_peer_validate_register) */
164 if (peer->p_cb2) {
165 CHECK_FCT_DO( (*peer->p_cb2)(&peer->p_hdr.info),
166 {
167 TRACE_DEBUG(FULL, "Validation failed, terminating the connection");
168 fd_psm_terminate(peer, "DO_NOT_WANT_TO_TALK_TO_YOU" );
169 } );
170 peer->p_cb2 = NULL;
171 return 0;
172 }
173
174 /* Insert in the active peers list */
175 CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
176 for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
177 struct fd_peer * next_p = (struct fd_peer *)li->o;
178 int cmp = fd_os_cmp(peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen,
179 next_p->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamidlen);
180 if (cmp < 0)
181 break;
182 }
183 fd_list_insert_before(li, &peer->p_actives);
184 CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
185
186 /* Callback registered when the peer was added, by fd_peer_add */
187 if (peer->p_cb) {
188 TRACE_DEBUG(FULL, "Calling add callback for peer %s", peer->p_hdr.info.pi_diamid);
189 (*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data); /* TODO: do this in a separate detached thread? */
190 peer->p_cb = NULL;
191 peer->p_cb_data = NULL;
192 }
193
194 /* Start the thread to handle outgoing messages */
195 CHECK_FCT( fd_out_start(peer) );
196
197 /* Update the expiry timer now */
198 CHECK_FCT( fd_p_expi_update(peer) );
199
200 return 0;
201 }
leave_open_state(struct fd_peer * peer,int skip_failover)202 static int leave_open_state(struct fd_peer * peer, int skip_failover)
203 {
204 /* Remove from active peers list */
205 CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
206 fd_list_unlink( &peer->p_actives );
207 CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
208
209 /* Stop the "out" thread */
210 CHECK_FCT( fd_out_stop(peer) );
211
212 /* Failover the messages */
213 if (!skip_failover) {
214 fd_peer_failover_msg(peer);
215 }
216
217 return 0;
218 }
219
220
221 /************************************************************************/
222 /* Helpers for state changes */
223 /************************************************************************/
224
225 /* Cleanup pending events in the peer */
fd_psm_events_free(struct fd_peer * peer)226 void fd_psm_events_free(struct fd_peer * peer)
227 {
228 struct fd_event * ev;
229 /* Purge all events, and free the associated data if any */
230 while (fd_fifo_tryget( peer->p_events, &ev ) == 0) {
231 switch (ev->code) {
232 case FDEVP_CNX_ESTABLISHED: {
233 fd_cnx_destroy(ev->data);
234 }
235 break;
236
237 case FDEVP_TERMINATE:
238 /* Do not free the string since it is a constant */
239 break;
240
241 case FDEVP_CNX_INCOMING: {
242 struct cnx_incoming * evd = ev->data;
243 fd_hook_call(HOOK_MESSAGE_DROPPED, evd->cer, NULL, "Message discarded while cleaning peer state machine queue.", fd_msg_pmdl_get(evd->cer));
244 CHECK_FCT_DO( fd_msg_free(evd->cer), /* continue */);
245 fd_cnx_destroy(evd->cnx);
246 }
247 default:
248 free(ev->data);
249 }
250 free(ev);
251 }
252 }
253
254 /* Read state */
fd_peer_get_state(struct peer_hdr * peer)255 int fd_peer_get_state(struct peer_hdr *peer)
256 {
257 int ret;
258
259 struct fd_peer * p = (struct fd_peer *)peer;
260
261 if (!CHECK_PEER(p))
262 return -1;
263
264 CHECK_POSIX_DO( pthread_mutex_lock(&p->p_state_mtx), return -1 );
265 ret = p->p_state;
266 CHECK_POSIX_DO( pthread_mutex_unlock(&p->p_state_mtx), return -1 );
267
268 return ret;
269 }
270
271
272 /* Change state */
fd_psm_change_state(struct fd_peer * peer,int new_state)273 int fd_psm_change_state(struct fd_peer * peer, int new_state)
274 {
275 int old;
276
277 TRACE_ENTRY("%p %d(%s)", peer, new_state, STATE_STR(new_state));
278 CHECK_PARAMS( CHECK_PEER(peer) );
279
280 old = fd_peer_getstate(peer);
281 if (old == new_state)
282 return 0;
283
284 LOG(((old == STATE_OPEN) || (new_state == STATE_OPEN)) ? ((new_state == STATE_SUSPECT || new_state == STATE_CLOSED) ? FD_LOG_ERROR : FD_LOG_NOTICE ): FD_LOG_DEBUG, "'%s'\t-> '%s'\t'%s'",
285 STATE_STR(old),
286 STATE_STR(new_state),
287 peer->p_hdr.info.pi_diamid);
288
289
290 CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) );
291 peer->p_state = new_state;
292 CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) );
293
294 if (old == STATE_OPEN) {
295 CHECK_FCT( leave_open_state(peer, new_state == STATE_CLOSING_GRACE) );
296 }
297 if (old == STATE_CLOSING_GRACE) {
298 fd_peer_failover_msg(peer);
299 }
300
301 if (new_state == STATE_OPEN) {
302 CHECK_FCT( enter_open_state(peer) );
303 }
304
305 if (new_state == STATE_CLOSED) {
306 /* Purge event list */
307 fd_psm_events_free(peer);
308
309 /* Reset the counter of pending answers to send */
310 peer->p_reqin_count = 0;
311
312 /* If the peer is not persistent, we destroy it */
313 if (peer->p_hdr.info.config.pic_flags.persist == PI_PRST_NONE) {
314 CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, NULL) );
315 }
316 }
317
318 return 0;
319 }
320
321 /* Set timeout timer of next event */
fd_psm_next_timeout(struct fd_peer * peer,int add_random,int delay)322 void fd_psm_next_timeout(struct fd_peer * peer, int add_random, int delay)
323 {
324 TRACE_DEBUG(FULL, "Peer timeout reset to %d seconds%s", delay, add_random ? " (+/- 2)" : "" );
325
326 /* Initialize the timer */
327 CHECK_POSIX_DO( clock_gettime( CLOCK_REALTIME, &peer->p_psm_timer ), ASSERT(0) );
328
329 if (add_random) {
330 if (delay > 2)
331 delay -= 2;
332 else
333 delay = 0;
334
335 /* Add a random value between 0 and 4sec */
336 peer->p_psm_timer.tv_sec += random() % 4;
337 peer->p_psm_timer.tv_nsec+= random() % 1000000000L;
338 if (peer->p_psm_timer.tv_nsec >= 1000000000L) {
339 peer->p_psm_timer.tv_nsec -= 1000000000L;
340 peer->p_psm_timer.tv_sec ++;
341 }
342 }
343
344 peer->p_psm_timer.tv_sec += delay;
345
346 #ifdef SLOW_PSM
347 /* temporary for debug */
348 peer->p_psm_timer.tv_sec += 10;
349 #endif
350 }
351
352 /* Cleanup the peer */
fd_psm_cleanup(struct fd_peer * peer,int terminate)353 void fd_psm_cleanup(struct fd_peer * peer, int terminate)
354 {
355 /* Move to CLOSED state: failover messages, stop OUT thread, unlink peer from active list */
356 if (fd_peer_getstate(peer) != STATE_ZOMBIE) {
357 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_CLOSED), /* continue */ );
358 }
359
360 fd_p_cnx_abort(peer, terminate);
361
362 fd_p_ce_clear_cnx(peer, NULL);
363
364 if (peer->p_receiver) {
365 fd_cnx_destroy(peer->p_receiver);
366 peer->p_receiver = NULL;
367 }
368
369 if (terminate) {
370 fd_psm_events_free(peer);
371 CHECK_FCT_DO( fd_fifo_del(&peer->p_events), /* continue */ );
372 }
373
374 }
375
376
377 /************************************************************************/
378 /* The PSM thread */
379 /************************************************************************/
380 /* Cancellation cleanup : set ZOMBIE state in the peer */
cleanup_setstate(void * arg)381 void cleanup_setstate(void * arg)
382 {
383 struct fd_peer * peer = (struct fd_peer *)arg;
384 CHECK_PARAMS_DO( CHECK_PEER(peer), return );
385 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), );
386 peer->p_state = STATE_ZOMBIE;
387 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), );
388 return;
389 }
390
391 /* The state machine thread (controller) */
p_psm_th(void * arg)392 static void * p_psm_th( void * arg )
393 {
394 struct fd_peer * peer = (struct fd_peer *)arg;
395 int created_started = started ? 1 : 0;
396 int event;
397 size_t ev_sz;
398 void * ev_data;
399 int cur_state;
400
401 CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) );
402
403 pthread_cleanup_push( cleanup_setstate, arg );
404
405 /* Set the thread name */
406 {
407 char buf[48];
408 snprintf(buf, sizeof(buf), "PSM/%s", peer->p_hdr.info.pi_diamid);
409 fd_log_threadname ( buf );
410 }
411
412 /* The state machine starts in CLOSED state */
413 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end );
414 peer->p_state = STATE_CLOSED;
415 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end );
416
417 /* Wait that the PSM are authorized to start in the daemon */
418 CHECK_FCT_DO( fd_psm_waitstart(), goto psm_end );
419
420 /* Initialize the timer */
421 if (peer->p_flags.pf_responder) {
422 fd_psm_next_timeout(peer, 0, INCNX_TIMEOUT);
423 } else {
424 fd_psm_next_timeout(peer, created_started, 0);
425 }
426
427 psm_loop:
428 /* Get next event */
429 TRACE_DEBUG(FULL, "'%s' in state '%s' waiting for next event.",
430 peer->p_hdr.info.pi_diamid, STATE_STR(fd_peer_getstate(peer)));
431 CHECK_FCT_DO( fd_event_timedget(peer->p_events, &peer->p_psm_timer, FDEVP_PSM_TIMEOUT, &event, &ev_sz, &ev_data), goto psm_end );
432
433 cur_state = fd_peer_getstate(peer);
434 if (cur_state == -1)
435 goto psm_end;
436
437 TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p,%zd)\t'%s'",
438 STATE_STR(cur_state),
439 fd_pev_str(event), ev_data, ev_sz,
440 peer->p_hdr.info.pi_diamid);
441
442 /* Now, the action depends on the current state and the incoming event */
443
444 /* The following states are impossible */
445 ASSERT( cur_state != STATE_NEW );
446 ASSERT( cur_state != STATE_ZOMBIE );
447 ASSERT( cur_state != STATE_OPEN_HANDSHAKE ); /* because it should exist only between two loops */
448
449 /* Purge invalid events */
450 if (!CHECK_PEVENT(event)) {
451 TRACE_DEBUG(INFO, "Invalid event received in PSM '%s' : %d", peer->p_hdr.info.pi_diamid, event);
452 ASSERT(0); /* we should investigate this situation */
453 goto psm_loop;
454 }
455
456 /* Requests to terminate the peer object */
457 if (event == FDEVP_TERMINATE) {
458 switch (cur_state) {
459 case STATE_OPEN:
460 case STATE_OPEN_NEW:
461 case STATE_REOPEN:
462 /* We cannot just close the connection, we have to send a DPR first */
463 CHECK_FCT_DO( fd_p_dp_initiate(peer, ev_data), goto psm_end );
464 goto psm_loop;
465
466 /*
467 case STATE_CLOSING:
468 case STATE_CLOSING_GRACE:
469 case STATE_WAITCNXACK:
470 case STATE_WAITCNXACK_ELEC:
471 case STATE_WAITCEA:
472 case STATE_SUSPECT:
473 case STATE_CLOSED:
474 */
475 default:
476 /* In these cases, we just cleanup the peer object (if needed) and terminate */
477 goto psm_end;
478 }
479 }
480
481 /* A message was received */
482 if (event == FDEVP_CNX_MSG_RECV) {
483 struct msg * msg = NULL;
484 struct msg_hdr * hdr;
485 struct fd_cnx_rcvdata rcv_data;
486 struct fd_msg_pmdl * pmdl = NULL;
487
488 rcv_data.buffer = ev_data;
489 rcv_data.length = ev_sz;
490 pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length);
491
492 /* Parse the received buffer */
493 CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg),
494 {
495 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, NULL, peer, &rcv_data, pmdl );
496 free(ev_data);
497 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
498 goto psm_loop;
499 } );
500
501 fd_hook_associate(msg, pmdl);
502 CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen), goto psm_end);
503
504 /* If the current state does not allow receiving messages, just drop it */
505 if (cur_state == STATE_CLOSED) {
506 /* In such case, just discard the message */
507 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Message purged from queue, peer in CLOSED state", fd_msg_pmdl_get(msg));
508 fd_msg_free(msg);
509 goto psm_loop;
510 }
511
512 /* Extract the header */
513 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end );
514
515 /* If it is an answer, associate with the request or drop */
516 if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) {
517 struct msg * req;
518 /* Search matching request (same hbhid) */
519 CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end );
520 if (req == NULL) {
521 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Answer received with no corresponding sent request.", fd_msg_pmdl_get(msg));
522 fd_msg_free(msg);
523 goto psm_loop;
524 }
525
526 /* Associate */
527 CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end );
528
529 }
530
531 /* Log incoming message */
532 fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, peer, NULL, fd_msg_pmdl_get(msg));
533
534 if (cur_state == STATE_OPEN_NEW) {
535 /* OK, we have received something, so the connection is supposedly now in OPEN state at the remote site */
536 fd_psm_change_state(peer, STATE_OPEN );
537 }
538
539 /* Now handle non-link-local messages */
540 if (fd_msg_is_routable(msg)) {
541 switch (cur_state) {
542 /* To maximize compatibility -- should not be a security issue here */
543 case STATE_REOPEN:
544 case STATE_SUSPECT:
545 case STATE_CLOSING:
546 case STATE_CLOSING_GRACE:
547 TRACE_DEBUG(FULL, "Accepted a message while not in OPEN state... ");
548 /* The standard situation : */
549 case STATE_OPEN_NEW:
550 case STATE_OPEN:
551 /* We received a valid routable message, update the expiry timer */
552 CHECK_FCT_DO( fd_p_expi_update(peer), goto psm_end );
553
554 /* Set the message source and add the Route-Record */
555 if (fd_g_config->cnf_rr_in_answers || (hdr->msg_flags & CMD_FLAG_REQUEST)) {
556 CHECK_FCT_DO( fd_msg_source_setrr( msg, peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen, fd_g_config->cnf_dict ), goto psm_end);
557 }
558
559 if ((hdr->msg_flags & CMD_FLAG_REQUEST)) {
560 /* Mark the incoming request so that we know we have pending answers for this peer */
561 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end );
562 peer->p_reqin_count++;
563 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end );
564 }
565
566 /* Requeue to the global incoming queue */
567 CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto psm_end );
568
569 /* Update the peer timer (only in OPEN state) */
570 if ((cur_state == STATE_OPEN) && (!peer->p_flags.pf_dw_pending)) {
571 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_twtimer ?: fd_g_config->cnf_timer_tw);
572 }
573 break;
574
575 /* In other states, we discard the message, it is either old or invalid to send it for the remote peer */
576 case STATE_WAITCNXACK:
577 case STATE_WAITCNXACK_ELEC:
578 case STATE_WAITCEA:
579 case STATE_CLOSED:
580 default: {
581 /* In such case, just discard the message */
582 char buf[128];
583 snprintf(buf, sizeof(buf), "Received while peer state machine was in state %s.", STATE_STR(cur_state));
584 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, buf, fd_msg_pmdl_get(msg));
585 fd_msg_free(msg);
586 }
587 }
588 goto psm_loop;
589 }
590
591 /* Link-local message: They must be understood by our dictionary, otherwise we return an error */
592 {
593 struct msg * error = NULL;
594 int ret = fd_msg_parse_or_error( &msg, &error );
595 if (ret != EBADMSG) {
596 CHECK_FCT_DO( ret,
597 {
598 char buf[256];
599 snprintf(buf, sizeof(buf), "%s: An unexpected error occurred while parsing a link-local message", peer->p_hdr.info.pi_diamid);
600 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, buf, fd_msg_pmdl_get(msg));
601 fd_msg_free(msg);
602 goto psm_end;
603 } );
604 } else {
605 if (msg == NULL) {
606 /* Send the error back to the peer */
607 CHECK_FCT_DO( ret = fd_out_send(&error, NULL, peer, 0), );
608 if (error) {
609 char buf[256];
610 /* Only if an error occurred & the message was not saved / dumped */
611 snprintf(buf, sizeof(buf), "%s: error sending a message", peer->p_hdr.info.pi_diamid);
612 fd_hook_call(HOOK_MESSAGE_DROPPED, error, peer, buf, fd_msg_pmdl_get(error));
613 CHECK_FCT_DO( fd_msg_free(error), goto psm_end);
614 }
615 } else {
616 char buf[256];
617 /* We received an invalid answer, let's disconnect */
618 snprintf(buf, sizeof(buf), "%s: Received invalid answer to Base protocol message, disconnecting...", peer->p_hdr.info.pi_diamid);
619 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, buf, fd_msg_pmdl_get(msg));
620 CHECK_FCT_DO( fd_msg_free(msg), goto psm_end);
621 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
622 }
623 goto psm_loop;
624 }
625 }
626
627 /* Handle the LL message and update the expiry timer appropriately */
628 switch (hdr->msg_code) {
629 case CC_CAPABILITIES_EXCHANGE:
630 CHECK_FCT_DO( fd_p_ce_msgrcv(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer),
631 {
632 if (msg)
633 CHECK_FCT_DO( fd_msg_free(msg), );
634 goto psm_reset;
635 } );
636 break;
637
638 case CC_DISCONNECT_PEER:
639 CHECK_FCT_DO( fd_p_dp_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset );
640 if (fd_peer_getstate(peer) == STATE_CLOSING)
641 goto psm_end;
642
643 break;
644
645 case CC_DEVICE_WATCHDOG:
646 CHECK_FCT_DO( fd_p_dw_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset );
647 break;
648
649 default:
650 /* Unknown / unexpected / invalid message -- but validated by our dictionary */
651 TRACE_DEBUG(INFO, "Invalid non-routable command received: %u.", hdr->msg_code);
652 if (hdr->msg_flags & CMD_FLAG_REQUEST) {
653 do {
654 /* Reply with an error code */
655 CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ), break );
656
657 /* Set the error code */
658 CHECK_FCT_DO( fd_msg_rescode_set(msg, "DIAMETER_COMMAND_UNSUPPORTED", "Or maybe the P-bit or application Id are erroneous.", NULL, 1 ), break );
659
660 /* Send the answer */
661 CHECK_FCT_DO( fd_out_send(&msg, peer->p_cnxctx, peer, 0), break );
662 } while (0);
663 } else {
664 /* We did ASK for it ??? */
665 TRACE_DEBUG(INFO, "Received answer with erroneous 'is_routable' result...");
666 }
667
668 /* Cleanup the message if not done */
669 if (msg) {
670 char buf[256];
671 snprintf(buf, sizeof(buf), "Received un-handled non-routable command from peer '%s'.", peer->p_hdr.info.pi_diamid);
672 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg));
673 CHECK_FCT_DO( fd_msg_free(msg), /* continue */);
674 msg = NULL;
675 }
676 };
677
678 /* At this point the message must have been fully handled already */
679 if (msg) {
680 char buf[256];
681 snprintf(buf, sizeof(buf), "Internal error ('%s'): unhandled message.", peer->p_hdr.info.pi_diamid);
682 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg));
683 fd_msg_free(msg);
684 }
685
686 goto psm_loop;
687 }
688
689 /* The connection object is broken */
690 if (event == FDEVP_CNX_ERROR) {
691 switch (cur_state) {
692 case STATE_WAITCNXACK_ELEC:
693 /* Abort the initiating side */
694 fd_p_cnx_abort(peer, 0);
695 /* Process the receiver side */
696 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end );
697 break;
698
699 case STATE_WAITCEA:
700 case STATE_OPEN:
701 case STATE_OPEN_NEW:
702 case STATE_REOPEN:
703 case STATE_WAITCNXACK:
704 case STATE_SUSPECT:
705 default:
706 /* Mark the connection problem */
707 peer->p_flags.pf_cnx_pb = 1;
708
709 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "The connection was broken", NULL);
710
711 /* Destroy the connection, restart the timer to a new connection attempt */
712 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
713
714 case STATE_CLOSED:
715 goto psm_reset;
716
717 case STATE_CLOSING:
718 /* We sent a DPR so we are terminating, do not wait for DPA */
719 goto psm_end;
720
721 case STATE_CLOSING_GRACE:
722 if (peer->p_flags.pf_localterm) /* initiated here */
723 goto psm_end;
724
725 fd_psm_cleanup(peer, 0);
726
727 /* Reset the timer for next connection attempt */
728 fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer));
729 goto psm_loop;
730 }
731 goto psm_loop;
732 }
733
734 /* The connection notified a change in endpoints */
735 if (event == FDEVP_CNX_EP_CHANGE) {
736 /* We actually don't care if we are in OPEN state here... */
737
738 /* Cleanup the remote LL and primary addresses */
739 CHECK_FCT_DO( fd_ep_filter( &peer->p_hdr.info.pi_endpoints, EP_FL_CONF | EP_FL_DISC | EP_FL_ADV ), /* ignore the error */);
740 CHECK_FCT_DO( fd_ep_clearflags( &peer->p_hdr.info.pi_endpoints, EP_FL_PRIMARY ), /* ignore the error */);
741
742 /* Get the new ones */
743 CHECK_FCT_DO( fd_cnx_getremoteeps(peer->p_cnxctx, &peer->p_hdr.info.pi_endpoints), /* ignore the error */);
744
745 /* We do not support local endpoints change currently, but it could be added here if needed (refresh fd_g_config->cnf_endpoints) */
746 {
747 char * buf = NULL;
748 size_t len = 0;
749 LOG_D("Got low layer notification (IGNORED): remote endpoint(s) changed: %s", fd_ep_dump(&buf, &len, NULL, 0, 0, &peer->p_hdr.info.pi_endpoints) ?: "error");
750 free(buf);
751 }
752
753 /* Done */
754 goto psm_loop;
755 }
756
757 /* A new connection was established and CER containing this peer id was received */
758 if (event == FDEVP_CNX_INCOMING) {
759 struct cnx_incoming * params = ev_data;
760 ASSERT(params);
761
762 /* Handle the message */
763 CHECK_FCT_DO( fd_p_ce_handle_newCER(¶ms->cer, peer, ¶ms->cnx, params->validate), goto psm_end );
764
765 /* Cleanup if needed */
766 if (params->cnx) {
767 fd_cnx_destroy(params->cnx);
768 params->cnx = NULL;
769 }
770 if (params->cer) {
771 CHECK_FCT_DO( fd_msg_free(params->cer), );
772 params->cer = NULL;
773 }
774
775 /* Loop */
776 free(ev_data);
777 goto psm_loop;
778 }
779
780 /* A new connection has been established with the remote peer */
781 if (event == FDEVP_CNX_ESTABLISHED) {
782 struct cnxctx * cnx = ev_data;
783
784 /* Release the resources of the connecting thread */
785 CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */);
786 peer->p_ini_thr = (pthread_t)NULL;
787
788 switch (cur_state) {
789 case STATE_WAITCNXACK_ELEC:
790 case STATE_WAITCNXACK:
791 LOG_D("%s: Connection established, %s", peer->p_hdr.info.pi_diamid, fd_cnx_getid(cnx));
792 fd_p_ce_handle_newcnx(peer, cnx);
793 break;
794
795 default:
796 /* Just abort the attempt and continue */
797 TRACE_DEBUG(FULL, "Connection attempt successful but current state is %s, closing... (too slow?)", STATE_STR(cur_state));
798 fd_cnx_destroy(cnx);
799 }
800
801 goto psm_loop;
802 }
803
804 /* A new connection has not been established with the remote peer */
805 if (event == FDEVP_CNX_FAILED) {
806
807 /* Release the resources of the connecting thread */
808 CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */);
809 peer->p_ini_thr = (pthread_t)NULL;
810
811 switch (cur_state) {
812 case STATE_WAITCNXACK_ELEC:
813 /* Abort the initiating side */
814 fd_p_cnx_abort(peer, 0);
815 /* Process the receiver side */
816 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end );
817 break;
818
819 case STATE_WAITCNXACK:
820 /* Go back to CLOSE */
821 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
822 goto psm_reset;
823
824 default:
825 /* Just ignore */
826 TRACE_DEBUG(FULL, "Connection attempt failed but current state is %s, ignoring...", STATE_STR(cur_state));
827 }
828
829 goto psm_loop;
830 }
831
832 /* The timeout for the current state has been reached */
833 if (event == FDEVP_PSM_TIMEOUT) {
834 switch (cur_state) {
835 case STATE_OPEN:
836 case STATE_REOPEN:
837 case STATE_OPEN_NEW:
838 CHECK_FCT_DO( fd_p_dw_timeout(peer), goto psm_end );
839 goto psm_loop;
840
841 case STATE_CLOSED:
842 LOG_D("%s: Connecting...", peer->p_hdr.info.pi_diamid);
843 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_WAITCNXACK), goto psm_end );
844 fd_psm_next_timeout(peer, 0, CNX_TIMEOUT);
845 CHECK_FCT_DO( fd_p_cnx_init(peer), goto psm_end );
846 goto psm_loop;
847
848 case STATE_SUSPECT:
849 /* Mark the connection problem */
850 peer->p_flags.pf_cnx_pb = 1;
851 case STATE_WAITCNXACK:
852 case STATE_WAITCEA:
853 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "Timeout while waiting for remote peer", NULL);
854 case STATE_CLOSING:
855 /* Destroy the connection, restart the timer to a new connection attempt */
856 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
857 goto psm_reset;
858
859 case STATE_CLOSING_GRACE:
860 /* The grace period is completed, now close */
861 if (peer->p_flags.pf_localterm)
862 goto psm_end;
863
864 fd_psm_cleanup(peer, 0);
865 /* Reset the timer for next connection attempt */
866 fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer));
867 goto psm_loop;
868
869 case STATE_WAITCNXACK_ELEC:
870 /* Abort the initiating side */
871 fd_p_cnx_abort(peer, 0);
872 /* Process the receiver side */
873 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end );
874 goto psm_loop;
875
876 default:
877 ASSERT(0); /* implementation problem, we did not foresee this case? */
878 }
879 }
880
881 /* Default action : the handling has not yet been implemented. [for debug only] */
882 TRACE_DEBUG(INFO, "Missing handler in PSM for '%s'\t<-- '%s'", STATE_STR(cur_state), fd_pev_str(event));
883 psm_reset:
884 if (peer->p_flags.pf_delete)
885 goto psm_end;
886 fd_psm_cleanup(peer, 0);
887 goto psm_loop;
888
889 psm_end:
890 cur_state = fd_peer_getstate(peer);
891 if ((cur_state == STATE_CLOSING) || (cur_state == STATE_CLOSING_GRACE)) {
892 LOG_N("%s: Going to ZOMBIE state (no more activity) after normal shutdown", peer->p_hdr.info.pi_diamid);
893 } else {
894 LOG_E("%s: Going to ZOMBIE state (no more activity) after abnormal shutdown", peer->p_hdr.info.pi_diamid);
895 }
896 fd_psm_cleanup(peer, 1);
897 TRACE_DEBUG(INFO, "'%s'\t-> 'STATE_ZOMBIE' (terminated)\t'%s'",
898 STATE_STR(fd_peer_getstate(peer)),
899 peer->p_hdr.info.pi_diamid);
900 pthread_cleanup_pop(1); /* set STATE_ZOMBIE */
901 peer->p_psm = (pthread_t)NULL;
902 pthread_detach(pthread_self());
903 return NULL;
904 }
905
906
907 /************************************************************************/
908 /* Functions to control the PSM */
909 /************************************************************************/
910 /* Create the PSM thread of one peer structure */
fd_psm_begin(struct fd_peer * peer)911 int fd_psm_begin(struct fd_peer * peer )
912 {
913 TRACE_ENTRY("%p", peer);
914
915 /* Check the peer and state are OK */
916 CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW );
917
918 /* Create the FIFO for events */
919 CHECK_FCT( fd_fifo_new(&peer->p_events, 0) );
920
921 /* Create the PSM controller thread */
922 CHECK_POSIX( pthread_create( &peer->p_psm, NULL, p_psm_th, peer ) );
923
924 /* We're done */
925 return 0;
926 }
927
928 /* End the PSM (clean ending) */
fd_psm_terminate(struct fd_peer * peer,char * reason)929 int fd_psm_terminate(struct fd_peer * peer, char * reason )
930 {
931 TRACE_ENTRY("%p", peer);
932 CHECK_PARAMS( CHECK_PEER(peer) );
933
934 if (fd_peer_getstate(peer) != STATE_ZOMBIE) {
935 CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, reason) );
936 } else {
937 TRACE_DEBUG(FULL, "Peer '%s' was already terminated", peer->p_hdr.info.pi_diamid);
938 }
939 return 0;
940 }
941
942 /* End the PSM & cleanup the peer structure */
fd_psm_abord(struct fd_peer * peer)943 void fd_psm_abord(struct fd_peer * peer )
944 {
945 TRACE_ENTRY("%p", peer);
946
947 /* Cancel PSM thread */
948 CHECK_FCT_DO( fd_thr_term(&peer->p_psm), /* continue */ );
949
950 /* Cleanup the data */
951 fd_psm_cleanup(peer, 1);
952
953 /* Destroy the event list */
954 CHECK_FCT_DO( fd_fifo_del(&peer->p_events), /* continue */ );
955
956 /* Remaining cleanups are performed in fd_peer_free */
957 return;
958 }
959
960