1 /*
2  * Main implementation file for interface to Forwarding Plane Manager.
3  *
4  * Copyright (C) 2012 by Open Source Routing.
5  * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6  *
7  * This file is part of GNU Zebra.
8  *
9  * GNU Zebra is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2, or (at your option) any
12  * later version.
13  *
14  * GNU Zebra is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with GNU Zebra; see the file COPYING.  If not, write to the Free
21  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
22  * 02111-1307, USA.
23  */
24 
25 #include <zebra.h>
26 
27 #include "log.h"
28 #include "stream.h"
29 #include "thread.h"
30 #include "network.h"
31 #include "command.h"
32 
33 #include "zebra/rib.h"
34 
35 #include "fpm/fpm.h"
36 #include "zebra_fpm.h"
37 #include "zebra_fpm_private.h"
38 
39 /*
40  * Interval at which we attempt to connect to the FPM.
41  */
42 #define ZFPM_CONNECT_RETRY_IVL   5
43 
44 /*
45  * Sizes of outgoing and incoming stream buffers for writing/reading
46  * FPM messages.
47  */
48 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
49 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
50 
51 /*
52  * The maximum number of times the FPM socket write callback can call
53  * 'write' before it yields.
54  */
55 #define ZFPM_MAX_WRITES_PER_RUN 10
56 
57 /*
58  * Interval over which we collect statistics.
59  */
60 #define ZFPM_STATS_IVL_SECS        10
61 
62 /*
63  * Structure that holds state for iterating over all route_node
64  * structures that are candidates for being communicated to the FPM.
65  */
66 typedef struct zfpm_rnodes_iter_t_
67 {
68   rib_tables_iter_t tables_iter;
69   route_table_iter_t iter;
70 } zfpm_rnodes_iter_t;
71 
72 /*
73  * Statistics.
74  */
75 typedef struct zfpm_stats_t_ {
76   unsigned long connect_calls;
77   unsigned long connect_no_sock;
78 
79   unsigned long read_cb_calls;
80 
81   unsigned long write_cb_calls;
82   unsigned long write_calls;
83   unsigned long partial_writes;
84   unsigned long max_writes_hit;
85   unsigned long t_write_yields;
86 
87   unsigned long nop_deletes_skipped;
88   unsigned long route_adds;
89   unsigned long route_dels;
90 
91   unsigned long updates_triggered;
92   unsigned long redundant_triggers;
93   unsigned long non_fpm_table_triggers;
94 
95   unsigned long dests_del_after_update;
96 
97   unsigned long t_conn_down_starts;
98   unsigned long t_conn_down_dests_processed;
99   unsigned long t_conn_down_yields;
100   unsigned long t_conn_down_finishes;
101 
102   unsigned long t_conn_up_starts;
103   unsigned long t_conn_up_dests_processed;
104   unsigned long t_conn_up_yields;
105   unsigned long t_conn_up_aborts;
106   unsigned long t_conn_up_finishes;
107 
108 } zfpm_stats_t;
109 
110 /*
111  * States for the FPM state machine.
112  */
113 typedef enum {
114 
115   /*
116    * In this state we are not yet ready to connect to the FPM. This
117    * can happen when this module is disabled, or if we're cleaning up
118    * after a connection has gone down.
119    */
120   ZFPM_STATE_IDLE,
121 
122   /*
123    * Ready to talk to the FPM and periodically trying to connect to
124    * it.
125    */
126   ZFPM_STATE_ACTIVE,
127 
128   /*
129    * In the middle of bringing up a TCP connection. Specifically,
130    * waiting for a connect() call to complete asynchronously.
131    */
132   ZFPM_STATE_CONNECTING,
133 
134   /*
135    * TCP connection to the FPM is up.
136    */
137   ZFPM_STATE_ESTABLISHED
138 
139 } zfpm_state_t;
140 
141 /*
142  * Message format to be used to communicate with the FPM.
143  */
144 typedef enum
145 {
146   ZFPM_MSG_FORMAT_NONE,
147   ZFPM_MSG_FORMAT_NETLINK,
148   ZFPM_MSG_FORMAT_PROTOBUF,
149 } zfpm_msg_format_e;
150 /*
151  * Globals.
152  */
153 typedef struct zfpm_glob_t_
154 {
155 
156   /*
157    * True if the FPM module has been enabled.
158    */
159   int enabled;
160 
161   /*
162    * Message format to be used to communicate with the fpm.
163    */
164   zfpm_msg_format_e message_format;
165 
166   struct thread_master *master;
167 
168   zfpm_state_t state;
169 
170   in_addr_t   fpm_server;
171   /*
172    * Port on which the FPM is running.
173    */
174   int fpm_port;
175 
176   /*
177    * List of rib_dest_t structures to be processed
178    */
179   TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
180 
181   /*
182    * Stream socket to the FPM.
183    */
184   int sock;
185 
186   /*
187    * Buffers for messages to/from the FPM.
188    */
189   struct stream *obuf;
190   struct stream *ibuf;
191 
192   /*
193    * Threads for I/O.
194    */
195   struct thread *t_connect;
196   struct thread *t_write;
197   struct thread *t_read;
198 
199   /*
200    * Thread to clean up after the TCP connection to the FPM goes down
201    * and the state that belongs to it.
202    */
203   struct thread *t_conn_down;
204 
205   struct {
206     zfpm_rnodes_iter_t iter;
207   } t_conn_down_state;
208 
209   /*
210    * Thread to take actions once the TCP conn to the FPM comes up, and
211    * the state that belongs to it.
212    */
213   struct thread *t_conn_up;
214 
215   struct {
216     zfpm_rnodes_iter_t iter;
217   } t_conn_up_state;
218 
219   unsigned long connect_calls;
220   time_t last_connect_call_time;
221 
222   /*
223    * Stats from the start of the current statistics interval up to
224    * now. These are the counters we typically update in the code.
225    */
226   zfpm_stats_t stats;
227 
228   /*
229    * Statistics that were gathered in the last collection interval.
230    */
231   zfpm_stats_t last_ivl_stats;
232 
233   /*
234    * Cumulative stats from the last clear to the start of the current
235    * statistics interval.
236    */
237   zfpm_stats_t cumulative_stats;
238 
239   /*
240    * Stats interval timer.
241    */
242   struct thread *t_stats;
243 
244   /*
245    * If non-zero, the last time when statistics were cleared.
246    */
247   time_t last_stats_clear_time;
248 
249 } zfpm_glob_t;
250 
251 static zfpm_glob_t zfpm_glob_space;
252 static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
253 
254 static int zfpm_read_cb (struct thread *thread);
255 static int zfpm_write_cb (struct thread *thread);
256 
257 static void zfpm_set_state (zfpm_state_t state, const char *reason);
258 static void zfpm_start_connect_timer (const char *reason);
259 static void zfpm_start_stats_timer (void);
260 
261 /*
262  * zfpm_thread_should_yield
263  */
264 static inline int
zfpm_thread_should_yield(struct thread * t)265 zfpm_thread_should_yield (struct thread *t)
266 {
267   return thread_should_yield (t);
268 }
269 
270 /*
271  * zfpm_state_to_str
272  */
273 static const char *
zfpm_state_to_str(zfpm_state_t state)274 zfpm_state_to_str (zfpm_state_t state)
275 {
276   switch (state)
277     {
278 
279     case ZFPM_STATE_IDLE:
280       return "idle";
281 
282     case ZFPM_STATE_ACTIVE:
283       return "active";
284 
285     case ZFPM_STATE_CONNECTING:
286       return "connecting";
287 
288     case ZFPM_STATE_ESTABLISHED:
289       return "established";
290 
291     default:
292       return "unknown";
293     }
294 }
295 
296 /*
297  * zfpm_get_time
298  */
299 static time_t
zfpm_get_time(void)300 zfpm_get_time (void)
301 {
302   struct timeval tv;
303 
304   if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
305     zlog_warn ("FPM: quagga_gettime failed!!");
306 
307   return tv.tv_sec;
308 }
309 
310 /*
311  * zfpm_get_elapsed_time
312  *
313  * Returns the time elapsed (in seconds) since the given time.
314  */
315 static time_t
zfpm_get_elapsed_time(time_t reference)316 zfpm_get_elapsed_time (time_t reference)
317 {
318   time_t now;
319 
320   now = zfpm_get_time ();
321 
322   if (now < reference)
323     {
324       assert (0);
325       return 0;
326     }
327 
328   return now - reference;
329 }
330 
331 /*
332  * zfpm_is_table_for_fpm
333  *
334  * Returns TRUE if the the given table is to be communicated to the
335  * FPM.
336  */
337 static inline int
zfpm_is_table_for_fpm(struct route_table * table)338 zfpm_is_table_for_fpm (struct route_table *table)
339 {
340   rib_table_info_t *info;
341 
342   info = rib_table_info (table);
343 
344   /*
345    * We only send the unicast tables in the main instance to the FPM
346    * at this point.
347    */
348   if (info->zvrf->vrf_id != 0)
349     return 0;
350 
351   if (info->safi != SAFI_UNICAST)
352     return 0;
353 
354   return 1;
355 }
356 
357 /*
358  * zfpm_rnodes_iter_init
359  */
360 static inline void
zfpm_rnodes_iter_init(zfpm_rnodes_iter_t * iter)361 zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
362 {
363   memset (iter, 0, sizeof (*iter));
364   rib_tables_iter_init (&iter->tables_iter);
365 
366   /*
367    * This is a hack, but it makes implementing 'next' easier by
368    * ensuring that route_table_iter_next() will return NULL the first
369    * time we call it.
370    */
371   route_table_iter_init (&iter->iter, NULL);
372   route_table_iter_cleanup (&iter->iter);
373 }
374 
375 /*
376  * zfpm_rnodes_iter_next
377  */
378 static inline struct route_node *
zfpm_rnodes_iter_next(zfpm_rnodes_iter_t * iter)379 zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
380 {
381   struct route_node *rn;
382   struct route_table *table;
383 
384   while (1)
385     {
386       rn = route_table_iter_next (&iter->iter);
387       if (rn)
388 	return rn;
389 
390       /*
391        * We've made our way through this table, go to the next one.
392        */
393       route_table_iter_cleanup (&iter->iter);
394 
395       while ((table = rib_tables_iter_next (&iter->tables_iter)))
396 	{
397 	  if (zfpm_is_table_for_fpm (table))
398 	    break;
399 	}
400 
401       if (!table)
402 	return NULL;
403 
404       route_table_iter_init (&iter->iter, table);
405     }
406 
407   return NULL;
408 }
409 
410 /*
411  * zfpm_rnodes_iter_pause
412  */
413 static inline void
zfpm_rnodes_iter_pause(zfpm_rnodes_iter_t * iter)414 zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
415 {
416   route_table_iter_pause (&iter->iter);
417 }
418 
419 /*
420  * zfpm_rnodes_iter_cleanup
421  */
422 static inline void
zfpm_rnodes_iter_cleanup(zfpm_rnodes_iter_t * iter)423 zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
424 {
425   route_table_iter_cleanup (&iter->iter);
426   rib_tables_iter_cleanup (&iter->tables_iter);
427 }
428 
429 /*
430  * zfpm_stats_init
431  *
432  * Initialize a statistics block.
433  */
434 static inline void
zfpm_stats_init(zfpm_stats_t * stats)435 zfpm_stats_init (zfpm_stats_t *stats)
436 {
437   memset (stats, 0, sizeof (*stats));
438 }
439 
440 /*
441  * zfpm_stats_reset
442  */
443 static inline void
zfpm_stats_reset(zfpm_stats_t * stats)444 zfpm_stats_reset (zfpm_stats_t *stats)
445 {
446   zfpm_stats_init (stats);
447 }
448 
449 /*
450  * zfpm_stats_copy
451  */
452 static inline void
zfpm_stats_copy(const zfpm_stats_t * src,zfpm_stats_t * dest)453 zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
454 {
455   memcpy (dest, src, sizeof (*dest));
456 }
457 
458 /*
459  * zfpm_stats_compose
460  *
461  * Total up the statistics in two stats structures ('s1 and 's2') and
462  * return the result in the third argument, 'result'. Note that the
463  * pointer 'result' may be the same as 's1' or 's2'.
464  *
465  * For simplicity, the implementation below assumes that the stats
466  * structure is composed entirely of counters. This can easily be
467  * changed when necessary.
468  */
469 static void
zfpm_stats_compose(const zfpm_stats_t * s1,const zfpm_stats_t * s2,zfpm_stats_t * result)470 zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
471 		    zfpm_stats_t *result)
472 {
473   const unsigned long *p1, *p2;
474   unsigned long *result_p;
475   int i, num_counters;
476 
477   p1 = (const unsigned long *) s1;
478   p2 = (const unsigned long *) s2;
479   result_p = (unsigned long *) result;
480 
481   num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
482 
483   for (i = 0; i < num_counters; i++)
484     {
485       result_p[i] = p1[i] + p2[i];
486     }
487 }
488 
489 /*
490  * zfpm_read_on
491  */
492 static inline void
zfpm_read_on(void)493 zfpm_read_on (void)
494 {
495   assert (!zfpm_g->t_read);
496   assert (zfpm_g->sock >= 0);
497 
498   THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
499 		  zfpm_g->sock);
500 }
501 
502 /*
503  * zfpm_write_on
504  */
505 static inline void
zfpm_write_on(void)506 zfpm_write_on (void)
507 {
508   assert (!zfpm_g->t_write);
509   assert (zfpm_g->sock >= 0);
510 
511   THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
512 		   zfpm_g->sock);
513 }
514 
515 /*
516  * zfpm_read_off
517  */
518 static inline void
zfpm_read_off(void)519 zfpm_read_off (void)
520 {
521   THREAD_READ_OFF (zfpm_g->t_read);
522 }
523 
524 /*
525  * zfpm_write_off
526  */
527 static inline void
zfpm_write_off(void)528 zfpm_write_off (void)
529 {
530   THREAD_WRITE_OFF (zfpm_g->t_write);
531 }
532 
533 /*
534  * zfpm_conn_up_thread_cb
535  *
536  * Callback for actions to be taken when the connection to the FPM
537  * comes up.
538  */
539 static int
zfpm_conn_up_thread_cb(struct thread * thread)540 zfpm_conn_up_thread_cb (struct thread *thread)
541 {
542   struct route_node *rnode;
543   zfpm_rnodes_iter_t *iter;
544   rib_dest_t *dest;
545 
546   assert (zfpm_g->t_conn_up);
547   zfpm_g->t_conn_up = NULL;
548 
549   iter = &zfpm_g->t_conn_up_state.iter;
550 
551   if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
552     {
553       zfpm_debug ("Connection not up anymore, conn_up thread aborting");
554       zfpm_g->stats.t_conn_up_aborts++;
555       goto done;
556     }
557 
558   while ((rnode = zfpm_rnodes_iter_next (iter)))
559     {
560       dest = rib_dest_from_rnode (rnode);
561 
562       if (dest)
563 	{
564 	  zfpm_g->stats.t_conn_up_dests_processed++;
565 	  zfpm_trigger_update (rnode, NULL);
566 	}
567 
568       /*
569        * Yield if need be.
570        */
571       if (!zfpm_thread_should_yield (thread))
572 	continue;
573 
574       zfpm_g->stats.t_conn_up_yields++;
575       zfpm_rnodes_iter_pause (iter);
576       zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
577 						 zfpm_conn_up_thread_cb,
578 						 0, 0);
579       return 0;
580     }
581 
582   zfpm_g->stats.t_conn_up_finishes++;
583 
584  done:
585   zfpm_rnodes_iter_cleanup (iter);
586   return 0;
587 }
588 
589 /*
590  * zfpm_connection_up
591  *
592  * Called when the connection to the FPM comes up.
593  */
594 static void
zfpm_connection_up(const char * detail)595 zfpm_connection_up (const char *detail)
596 {
597   assert (zfpm_g->sock >= 0);
598   zfpm_read_on ();
599   zfpm_write_on ();
600   zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
601 
602   /*
603    * Start thread to push existing routes to the FPM.
604    */
605   assert (!zfpm_g->t_conn_up);
606 
607   zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
608 
609   zfpm_debug ("Starting conn_up thread");
610   zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
611 					     zfpm_conn_up_thread_cb, 0, 0);
612   zfpm_g->stats.t_conn_up_starts++;
613 }
614 
615 /*
616  * zfpm_connect_check
617  *
618  * Check if an asynchronous connect() to the FPM is complete.
619  */
620 static void
zfpm_connect_check()621 zfpm_connect_check ()
622 {
623   int status;
624   socklen_t slen;
625   int ret;
626 
627   zfpm_read_off ();
628   zfpm_write_off ();
629 
630   slen = sizeof (status);
631   ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
632 		    &slen);
633 
634   if (ret >= 0 && status == 0)
635     {
636       zfpm_connection_up ("async connect complete");
637       return;
638     }
639 
640   /*
641    * getsockopt() failed or indicated an error on the socket.
642    */
643   close (zfpm_g->sock);
644   zfpm_g->sock = -1;
645 
646   zfpm_start_connect_timer ("getsockopt() after async connect failed");
647   return;
648 }
649 
650 /*
651  * zfpm_conn_down_thread_cb
652  *
653  * Callback that is invoked to clean up state after the TCP connection
654  * to the FPM goes down.
655  */
656 static int
zfpm_conn_down_thread_cb(struct thread * thread)657 zfpm_conn_down_thread_cb (struct thread *thread)
658 {
659   struct route_node *rnode;
660   zfpm_rnodes_iter_t *iter;
661   rib_dest_t *dest;
662 
663   assert (zfpm_g->state == ZFPM_STATE_IDLE);
664 
665   assert (zfpm_g->t_conn_down);
666   zfpm_g->t_conn_down = NULL;
667 
668   iter = &zfpm_g->t_conn_down_state.iter;
669 
670   while ((rnode = zfpm_rnodes_iter_next (iter)))
671     {
672       dest = rib_dest_from_rnode (rnode);
673 
674       if (dest)
675 	{
676 	  if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
677 	    {
678 	      TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
679 	    }
680 
681 	  UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
682 	  UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
683 
684 	  zfpm_g->stats.t_conn_down_dests_processed++;
685 
686 	  /*
687 	   * Check if the dest should be deleted.
688 	   */
689 	  rib_gc_dest(rnode);
690 	}
691 
692       /*
693        * Yield if need be.
694        */
695       if (!zfpm_thread_should_yield (thread))
696 	continue;
697 
698       zfpm_g->stats.t_conn_down_yields++;
699       zfpm_rnodes_iter_pause (iter);
700       zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
701 						   zfpm_conn_down_thread_cb,
702 						   0, 0);
703       return 0;
704     }
705 
706   zfpm_g->stats.t_conn_down_finishes++;
707   zfpm_rnodes_iter_cleanup (iter);
708 
709   /*
710    * Start the process of connecting to the FPM again.
711    */
712   zfpm_start_connect_timer ("cleanup complete");
713   return 0;
714 }
715 
716 /*
717  * zfpm_connection_down
718  *
719  * Called when the connection to the FPM has gone down.
720  */
721 static void
zfpm_connection_down(const char * detail)722 zfpm_connection_down (const char *detail)
723 {
724   if (!detail)
725     detail = "unknown";
726 
727   assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
728 
729   zlog_info ("connection to the FPM has gone down: %s", detail);
730 
731   zfpm_read_off ();
732   zfpm_write_off ();
733 
734   stream_reset (zfpm_g->ibuf);
735   stream_reset (zfpm_g->obuf);
736 
737   if (zfpm_g->sock >= 0) {
738     close (zfpm_g->sock);
739     zfpm_g->sock = -1;
740   }
741 
742   /*
743    * Start thread to clean up state after the connection goes down.
744    */
745   assert (!zfpm_g->t_conn_down);
746   zfpm_debug ("Starting conn_down thread");
747   zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
748   zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
749 					       zfpm_conn_down_thread_cb, 0, 0);
750   zfpm_g->stats.t_conn_down_starts++;
751 
752   zfpm_set_state (ZFPM_STATE_IDLE, detail);
753 }
754 
755 /*
756  * zfpm_read_cb
757  */
758 static int
zfpm_read_cb(struct thread * thread)759 zfpm_read_cb (struct thread *thread)
760 {
761   size_t already;
762   struct stream *ibuf;
763   uint16_t msg_len;
764   fpm_msg_hdr_t *hdr;
765 
766   zfpm_g->stats.read_cb_calls++;
767   assert (zfpm_g->t_read);
768   zfpm_g->t_read = NULL;
769 
770   /*
771    * Check if async connect is now done.
772    */
773   if (zfpm_g->state == ZFPM_STATE_CONNECTING)
774     {
775       zfpm_connect_check();
776       return 0;
777     }
778 
779   assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
780   assert (zfpm_g->sock >= 0);
781 
782   ibuf = zfpm_g->ibuf;
783 
784   already = stream_get_endp (ibuf);
785   if (already < FPM_MSG_HDR_LEN)
786     {
787       ssize_t nbyte;
788 
789       nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
790       if (nbyte == 0 || nbyte == -1)
791 	{
792 	  zfpm_connection_down ("closed socket in read");
793 	  return 0;
794 	}
795 
796       if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
797 	goto done;
798 
799       already = FPM_MSG_HDR_LEN;
800     }
801 
802   stream_set_getp (ibuf, 0);
803 
804   hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
805 
806   if (!fpm_msg_hdr_ok (hdr))
807     {
808       zfpm_connection_down ("invalid message header");
809       return 0;
810     }
811 
812   msg_len = fpm_msg_len (hdr);
813 
814   /*
815    * Read out the rest of the packet.
816    */
817   if (already < msg_len)
818     {
819       ssize_t nbyte;
820 
821       nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
822 
823       if (nbyte == 0 || nbyte == -1)
824 	{
825 	  zfpm_connection_down ("failed to read message");
826 	  return 0;
827 	}
828 
829       if (nbyte != (ssize_t) (msg_len - already))
830 	goto done;
831     }
832 
833   zfpm_debug ("Read out a full fpm message");
834 
835   /*
836    * Just throw it away for now.
837    */
838   stream_reset (ibuf);
839 
840  done:
841   zfpm_read_on ();
842   return 0;
843 }
844 
845 /*
846  * zfpm_writes_pending
847  *
848  * Returns TRUE if we may have something to write to the FPM.
849  */
850 static int
zfpm_writes_pending(void)851 zfpm_writes_pending (void)
852 {
853 
854   /*
855    * Check if there is any data in the outbound buffer that has not
856    * been written to the socket yet.
857    */
858   if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
859     return 1;
860 
861   /*
862    * Check if there are any prefixes on the outbound queue.
863    */
864   if (!TAILQ_EMPTY (&zfpm_g->dest_q))
865     return 1;
866 
867   return 0;
868 }
869 
870 /*
871  * zfpm_encode_route
872  *
873  * Encode a message to the FPM with information about the given route.
874  *
875  * Returns the number of bytes written to the buffer. 0 or a negative
876  * value indicates an error.
877  */
878 static inline int
zfpm_encode_route(rib_dest_t * dest,struct rib * rib,char * in_buf,size_t in_buf_len,fpm_msg_type_e * msg_type)879 zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
880 		   size_t in_buf_len, fpm_msg_type_e *msg_type)
881 {
882   size_t len;
883   int cmd;
884   len = 0;
885 
886   *msg_type = FPM_MSG_TYPE_NONE;
887 
888   switch (zfpm_g->message_format) {
889 
890   case ZFPM_MSG_FORMAT_PROTOBUF:
891 #ifdef HAVE_PROTOBUF
892     len = zfpm_protobuf_encode_route (dest, rib, (uint8_t *) in_buf,
893 				      in_buf_len);
894     *msg_type = FPM_MSG_TYPE_PROTOBUF;
895 #endif
896     break;
897 
898   case ZFPM_MSG_FORMAT_NETLINK:
899 #ifdef HAVE_NETLINK
900     *msg_type = FPM_MSG_TYPE_NETLINK;
901     cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
902     len = zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
903     assert(fpm_msg_align(len) == len);
904     *msg_type = FPM_MSG_TYPE_NETLINK;
905 #endif /* HAVE_NETLINK */
906     break;
907 
908   default:
909     break;
910   }
911 
912   return len;
913 
914 }
915 
916 /*
917  * zfpm_route_for_update
918  *
919  * Returns the rib that is to be sent to the FPM for a given dest.
920  */
921 struct rib *
zfpm_route_for_update(rib_dest_t * dest)922 zfpm_route_for_update (rib_dest_t *dest)
923 {
924   struct rib *rib;
925 
926   RIB_DEST_FOREACH_ROUTE (dest, rib)
927     {
928       if (!CHECK_FLAG (rib->status, RIB_ENTRY_SELECTED_FIB))
929 	continue;
930 
931       return rib;
932     }
933 
934   /*
935    * We have no route for this destination.
936    */
937   return NULL;
938 }
939 
940 /*
941  * zfpm_build_updates
942  *
943  * Process the outgoing queue and write messages to the outbound
944  * buffer.
945  */
946 static void
zfpm_build_updates(void)947 zfpm_build_updates (void)
948 {
949   struct stream *s;
950   rib_dest_t *dest;
951   unsigned char *buf, *data, *buf_end;
952   size_t msg_len;
953   size_t data_len;
954   fpm_msg_hdr_t *hdr;
955   struct rib *rib;
956   int is_add, write_msg;
957   fpm_msg_type_e msg_type;
958 
959   s = zfpm_g->obuf;
960 
961   assert (stream_empty (s));
962 
963   do {
964 
965     /*
966      * Make sure there is enough space to write another message.
967      */
968     if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
969       break;
970 
971     buf = STREAM_DATA (s) + stream_get_endp (s);
972     buf_end = buf + STREAM_WRITEABLE (s);
973 
974     dest = TAILQ_FIRST (&zfpm_g->dest_q);
975     if (!dest)
976       break;
977 
978     assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
979 
980     hdr = (fpm_msg_hdr_t *) buf;
981     hdr->version = FPM_PROTO_VERSION;
982 
983     data = fpm_msg_data (hdr);
984 
985     rib = zfpm_route_for_update (dest);
986     is_add = rib ? 1 : 0;
987 
988     write_msg = 1;
989 
990     /*
991      * If this is a route deletion, and we have not sent the route to
992      * the FPM previously, skip it.
993      */
994     if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
995       {
996 	write_msg = 0;
997 	zfpm_g->stats.nop_deletes_skipped++;
998       }
999 
1000     if (write_msg) {
1001       data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data,
1002 				    &msg_type);
1003 
1004       assert (data_len);
1005       if (data_len)
1006 	{
1007 	  hdr->msg_type = msg_type;
1008 	  msg_len = fpm_data_len_to_msg_len (data_len);
1009 	  hdr->msg_len = htons (msg_len);
1010 	  stream_forward_endp (s, msg_len);
1011 
1012 	  if (is_add)
1013 	    zfpm_g->stats.route_adds++;
1014 	  else
1015 	    zfpm_g->stats.route_dels++;
1016 	}
1017     }
1018 
1019     /*
1020      * Remove the dest from the queue, and reset the flag.
1021      */
1022     UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1023     TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
1024 
1025     if (is_add)
1026       {
1027 	SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1028       }
1029     else
1030       {
1031 	UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1032       }
1033 
1034     /*
1035      * Delete the destination if necessary.
1036      */
1037     if (rib_gc_dest (dest->rnode))
1038       zfpm_g->stats.dests_del_after_update++;
1039 
1040   } while (1);
1041 
1042 }
1043 
1044 /*
1045  * zfpm_write_cb
1046  */
1047 static int
zfpm_write_cb(struct thread * thread)1048 zfpm_write_cb (struct thread *thread)
1049 {
1050   struct stream *s;
1051   int num_writes;
1052 
1053   zfpm_g->stats.write_cb_calls++;
1054   assert (zfpm_g->t_write);
1055   zfpm_g->t_write = NULL;
1056 
1057   /*
1058    * Check if async connect is now done.
1059    */
1060   if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1061     {
1062       zfpm_connect_check ();
1063       return 0;
1064     }
1065 
1066   assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1067   assert (zfpm_g->sock >= 0);
1068 
1069   num_writes = 0;
1070 
1071   do
1072     {
1073       int bytes_to_write, bytes_written;
1074 
1075       s = zfpm_g->obuf;
1076 
1077       /*
1078        * If the stream is empty, try fill it up with data.
1079        */
1080       if (stream_empty (s))
1081 	{
1082 	  zfpm_build_updates ();
1083 	}
1084 
1085       bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1086       if (!bytes_to_write)
1087 	break;
1088 
1089       bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1090       zfpm_g->stats.write_calls++;
1091       num_writes++;
1092 
1093       if (bytes_written < 0)
1094 	{
1095 	  if (ERRNO_IO_RETRY (errno))
1096 	    break;
1097 
1098 	  zfpm_connection_down ("failed to write to socket");
1099 	  return 0;
1100 	}
1101 
1102       if (bytes_written != bytes_to_write)
1103 	{
1104 
1105 	  /*
1106 	   * Partial write.
1107 	   */
1108 	  stream_forward_getp (s, bytes_written);
1109 	  zfpm_g->stats.partial_writes++;
1110 	  break;
1111 	}
1112 
1113       /*
1114        * We've written out the entire contents of the stream.
1115        */
1116       stream_reset (s);
1117 
1118       if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1119 	{
1120 	  zfpm_g->stats.max_writes_hit++;
1121 	  break;
1122 	}
1123 
1124       if (zfpm_thread_should_yield (thread))
1125 	{
1126 	  zfpm_g->stats.t_write_yields++;
1127 	  break;
1128 	}
1129     } while (1);
1130 
1131   if (zfpm_writes_pending ())
1132       zfpm_write_on ();
1133 
1134   return 0;
1135 }
1136 
1137 /*
1138  * zfpm_connect_cb
1139  */
1140 static int
zfpm_connect_cb(struct thread * t)1141 zfpm_connect_cb (struct thread *t)
1142 {
1143   int sock, ret;
1144   struct sockaddr_in serv;
1145 
1146   assert (zfpm_g->t_connect);
1147   zfpm_g->t_connect = NULL;
1148   assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1149 
1150   sock = socket (AF_INET, SOCK_STREAM, 0);
1151   if (sock < 0)
1152     {
1153       zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1154       zfpm_g->stats.connect_no_sock++;
1155       return 0;
1156     }
1157 
1158   set_nonblocking(sock);
1159 
1160   /* Make server socket. */
1161   memset (&serv, 0, sizeof (serv));
1162   serv.sin_family = AF_INET;
1163   serv.sin_port = htons (zfpm_g->fpm_port);
1164 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1165   serv.sin_len = sizeof (struct sockaddr_in);
1166 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1167   if (!zfpm_g->fpm_server)
1168     serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1169   else
1170     serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1171 
1172   /*
1173    * Connect to the FPM.
1174    */
1175   zfpm_g->connect_calls++;
1176   zfpm_g->stats.connect_calls++;
1177   zfpm_g->last_connect_call_time = zfpm_get_time ();
1178 
1179   ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1180   if (ret >= 0)
1181     {
1182       zfpm_g->sock = sock;
1183       zfpm_connection_up ("connect succeeded");
1184       return 1;
1185     }
1186 
1187   if (errno == EINPROGRESS)
1188     {
1189       zfpm_g->sock = sock;
1190       zfpm_read_on ();
1191       zfpm_write_on ();
1192       zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1193       return 0;
1194     }
1195 
1196   zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1197   close (sock);
1198 
1199   /*
1200    * Restart timer for retrying connection.
1201    */
1202   zfpm_start_connect_timer ("connect() failed");
1203   return 0;
1204 }
1205 
1206 /*
1207  * zfpm_set_state
1208  *
1209  * Move state machine into the given state.
1210  */
1211 static void
zfpm_set_state(zfpm_state_t state,const char * reason)1212 zfpm_set_state (zfpm_state_t state, const char *reason)
1213 {
1214   zfpm_state_t cur_state = zfpm_g->state;
1215 
1216   if (!reason)
1217     reason = "Unknown";
1218 
1219   if (state == cur_state)
1220     return;
1221 
1222   zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1223 	     zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1224 	     reason);
1225 
1226   switch (state) {
1227 
1228   case ZFPM_STATE_IDLE:
1229     assert (cur_state == ZFPM_STATE_ESTABLISHED);
1230     break;
1231 
1232   case ZFPM_STATE_ACTIVE:
1233      assert (cur_state == ZFPM_STATE_IDLE ||
1234 	     cur_state == ZFPM_STATE_CONNECTING);
1235     assert (zfpm_g->t_connect);
1236     break;
1237 
1238   case ZFPM_STATE_CONNECTING:
1239     assert (zfpm_g->sock);
1240     assert (cur_state == ZFPM_STATE_ACTIVE);
1241     assert (zfpm_g->t_read);
1242     assert (zfpm_g->t_write);
1243     break;
1244 
1245   case ZFPM_STATE_ESTABLISHED:
1246     assert (cur_state == ZFPM_STATE_ACTIVE ||
1247 	    cur_state == ZFPM_STATE_CONNECTING);
1248     assert (zfpm_g->sock);
1249     assert (zfpm_g->t_read);
1250     assert (zfpm_g->t_write);
1251     break;
1252   }
1253 
1254   zfpm_g->state = state;
1255 }
1256 
1257 /*
1258  * zfpm_calc_connect_delay
1259  *
1260  * Returns the number of seconds after which we should attempt to
1261  * reconnect to the FPM.
1262  */
1263 static long
zfpm_calc_connect_delay(void)1264 zfpm_calc_connect_delay (void)
1265 {
1266   time_t elapsed;
1267 
1268   /*
1269    * Return 0 if this is our first attempt to connect.
1270    */
1271   if (zfpm_g->connect_calls == 0)
1272     {
1273       return 0;
1274     }
1275 
1276   elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1277 
1278   if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1279     return 0;
1280   }
1281 
1282   return ZFPM_CONNECT_RETRY_IVL - elapsed;
1283 }
1284 
1285 /*
1286  * zfpm_start_connect_timer
1287  */
1288 static void
zfpm_start_connect_timer(const char * reason)1289 zfpm_start_connect_timer (const char *reason)
1290 {
1291   long delay_secs;
1292 
1293   assert (!zfpm_g->t_connect);
1294   assert (zfpm_g->sock < 0);
1295 
1296   assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1297 	 zfpm_g->state == ZFPM_STATE_ACTIVE ||
1298 	 zfpm_g->state == ZFPM_STATE_CONNECTING);
1299 
1300   delay_secs = zfpm_calc_connect_delay();
1301   zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1302 
1303   THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1304 		   delay_secs);
1305   zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1306 }
1307 
1308 /*
1309  * zfpm_is_enabled
1310  *
1311  * Returns TRUE if the zebra FPM module has been enabled.
1312  */
1313 static inline int
zfpm_is_enabled(void)1314 zfpm_is_enabled (void)
1315 {
1316   return zfpm_g->enabled;
1317 }
1318 
1319 /*
1320  * zfpm_conn_is_up
1321  *
1322  * Returns TRUE if the connection to the FPM is up.
1323  */
1324 static inline int
zfpm_conn_is_up(void)1325 zfpm_conn_is_up (void)
1326 {
1327   if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1328     return 0;
1329 
1330   assert (zfpm_g->sock >= 0);
1331 
1332   return 1;
1333 }
1334 
1335 /*
1336  * zfpm_trigger_update
1337  *
1338  * The zebra code invokes this function to indicate that we should
1339  * send an update to the FPM about the given route_node.
1340  */
1341 void
zfpm_trigger_update(struct route_node * rn,const char * reason)1342 zfpm_trigger_update (struct route_node *rn, const char *reason)
1343 {
1344   rib_dest_t *dest;
1345   char buf[PREFIX_STRLEN];
1346 
1347   /*
1348    * Ignore if the connection is down. We will update the FPM about
1349    * all destinations once the connection comes up.
1350    */
1351   if (!zfpm_conn_is_up ())
1352     return;
1353 
1354   dest = rib_dest_from_rnode (rn);
1355 
1356   /*
1357    * Ignore the trigger if the dest is not in a table that we would
1358    * send to the FPM.
1359    */
1360   if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1361     {
1362       zfpm_g->stats.non_fpm_table_triggers++;
1363       return;
1364     }
1365 
1366   if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1367     zfpm_g->stats.redundant_triggers++;
1368     return;
1369   }
1370 
1371   if (reason)
1372     {
1373       zfpm_debug ("%s triggering update to FPM - Reason: %s",
1374 		  prefix2str (&rn->p, buf, sizeof(buf)), reason);
1375     }
1376 
1377   SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1378   TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1379   zfpm_g->stats.updates_triggered++;
1380 
1381   /*
1382    * Make sure that writes are enabled.
1383    */
1384   if (zfpm_g->t_write)
1385     return;
1386 
1387   zfpm_write_on ();
1388 }
1389 
1390 /*
1391  * zfpm_stats_timer_cb
1392  */
1393 static int
zfpm_stats_timer_cb(struct thread * t)1394 zfpm_stats_timer_cb (struct thread *t)
1395 {
1396   assert (zfpm_g->t_stats);
1397   zfpm_g->t_stats = NULL;
1398 
1399   /*
1400    * Remember the stats collected in the last interval for display
1401    * purposes.
1402    */
1403   zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1404 
1405   /*
1406    * Add the current set of stats into the cumulative statistics.
1407    */
1408   zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1409 		      &zfpm_g->cumulative_stats);
1410 
1411   /*
1412    * Start collecting stats afresh over the next interval.
1413    */
1414   zfpm_stats_reset (&zfpm_g->stats);
1415 
1416   zfpm_start_stats_timer ();
1417 
1418   return 0;
1419 }
1420 
1421 /*
1422  * zfpm_stop_stats_timer
1423  */
1424 static void
zfpm_stop_stats_timer(void)1425 zfpm_stop_stats_timer (void)
1426 {
1427   if (!zfpm_g->t_stats)
1428     return;
1429 
1430   zfpm_debug ("Stopping existing stats timer");
1431   THREAD_TIMER_OFF (zfpm_g->t_stats);
1432 }
1433 
1434 /*
1435  * zfpm_start_stats_timer
1436  */
1437 void
zfpm_start_stats_timer(void)1438 zfpm_start_stats_timer (void)
1439 {
1440   assert (!zfpm_g->t_stats);
1441 
1442   THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1443 		   ZFPM_STATS_IVL_SECS);
1444 }
1445 
1446 /*
1447  * Helper macro for zfpm_show_stats() below.
1448  */
1449 #define ZFPM_SHOW_STAT(counter)						\
1450   do {									\
1451     vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter,	\
1452 	     zfpm_g->last_ivl_stats.counter, VTY_NEWLINE);		\
1453   } while (0)
1454 
1455 /*
1456  * zfpm_show_stats
1457  */
1458 static void
zfpm_show_stats(struct vty * vty)1459 zfpm_show_stats (struct vty *vty)
1460 {
1461   zfpm_stats_t total_stats;
1462   time_t elapsed;
1463 
1464   vty_out (vty, "%s%-40s %10s     Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1465 	   "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1466 
1467   /*
1468    * Compute the total stats up to this instant.
1469    */
1470   zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1471 		      &total_stats);
1472 
1473   ZFPM_SHOW_STAT (connect_calls);
1474   ZFPM_SHOW_STAT (connect_no_sock);
1475   ZFPM_SHOW_STAT (read_cb_calls);
1476   ZFPM_SHOW_STAT (write_cb_calls);
1477   ZFPM_SHOW_STAT (write_calls);
1478   ZFPM_SHOW_STAT (partial_writes);
1479   ZFPM_SHOW_STAT (max_writes_hit);
1480   ZFPM_SHOW_STAT (t_write_yields);
1481   ZFPM_SHOW_STAT (nop_deletes_skipped);
1482   ZFPM_SHOW_STAT (route_adds);
1483   ZFPM_SHOW_STAT (route_dels);
1484   ZFPM_SHOW_STAT (updates_triggered);
1485   ZFPM_SHOW_STAT (non_fpm_table_triggers);
1486   ZFPM_SHOW_STAT (redundant_triggers);
1487   ZFPM_SHOW_STAT (dests_del_after_update);
1488   ZFPM_SHOW_STAT (t_conn_down_starts);
1489   ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1490   ZFPM_SHOW_STAT (t_conn_down_yields);
1491   ZFPM_SHOW_STAT (t_conn_down_finishes);
1492   ZFPM_SHOW_STAT (t_conn_up_starts);
1493   ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1494   ZFPM_SHOW_STAT (t_conn_up_yields);
1495   ZFPM_SHOW_STAT (t_conn_up_aborts);
1496   ZFPM_SHOW_STAT (t_conn_up_finishes);
1497 
1498   if (!zfpm_g->last_stats_clear_time)
1499     return;
1500 
1501   elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1502 
1503   vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1504 	   (unsigned long) elapsed, VTY_NEWLINE);
1505 }
1506 
1507 /*
1508  * zfpm_clear_stats
1509  */
1510 static void
zfpm_clear_stats(struct vty * vty)1511 zfpm_clear_stats (struct vty *vty)
1512 {
1513   if (!zfpm_is_enabled ())
1514     {
1515       vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1516       return;
1517     }
1518 
1519   zfpm_stats_reset (&zfpm_g->stats);
1520   zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1521   zfpm_stats_reset (&zfpm_g->cumulative_stats);
1522 
1523   zfpm_stop_stats_timer ();
1524   zfpm_start_stats_timer ();
1525 
1526   zfpm_g->last_stats_clear_time = zfpm_get_time();
1527 
1528   vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1529 }
1530 
1531 /*
1532  * show_zebra_fpm_stats
1533  */
1534 DEFUN (show_zebra_fpm_stats,
1535        show_zebra_fpm_stats_cmd,
1536        "show zebra fpm stats",
1537        SHOW_STR
1538        "Zebra information\n"
1539        "Forwarding Path Manager information\n"
1540        "Statistics\n")
1541 {
1542   zfpm_show_stats (vty);
1543   return CMD_SUCCESS;
1544 }
1545 
1546 /*
1547  * clear_zebra_fpm_stats
1548  */
1549 DEFUN (clear_zebra_fpm_stats,
1550        clear_zebra_fpm_stats_cmd,
1551        "clear zebra fpm stats",
1552        CLEAR_STR
1553        "Zebra information\n"
1554        "Clear Forwarding Path Manager information\n"
1555        "Statistics\n")
1556 {
1557   zfpm_clear_stats (vty);
1558   return CMD_SUCCESS;
1559 }
1560 
1561 /*
1562  * update fpm connection information
1563  */
1564 DEFUN ( fpm_remote_ip,
1565         fpm_remote_ip_cmd,
1566         "fpm connection ip A.B.C.D port <1-65535>",
1567         "fpm connection remote ip and port\n"
1568         "Remote fpm server ip A.B.C.D\n"
1569         "Enter ip ")
1570 {
1571 
1572    in_addr_t fpm_server;
1573    uint32_t port_no;
1574 
1575    fpm_server = inet_addr (argv[0]);
1576    if (fpm_server == INADDR_NONE)
1577      return CMD_ERR_INCOMPLETE;
1578 
1579    port_no = atoi (argv[1]);
1580    if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1581      return CMD_ERR_INCOMPLETE;
1582 
1583    zfpm_g->fpm_server = fpm_server;
1584    zfpm_g->fpm_port = port_no;
1585 
1586 
1587    return CMD_SUCCESS;
1588 }
1589 
1590 DEFUN ( no_fpm_remote_ip,
1591         no_fpm_remote_ip_cmd,
1592         "no fpm connection ip A.B.C.D port <1-65535>",
1593         "fpm connection remote ip and port\n"
1594         "Connection\n"
1595         "Remote fpm server ip A.B.C.D\n"
1596         "Enter ip ")
1597 {
1598    if (zfpm_g->fpm_server != inet_addr (argv[0]) ||
1599               zfpm_g->fpm_port !=  atoi (argv[1]))
1600        return CMD_ERR_NO_MATCH;
1601 
1602    zfpm_g->fpm_server = FPM_DEFAULT_IP;
1603    zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1604 
1605    return CMD_SUCCESS;
1606 }
1607 
1608 
1609 /*
1610  * zfpm_init_message_format
1611  */
1612 static inline void
zfpm_init_message_format(const char * format)1613 zfpm_init_message_format (const char *format)
1614 {
1615   int have_netlink, have_protobuf;
1616 
1617   have_netlink = have_protobuf = 0;
1618 
1619 #ifdef HAVE_NETLINK
1620   have_netlink = 1;
1621 #endif
1622 
1623 #ifdef HAVE_PROTOBUF
1624   have_protobuf = 1;
1625 #endif
1626 
1627   zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1628 
1629   if (!format)
1630     {
1631       if (have_netlink)
1632 	{
1633 	  zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1634 	}
1635       else if (have_protobuf)
1636 	{
1637 	  zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1638 	}
1639       return;
1640     }
1641 
1642   if (!strcmp ("netlink", format))
1643     {
1644       if (!have_netlink)
1645 	{
1646 	  zlog_err ("FPM netlink message format is not available");
1647 	  return;
1648 	}
1649       zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1650       return;
1651     }
1652 
1653   if (!strcmp ("protobuf", format))
1654     {
1655       if (!have_protobuf)
1656 	{
1657 	  zlog_err ("FPM protobuf message format is not available");
1658 	  return;
1659 	}
1660       zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1661       return;
1662     }
1663 
1664   zlog_warn ("Unknown fpm format '%s'", format);
1665 }
1666 
1667 /**
1668  * fpm_remote_srv_write
1669  *
1670  * Module to write remote fpm connection
1671  *
1672  * Returns ZERO on success.
1673  */
1674 
fpm_remote_srv_write(struct vty * vty)1675 int fpm_remote_srv_write (struct vty *vty )
1676 {
1677    struct in_addr in;
1678 
1679    in.s_addr = zfpm_g->fpm_server;
1680 
1681    if (zfpm_g->fpm_server != FPM_DEFAULT_IP ||
1682           zfpm_g->fpm_port != FPM_DEFAULT_PORT)
1683       vty_out (vty,"fpm connection ip %s port %d%s", inet_ntoa (in),zfpm_g->fpm_port,VTY_NEWLINE);
1684 
1685    return 0;
1686 }
1687 
1688 
1689 /**
1690  * zfpm_init
1691  *
1692  * One-time initialization of the Zebra FPM module.
1693  *
1694  * @param[in] port port at which FPM is running.
1695  * @param[in] enable TRUE if the zebra FPM module should be enabled
1696  * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1697  *
1698  * Returns TRUE on success.
1699  */
1700 int
zfpm_init(struct thread_master * master,int enable,uint16_t port,const char * format)1701 zfpm_init (struct thread_master *master, int enable, uint16_t port,
1702 	   const char *format)
1703 {
1704   static int initialized = 0;
1705 
1706   if (initialized) {
1707     return 1;
1708   }
1709 
1710   initialized = 1;
1711 
1712   memset (zfpm_g, 0, sizeof (*zfpm_g));
1713   zfpm_g->master = master;
1714   TAILQ_INIT(&zfpm_g->dest_q);
1715   zfpm_g->sock = -1;
1716   zfpm_g->state = ZFPM_STATE_IDLE;
1717 
1718   zfpm_stats_init (&zfpm_g->stats);
1719   zfpm_stats_init (&zfpm_g->last_ivl_stats);
1720   zfpm_stats_init (&zfpm_g->cumulative_stats);
1721 
1722   install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1723   install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1724   install_element (CONFIG_NODE, &fpm_remote_ip_cmd);
1725   install_element (CONFIG_NODE, &no_fpm_remote_ip_cmd);
1726 
1727   zfpm_init_message_format(format);
1728 
1729   /*
1730    * Disable FPM interface if no suitable format is available.
1731    */
1732   if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1733       enable = 0;
1734 
1735   zfpm_g->enabled = enable;
1736 
1737   if (!enable) {
1738     return 1;
1739   }
1740 
1741   if (!zfpm_g->fpm_server)
1742      zfpm_g->fpm_server = FPM_DEFAULT_IP;
1743 
1744   if (!port)
1745     port = FPM_DEFAULT_PORT;
1746 
1747   zfpm_g->fpm_port = port;
1748 
1749   zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1750   zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1751 
1752   zfpm_start_stats_timer ();
1753   zfpm_start_connect_timer ("initialized");
1754 
1755   return 1;
1756 }
1757