1 /*
2  *
3   ***** BEGIN LICENSE BLOCK *****
4 
5   Copyright (C) 2009-2019 Olof Hagsand and Benny Holmgren
6 
7   This file is part of CLIXON.
8 
9   Licensed under the Apache License, Version 2.0 (the "License");
10   you may not use this file except in compliance with the License.
11   You may obtain a copy of the License at
12 
13     http://www.apache.org/licenses/LICENSE-2.0
14 
15   Unless required by applicable law or agreed to in writing, software
16   distributed under the License is distributed on an "AS IS" BASIS,
17   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18   See the License for the specific language governing permissions and
19   limitations under the License.
20 
21   Alternatively, the contents of this file may be used under the terms of
22   the GNU General Public License Version 3 or later (the "GPL"),
23   in which case the provisions of the GPL are applicable instead
24   of those above. If you wish to allow use of your version of this file only
25   under the terms of the GPL, and not to allow others to
26   use your version of this file under the terms of Apache License version 2,
27   indicate your decision by deleting the provisions above and replace them with
28   the  notice and other provisions required by the GPL. If you do not delete
29   the provisions above, a recipient may use your version of this file under
30   the terms of any one of the Apache License version 2 or the GPL.
31 
32   ***** END LICENSE BLOCK *****
33 
34  * Event notification streams according to RFC5277
35  * The stream implementation has three parts:
36  * 1) Base stream handling: stream_find/register/delete_all/get_xml
37  * 2) Stream subscription handling (stream_ss_add/delete/timeout, stream_notify, etc
38  * 3) Stream replay: stream_replay/_add
39  * 4) nginx/nchan publish code (use --enable-publish config option)
40  *
41  *
42  *             +---------------+  1             arg
43  *             | client_entry  | <----------------- +---------------+
44  *             +---------------+                +-->| subscription  |
45  *                                            /     +---------------+
46  * +---------------+        * +---------------+
47  * | clicon_handle |--------->| event_stream  |
48  * +---------------+          +---------------+
49  *                                             \  * +---------------+
50  *                                              +-->| replay        |
51  *                                                  +---------------+
52 
53  */
54 
55 #ifdef HAVE_CONFIG_H
56 #include "clixon_config.h" /* generated by config & autoconf */
57 #endif
58 
59 #include <stdio.h>
60 #include <stdlib.h>
61 #include <string.h>
62 #include <errno.h>
63 #include <inttypes.h>
64 #include <syslog.h>
65 #include <sys/time.h>
66 
67 /* cligen */
68 #include <cligen/cligen.h>
69 
70 /* clicon */
71 #include "clixon_queue.h"
72 #include "clixon_err.h"
73 #include "clixon_log.h"
74 #include "clixon_event.h"
75 #include "clixon_string.h"
76 #include "clixon_hash.h"
77 #include "clixon_handle.h"
78 #include "clixon_yang.h"
79 #include "clixon_xml.h"
80 #include "clixon_xml_io.h"
81 #include "clixon_options.h"
82 #include "clixon_data.h"
83 #include "clixon_xpath_ctx.h"
84 #include "clixon_xpath.h"
85 #include "clixon_stream.h"
86 
87 /* Go through and timeout subscription timers [s] */
88 #define STREAM_TIMER_TIMEOUT_S 5
89 
90 /*! Find an event notification stream given name
91  * @param[in]  h    Clicon handle
92  * @param[in]  name Name of stream
93  * @retval     es   Event notification stream structure
94  * @retval     NULL Not found
95  */
96 event_stream_t *
stream_find(clicon_handle h,const char * name)97 stream_find(clicon_handle h,
98 	    const char   *name)
99 {
100     event_stream_t *es0;
101     event_stream_t *es = NULL;
102 
103     es0 = clicon_stream(h);
104     if ((es = es0) != NULL)
105 	do {
106 	    if (strcmp(name, es->es_name)==0)
107 		return es;
108 	    es = NEXTQ(struct event_stream *, es);
109 	} while (es && es != es0);
110     return NULL;
111 }
112 
113 /*! Add notification event stream
114  * @param[in]  h              Clicon handle
115  * @param[in]  name           Name of stream
116  * @param[in]  description    Description of stream
117  * @param[in]  replay_enabled Set if replay possible in stream
118  * @param[in]  retention      For replay buffer how much relative to save
119  */
120 int
stream_add(clicon_handle h,const char * name,const char * description,const int replay_enabled,struct timeval * retention)121 stream_add(clicon_handle   h,
122 	   const char     *name,
123 	   const char     *description,
124 	   const int       replay_enabled,
125 	   struct timeval *retention)
126 {
127     int             retval = -1;
128     event_stream_t *es;
129 
130     if ((es = stream_find(h, name)) != NULL)
131 	goto ok;
132     if ((es = malloc(sizeof(event_stream_t))) == NULL){
133 	clicon_err(OE_XML, errno, "malloc");
134 	goto done;
135     }
136     memset(es, 0, sizeof(event_stream_t));
137     if ((es->es_name = strdup(name)) == NULL){
138 	clicon_err(OE_XML, errno, "strdup");
139 	goto done;
140     }
141     if ((es->es_description = strdup(description)) == NULL){
142 	clicon_err(OE_XML, errno, "strdup");
143 	goto done;
144     }
145     es->es_replay_enabled = replay_enabled;
146     if (retention)
147 	es->es_retention = *retention;
148     clicon_stream_append(h, es);
149  ok:
150     retval = 0;
151  done:
152     return retval;
153 }
154 
155 /*! Delete complete notification event stream list (not just single stream)
156  * @param[in] h     Clicon handle
157  * @param[in] force Force deletion of
158  */
159 int
stream_delete_all(clicon_handle h,int force)160 stream_delete_all(clicon_handle h,
161 		  int           force)
162 {
163     struct stream_replay *r;
164     struct stream_subscription *ss;
165     event_stream_t       *es;
166     event_stream_t       *head = clicon_stream(h);
167 
168     while ((es = clicon_stream(h)) != NULL){
169 	DELQ(es, head, event_stream_t *);
170 	clicon_stream_set(h, head);
171 	if (es->es_name)
172 	    free(es->es_name);
173 	if (es->es_description)
174 	    free(es->es_description);
175 	while ((ss = es->es_subscription) != NULL)
176 	    stream_ss_rm(h, es, ss, force); /* XXX in some cases leaks memory due to DONT clause in stream_ss_rm() */
177 	while ((r = es->es_replay) != NULL){
178 	    DELQ(r, es->es_replay, struct stream_replay *);
179 	    if (r->r_xml)
180 		xml_free(r->r_xml);
181 	    free(r);
182 	}
183 	free(es);
184     }
185     return 0;
186 }
187 
188 /*! Return stream definition state in XML supporting RFC 8040 and RFC5277
189  * @param[in]  h      Clicon handle
190  * @param[in]  access If set, include access/location
191  * @param[out] cb     Output buffer containing XML on exit
192  * @retval     0      OK
193  * @retval     -1     Error
194  */
195 int
stream_get_xml(clicon_handle h,int access,cbuf * cb)196 stream_get_xml(clicon_handle h,
197 	       int           access,
198 	       cbuf         *cb)
199 {
200     event_stream_t *es = NULL;
201     char           *url_prefix;
202     char           *stream_path;
203 
204     cprintf(cb, "<streams>");
205     if ((es = clicon_stream(h)) != NULL){
206 	do {
207 	    cprintf(cb, "<stream>");
208 	    cprintf(cb, "<name>%s</name>", es->es_name);
209 	    if (es->es_description)
210 		cprintf(cb, "<description>%s</description>", es->es_description);
211 	    cprintf(cb, "<replay-support>%s</replay-support>",
212 		    es->es_replay_enabled?"true":"false");
213 	    if (access){
214 		cprintf(cb, "<access>");
215 		cprintf(cb, "<encoding>xml</encoding>");
216 		url_prefix = clicon_option_str(h, "CLICON_STREAM_URL");
217 		stream_path = clicon_option_str(h, "CLICON_STREAM_PATH");
218 		cprintf(cb, "<location>%s/%s/%s</location>",
219 			url_prefix, stream_path, es->es_name);
220 		cprintf(cb, "</access>");
221 	    }
222 	    cprintf(cb, "</stream>");
223 	    es = NEXTQ(struct event_stream *, es);
224 	} while (es && es != clicon_stream(h));
225     }
226     cprintf(cb, "</streams>");
227     return 0;
228 }
229 
230 /*! Check all stream subscription stop timers, set up new timer
231  * @param[in] fd   No-op
232  * @param[in] arg  Clicon handle
233  * @note format is given by clixon_event_reg_timeout callback function (fd not needed)
234  */
235 int
stream_timer_setup(int fd,void * arg)236 stream_timer_setup(int   fd,
237 		   void *arg)
238 {
239     int                          retval = -1;
240     clicon_handle                h = (clicon_handle)arg;
241     struct timeval               now;
242     struct timeval               t;
243     struct timeval               t1 = {STREAM_TIMER_TIMEOUT_S, 0};
244     struct timeval               tret;
245     event_stream_t              *es;
246     struct stream_subscription  *ss;
247     struct stream_subscription  *ss1;
248     struct stream_replay        *r;
249     struct stream_replay        *r1;
250 
251     clicon_debug(2, "%s", __FUNCTION__);
252     /* Go thru callbacks and see if any have timed out, if so remove them
253      * Could also be done by a separate timer.
254      */
255     gettimeofday(&now, NULL);
256     /* For all event streams:
257      * 1) Go through subscriptions, if stop-time and its past, remove it
258      * XXX: but client may not be closed
259      * 2) Go throughreplay buffer and remove entries with passed retention time
260      */
261     if ((es = clicon_stream(h)) != NULL){
262 	do {
263    /* 1) Go through subscriptions, if stop-time and its past, remove it */
264 	    if ((ss = es->es_subscription) != NULL)
265 		do {
266 		    if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){
267 			ss1 = NEXTQ(struct stream_subscription *, ss);
268 			/* Signal to remove stream for upper levels */
269 			if (stream_ss_rm(h, es, ss, 0) < 0)
270 			    goto done;
271 			ss = ss1;
272 		    }
273 		    else
274 			ss = NEXTQ(struct stream_subscription *, ss);
275 		} while (ss && ss != es->es_subscription);
276   /* 2) Go throughreplay buffer and remove entries with passed retention time */
277 	    if (timerisset(&es->es_retention) &&
278 		(r = es->es_replay) != NULL){
279 		timersub(&now, &es->es_retention, &tret);
280 		do {
281 		    if (timercmp(&r->r_tv, &tret, <)){
282 			r1 = NEXTQ(struct stream_replay *, r);
283 			DELQ(r, es->es_replay, struct stream_replay *);
284 			if (r->r_xml)
285 			    xml_free(r->r_xml);
286 			free(r);
287 			r = r1;
288 		    }
289 		    else
290 			r = NEXTQ(struct stream_replay *, r);
291 		} while (r && r!=es->es_replay);
292 	    }
293 	    es = NEXTQ(struct event_stream *, es);
294 	} while (es && es != clicon_stream(h));
295     }
296     /* Initiate new timer */
297     timeradd(&now, &t1, &t);
298     if (clixon_event_reg_timeout(t,
299 			  stream_timer_setup, /* this function */
300 			  h,                  /* clicon handle */
301 			  "stream timer setup") < 0)
302 	goto done;
303     retval = 0;
304  done:
305     return retval;
306 }
307 
308 #ifdef NYI
309 /*! Delete single notification event stream
310  * XXX notused
311  */
312 int
stream_del()313 stream_del()
314 {
315     return 0;
316 }
317 #endif
318 
319 /*! Add an event notification callback to a stream given a callback function
320  * @param[in]  h        Clicon handle
321  * @param[in]  stream   Name of stream
322  * @param[in]  xpath    Filter selector - xpath
323  * @param[in]  startime If set, Make a replay
324  * @param[in]  stoptime If set, dont continue past this time
325  * @param[in]  fn       Callback when event occurs
326  * @param[in]  arg      Argument to use with callback. Also handle when deleting
327  * @retval     0        OK
328  * @retval     -1       Error, ie no such stream
329  */
330 struct stream_subscription *
stream_ss_add(clicon_handle h,char * stream,char * xpath,struct timeval * starttime,struct timeval * stoptime,stream_fn_t fn,void * arg)331 stream_ss_add(clicon_handle     h,
332 	      char             *stream,
333 	      char             *xpath,
334 	      struct timeval   *starttime,
335 	      struct timeval   *stoptime,
336 	      stream_fn_t       fn,
337 	      void             *arg)
338 {
339     event_stream_t             *es;
340     struct stream_subscription *ss = NULL;
341 
342     clicon_debug(1, "%s", __FUNCTION__);
343     if ((es = stream_find(h, stream)) == NULL){
344 	clicon_err(OE_CFG, ENOENT, "Stream %s not found", stream);
345 	goto done;
346     }
347     if ((ss = malloc(sizeof(*ss))) == NULL){
348 	clicon_err(OE_CFG, errno, "malloc");
349 	goto done;
350     }
351     memset(ss, 0, sizeof(*ss));
352     if ((ss->ss_stream = strdup(stream)) == NULL){
353 	clicon_err(OE_CFG, errno, "strdup");
354 	goto done;
355     }
356     if (stoptime)
357 	ss->ss_stoptime = *stoptime;
358     if (starttime)
359 	ss->ss_starttime = *starttime;
360     if (xpath && (ss->ss_xpath = strdup(xpath)) == NULL){
361 	clicon_err(OE_CFG, errno, "strdup");
362 	goto done;
363     }
364     ss->ss_fn     = fn;
365     ss->ss_arg    = arg;
366     ADDQ(ss, es->es_subscription);
367     return ss;
368   done:
369     if (ss)
370 	free(ss);
371     return NULL;
372 }
373 
374 /*! Delete event stream subscription to a stream given a callback and arg
375  * @param[in]  h      Clicon handle
376  * @param[in]  stream Name of stream or NULL for all streams
377  * @param[in]  fn     Callback when event occurs
378  * @param[in]  arg    Argument to use with callback. Also handle when deleting
379  * @retval     0      OK
380  * @retval     -1     Error
381  */
382 int
stream_ss_rm(clicon_handle h,event_stream_t * es,struct stream_subscription * ss,int force)383 stream_ss_rm(clicon_handle                h,
384 	     event_stream_t              *es,
385 	     struct stream_subscription  *ss,
386 	     int                          force)
387 {
388     clicon_debug(1, "%s", __FUNCTION__);
389     DELQ(ss, es->es_subscription, struct stream_subscription *);
390     /* Remove from upper layers - close socket etc. */
391     (*ss->ss_fn)(h, 1, NULL, ss->ss_arg);
392     if (force){
393 	if (ss->ss_stream)
394 	    free(ss->ss_stream);
395 	if (ss->ss_xpath)
396 	    free(ss->ss_xpath);
397 	free(ss);
398     }
399     clicon_debug(1, "%s retval: 0", __FUNCTION__);
400     return 0;
401 }
402 
403 /*! Find stream callback given callback function and its (unique) argument
404  * @param[in]  es   Pointer to event stream
405  * @param[in]  fn   Stream callback
406  * @param[in]  arg  Argument - typically unique client handle
407  * @retval     ss   Event stream subscription structure
408  * @retval     NULL Not found
409  */
410 struct stream_subscription *
stream_ss_find(event_stream_t * es,stream_fn_t fn,void * arg)411 stream_ss_find(event_stream_t   *es,
412 	       stream_fn_t       fn,
413 	       void             *arg)
414 {
415     struct stream_subscription  *ss;
416 
417     if ((ss = es->es_subscription) != NULL)
418 	do {
419 	    if (fn == ss->ss_fn && arg == ss->ss_arg)
420 		return ss;
421 	    ss = NEXTQ(struct stream_subscription *, ss);
422 	} while (ss && ss != es->es_subscription);
423     return NULL;
424 }
425 
426 /*! Remove stream subscription identified with fn and arg in all streams
427  * @param[in] h       Clicon handle
428  * @param[in] fn      Stream callback
429  * @param[in] arg     Argument - typically unique client handle
430  * @see stream_ss_delete  For single stream
431  */
432 int
stream_ss_delete_all(clicon_handle h,stream_fn_t fn,void * arg)433 stream_ss_delete_all(clicon_handle     h,
434 		     stream_fn_t       fn,
435 		     void             *arg)
436 {
437     int                          retval = -1;
438     event_stream_t              *es;
439     struct stream_subscription  *ss;
440 
441     if ((es = clicon_stream(h)) != NULL){
442 	do {
443 	    if ((ss = stream_ss_find(es, fn, arg)) != NULL){
444 		if (stream_ss_rm(h, es, ss, 1) < 0)
445 		    goto done;
446 	    }
447 	    es = NEXTQ(struct event_stream *, es);
448 	} while (es && es != clicon_stream(h));
449     }
450     retval = 0;
451  done:
452     return retval;
453 }
454 
455 /*! Delete a single stream
456  * @see stream_ss_delete_all (merge with this?)
457  */
458 int
stream_ss_delete(clicon_handle h,char * name,stream_fn_t fn,void * arg)459 stream_ss_delete(clicon_handle     h,
460 		 char             *name,
461 		 stream_fn_t       fn,
462 		 void             *arg)
463 {
464     int                          retval = -1;
465     event_stream_t              *es;
466     struct stream_subscription  *ss;
467 
468     if ((es = clicon_stream(h)) != NULL){
469 	do {
470 	    if (strcmp(name, es->es_name)==0)
471 		if ((ss = stream_ss_find(es, fn, arg)) != NULL){
472 		    if (stream_ss_rm(h, es, ss, 0) < 0)
473 			goto done;
474 		}
475 	    es = NEXTQ(struct event_stream *, es);
476 	} while (es && es != clicon_stream(h));
477     }
478     retval = 0;
479  done:
480     return retval;
481 }
482 
483 /*! Stream notify event and distribute to all registered callbacks
484  * @param[in]  h       Clicon handle
485  * @param[in]  stream  Name of event stream. CLICON is predefined as LOG stream
486  * @param[in]  tv      Timestamp. Dont notify if subscription has stoptime<tv
487  * @param[in]  event   Notification as xml tree
488  * @retval  0  OK
489  * @retval -1  Error with clicon_err called
490  * @see stream_notify
491  * @see stream_ss_timeout where subscriptions are removed if stoptime<now
492  */
493 static int
stream_notify1(clicon_handle h,event_stream_t * es,struct timeval * tv,cxobj * xevent)494 stream_notify1(clicon_handle   h,
495 	       event_stream_t *es,
496 	       struct timeval *tv,
497 	       cxobj          *xevent)
498 {
499     int                         retval = -1;
500     struct stream_subscription *ss;
501 
502     clicon_debug(2, "%s", __FUNCTION__);
503     /* Go thru all subscriptions and find matches */
504     if ((ss = es->es_subscription) != NULL)
505 	do {
506 	    if (timerisset(&ss->ss_stoptime) && /* stoptime has passed */
507 		timercmp(&ss->ss_stoptime, tv, <)){
508 		struct stream_subscription *ss1;
509 		ss1 = NEXTQ(struct stream_subscription *, ss);
510 		/* Signal to remove stream for upper levels */
511 		if (stream_ss_rm(h, es, ss, 1) < 0)
512 		    goto done;
513 		ss = ss1;
514 	    }
515 	    else{  /* xpath match */
516 		if (ss->ss_xpath == NULL ||
517 		    strlen(ss->ss_xpath)==0 ||
518 		    xpath_first(xevent, NULL, "%s", ss->ss_xpath) != NULL)
519 		    if ((*ss->ss_fn)(h, 0, xevent, ss->ss_arg) < 0)
520 			goto done;
521 		ss = NEXTQ(struct stream_subscription *, ss);
522 	    }
523 	} while (es->es_subscription && ss != es->es_subscription);
524     retval = 0;
525   done:
526     return retval;
527 }
528 
529 /*! Stream notify event and distribute to all registered callbacks
530  * @param[in]  h       Clicon handle
531  * @param[in]  stream  Name of event stream. CLICON is predefined as LOG stream
532  * @param[in]  event   Notification as format string according to printf(3)
533  * @retval  0  OK
534  * @retval -1  Error with clicon_err called
535  * @code
536  *  if (stream_notify(h, "NETCONF", "<event><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>") < 0)
537  *    err;
538  * @endcode
539  * @see stream_notify1 Internal
540  */
541 int
stream_notify(clicon_handle h,char * stream,const char * event,...)542 stream_notify(clicon_handle h,
543 	      char         *stream,
544 	      const char   *event, ...)
545 {
546     int        retval = -1;
547     va_list    args;
548     int        len;
549     cxobj     *xev = NULL;
550     yang_stmt *yspec = NULL;
551     char      *str = NULL;
552     cbuf      *cb = NULL;
553     char       timestr[28];
554     struct timeval tv;
555     event_stream_t *es;
556 
557     clicon_debug(2, "%s", __FUNCTION__);
558     if ((es = stream_find(h, stream)) == NULL)
559 	goto ok;
560     va_start(args, event);
561     len = vsnprintf(NULL, 0, event, args) + 1;
562     va_end(args);
563     if ((str = malloc(len)) == NULL){
564 	clicon_err(OE_UNIX, errno, "malloc");
565 	goto done;
566     }
567     memset(str, 0, len);
568     va_start(args, event);
569     len = vsnprintf(str, len, event, args) + 1;
570     va_end(args);
571     if ((yspec = clicon_dbspec_yang(h)) == NULL){
572 	clicon_err(OE_YANG, 0, "No yang spec");
573 	goto done;
574     }
575     if ((cb = cbuf_new()) == NULL){
576 	clicon_err(OE_UNIX, errno, "cbuf_new");
577 	goto done;
578     }
579     gettimeofday(&tv, NULL);
580     if (time2str(tv, timestr, sizeof(timestr)) < 0){
581 	clicon_err(OE_UNIX, errno, "time2str");
582 	goto done;
583     }
584     /* From RFC5277 */
585     cprintf(cb, "<notification xmlns=\"%s\"><eventTime>%s</eventTime>%s</notification>",
586 	    NOTIFICATION_RFC5277_NAMESPACE, timestr, str);
587     if (clixon_xml_parse_string(cbuf_get(cb), YB_MODULE, yspec, &xev, NULL) < 0)
588 	goto done;
589     if (xml_rootchild(xev, 0, &xev) < 0)
590 	goto done;
591     if (stream_notify1(h, es, &tv, xev) < 0)
592 	goto done;
593     if (es->es_replay_enabled){
594 	if (stream_replay_add(es, &tv, xev) < 0)
595 	    goto done;
596 	xev = NULL; /* xml stored in replay_add and should not be freed */
597     }
598  ok:
599     retval = 0;
600   done:
601     if (cb)
602 	cbuf_free(cb);
603     if (xev)
604 	xml_free(xev);
605     if (str)
606 	free(str);
607     return retval;
608 }
609 
610 /*! Backward compatible function
611  * @param[in]  h       Clicon handle
612  * @param[in]  stream  Name of event stream. CLICON is predefined as LOG stream
613  * @param[in]  xml     Notification as XML stream. Is copied.
614  * @retval  0  OK
615  * @retval -1  Error with clicon_err called
616  * @see  stream_notify  Should be merged with this
617  */
618 int
stream_notify_xml(clicon_handle h,char * stream,cxobj * xml)619 stream_notify_xml(clicon_handle h,
620 		  char         *stream,
621 		  cxobj        *xml)
622 {
623     int        retval = -1;
624     cxobj     *xev = NULL;
625     cxobj     *xml2; /* copy */
626     yang_stmt *yspec = NULL;
627     char      *str = NULL; /* XXX: never set */
628     cbuf      *cb = NULL;
629     char       timestr[28];
630     struct timeval tv;
631     event_stream_t *es;
632 
633     clicon_debug(2, "%s", __FUNCTION__);
634     if ((es = stream_find(h, stream)) == NULL)
635 	goto ok;
636     if ((yspec = clicon_dbspec_yang(h)) == NULL){
637 	clicon_err(OE_YANG, 0, "No yang spec");
638 	goto done;
639     }
640     if ((cb = cbuf_new()) == NULL){
641 	clicon_err(OE_UNIX, errno, "cbuf_new");
642 	goto done;
643     }
644     gettimeofday(&tv, NULL);
645     if (time2str(tv, timestr, sizeof(timestr)) < 0){
646 	clicon_err(OE_UNIX, errno, "time2str");
647 	goto done;
648     }
649     cprintf(cb, "<notification xmlns=\"%s\"><eventTime>%s</eventTime>NULL</notification>",
650 	    NOTIFICATION_RFC5277_NAMESPACE,
651 	    timestr); /* XXX str is always NULL */
652     if (clixon_xml_parse_string(cbuf_get(cb), YB_NONE, yspec, &xev, NULL) < 0)
653 	goto done;
654     if (xml_rootchild(xev, 0, &xev) < 0)
655 	goto done;
656     if ((xml2 = xml_dup(xml)) == NULL)
657 	goto done;
658     if (xml_addsub(xev, xml2) < 0)
659 	goto done;
660     if (stream_notify1(h, es, &tv, xev) < 0)
661 	goto done;
662     if (es->es_replay_enabled){
663 	if (stream_replay_add(es, &tv, xev) < 0)
664 	    goto done;
665 	xev = NULL; /* xml stored in replay_add and should not be freed */
666     }
667  ok:
668     retval = 0;
669   done:
670     if (cb)
671 	cbuf_free(cb);
672     if (xev)
673 	xml_free(xev);
674     if (str)
675 	free(str);
676     return retval;
677 }
678 
679 
680 /*! Replay a stream by sending notification messages
681  * @see RFC5277 Sec 2.1.1:
682  *  Start Time:
683          A parameter, <startTime>, used to trigger the replay feature
684          and indicate that the replay should start at the time
685          specified.  If <startTime> is not present, this is not a replay
686          subscription.  It is not valid to specify start times that are
687          later than the current time.  If the <startTime> specified is
688          earlier than the log can support, the replay will begin with
689          the earliest available notification.  This parameter is of type
690          dateTime and compliant to [RFC3339].  Implementations must
691          support time zones.
692 
693     Stop Time:
694          An optional parameter, <stopTime>, used with the optional
695          replay feature to indicate the newest notifications of
696          interest.  If <stopTime> is not present, the notifications will
697          continue until the subscription is terminated.  Must be used
698          with and be later than <startTime>.  Values of <stopTime> in
699          the future are valid.  This parameter is of type dateTime and
700          compliant to [RFC3339].  Implementations must support time
701          zones.
702 
703  * Assume no future sample timestamps.
704  */
705 static int
stream_replay_notify(clicon_handle h,event_stream_t * es,struct stream_subscription * ss)706 stream_replay_notify(clicon_handle               h,
707 		     event_stream_t             *es,
708 		     struct stream_subscription *ss)
709 {
710     int                   retval = -1;
711     struct stream_replay *r;
712 
713     /* If <startTime> is not present, this is not a replay */
714     if (!timerisset(&ss->ss_starttime))
715 	goto ok;
716     if (!es->es_replay_enabled)
717 	goto ok;
718     /* Get replay linked list */
719     if ((r = es->es_replay) == NULL)
720 	goto ok;
721     /* First loop to skip until start */
722     do {
723 	if (timercmp(&r->r_tv, &ss->ss_starttime, >=))
724 	    break;
725 	r = NEXTQ(struct stream_replay *, r);
726     } while (r && r!=es->es_replay);
727     if (r == NULL)
728 	goto ok; /* No samples to replay */
729     /* Then notify until stop */
730     do {
731 	if (timerisset(&ss->ss_stoptime) &&
732 	    timercmp(&r->r_tv, &ss->ss_stoptime, >))
733 	    break;
734 	if ((*ss->ss_fn)(h, 0, r->r_xml, ss->ss_arg) < 0)
735 	    goto done;
736 	r = NEXTQ(struct stream_replay *, r);
737     } while (r && r!=es->es_replay);
738  ok:
739     retval = 0;
740  done:
741     return retval;
742 }
743 
744 /*! Add replay sample to stream with timestamp
745  * @param[in] es   Stream
746  * @param[in] tv   Timestamp
747  * @param[in] xv   XML
748  */
749 int
stream_replay_add(event_stream_t * es,struct timeval * tv,cxobj * xv)750 stream_replay_add(event_stream_t *es,
751 		  struct timeval *tv,
752 		  cxobj          *xv)
753 {
754     int                   retval = -1;
755     struct stream_replay *new;
756 
757     if ((new = malloc(sizeof *new)) == NULL){
758 	clicon_err(OE_UNIX, errno, "malloc");
759 	goto done;
760     }
761     memset(new, 0, (sizeof *new));
762     new->r_tv = *tv;
763     new->r_xml = xv;
764     ADDQ(new, es->es_replay);
765     retval = 0;
766  done:
767     return retval;
768 }
769 
770 /* tmp struct for timeout callback containing clicon handle,
771  *  stream and subscription
772  */
773 struct replay_arg{
774     clicon_handle ra_h;
775     char         *ra_stream; /* Name of stream - malloced: free by cb */
776     stream_fn_t   ra_fn;  /* Stream callback */
777     void         *ra_arg; /*  Argument - typically unique client handle */
778 };
779 
780 /*! Timeout callback for replaying stream
781  * @param[in] fd   Ignore
782  * @param[in] arg  tmp struct including clicon handle, stream and subscription
783  */
784 static int
stream_replay_cb(int fd,void * arg)785 stream_replay_cb(int   fd,
786 		 void *arg)
787 {
788     int                         retval = -1;
789     struct replay_arg          *ra= (struct replay_arg*)arg;
790     event_stream_t             *es;
791     struct stream_subscription *ss;
792 
793     if (ra == NULL)
794 	goto ok;
795     if (ra->ra_stream == NULL)
796 	goto ok;
797     if ((es = stream_find(ra->ra_h, ra->ra_stream)) == NULL)
798 	goto ok;
799     if ((ss = stream_ss_find(es, ra->ra_fn, ra->ra_arg)) == NULL)
800 	goto ok;
801     if (stream_replay_notify(ra->ra_h, es, ss) < 0)
802 	goto done;
803  ok:
804     retval = 0;
805  done:
806     if (ra){
807 	if (ra->ra_stream)
808 	    free(ra->ra_stream);
809 	free(ra);
810     }
811     return retval;
812 }
813 
814 /*! Schedule stream replay to occur asap, eg "now"
815  *
816  * @param[in]  h       clicon handle
817  * @param[in]  stream  Name of stream
818  * @param[in] fn       Stream callback
819  * @param[in] arg      Argument - typically unique client handle
820  */
821 int
stream_replay_trigger(clicon_handle h,char * stream,stream_fn_t fn,void * arg)822 stream_replay_trigger(clicon_handle h,
823 		      char         *stream,
824 		      stream_fn_t   fn,
825 		      void         *arg)
826 {
827     int retval = -1;
828     struct timeval now;
829     struct replay_arg *ra;
830 
831     if ((ra = malloc(sizeof(*ra))) == NULL){
832 	clicon_err(OE_UNIX, errno, "malloc");
833 	goto done;
834     }
835     memset(ra, 0, sizeof(*ra));
836     ra->ra_h = h;
837     if ((ra->ra_stream = strdup(stream)) == NULL){
838 	clicon_err(OE_UNIX, errno, "strdup");
839 	goto done;
840     }
841     ra->ra_fn = fn;
842     ra->ra_arg = arg;
843     gettimeofday(&now, NULL);
844     if (clixon_event_reg_timeout(now, stream_replay_cb, ra,
845 			  "create-subscribtion stream replay") < 0)
846 	goto done;
847     retval = 0;
848  done:
849     return retval;
850 }
851 
852 #ifdef CLIXON_PUBLISH_STREAMS
853 /* SSE support using Nginx Nchan. This code needs to be enabled at configure
854  * time using: --enable-publish configure option
855  * It uses CURL and autoconf needs to set that dependency
856  */
857 
858 #include <curl/curl.h>
859 
860 /*
861  * Types (curl)
862  */
863 struct curlbuf{
864     size_t b_len;
865     char  *b_buf;
866 };
867 
868 /*
869  * For the asynchronous case. I think we must handle the case where of many of these
870  * come in before we can handle them in the upper-level polling routine.
871  * realloc. Therefore, we append new data to the userdata buffer.
872  */
873 static size_t
curl_get_cb(void * ptr,size_t size,size_t nmemb,void * userdata)874 curl_get_cb(void *ptr,
875 	    size_t size,
876 	    size_t nmemb,
877 	    void *userdata)
878 {
879     struct curlbuf *buf = (struct curlbuf *)userdata;
880     int len;
881 
882     len = size*nmemb;
883     if ((buf->b_buf = realloc(buf->b_buf, buf->b_len+len+1)) == NULL)
884 	return 0;
885     memcpy(buf->b_buf+buf->b_len, ptr, len);
886     buf->b_len += len;
887     buf->b_buf[buf->b_len] = '\0';
888     return len;
889 }
890 
891 /*! Send a curl POST request
892  * @retval  -1   fatal error
893  * @retval   0   expect set but did not expected return or other non-fatal error
894  * @retval   1   ok
895  * Note: curl_easy_perform blocks
896  * Note: New handle is created every time, the handle can be re-used for better TCP performance
897  * @see same function (url_post) in grideye_curl.c
898  */
899 static int
url_post(char * url,char * postfields,char ** getdata)900 url_post(char *url,
901 	 char *postfields,
902 	 char **getdata)
903 {
904     int            retval = -1;
905     CURL          *curl = NULL;
906     char          *err = NULL;
907     struct curlbuf cb = {0, };
908     CURLcode       errcode;
909 
910     /* Try it with  curl -X PUT -d '*/
911     clicon_debug(1, "%s:  curl -X POST -d '%s' %s",
912 	__FUNCTION__, postfields, url);
913     /* Set up curl for doing the communication with the controller */
914     if ((curl = curl_easy_init()) == NULL) {
915 	clicon_debug(1, "curl_easy_init");
916 	goto done;
917     }
918     if ((err = malloc(CURL_ERROR_SIZE)) == NULL) {
919 	clicon_debug(1, "%s: malloc", __FUNCTION__);
920 	goto done;
921     }
922     curl_easy_setopt(curl, CURLOPT_URL, url);
923     curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_get_cb);
924     curl_easy_setopt(curl, CURLOPT_WRITEDATA, &cb);
925     curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, err);
926     curl_easy_setopt(curl, CURLOPT_POST, 1);
927     curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postfields);
928     curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(postfields));
929 
930     if (clicon_debug_get())
931 	curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
932     if ((errcode = curl_easy_perform(curl)) != CURLE_OK){
933 	clicon_debug(1, "%s: curl: %s(%d)", __FUNCTION__, err, errcode);
934 	retval = 0;
935 	goto done;
936     }
937     if (getdata && cb.b_buf){
938 	*getdata = cb.b_buf;
939 	cb.b_buf = NULL;
940     }
941     retval = 1;
942   done:
943     if (err)
944 	free(err);
945     if (cb.b_buf)
946 	free(cb.b_buf);
947     if (curl)
948 	curl_easy_cleanup(curl);   /* cleanup */
949     return retval;
950 }
951 
952 /*! Stream callback for example stream notification
953  * Push via curl_post to publish stream event
954  * @param[in]  h     Clicon handle
955  * @param[in]  op    Operation: 0 OK, 1 Close
956  * @param[in]  event Event as XML
957  * @param[in]  arg   Extra argument provided in stream_ss_add
958  * @see stream_ss_add
959  */
960 static int
stream_publish_cb(clicon_handle h,int op,cxobj * event,void * arg)961 stream_publish_cb(clicon_handle h,
962 		  int           op,
963 		  cxobj        *event,
964 		  void         *arg)
965 {
966     int   retval = -1;
967     cbuf *u = NULL; /* stream pub (push) url */
968     cbuf *d = NULL; /* (XML) data to push */
969     char *pub_prefix;
970     char *result = NULL;
971     char *stream = (char*)arg;
972 
973     clicon_debug(1, "%s", __FUNCTION__);
974     if (op != 0)
975 	goto ok;
976     /* Create pub url */
977     if ((u = cbuf_new()) == NULL){
978 	clicon_err(OE_XML, errno, "cbuf_new");
979 	goto done;
980     }
981     if ((pub_prefix = clicon_option_str(h, "CLICON_STREAM_PUB")) == NULL){
982 	clicon_err(OE_CFG, ENOENT, "CLICON_STREAM_PUB not defined");
983 	goto done;
984     }
985     cprintf(u, "%s/%s", pub_prefix, stream);
986     /* Create XML data as string */
987     if ((d = cbuf_new()) == NULL){
988 	clicon_err(OE_XML, errno, "cbuf_new");
989 	goto done;
990     }
991     if (clicon_xml2cbuf(d, event, 0, 0, -1) < 0)
992 	goto done;
993     if (url_post(cbuf_get(u),     /* url+stream */
994 		 cbuf_get(d),     /* postfields */
995 		 &result) < 0)    /* result as xml */
996 	goto done;
997     if (result)
998 	clicon_debug(1, "%s: %s", __FUNCTION__, result);
999  ok:
1000     retval = 0;
1001  done:
1002     if (u)
1003 	cbuf_free(u);
1004     if (d)
1005 	cbuf_free(d);
1006     if (result)
1007 	free(result);
1008     return retval;
1009 }
1010 #endif /* CLIXON_PUBLISH_STREAMS */
1011 
1012 /*! Publish all streams on a pubsub channel, eg using SSE
1013  */
1014 int
stream_publish(clicon_handle h,char * stream)1015 stream_publish(clicon_handle h,
1016 	       char         *stream)
1017 {
1018 #ifdef CLIXON_PUBLISH_STREAMS
1019     int retval = -1;
1020 
1021     if (stream_ss_add(h, stream, NULL, NULL, NULL, stream_publish_cb, (void*)stream) < 0)
1022 	goto done;
1023     retval = 0;
1024  done:
1025     return retval;
1026 #else
1027    clicon_log(LOG_WARNING, "%s called but CLIXON_PUBLISH_STREAMS not enabled (enable with configure --enable-publish)", __FUNCTION__);
1028    clicon_log_init("xpath", LOG_WARNING, CLICON_LOG_STDERR);
1029    return 0;
1030 #endif
1031 }
1032 
1033 int
stream_publish_init()1034 stream_publish_init()
1035 {
1036 #ifdef CLIXON_PUBLISH_STREAMS
1037     int retval = -1;
1038 
1039     if (curl_global_init(CURL_GLOBAL_ALL) != 0){
1040 	clicon_err(OE_PLUGIN, errno, "curl_global_init");
1041 	goto done;
1042     }
1043     retval = 0;
1044  done:
1045     return retval;
1046 #else
1047     return 0;
1048 #endif
1049 }
1050 
1051 int
stream_publish_exit()1052 stream_publish_exit()
1053 {
1054 #ifdef CLIXON_PUBLISH_STREAMS
1055     curl_global_cleanup();
1056 #endif
1057     return 0;
1058 }
1059