1 /*
2 *
3 ***** BEGIN LICENSE BLOCK *****
4
5 Copyright (C) 2009-2019 Olof Hagsand and Benny Holmgren
6
7 This file is part of CLIXON.
8
9 Licensed under the Apache License, Version 2.0 (the "License");
10 you may not use this file except in compliance with the License.
11 You may obtain a copy of the License at
12
13 http://www.apache.org/licenses/LICENSE-2.0
14
15 Unless required by applicable law or agreed to in writing, software
16 distributed under the License is distributed on an "AS IS" BASIS,
17 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 See the License for the specific language governing permissions and
19 limitations under the License.
20
21 Alternatively, the contents of this file may be used under the terms of
22 the GNU General Public License Version 3 or later (the "GPL"),
23 in which case the provisions of the GPL are applicable instead
24 of those above. If you wish to allow use of your version of this file only
25 under the terms of the GPL, and not to allow others to
26 use your version of this file under the terms of Apache License version 2,
27 indicate your decision by deleting the provisions above and replace them with
28 the notice and other provisions required by the GPL. If you do not delete
29 the provisions above, a recipient may use your version of this file under
30 the terms of any one of the Apache License version 2 or the GPL.
31
32 ***** END LICENSE BLOCK *****
33
34 * Event notification streams according to RFC5277
35 * The stream implementation has three parts:
36 * 1) Base stream handling: stream_find/register/delete_all/get_xml
37 * 2) Stream subscription handling (stream_ss_add/delete/timeout, stream_notify, etc
38 * 3) Stream replay: stream_replay/_add
39 * 4) nginx/nchan publish code (use --enable-publish config option)
40 *
41 *
42 * +---------------+ 1 arg
43 * | client_entry | <----------------- +---------------+
44 * +---------------+ +-->| subscription |
45 * / +---------------+
46 * +---------------+ * +---------------+
47 * | clicon_handle |--------->| event_stream |
48 * +---------------+ +---------------+
49 * \ * +---------------+
50 * +-->| replay |
51 * +---------------+
52
53 */
54
55 #ifdef HAVE_CONFIG_H
56 #include "clixon_config.h" /* generated by config & autoconf */
57 #endif
58
59 #include <stdio.h>
60 #include <stdlib.h>
61 #include <string.h>
62 #include <errno.h>
63 #include <inttypes.h>
64 #include <syslog.h>
65 #include <sys/time.h>
66
67 /* cligen */
68 #include <cligen/cligen.h>
69
70 /* clicon */
71 #include "clixon_queue.h"
72 #include "clixon_err.h"
73 #include "clixon_log.h"
74 #include "clixon_event.h"
75 #include "clixon_string.h"
76 #include "clixon_hash.h"
77 #include "clixon_handle.h"
78 #include "clixon_yang.h"
79 #include "clixon_xml.h"
80 #include "clixon_xml_io.h"
81 #include "clixon_options.h"
82 #include "clixon_data.h"
83 #include "clixon_xpath_ctx.h"
84 #include "clixon_xpath.h"
85 #include "clixon_stream.h"
86
87 /* Go through and timeout subscription timers [s] */
88 #define STREAM_TIMER_TIMEOUT_S 5
89
90 /*! Find an event notification stream given name
91 * @param[in] h Clicon handle
92 * @param[in] name Name of stream
93 * @retval es Event notification stream structure
94 * @retval NULL Not found
95 */
96 event_stream_t *
stream_find(clicon_handle h,const char * name)97 stream_find(clicon_handle h,
98 const char *name)
99 {
100 event_stream_t *es0;
101 event_stream_t *es = NULL;
102
103 es0 = clicon_stream(h);
104 if ((es = es0) != NULL)
105 do {
106 if (strcmp(name, es->es_name)==0)
107 return es;
108 es = NEXTQ(struct event_stream *, es);
109 } while (es && es != es0);
110 return NULL;
111 }
112
113 /*! Add notification event stream
114 * @param[in] h Clicon handle
115 * @param[in] name Name of stream
116 * @param[in] description Description of stream
117 * @param[in] replay_enabled Set if replay possible in stream
118 * @param[in] retention For replay buffer how much relative to save
119 */
120 int
stream_add(clicon_handle h,const char * name,const char * description,const int replay_enabled,struct timeval * retention)121 stream_add(clicon_handle h,
122 const char *name,
123 const char *description,
124 const int replay_enabled,
125 struct timeval *retention)
126 {
127 int retval = -1;
128 event_stream_t *es;
129
130 if ((es = stream_find(h, name)) != NULL)
131 goto ok;
132 if ((es = malloc(sizeof(event_stream_t))) == NULL){
133 clicon_err(OE_XML, errno, "malloc");
134 goto done;
135 }
136 memset(es, 0, sizeof(event_stream_t));
137 if ((es->es_name = strdup(name)) == NULL){
138 clicon_err(OE_XML, errno, "strdup");
139 goto done;
140 }
141 if ((es->es_description = strdup(description)) == NULL){
142 clicon_err(OE_XML, errno, "strdup");
143 goto done;
144 }
145 es->es_replay_enabled = replay_enabled;
146 if (retention)
147 es->es_retention = *retention;
148 clicon_stream_append(h, es);
149 ok:
150 retval = 0;
151 done:
152 return retval;
153 }
154
155 /*! Delete complete notification event stream list (not just single stream)
156 * @param[in] h Clicon handle
157 * @param[in] force Force deletion of
158 */
159 int
stream_delete_all(clicon_handle h,int force)160 stream_delete_all(clicon_handle h,
161 int force)
162 {
163 struct stream_replay *r;
164 struct stream_subscription *ss;
165 event_stream_t *es;
166 event_stream_t *head = clicon_stream(h);
167
168 while ((es = clicon_stream(h)) != NULL){
169 DELQ(es, head, event_stream_t *);
170 clicon_stream_set(h, head);
171 if (es->es_name)
172 free(es->es_name);
173 if (es->es_description)
174 free(es->es_description);
175 while ((ss = es->es_subscription) != NULL)
176 stream_ss_rm(h, es, ss, force); /* XXX in some cases leaks memory due to DONT clause in stream_ss_rm() */
177 while ((r = es->es_replay) != NULL){
178 DELQ(r, es->es_replay, struct stream_replay *);
179 if (r->r_xml)
180 xml_free(r->r_xml);
181 free(r);
182 }
183 free(es);
184 }
185 return 0;
186 }
187
188 /*! Return stream definition state in XML supporting RFC 8040 and RFC5277
189 * @param[in] h Clicon handle
190 * @param[in] access If set, include access/location
191 * @param[out] cb Output buffer containing XML on exit
192 * @retval 0 OK
193 * @retval -1 Error
194 */
195 int
stream_get_xml(clicon_handle h,int access,cbuf * cb)196 stream_get_xml(clicon_handle h,
197 int access,
198 cbuf *cb)
199 {
200 event_stream_t *es = NULL;
201 char *url_prefix;
202 char *stream_path;
203
204 cprintf(cb, "<streams>");
205 if ((es = clicon_stream(h)) != NULL){
206 do {
207 cprintf(cb, "<stream>");
208 cprintf(cb, "<name>%s</name>", es->es_name);
209 if (es->es_description)
210 cprintf(cb, "<description>%s</description>", es->es_description);
211 cprintf(cb, "<replay-support>%s</replay-support>",
212 es->es_replay_enabled?"true":"false");
213 if (access){
214 cprintf(cb, "<access>");
215 cprintf(cb, "<encoding>xml</encoding>");
216 url_prefix = clicon_option_str(h, "CLICON_STREAM_URL");
217 stream_path = clicon_option_str(h, "CLICON_STREAM_PATH");
218 cprintf(cb, "<location>%s/%s/%s</location>",
219 url_prefix, stream_path, es->es_name);
220 cprintf(cb, "</access>");
221 }
222 cprintf(cb, "</stream>");
223 es = NEXTQ(struct event_stream *, es);
224 } while (es && es != clicon_stream(h));
225 }
226 cprintf(cb, "</streams>");
227 return 0;
228 }
229
230 /*! Check all stream subscription stop timers, set up new timer
231 * @param[in] fd No-op
232 * @param[in] arg Clicon handle
233 * @note format is given by clixon_event_reg_timeout callback function (fd not needed)
234 */
235 int
stream_timer_setup(int fd,void * arg)236 stream_timer_setup(int fd,
237 void *arg)
238 {
239 int retval = -1;
240 clicon_handle h = (clicon_handle)arg;
241 struct timeval now;
242 struct timeval t;
243 struct timeval t1 = {STREAM_TIMER_TIMEOUT_S, 0};
244 struct timeval tret;
245 event_stream_t *es;
246 struct stream_subscription *ss;
247 struct stream_subscription *ss1;
248 struct stream_replay *r;
249 struct stream_replay *r1;
250
251 clicon_debug(2, "%s", __FUNCTION__);
252 /* Go thru callbacks and see if any have timed out, if so remove them
253 * Could also be done by a separate timer.
254 */
255 gettimeofday(&now, NULL);
256 /* For all event streams:
257 * 1) Go through subscriptions, if stop-time and its past, remove it
258 * XXX: but client may not be closed
259 * 2) Go throughreplay buffer and remove entries with passed retention time
260 */
261 if ((es = clicon_stream(h)) != NULL){
262 do {
263 /* 1) Go through subscriptions, if stop-time and its past, remove it */
264 if ((ss = es->es_subscription) != NULL)
265 do {
266 if (timerisset(&ss->ss_stoptime) && timercmp(&ss->ss_stoptime, &now, <)){
267 ss1 = NEXTQ(struct stream_subscription *, ss);
268 /* Signal to remove stream for upper levels */
269 if (stream_ss_rm(h, es, ss, 0) < 0)
270 goto done;
271 ss = ss1;
272 }
273 else
274 ss = NEXTQ(struct stream_subscription *, ss);
275 } while (ss && ss != es->es_subscription);
276 /* 2) Go throughreplay buffer and remove entries with passed retention time */
277 if (timerisset(&es->es_retention) &&
278 (r = es->es_replay) != NULL){
279 timersub(&now, &es->es_retention, &tret);
280 do {
281 if (timercmp(&r->r_tv, &tret, <)){
282 r1 = NEXTQ(struct stream_replay *, r);
283 DELQ(r, es->es_replay, struct stream_replay *);
284 if (r->r_xml)
285 xml_free(r->r_xml);
286 free(r);
287 r = r1;
288 }
289 else
290 r = NEXTQ(struct stream_replay *, r);
291 } while (r && r!=es->es_replay);
292 }
293 es = NEXTQ(struct event_stream *, es);
294 } while (es && es != clicon_stream(h));
295 }
296 /* Initiate new timer */
297 timeradd(&now, &t1, &t);
298 if (clixon_event_reg_timeout(t,
299 stream_timer_setup, /* this function */
300 h, /* clicon handle */
301 "stream timer setup") < 0)
302 goto done;
303 retval = 0;
304 done:
305 return retval;
306 }
307
308 #ifdef NYI
309 /*! Delete single notification event stream
310 * XXX notused
311 */
312 int
stream_del()313 stream_del()
314 {
315 return 0;
316 }
317 #endif
318
319 /*! Add an event notification callback to a stream given a callback function
320 * @param[in] h Clicon handle
321 * @param[in] stream Name of stream
322 * @param[in] xpath Filter selector - xpath
323 * @param[in] startime If set, Make a replay
324 * @param[in] stoptime If set, dont continue past this time
325 * @param[in] fn Callback when event occurs
326 * @param[in] arg Argument to use with callback. Also handle when deleting
327 * @retval 0 OK
328 * @retval -1 Error, ie no such stream
329 */
330 struct stream_subscription *
stream_ss_add(clicon_handle h,char * stream,char * xpath,struct timeval * starttime,struct timeval * stoptime,stream_fn_t fn,void * arg)331 stream_ss_add(clicon_handle h,
332 char *stream,
333 char *xpath,
334 struct timeval *starttime,
335 struct timeval *stoptime,
336 stream_fn_t fn,
337 void *arg)
338 {
339 event_stream_t *es;
340 struct stream_subscription *ss = NULL;
341
342 clicon_debug(1, "%s", __FUNCTION__);
343 if ((es = stream_find(h, stream)) == NULL){
344 clicon_err(OE_CFG, ENOENT, "Stream %s not found", stream);
345 goto done;
346 }
347 if ((ss = malloc(sizeof(*ss))) == NULL){
348 clicon_err(OE_CFG, errno, "malloc");
349 goto done;
350 }
351 memset(ss, 0, sizeof(*ss));
352 if ((ss->ss_stream = strdup(stream)) == NULL){
353 clicon_err(OE_CFG, errno, "strdup");
354 goto done;
355 }
356 if (stoptime)
357 ss->ss_stoptime = *stoptime;
358 if (starttime)
359 ss->ss_starttime = *starttime;
360 if (xpath && (ss->ss_xpath = strdup(xpath)) == NULL){
361 clicon_err(OE_CFG, errno, "strdup");
362 goto done;
363 }
364 ss->ss_fn = fn;
365 ss->ss_arg = arg;
366 ADDQ(ss, es->es_subscription);
367 return ss;
368 done:
369 if (ss)
370 free(ss);
371 return NULL;
372 }
373
374 /*! Delete event stream subscription to a stream given a callback and arg
375 * @param[in] h Clicon handle
376 * @param[in] stream Name of stream or NULL for all streams
377 * @param[in] fn Callback when event occurs
378 * @param[in] arg Argument to use with callback. Also handle when deleting
379 * @retval 0 OK
380 * @retval -1 Error
381 */
382 int
stream_ss_rm(clicon_handle h,event_stream_t * es,struct stream_subscription * ss,int force)383 stream_ss_rm(clicon_handle h,
384 event_stream_t *es,
385 struct stream_subscription *ss,
386 int force)
387 {
388 clicon_debug(1, "%s", __FUNCTION__);
389 DELQ(ss, es->es_subscription, struct stream_subscription *);
390 /* Remove from upper layers - close socket etc. */
391 (*ss->ss_fn)(h, 1, NULL, ss->ss_arg);
392 if (force){
393 if (ss->ss_stream)
394 free(ss->ss_stream);
395 if (ss->ss_xpath)
396 free(ss->ss_xpath);
397 free(ss);
398 }
399 clicon_debug(1, "%s retval: 0", __FUNCTION__);
400 return 0;
401 }
402
403 /*! Find stream callback given callback function and its (unique) argument
404 * @param[in] es Pointer to event stream
405 * @param[in] fn Stream callback
406 * @param[in] arg Argument - typically unique client handle
407 * @retval ss Event stream subscription structure
408 * @retval NULL Not found
409 */
410 struct stream_subscription *
stream_ss_find(event_stream_t * es,stream_fn_t fn,void * arg)411 stream_ss_find(event_stream_t *es,
412 stream_fn_t fn,
413 void *arg)
414 {
415 struct stream_subscription *ss;
416
417 if ((ss = es->es_subscription) != NULL)
418 do {
419 if (fn == ss->ss_fn && arg == ss->ss_arg)
420 return ss;
421 ss = NEXTQ(struct stream_subscription *, ss);
422 } while (ss && ss != es->es_subscription);
423 return NULL;
424 }
425
426 /*! Remove stream subscription identified with fn and arg in all streams
427 * @param[in] h Clicon handle
428 * @param[in] fn Stream callback
429 * @param[in] arg Argument - typically unique client handle
430 * @see stream_ss_delete For single stream
431 */
432 int
stream_ss_delete_all(clicon_handle h,stream_fn_t fn,void * arg)433 stream_ss_delete_all(clicon_handle h,
434 stream_fn_t fn,
435 void *arg)
436 {
437 int retval = -1;
438 event_stream_t *es;
439 struct stream_subscription *ss;
440
441 if ((es = clicon_stream(h)) != NULL){
442 do {
443 if ((ss = stream_ss_find(es, fn, arg)) != NULL){
444 if (stream_ss_rm(h, es, ss, 1) < 0)
445 goto done;
446 }
447 es = NEXTQ(struct event_stream *, es);
448 } while (es && es != clicon_stream(h));
449 }
450 retval = 0;
451 done:
452 return retval;
453 }
454
455 /*! Delete a single stream
456 * @see stream_ss_delete_all (merge with this?)
457 */
458 int
stream_ss_delete(clicon_handle h,char * name,stream_fn_t fn,void * arg)459 stream_ss_delete(clicon_handle h,
460 char *name,
461 stream_fn_t fn,
462 void *arg)
463 {
464 int retval = -1;
465 event_stream_t *es;
466 struct stream_subscription *ss;
467
468 if ((es = clicon_stream(h)) != NULL){
469 do {
470 if (strcmp(name, es->es_name)==0)
471 if ((ss = stream_ss_find(es, fn, arg)) != NULL){
472 if (stream_ss_rm(h, es, ss, 0) < 0)
473 goto done;
474 }
475 es = NEXTQ(struct event_stream *, es);
476 } while (es && es != clicon_stream(h));
477 }
478 retval = 0;
479 done:
480 return retval;
481 }
482
483 /*! Stream notify event and distribute to all registered callbacks
484 * @param[in] h Clicon handle
485 * @param[in] stream Name of event stream. CLICON is predefined as LOG stream
486 * @param[in] tv Timestamp. Dont notify if subscription has stoptime<tv
487 * @param[in] event Notification as xml tree
488 * @retval 0 OK
489 * @retval -1 Error with clicon_err called
490 * @see stream_notify
491 * @see stream_ss_timeout where subscriptions are removed if stoptime<now
492 */
493 static int
stream_notify1(clicon_handle h,event_stream_t * es,struct timeval * tv,cxobj * xevent)494 stream_notify1(clicon_handle h,
495 event_stream_t *es,
496 struct timeval *tv,
497 cxobj *xevent)
498 {
499 int retval = -1;
500 struct stream_subscription *ss;
501
502 clicon_debug(2, "%s", __FUNCTION__);
503 /* Go thru all subscriptions and find matches */
504 if ((ss = es->es_subscription) != NULL)
505 do {
506 if (timerisset(&ss->ss_stoptime) && /* stoptime has passed */
507 timercmp(&ss->ss_stoptime, tv, <)){
508 struct stream_subscription *ss1;
509 ss1 = NEXTQ(struct stream_subscription *, ss);
510 /* Signal to remove stream for upper levels */
511 if (stream_ss_rm(h, es, ss, 1) < 0)
512 goto done;
513 ss = ss1;
514 }
515 else{ /* xpath match */
516 if (ss->ss_xpath == NULL ||
517 strlen(ss->ss_xpath)==0 ||
518 xpath_first(xevent, NULL, "%s", ss->ss_xpath) != NULL)
519 if ((*ss->ss_fn)(h, 0, xevent, ss->ss_arg) < 0)
520 goto done;
521 ss = NEXTQ(struct stream_subscription *, ss);
522 }
523 } while (es->es_subscription && ss != es->es_subscription);
524 retval = 0;
525 done:
526 return retval;
527 }
528
529 /*! Stream notify event and distribute to all registered callbacks
530 * @param[in] h Clicon handle
531 * @param[in] stream Name of event stream. CLICON is predefined as LOG stream
532 * @param[in] event Notification as format string according to printf(3)
533 * @retval 0 OK
534 * @retval -1 Error with clicon_err called
535 * @code
536 * if (stream_notify(h, "NETCONF", "<event><event-class>fault</event-class><reportingEntity><card>Ethernet0</card></reportingEntity><severity>major</severity></event>") < 0)
537 * err;
538 * @endcode
539 * @see stream_notify1 Internal
540 */
541 int
stream_notify(clicon_handle h,char * stream,const char * event,...)542 stream_notify(clicon_handle h,
543 char *stream,
544 const char *event, ...)
545 {
546 int retval = -1;
547 va_list args;
548 int len;
549 cxobj *xev = NULL;
550 yang_stmt *yspec = NULL;
551 char *str = NULL;
552 cbuf *cb = NULL;
553 char timestr[28];
554 struct timeval tv;
555 event_stream_t *es;
556
557 clicon_debug(2, "%s", __FUNCTION__);
558 if ((es = stream_find(h, stream)) == NULL)
559 goto ok;
560 va_start(args, event);
561 len = vsnprintf(NULL, 0, event, args) + 1;
562 va_end(args);
563 if ((str = malloc(len)) == NULL){
564 clicon_err(OE_UNIX, errno, "malloc");
565 goto done;
566 }
567 memset(str, 0, len);
568 va_start(args, event);
569 len = vsnprintf(str, len, event, args) + 1;
570 va_end(args);
571 if ((yspec = clicon_dbspec_yang(h)) == NULL){
572 clicon_err(OE_YANG, 0, "No yang spec");
573 goto done;
574 }
575 if ((cb = cbuf_new()) == NULL){
576 clicon_err(OE_UNIX, errno, "cbuf_new");
577 goto done;
578 }
579 gettimeofday(&tv, NULL);
580 if (time2str(tv, timestr, sizeof(timestr)) < 0){
581 clicon_err(OE_UNIX, errno, "time2str");
582 goto done;
583 }
584 /* From RFC5277 */
585 cprintf(cb, "<notification xmlns=\"%s\"><eventTime>%s</eventTime>%s</notification>",
586 NOTIFICATION_RFC5277_NAMESPACE, timestr, str);
587 if (clixon_xml_parse_string(cbuf_get(cb), YB_MODULE, yspec, &xev, NULL) < 0)
588 goto done;
589 if (xml_rootchild(xev, 0, &xev) < 0)
590 goto done;
591 if (stream_notify1(h, es, &tv, xev) < 0)
592 goto done;
593 if (es->es_replay_enabled){
594 if (stream_replay_add(es, &tv, xev) < 0)
595 goto done;
596 xev = NULL; /* xml stored in replay_add and should not be freed */
597 }
598 ok:
599 retval = 0;
600 done:
601 if (cb)
602 cbuf_free(cb);
603 if (xev)
604 xml_free(xev);
605 if (str)
606 free(str);
607 return retval;
608 }
609
610 /*! Backward compatible function
611 * @param[in] h Clicon handle
612 * @param[in] stream Name of event stream. CLICON is predefined as LOG stream
613 * @param[in] xml Notification as XML stream. Is copied.
614 * @retval 0 OK
615 * @retval -1 Error with clicon_err called
616 * @see stream_notify Should be merged with this
617 */
618 int
stream_notify_xml(clicon_handle h,char * stream,cxobj * xml)619 stream_notify_xml(clicon_handle h,
620 char *stream,
621 cxobj *xml)
622 {
623 int retval = -1;
624 cxobj *xev = NULL;
625 cxobj *xml2; /* copy */
626 yang_stmt *yspec = NULL;
627 char *str = NULL; /* XXX: never set */
628 cbuf *cb = NULL;
629 char timestr[28];
630 struct timeval tv;
631 event_stream_t *es;
632
633 clicon_debug(2, "%s", __FUNCTION__);
634 if ((es = stream_find(h, stream)) == NULL)
635 goto ok;
636 if ((yspec = clicon_dbspec_yang(h)) == NULL){
637 clicon_err(OE_YANG, 0, "No yang spec");
638 goto done;
639 }
640 if ((cb = cbuf_new()) == NULL){
641 clicon_err(OE_UNIX, errno, "cbuf_new");
642 goto done;
643 }
644 gettimeofday(&tv, NULL);
645 if (time2str(tv, timestr, sizeof(timestr)) < 0){
646 clicon_err(OE_UNIX, errno, "time2str");
647 goto done;
648 }
649 cprintf(cb, "<notification xmlns=\"%s\"><eventTime>%s</eventTime>NULL</notification>",
650 NOTIFICATION_RFC5277_NAMESPACE,
651 timestr); /* XXX str is always NULL */
652 if (clixon_xml_parse_string(cbuf_get(cb), YB_NONE, yspec, &xev, NULL) < 0)
653 goto done;
654 if (xml_rootchild(xev, 0, &xev) < 0)
655 goto done;
656 if ((xml2 = xml_dup(xml)) == NULL)
657 goto done;
658 if (xml_addsub(xev, xml2) < 0)
659 goto done;
660 if (stream_notify1(h, es, &tv, xev) < 0)
661 goto done;
662 if (es->es_replay_enabled){
663 if (stream_replay_add(es, &tv, xev) < 0)
664 goto done;
665 xev = NULL; /* xml stored in replay_add and should not be freed */
666 }
667 ok:
668 retval = 0;
669 done:
670 if (cb)
671 cbuf_free(cb);
672 if (xev)
673 xml_free(xev);
674 if (str)
675 free(str);
676 return retval;
677 }
678
679
680 /*! Replay a stream by sending notification messages
681 * @see RFC5277 Sec 2.1.1:
682 * Start Time:
683 A parameter, <startTime>, used to trigger the replay feature
684 and indicate that the replay should start at the time
685 specified. If <startTime> is not present, this is not a replay
686 subscription. It is not valid to specify start times that are
687 later than the current time. If the <startTime> specified is
688 earlier than the log can support, the replay will begin with
689 the earliest available notification. This parameter is of type
690 dateTime and compliant to [RFC3339]. Implementations must
691 support time zones.
692
693 Stop Time:
694 An optional parameter, <stopTime>, used with the optional
695 replay feature to indicate the newest notifications of
696 interest. If <stopTime> is not present, the notifications will
697 continue until the subscription is terminated. Must be used
698 with and be later than <startTime>. Values of <stopTime> in
699 the future are valid. This parameter is of type dateTime and
700 compliant to [RFC3339]. Implementations must support time
701 zones.
702
703 * Assume no future sample timestamps.
704 */
705 static int
stream_replay_notify(clicon_handle h,event_stream_t * es,struct stream_subscription * ss)706 stream_replay_notify(clicon_handle h,
707 event_stream_t *es,
708 struct stream_subscription *ss)
709 {
710 int retval = -1;
711 struct stream_replay *r;
712
713 /* If <startTime> is not present, this is not a replay */
714 if (!timerisset(&ss->ss_starttime))
715 goto ok;
716 if (!es->es_replay_enabled)
717 goto ok;
718 /* Get replay linked list */
719 if ((r = es->es_replay) == NULL)
720 goto ok;
721 /* First loop to skip until start */
722 do {
723 if (timercmp(&r->r_tv, &ss->ss_starttime, >=))
724 break;
725 r = NEXTQ(struct stream_replay *, r);
726 } while (r && r!=es->es_replay);
727 if (r == NULL)
728 goto ok; /* No samples to replay */
729 /* Then notify until stop */
730 do {
731 if (timerisset(&ss->ss_stoptime) &&
732 timercmp(&r->r_tv, &ss->ss_stoptime, >))
733 break;
734 if ((*ss->ss_fn)(h, 0, r->r_xml, ss->ss_arg) < 0)
735 goto done;
736 r = NEXTQ(struct stream_replay *, r);
737 } while (r && r!=es->es_replay);
738 ok:
739 retval = 0;
740 done:
741 return retval;
742 }
743
744 /*! Add replay sample to stream with timestamp
745 * @param[in] es Stream
746 * @param[in] tv Timestamp
747 * @param[in] xv XML
748 */
749 int
stream_replay_add(event_stream_t * es,struct timeval * tv,cxobj * xv)750 stream_replay_add(event_stream_t *es,
751 struct timeval *tv,
752 cxobj *xv)
753 {
754 int retval = -1;
755 struct stream_replay *new;
756
757 if ((new = malloc(sizeof *new)) == NULL){
758 clicon_err(OE_UNIX, errno, "malloc");
759 goto done;
760 }
761 memset(new, 0, (sizeof *new));
762 new->r_tv = *tv;
763 new->r_xml = xv;
764 ADDQ(new, es->es_replay);
765 retval = 0;
766 done:
767 return retval;
768 }
769
770 /* tmp struct for timeout callback containing clicon handle,
771 * stream and subscription
772 */
773 struct replay_arg{
774 clicon_handle ra_h;
775 char *ra_stream; /* Name of stream - malloced: free by cb */
776 stream_fn_t ra_fn; /* Stream callback */
777 void *ra_arg; /* Argument - typically unique client handle */
778 };
779
780 /*! Timeout callback for replaying stream
781 * @param[in] fd Ignore
782 * @param[in] arg tmp struct including clicon handle, stream and subscription
783 */
784 static int
stream_replay_cb(int fd,void * arg)785 stream_replay_cb(int fd,
786 void *arg)
787 {
788 int retval = -1;
789 struct replay_arg *ra= (struct replay_arg*)arg;
790 event_stream_t *es;
791 struct stream_subscription *ss;
792
793 if (ra == NULL)
794 goto ok;
795 if (ra->ra_stream == NULL)
796 goto ok;
797 if ((es = stream_find(ra->ra_h, ra->ra_stream)) == NULL)
798 goto ok;
799 if ((ss = stream_ss_find(es, ra->ra_fn, ra->ra_arg)) == NULL)
800 goto ok;
801 if (stream_replay_notify(ra->ra_h, es, ss) < 0)
802 goto done;
803 ok:
804 retval = 0;
805 done:
806 if (ra){
807 if (ra->ra_stream)
808 free(ra->ra_stream);
809 free(ra);
810 }
811 return retval;
812 }
813
814 /*! Schedule stream replay to occur asap, eg "now"
815 *
816 * @param[in] h clicon handle
817 * @param[in] stream Name of stream
818 * @param[in] fn Stream callback
819 * @param[in] arg Argument - typically unique client handle
820 */
821 int
stream_replay_trigger(clicon_handle h,char * stream,stream_fn_t fn,void * arg)822 stream_replay_trigger(clicon_handle h,
823 char *stream,
824 stream_fn_t fn,
825 void *arg)
826 {
827 int retval = -1;
828 struct timeval now;
829 struct replay_arg *ra;
830
831 if ((ra = malloc(sizeof(*ra))) == NULL){
832 clicon_err(OE_UNIX, errno, "malloc");
833 goto done;
834 }
835 memset(ra, 0, sizeof(*ra));
836 ra->ra_h = h;
837 if ((ra->ra_stream = strdup(stream)) == NULL){
838 clicon_err(OE_UNIX, errno, "strdup");
839 goto done;
840 }
841 ra->ra_fn = fn;
842 ra->ra_arg = arg;
843 gettimeofday(&now, NULL);
844 if (clixon_event_reg_timeout(now, stream_replay_cb, ra,
845 "create-subscribtion stream replay") < 0)
846 goto done;
847 retval = 0;
848 done:
849 return retval;
850 }
851
852 #ifdef CLIXON_PUBLISH_STREAMS
853 /* SSE support using Nginx Nchan. This code needs to be enabled at configure
854 * time using: --enable-publish configure option
855 * It uses CURL and autoconf needs to set that dependency
856 */
857
858 #include <curl/curl.h>
859
860 /*
861 * Types (curl)
862 */
863 struct curlbuf{
864 size_t b_len;
865 char *b_buf;
866 };
867
868 /*
869 * For the asynchronous case. I think we must handle the case where of many of these
870 * come in before we can handle them in the upper-level polling routine.
871 * realloc. Therefore, we append new data to the userdata buffer.
872 */
873 static size_t
curl_get_cb(void * ptr,size_t size,size_t nmemb,void * userdata)874 curl_get_cb(void *ptr,
875 size_t size,
876 size_t nmemb,
877 void *userdata)
878 {
879 struct curlbuf *buf = (struct curlbuf *)userdata;
880 int len;
881
882 len = size*nmemb;
883 if ((buf->b_buf = realloc(buf->b_buf, buf->b_len+len+1)) == NULL)
884 return 0;
885 memcpy(buf->b_buf+buf->b_len, ptr, len);
886 buf->b_len += len;
887 buf->b_buf[buf->b_len] = '\0';
888 return len;
889 }
890
891 /*! Send a curl POST request
892 * @retval -1 fatal error
893 * @retval 0 expect set but did not expected return or other non-fatal error
894 * @retval 1 ok
895 * Note: curl_easy_perform blocks
896 * Note: New handle is created every time, the handle can be re-used for better TCP performance
897 * @see same function (url_post) in grideye_curl.c
898 */
899 static int
url_post(char * url,char * postfields,char ** getdata)900 url_post(char *url,
901 char *postfields,
902 char **getdata)
903 {
904 int retval = -1;
905 CURL *curl = NULL;
906 char *err = NULL;
907 struct curlbuf cb = {0, };
908 CURLcode errcode;
909
910 /* Try it with curl -X PUT -d '*/
911 clicon_debug(1, "%s: curl -X POST -d '%s' %s",
912 __FUNCTION__, postfields, url);
913 /* Set up curl for doing the communication with the controller */
914 if ((curl = curl_easy_init()) == NULL) {
915 clicon_debug(1, "curl_easy_init");
916 goto done;
917 }
918 if ((err = malloc(CURL_ERROR_SIZE)) == NULL) {
919 clicon_debug(1, "%s: malloc", __FUNCTION__);
920 goto done;
921 }
922 curl_easy_setopt(curl, CURLOPT_URL, url);
923 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_get_cb);
924 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &cb);
925 curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, err);
926 curl_easy_setopt(curl, CURLOPT_POST, 1);
927 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postfields);
928 curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, strlen(postfields));
929
930 if (clicon_debug_get())
931 curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
932 if ((errcode = curl_easy_perform(curl)) != CURLE_OK){
933 clicon_debug(1, "%s: curl: %s(%d)", __FUNCTION__, err, errcode);
934 retval = 0;
935 goto done;
936 }
937 if (getdata && cb.b_buf){
938 *getdata = cb.b_buf;
939 cb.b_buf = NULL;
940 }
941 retval = 1;
942 done:
943 if (err)
944 free(err);
945 if (cb.b_buf)
946 free(cb.b_buf);
947 if (curl)
948 curl_easy_cleanup(curl); /* cleanup */
949 return retval;
950 }
951
952 /*! Stream callback for example stream notification
953 * Push via curl_post to publish stream event
954 * @param[in] h Clicon handle
955 * @param[in] op Operation: 0 OK, 1 Close
956 * @param[in] event Event as XML
957 * @param[in] arg Extra argument provided in stream_ss_add
958 * @see stream_ss_add
959 */
960 static int
stream_publish_cb(clicon_handle h,int op,cxobj * event,void * arg)961 stream_publish_cb(clicon_handle h,
962 int op,
963 cxobj *event,
964 void *arg)
965 {
966 int retval = -1;
967 cbuf *u = NULL; /* stream pub (push) url */
968 cbuf *d = NULL; /* (XML) data to push */
969 char *pub_prefix;
970 char *result = NULL;
971 char *stream = (char*)arg;
972
973 clicon_debug(1, "%s", __FUNCTION__);
974 if (op != 0)
975 goto ok;
976 /* Create pub url */
977 if ((u = cbuf_new()) == NULL){
978 clicon_err(OE_XML, errno, "cbuf_new");
979 goto done;
980 }
981 if ((pub_prefix = clicon_option_str(h, "CLICON_STREAM_PUB")) == NULL){
982 clicon_err(OE_CFG, ENOENT, "CLICON_STREAM_PUB not defined");
983 goto done;
984 }
985 cprintf(u, "%s/%s", pub_prefix, stream);
986 /* Create XML data as string */
987 if ((d = cbuf_new()) == NULL){
988 clicon_err(OE_XML, errno, "cbuf_new");
989 goto done;
990 }
991 if (clicon_xml2cbuf(d, event, 0, 0, -1) < 0)
992 goto done;
993 if (url_post(cbuf_get(u), /* url+stream */
994 cbuf_get(d), /* postfields */
995 &result) < 0) /* result as xml */
996 goto done;
997 if (result)
998 clicon_debug(1, "%s: %s", __FUNCTION__, result);
999 ok:
1000 retval = 0;
1001 done:
1002 if (u)
1003 cbuf_free(u);
1004 if (d)
1005 cbuf_free(d);
1006 if (result)
1007 free(result);
1008 return retval;
1009 }
1010 #endif /* CLIXON_PUBLISH_STREAMS */
1011
1012 /*! Publish all streams on a pubsub channel, eg using SSE
1013 */
1014 int
stream_publish(clicon_handle h,char * stream)1015 stream_publish(clicon_handle h,
1016 char *stream)
1017 {
1018 #ifdef CLIXON_PUBLISH_STREAMS
1019 int retval = -1;
1020
1021 if (stream_ss_add(h, stream, NULL, NULL, NULL, stream_publish_cb, (void*)stream) < 0)
1022 goto done;
1023 retval = 0;
1024 done:
1025 return retval;
1026 #else
1027 clicon_log(LOG_WARNING, "%s called but CLIXON_PUBLISH_STREAMS not enabled (enable with configure --enable-publish)", __FUNCTION__);
1028 clicon_log_init("xpath", LOG_WARNING, CLICON_LOG_STDERR);
1029 return 0;
1030 #endif
1031 }
1032
1033 int
stream_publish_init()1034 stream_publish_init()
1035 {
1036 #ifdef CLIXON_PUBLISH_STREAMS
1037 int retval = -1;
1038
1039 if (curl_global_init(CURL_GLOBAL_ALL) != 0){
1040 clicon_err(OE_PLUGIN, errno, "curl_global_init");
1041 goto done;
1042 }
1043 retval = 0;
1044 done:
1045 return retval;
1046 #else
1047 return 0;
1048 #endif
1049 }
1050
1051 int
stream_publish_exit()1052 stream_publish_exit()
1053 {
1054 #ifdef CLIXON_PUBLISH_STREAMS
1055 curl_global_cleanup();
1056 #endif
1057 return 0;
1058 }
1059