1 /*
2 * This file is part of the Sofia-SIP package
3 *
4 * Copyright (C) 2005 Nokia Corporation.
5 *
6 * Contact: Pekka Pessi <pekka.pessi@nokia.com>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public License
10 * as published by the Free Software Foundation; either version 2.1 of
11 * the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
21 * 02110-1301 USA
22 *
23 */
24
25 /**@internal @file nea_server.c
26 * @brief Nokia Event API - event notifier implementation.
27 *
28 * @author Pekka Pessi <Pekka.Pessi@nokia.com>
29 * @author Martti Mela <Martti.Mela@nokia.com>
30 *
31 * @date Created: Wed Feb 14 18:37:04 EET 2001 ppessi
32 */
33
34 #include "config.h"
35
36 #include <sofia-sip/sip.h>
37 #include <sofia-sip/sip_header.h>
38 #include <sofia-sip/sip_util.h>
39 #include <sofia-sip/sip_status.h>
40 #include <sofia-sip/su_tagarg.h>
41
42 #include "nea_debug.h"
43
44 #ifndef _MSC_VER
45 #define NONE ((void *)- 1)
46 #else
47 #define NONE ((void *)(INT_PTR)- 1)
48 #endif
49
50 #define SU_ROOT_MAGIC_T struct nea_server_s
51 #define SU_MSG_ARG_T tagi_t
52
53 #define NTA_AGENT_MAGIC_T struct nea_server_s
54 #define NTA_LEG_MAGIC_T struct nea_sub_s
55 #define NTA_INCOMING_MAGIC_T struct nea_sub_s
56 #define NTA_OUTGOING_MAGIC_T struct nea_sub_s
57
58 #include <sofia-sip/nea.h>
59 #include <sofia-sip/htable.h>
60
61 #include <stddef.h>
62 #include <stdlib.h>
63 #include <string.h>
64 #include <stdio.h>
65 #include <stdarg.h>
66 #include <assert.h>
67 #include <limits.h>
68
69 /** Number of primary views (with different MIME type or content) */
70 #define NEA_VIEW_MAX (8)
71
72 /** @internal Server object, created for every notifier.
73 */
74 struct nea_server_s {
75 su_home_t nes_home[1];
76 su_root_t *nes_root;
77 su_timer_t *nes_timer;
78
79 nta_agent_t *nes_agent;
80 nta_leg_t *nes_leg;
81
82 nea_sub_t *nes_subscribers;
83
84 sip_require_t *nes_require;
85
86 sip_time_t nes_min_expires;
87 sip_time_t nes_expires;
88 sip_time_t nes_max_expires;
89
90 int nes_max_subs;
91 unsigned nes_throttle; /**< Default throttle */
92 unsigned nes_min_throttle; /**< Minimum throttle */
93 unsigned nes_eventlist:1; /**< Eventlist only */
94 unsigned nes_in_callback : 1;
95 unsigned nes_pending_destroy : 1;
96 unsigned nes_pending_flush : 1;
97 unsigned nes_202_before_notify:1;
98
99 unsigned nes_in_list;
100
101 unsigned nes_throttled; /**< Throttled notifications? */
102
103 char const *nes_server;
104
105 sip_contact_t *nes_eventity_uri;
106 sip_allow_events_t *nes_allow_events;
107
108 sip_allow_t *nes_allow_methods;
109
110 nea_new_event_f *nes_callback;
111 nea_smagic_t *nes_context;
112
113 /** Events.
114 * Each subscriber will be added to one of these. */
115 nea_event_t *nes_events;
116 };
117
118
119 /** @internal Supported events and their subscribers */
120 struct nea_event_s {
121 nea_event_t *ev_next;
122 nea_event_t **ev_prev;
123
124 nea_watcher_f *ev_callback;
125 nea_emagic_t *ev_magic;
126
127 unsigned ev_throttle; /**< Default throttle */
128 unsigned ev_min_throttle; /**< Minimum throttle */
129 unsigned ev_eventlist:1; /**< Eventlist is supported */
130 unsigned ev_reliable:1; /**< Keep all notifications */
131 unsigned :0;
132
133 /** Sequence number of first unsent update */
134 unsigned ev_throttling;
135 unsigned ev_updated; /**< Sequence number for updates */
136 sip_require_t *ev_require; /**< Required features */
137 sip_supported_t *ev_supported; /**< Supported features */
138
139 sip_event_t *ev_event;
140 sip_accept_t const *ev_default;/**< Default content type */
141 sip_accept_t const *ev_accept; /**< Supported content types */
142
143 nea_event_view_t *ev_views[NEA_VIEW_MAX + 1];
144 };
145
146 typedef struct nea_event_queue_s nea_event_queue_t;
147
148 /** @internal Object representing particular view of event */
149 struct nea_event_view_s
150 {
151 nea_event_view_t *evv_next;
152 nea_event_view_t *evv_primary; /**< Backpointer to the primary view */
153
154 nea_evmagic_t *evv_magic;
155
156 unsigned evv_throttle; /**< Default throttle */
157 unsigned evv_min_throttle; /**< Minimum throttle */
158 unsigned evv_fake:1; /**< This is "fake" (ie. default) view */
159 unsigned evv_private:1; /**< This is private view */
160 unsigned evv_reliable:1; /**< Keep all notifications */
161 unsigned:0;
162
163 /** @internal Queued notification */
164 struct nea_event_queue_s
165 {
166 nea_event_queue_t *evq_next;
167 unsigned evq_updated;
168 unsigned evq_version;
169 sip_content_type_t *evq_content_type;
170 sip_payload_t *evq_payload;
171 } evv_head[1];
172 };
173
174 #define evv_version evv_head->evq_version
175 #define evv_updated evv_head->evq_updated
176 #define evv_content_type evv_head->evq_content_type
177 #define evv_payload evv_head->evq_payload
178
179
180 /** @internal Subscription object.
181 */
182 struct nea_sub_s {
183 nea_sub_t *s_next;
184 nea_sub_t **s_prev;
185
186 nta_leg_t *s_leg;
187 nta_incoming_t *s_irq;
188 nta_outgoing_t *s_oreq;
189
190 nea_server_t *s_nes;
191
192 sip_contact_t *s_local; /**< Local contact */
193
194 sip_from_t *s_from;
195 sip_contact_t *s_remote; /**< Remote contact */
196 /* sip_accept_t *s_accept; */
197 sip_event_t *s_id;
198
199 nea_event_t *s_event;
200 nea_event_view_t *s_view;
201 nea_state_t s_state;
202
203 char const *s_extended;
204
205 sip_content_type_t *s_content_type; /** Content-Type of SUBSCRIBE body. */
206 sip_payload_t *s_payload; /**< Body of SUBSCRIBE. */
207
208 unsigned s_reported :1 ; /**< Made watcher report upon un-SUBSCRIBE */
209
210 unsigned s_processing : 1;
211 unsigned s_rejected : 1;
212 unsigned s_pending_flush : 1;
213 unsigned s_garbage : 1;
214 unsigned s_fake : 1; /**< Do not send real information to user */
215 unsigned s_eventlist : 1; /**< Subscriber supported eventlist */
216
217 sip_time_t s_subscribed; /**< When first SUBSCRIBE was recv */
218 sip_time_t s_notified; /**< When last notification was sent */
219 sip_time_t s_expires; /**< Expiration time. */
220
221 unsigned s_version; /**< Version number set by application */
222
223 unsigned s_latest; /**< External version of latest payload */
224 unsigned s_updated; /**< Internal version of latest payload */
225 unsigned s_throttle; /**< Minimum time between notifications */
226 };
227
228 /* Prototypes */
229
230 static void nea_server_pending_flush(nea_server_t *nes);
231
232 static int nea_view_update(nea_server_t *nes,
233 nea_event_t *ev,
234 nea_event_view_t **evvp,
235 int private,
236 int fake,
237 tag_type_t tag,
238 tag_value_t value,
239 ...);
240
241 static nea_sub_t *nea_sub_create(nea_server_t *nes);
242 static int nea_sub_is_removed(nea_sub_t const *s);
243 static void nea_sub_remove(nea_sub_t *s);
244 static void nea_sub_destroy(nea_sub_t *s);
245
246 static
247 int nea_server_callback(nea_sub_t *nes_as_sub,
248 nta_leg_t *leg,
249 nta_incoming_t *irq,
250 sip_t const *sip);
251
252 static int nea_sub_process_incoming(nea_sub_t *s,
253 nta_leg_t *leg,
254 nta_incoming_t *irq,
255 sip_t const *sip);
256
257 static int nea_sub_process_subscribe(nea_sub_t *s,
258 nta_leg_t *leg,
259 nta_incoming_t *irq,
260 sip_t const *sip);
261
262 static int nea_sub_notify(nea_server_t *nes,
263 nea_sub_t *s,
264 sip_time_t now,
265 tag_type_t tag, tag_value_t value, ...);
266
267 static int response_to_notify(nea_sub_t *s,
268 nta_outgoing_t *oreq,
269 sip_t const *sip);
270
271 static void nes_event_timer(nea_server_t *nes,
272 su_timer_t *timer,
273 su_timer_arg_t *arg);
274
275 static int nea_view_queue(nea_server_t *nes,
276 nea_event_view_t *evv,
277 nea_event_queue_t *evq);
278
279 /** Assign an event view to subscriber. */
280 su_inline
nea_sub_assign_view(nea_sub_t * s,nea_event_view_t * evv)281 void nea_sub_assign_view(nea_sub_t *s, nea_event_view_t *evv)
282 {
283 if (s->s_view != evv)
284 /* Make sure we send a notification */
285 s->s_updated = evv->evv_updated - 1;
286 s->s_view = evv;
287 s->s_throttle = evv->evv_throttle;
288 }
289
290 su_inline
nea_subnode_init(nea_subnode_t * sn,nea_sub_t * s,sip_time_t now)291 void nea_subnode_init(nea_subnode_t *sn, nea_sub_t *s, sip_time_t now)
292 {
293 sn->sn_state = s->s_state;
294 sn->sn_fake = s->s_fake;
295 sn->sn_subscriber = s;
296 sn->sn_event = s->s_event;
297 sn->sn_remote = s->s_from;
298 sn->sn_contact = s->s_remote;
299 sn->sn_content_type = s->s_content_type;
300 sn->sn_payload = s->s_payload;
301 if (s->s_expires != 0 && (int)(s->s_expires - now) > 0)
302 sn->sn_expires = s->s_expires - now;
303 else
304 sn->sn_expires = 0;
305 sn->sn_latest = s->s_latest;
306 sn->sn_throttle = s->s_throttle;
307 sn->sn_eventlist = s->s_eventlist;
308 sn->sn_version = s->s_version;
309 sn->sn_subscribed = now - s->s_subscribed;
310 sn->sn_notified = s->s_notified;
311 sn->sn_view = s->s_view;
312 }
313
314 /** Create an event server.
315 *
316 * The function nea_server_create() initializes an event server object and
317 * registers it with @b nta. An event server object takes care of all events
318 * for a particular URI (@em eventity).
319 *
320 * @param agent pointer to an @b nta agent object
321 * @param root pointer to an @b root object
322 *�@param url url of the server to be created
323 * @param max_subs maximum number of subscriptions
324 * @param callback authorization function,
325 * or @c NULL if no authorization is required
326 * @param context server context (pointer to application data)
327 * @param tag, value, ... optional list of tag parameters
328 *
329 * @TAGS
330 * The function nea_server_create() takes the following tag values as its
331 * arguments:
332 * <dl>
333 *
334 * <dt>SIPTAG_CONTACT() or SIPTAG_CONTACT_STR()
335 * <dd>The target address of the event server.
336 *
337 * <dt>SIPTAG_ALLOW_EVENTS()
338 * <dd>The initial list of events supported by eventity. This list is
339 * extended whenever a new event is created with nea_event_tcreate().
340 *
341 * <dt>SIPTAG_SERVER_STR()
342 * <dd>The @b Server header for the event server.
343 *
344 * <dt>NEATAG_MINSUB()
345 * <dd>Minimum duration of a subscription.
346 *
347 * <dt>NEATAG_THROTTLE()
348 * <dd>Default value for event throttle (by default, 5 seconds).
349 * Throttle determines the minimum interval betweeen notifications. Note
350 * that the notification indicating that the subscription has terminated
351 * will be sent regardless of throttle.
352 *
353 * The default throttle value is used if the subscriber does not include
354 * a throttle parameter in @ref sip_event "Event" header of SUBSCRIBE request.
355 *
356 * <dt>NEATAG_MINTHROTTLE()
357 * <dd>Minimum allowed throttle value (by default, 5 seconds).
358 *
359 * <dt>NEATAG_EVENTLIST()
360 * <dd>If true, the subscribers must support eventlists. If SIPTAG_REQUIRE()
361 * is given, it must contain the "eventlist" feature.
362 *
363 * <dt>NEATAG_DIALOG()
364 * <dd>Give an optional NTA destination leg to event server.
365 *
366 * <dt>SIPTAG_REQUIRE()/SIPTAG_REQUIRE_STR()
367 * <dd>The @b Require header for the event server. The subscribers must
368 * indicate support the specified features.
369 *
370 * </dl>
371 *
372 * @return
373 * The function nea_server_create() returns a pointer to an event server
374 * object, or @c NULL upon an error.
375 */
nea_server_create(nta_agent_t * agent,su_root_t * root,url_t const * url,int max_subs,nea_new_event_f * callback,nea_smagic_t * context,tag_type_t tag,tag_value_t value,...)376 nea_server_t *nea_server_create(nta_agent_t *agent,
377 su_root_t *root,
378 url_t const *url,
379 int max_subs,
380 nea_new_event_f *callback,
381 nea_smagic_t *context,
382 tag_type_t tag, tag_value_t value, ...)
383 {
384 nea_server_t *nes = NULL;
385 sip_contact_t const *contact = NULL;
386 sip_allow_events_t const *allow_events = NULL;
387 sip_require_t const *rq = NULL;
388 char const *contact_str = NULL;
389 char const *server_str = NULL;
390 char const *rq_str = NULL;
391 unsigned
392 min_expires = 15 * 60,
393 expires = NEA_DEFAULT_EXPIRES,
394 max_expires = 24 * 60 * 60;
395 nta_leg_t *leg = NONE;
396 unsigned throttle = 5, min_throttle = throttle;
397 int eventlist = 0;
398
399 {
400 ta_list ta;
401
402 ta_start(ta, tag, value);
403
404 tl_gets(ta_args(ta),
405 SIPTAG_CONTACT_REF(contact),
406 SIPTAG_CONTACT_STR_REF(contact_str),
407 SIPTAG_ALLOW_EVENTS_REF(allow_events),
408 SIPTAG_SERVER_STR_REF(server_str),
409 SIPTAG_REQUIRE_REF(rq),
410 SIPTAG_REQUIRE_STR_REF(rq_str),
411 NEATAG_MIN_EXPIRES_REF(min_expires),
412 NEATAG_EXPIRES_REF(expires),
413 NEATAG_MAX_EXPIRES_REF(max_expires),
414 NEATAG_DIALOG_REF(leg),
415 NEATAG_THROTTLE_REF(throttle),
416 NEATAG_MINTHROTTLE_REF(min_throttle),
417 NEATAG_EVENTLIST_REF(eventlist),
418 TAG_NULL());
419
420 ta_end(ta);
421 }
422
423 if (throttle < min_throttle)
424 throttle = min_throttle;
425
426 if (!url) {
427 SU_DEBUG_5(("nea_server_create(): invalid url\n" VA_NONE));
428 return NULL;
429 }
430
431 if (min_expires > expires || expires > max_expires) {
432 SU_DEBUG_5(("nea_server_create(): invalid expiration range\n" VA_NONE));
433 return NULL;
434 }
435
436 nes = su_home_new(sizeof(nea_server_t));
437
438 if (nes) {
439 su_home_t *home = nes->nes_home;
440
441 nes->nes_root = root;
442 nes->nes_agent = agent;
443
444 nes->nes_max_subs = max_subs;
445
446 nes->nes_min_expires = min_expires;
447 nes->nes_expires = expires;
448 nes->nes_max_expires = max_expires;
449
450 nes->nes_throttle = throttle;
451 nes->nes_min_throttle = min_throttle;
452
453 if (allow_events)
454 nes->nes_allow_events = sip_allow_events_dup(home, allow_events);
455 else
456 nes->nes_allow_events = sip_allow_events_make(home, "");
457
458 nes->nes_allow_methods = sip_allow_make(home, "SUBSCRIBE");
459
460 nes->nes_server =
461 su_sprintf(home, "%s%snea/" NEA_VERSION_STR " %s",
462 server_str ? server_str : "",
463 server_str ? " " : "",
464 nta_agent_version(agent));
465
466 if (contact)
467 nes->nes_eventity_uri = sip_contact_dup(home, contact);
468 else if (contact_str)
469 nes->nes_eventity_uri = sip_contact_make(home, contact_str);
470 else
471 nes->nes_eventity_uri = sip_contact_create(home, (url_string_t *)url, NULL);
472
473 if (leg != NONE) {
474 nes->nes_leg = leg;
475 if (leg != NULL)
476 nta_leg_bind(leg, nea_server_callback, (nea_sub_t*)nes);
477 } else {
478 nes->nes_leg = nta_leg_tcreate(agent,
479 nea_server_callback,
480 (nea_sub_t*)nes,
481 NTATAG_NO_DIALOG(1),
482 NTATAG_METHOD("SUBSCRIBE"),
483 URLTAG_URL(url),
484 TAG_END());
485 }
486
487 nes->nes_eventlist = eventlist; /* Every event is a list */
488 if (eventlist && rq == NULL && rq_str == NULL)
489 rq_str = "eventlist";
490
491 if (rq)
492 nes->nes_require = sip_require_dup(nes->nes_home, rq);
493 else if (rq_str)
494 nes->nes_require = sip_require_make(nes->nes_home, rq_str);
495
496 nes->nes_timer = su_timer_create(su_root_task(nes->nes_root),
497 nes->nes_min_throttle
498 ? 500L * nes->nes_min_throttle
499 : 500L);
500
501 if (nes->nes_allow_events &&
502 nes->nes_eventity_uri &&
503 (nes->nes_leg || leg == NULL) &&
504 nes->nes_timer) {
505 SU_DEBUG_5(("nea_server_create(%p): success\n", (void *)nes));
506 su_timer_set(nes->nes_timer, nes_event_timer, nes);
507
508 nes->nes_callback = callback;
509 nes->nes_context = context;
510 }
511 else {
512 SU_DEBUG_5(("nea_server_create(%p): failed\n", (void *)nes));
513 nea_server_destroy(nes), nes = NULL;
514 }
515 }
516
517 return nes;
518 }
519
520 /** Invoke the new event callback.
521 *
522 * The function nes_event_callback() calls the callback provided by the
523 * application using the notifier object.
524 *
525 * @param nes pointer to notifier object
526 * @param ev pointer to event view
527 * @param s pointer to subscription object
528 * @param sip pointer to subscribe request
529 *
530 * @return
531 * The function nes_event_callback() returns -1 if the notifier object
532 * has been destroyed by the callback function, 0 otherwise.
533 */
534 static
nes_new_event_callback(nea_server_t * nes,nea_event_t ** ev_p,nea_event_view_t ** view_p,nta_incoming_t * irq,sip_t const * sip)535 int nes_new_event_callback(nea_server_t *nes,
536 nea_event_t **ev_p,
537 nea_event_view_t **view_p,
538 nta_incoming_t *irq,
539 sip_t const *sip)
540 {
541 if (nes->nes_callback)
542 return nes->nes_callback(nes->nes_context, nes, ev_p, view_p, irq, sip);
543 else
544 return -1;
545 }
546
547 /** Shutdown event server.
548 */
nea_server_shutdown(nea_server_t * nes,int retry_after)549 int nea_server_shutdown(nea_server_t *nes,
550 int retry_after)
551 {
552 nea_sub_t *s;
553 // int status = 200;
554 int in_callback;
555
556 if (nes == NULL)
557 return 500;
558
559 if (nes->nes_in_callback) {
560 SU_DEBUG_5(("nea_server_shutdown(%p) while in callback\n", (void *)nes));
561 return 100;
562 }
563
564 SU_DEBUG_5(("nea_server_shutdown(%p)\n", (void *)nes));
565
566 in_callback = nes->nes_in_callback; nes->nes_in_callback = 1;
567
568 for (s = nes->nes_subscribers; s; s = s->s_next) {
569 if (s->s_state == nea_terminated)
570 continue;
571 if (s->s_pending_flush)
572 continue;
573 if (s->s_oreq == NULL)
574 nea_sub_auth(s, nea_terminated,
575 TAG_IF(retry_after, NEATAG_REASON("probation")),
576 TAG_IF(!retry_after, NEATAG_REASON("deactivated")),
577 TAG_IF(retry_after, NEATAG_RETRY_AFTER(retry_after)),
578 TAG_END());
579 //else
580 //status = 180;
581 }
582
583 nes->nes_in_callback = in_callback;
584
585 return 200;
586 }
587
nea_server_destroy(nea_server_t * nes)588 void nea_server_destroy(nea_server_t *nes)
589 {
590 if (nes == NULL)
591 return;
592
593 if (nes->nes_in_callback) {
594 SU_DEBUG_5(("nea_server_destroy(%p) while in callback\n", (void *)nes));
595 nes->nes_pending_destroy = 1;
596 return;
597 }
598
599 SU_DEBUG_5(("nea_server_destroy(%p)\n", (void *)nes));
600
601 nta_leg_destroy(nes->nes_leg), nes->nes_leg = NULL;
602
603 while (nes->nes_subscribers)
604 nea_sub_destroy(nes->nes_subscribers);
605
606 su_timer_destroy(nes->nes_timer), nes->nes_timer = NULL;
607
608 su_home_unref(nes->nes_home);
609 }
610
611 /* ----------------------------------------------------------------- */
612
613 /**Update server payload.
614 *
615 * A nea event server has typed content that is delivered to the
616 * subscribers. Different content types are each assigned a separate primary
617 * view. There can be also primary views with "fake" content, content
618 * delivered to politely blocked subscribers.
619 *
620 * In addition to primary views, there can be secondary views, views
621 * assigned to a single subscriber only.
622 *
623 * @TAGS
624 * The following tagged arguments are accepted:
625 * <dl>
626 *
627 * <dt>SIPTAG_PAYLOAD() or SIPTAG_PAYLOAD_STR()
628 * <dd>Updated event content.
629 *
630 * <dt>SIPTAG_CONTENT_TYPE() or SIPTAG_CONTENT_TYPE_STR().
631 * <dd>MIME type of the content.
632 *
633 * <dt>NEATAG_FAKE(fak)
634 * <dd>If @a fake is true, 'fake' view is updated.
635 *
636 * <dt>NEATAG_VIEW(view)
637 * <dd>If included in tagged arguments, @a view is * updated. Used when
638 * updating secondary view.
639 *
640 * <dt>NEATAG_VERSION(version)
641 * <dd>The application-provided @a version for
642 * event content. After updated content has been sent to subscriber, @a
643 * version is copied to subscriber information structure.
644 *
645 * <dt>NEATAG_EVMAGIC(context)
646 * <dd>Application-provided @a context pointer.
647 * The @a context pointer is returned by nea_view_magic() function.
648 *
649 * <dt>NEATAG_RELIABLE(reliable)
650 * <dd>The @a reliable flag determines how overlapping updates are handled.
651 * If @a reliable is true, all updates are delivered to the subscribers.
652 *
653 * <dt>NEATAG_THROTTLE(throttl)
654 * <dd>Default value for event throttle for updated event view. Throttle
655 * determines the minimum interval in seconds betweeen notifications. Note
656 * that the notification indicating that the subscription has terminated
657 * will be sent regardless of throttle.
658 *
659 * The default throttle value is used if the subscriber does not include
660 * a throttle parameter in @ref sip_event "Event" header of SUBSCRIBE request.
661 *
662 * <dt>NEATAG_MINTHROTTLE()
663 * <dd>Minimum allowed throttle value for updated event view.
664 *
665 * </dl>
666 *
667 * @retval -1 upon an error.
668 * @retval 0 if event document was not updated.
669 * @retval 1 if event document was updated.
670 */
nea_server_update(nea_server_t * nes,nea_event_t * ev,tag_type_t tag,tag_value_t value,...)671 int nea_server_update(nea_server_t *nes,
672 nea_event_t *ev,
673 tag_type_t tag,
674 tag_value_t value,
675 ...)
676 {
677 nea_event_view_t *evv = NULL;
678 int fake = 0, updated;
679
680 ta_list ta;
681
682 if (ev == NULL)
683 ev = nes->nes_events;
684
685 ta_start(ta, tag, value);
686
687 tl_gets(ta_args(ta),
688 NEATAG_FAKE_REF(fake),
689 NEATAG_VIEW_REF(evv),
690 TAG_NULL());
691
692 updated = nea_view_update(nes, ev, &evv, 0, fake, ta_tags(ta));
693
694 ta_end(ta);
695
696 return updated;
697 }
698
699 static
nea_view_update(nea_server_t * nes,nea_event_t * ev,nea_event_view_t ** evvp,int private,int fake,tag_type_t tag,tag_value_t value,...)700 int nea_view_update(nea_server_t *nes,
701 nea_event_t *ev,
702 nea_event_view_t **evvp,
703 int private,
704 int fake,
705 tag_type_t tag,
706 tag_value_t value,
707 ...)
708 {
709 ta_list ta;
710
711 su_home_t *home = nes->nes_home;
712
713 sip_content_type_t const *ct = NULL;
714 char const *cts = NULL, *pls = NULL;
715 sip_payload_t const *pl = NULL;
716 sip_payload_t *new_pl;
717 nea_event_view_t *evv, **eevv = &evv;
718 nea_event_view_t *primary = NULL, **primary_p = &primary;
719 unsigned version = UINT_MAX;
720 nea_evmagic_t *evmagic = NULL;
721 int reliable = ev->ev_reliable;
722 unsigned throttle = ev->ev_throttle;
723 unsigned min_throttle = ev->ev_min_throttle;
724
725 nea_event_queue_t evq[1] = {{ NULL }};
726
727 ta_start(ta, tag, value);
728
729 tl_gets(ta_args(ta),
730 SIPTAG_CONTENT_TYPE_REF(ct),
731 SIPTAG_CONTENT_TYPE_STR_REF(cts),
732 SIPTAG_PAYLOAD_REF(pl),
733 SIPTAG_PAYLOAD_STR_REF(pls),
734 NEATAG_VERSION_REF(version),
735 NEATAG_EVMAGIC_REF(evmagic),
736 NEATAG_RELIABLE_REF(reliable),
737 NEATAG_THROTTLE_REF(throttle),
738 NEATAG_MINTHROTTLE_REF(min_throttle),
739 TAG_NULL());
740
741 ta_end(ta);
742
743 if (min_throttle < throttle)
744 min_throttle = throttle;
745
746 if (ct == NULL && cts == NULL)
747 return -1;
748
749 if (ct)
750 cts = ct->c_type;
751
752 evv = *evvp;
753
754 if (!evv) {
755 int i;
756
757 /* Check if the payload type already exists */
758 for (i = 0; (evv = ev->ev_views[i]); i++)
759 if (su_casematch(cts, evv->evv_content_type->c_type))
760 break;
761
762 if (private && evv == NULL) /* No private view without primary view. */
763 return -1;
764
765 if (i == NEA_VIEW_MAX) /* Too many primary views. */
766 return -1;
767
768 primary_p = eevv = ev->ev_views + i;
769
770 /* Search for fakeness/eventlist/private view */
771 if (evv && (private || evv->evv_private || evv->evv_fake != (unsigned)fake)) {
772 for (eevv = &evv->evv_next; (evv = *eevv); eevv = &evv->evv_next) {
773 if (private || evv->evv_private)
774 continue;
775 if (evv->evv_fake == (unsigned)fake)
776 break;
777 }
778 }
779 }
780
781 /* New event view, allocate and link to chain */
782 if (!evv) {
783 sip_content_type_t *new_ct;
784
785 evv = su_zalloc(home, sizeof (*evv));
786 if (!evv)
787 return -1;
788
789 new_pl = pl ? sip_payload_dup(home, pl)
790 : sip_payload_make(home, pls);
791
792 new_ct = ct ? sip_content_type_dup(home, ct)
793 : sip_content_type_make(home, cts);
794
795 if ((!new_pl && pl) || !new_ct) {
796 su_free(home, evv); su_free(home, new_pl);
797 return -1;
798 }
799
800 *evvp = *eevv = evv;
801
802 evv->evv_primary = *primary_p;
803 evv->evv_private = private != 0;
804 evv->evv_fake = fake != 0;
805 evv->evv_reliable = reliable != 0;
806 evv->evv_magic = evmagic;
807 evv->evv_content_type = new_ct;
808 evv->evv_payload = new_pl;
809 evv->evv_throttle = throttle;
810 evv->evv_min_throttle = min_throttle;
811
812 assert(evv->evv_content_type);
813 }
814 else {
815 if (pl &&
816 evv->evv_payload &&
817 evv->evv_payload->pl_len == pl->pl_len &&
818 memcmp(evv->evv_payload->pl_data, pl->pl_data, pl->pl_len) == 0)
819 return 0;
820 if (!pl && pls && evv->evv_payload &&
821 evv->evv_payload->pl_len == strlen(pls) &&
822 memcmp(evv->evv_payload->pl_data, pls, evv->evv_payload->pl_len) == 0)
823 return 0;
824 if (!pl && !pls && !evv->evv_payload)
825 return 0;
826
827 *evq = *evv->evv_head;
828
829 new_pl = pl ? sip_payload_dup(home, pl) : sip_payload_make(home, pls);
830
831 if (!new_pl && (pl || pls))
832 return -1;
833
834 evv->evv_payload = new_pl;
835 }
836
837 if (version != UINT_MAX)
838 evv->evv_version = version;
839
840 if (!fake)
841 evv->evv_updated = ++ev->ev_updated;
842
843 if (evq->evq_content_type)
844 nea_view_queue(nes, evv, evq);
845
846 SU_DEBUG_7(("nea_server_update(%p): %s (%s)\n", (void *)nes,
847 ev->ev_event->o_type, (evv->evv_content_type ? evv->evv_content_type->c_type : "N/A")));
848
849 return 1;
850 }
851
nea_view_create(nea_server_t * nes,nea_event_t * ev,nea_evmagic_t * magic,tag_type_t tag,tag_value_t value,...)852 nea_event_view_t *nea_view_create(nea_server_t *nes,
853 nea_event_t *ev,
854 nea_evmagic_t *magic,
855 tag_type_t tag,
856 tag_value_t value,
857 ...)
858 {
859 nea_event_view_t *evv = NULL;
860 ta_list ta;
861
862 if (ev == NULL)
863 return NULL;
864
865 ta_start(ta, tag, value);
866
867 nea_view_update(nes, ev, &evv, 1, 0, ta_tags(ta));
868
869 ta_end(ta);
870
871 return evv;
872 }
873
nea_view_destroy(nea_server_t * nes,nea_event_view_t * evv)874 void nea_view_destroy(nea_server_t *nes, nea_event_view_t *evv)
875 {
876 nea_event_view_t **evvp;
877 nea_sub_t *s;
878
879 if (nes == NULL || evv == NULL || !evv->evv_private)
880 return;
881
882 assert(evv->evv_primary && evv != evv->evv_primary);
883
884 for (evvp = &evv->evv_primary->evv_next; *evvp; evvp = &(*evvp)->evv_next)
885 if (*evvp == evv) {
886 *evvp = evv->evv_next;
887 break;
888 }
889
890 for (s = nes->nes_subscribers; s; s = s->s_next)
891 if (s->s_view == evv)
892 nea_sub_assign_view(s, evv->evv_primary);
893
894 su_free(nes->nes_home, evv->evv_content_type);
895 su_free(nes->nes_home, evv->evv_payload);
896 su_free(nes->nes_home, evv);
897 }
898
nea_view_magic(nea_event_view_t const * evv)899 nea_evmagic_t *nea_view_magic(nea_event_view_t const *evv)
900 {
901 return evv ? evv->evv_magic : NULL;
902 }
903
nea_view_set_magic(nea_event_view_t * evv,nea_evmagic_t * magic)904 void nea_view_set_magic(nea_event_view_t *evv, nea_evmagic_t *magic)
905 {
906 if (evv)
907 evv->evv_magic = magic;
908 }
909
nea_view_version(nea_event_view_t const * evv)910 unsigned nea_view_version(nea_event_view_t const *evv)
911 {
912 return evv ? evv->evv_version : 0;
913 }
914
915 /** Get primary, non-fake event view for given content type */
nea_event_view(nea_event_t * ev,char const * content_type)916 nea_event_view_t *nea_event_view(nea_event_t *ev, char const *content_type)
917 {
918 int i;
919 nea_event_view_t *evv;
920
921 /* Check if the payload type already exists */
922 for (i = 0; ev->ev_views[i]; i++)
923 if (su_casematch(content_type, ev->ev_views[i]->evv_content_type->c_type))
924 break;
925
926 for (evv = ev->ev_views[i]; evv; evv = evv->evv_next)
927 if (!evv->evv_fake)
928 return evv;
929
930 return ev->ev_views[i];
931 }
932
933 /** Get the content type for event view */
nea_view_content_type(nea_event_view_t const * evv)934 sip_content_type_t const *nea_view_content_type(nea_event_view_t const *evv)
935 {
936 return evv ? evv->evv_content_type : NULL;
937 }
938
939
940 /** Queue an old notification if needed. */
941 static
nea_view_queue(nea_server_t * nes,nea_event_view_t * evv,nea_event_queue_t * evq)942 int nea_view_queue(nea_server_t *nes,
943 nea_event_view_t *evv,
944 nea_event_queue_t *evq)
945 {
946 nea_sub_t *s = NULL;
947
948 assert(nes && evv && evq);
949
950 if (evv->evv_reliable)
951 for (s = nes->nes_subscribers; s; s = s->s_next) {
952 if (s->s_view != evv)
953 continue;
954 if (s->s_updated > evq->evq_updated)
955 continue;
956 if (s->s_updated == evq->evq_updated && s->s_oreq == NULL)
957 continue;
958 break; /* This */
959 }
960
961 if (s) {
962 nea_event_queue_t *evq0 = su_alloc(nes->nes_home, sizeof *evq);
963
964 if (evq0 == NULL)
965 return -1;
966
967 *evq0 = *evq, evq = evq0;
968
969 /* evq should be copy of old head but with changed payload */
970 assert(evq->evq_next == evv->evv_head->evq_next);
971
972 evv->evv_head->evq_next = evq; /* insert to the queue */
973
974 return 0;
975 }
976
977 su_free(nes->nes_home, (void *)evq->evq_payload);
978
979 return 0;
980 }
981
982 /** Remove old unneeded notifications. */
983 static
nea_view_dequeue(nea_server_t * nes,nea_event_t * ev)984 int nea_view_dequeue(nea_server_t *nes,
985 nea_event_t *ev)
986 {
987 int i;
988 nea_event_view_t *evv;
989 nea_event_queue_t **prev, *evq;;
990
991 assert(nes && ev);
992
993 for (i = 0; ev->ev_views[i]; i++) {
994 for (evv = ev->ev_views[i]; evv; evv = evv->evv_next) {
995 if (!evv->evv_reliable)
996 continue;
997
998 for (prev = &evv->evv_head->evq_next; *prev; prev = &(*prev)->evq_next)
999 if (ev->ev_throttling >= (*prev)->evq_updated)
1000 break;
1001
1002 /* Free from evq onwards */
1003 for (evq = *prev; evq; evq = *prev) {
1004 *prev = evq->evq_next;
1005 su_free(nes->nes_home, evq->evq_payload);
1006 su_free(nes->nes_home, evq);
1007 }
1008 }
1009 }
1010
1011 return 0;
1012 }
1013
1014 /* ----------------------------------------------------------------- */
1015
1016 /** Notify watchers.
1017 *
1018 * @return
1019 * The function nea_server_notify() returns number of subscribers that the
1020 * notification could be sent, or -1 upon an error.
1021 */
nea_server_notify(nea_server_t * nes,nea_event_t * ev)1022 int nea_server_notify(nea_server_t *nes, nea_event_t *ev)
1023 {
1024 sip_time_t now = sip_now();
1025 nea_sub_t *s;
1026 int notified = 0, throttled = nes->nes_throttled;
1027
1028 SU_DEBUG_7(("nea_server_notify(%p): %s\n", (void *)nes,
1029 ev ? ev->ev_event->o_type: ""));
1030
1031 ++nes->nes_in_list;
1032
1033 nes->nes_throttled = 0;
1034
1035 if (ev == NULL)
1036 for (ev = nes->nes_events; ev; ev = ev->ev_next)
1037 ev->ev_throttling = UINT_MAX;
1038 else
1039 ev->ev_throttling = UINT_MAX;
1040
1041 for (s = nes->nes_subscribers; s; s = s->s_next) {
1042 if ((ev == NULL || ev == s->s_event) && s->s_state != nea_terminated) {
1043 notified += nea_sub_notify(nes, s, now, TAG_END());
1044 }
1045 }
1046
1047 if (throttled) {
1048 /* Dequeue throttled updates */
1049 if (ev == NULL)
1050 for (ev = nes->nes_events; ev; ev = ev->ev_next) {
1051 nea_view_dequeue(nes, ev);
1052 SU_DEBUG_3(("nea_server(): notified %u, throttling at %u\n",
1053 notified, ev->ev_throttling));
1054 }
1055 else {
1056 SU_DEBUG_3(("nea_server(): notified %u, throttling at %u\n",
1057 notified, ev->ev_throttling));
1058 nea_view_dequeue(nes, ev);
1059 }
1060 }
1061
1062 if (--nes->nes_in_list == 0 && nes->nes_pending_flush)
1063 nea_server_pending_flush(nes);
1064
1065 return notified;
1066 }
1067
1068
1069 /* ----------------------------------------------------------------- */
nea_server_flush(nea_server_t * nes,nea_event_t * event)1070 void nea_server_flush(nea_server_t *nes, nea_event_t *event)
1071 {
1072 nea_sub_t *s, **ss;
1073 sip_time_t now;
1074
1075 if (nes == NULL)
1076 return;
1077
1078 now = sip_now();
1079
1080 for (ss = &nes->nes_subscribers; (s = *ss);) {
1081 if ((event == NULL || s->s_event == event) &&
1082 (s->s_state == nea_terminated || s->s_expires < now)) {
1083 /** On first flush, mark as garbage, remove on second flush */
1084 if (!s->s_garbage)
1085 s->s_garbage = 1;
1086 else if (nes->nes_in_callback || nes->nes_in_list) {
1087 nes->nes_pending_flush = 1;
1088 (*ss)->s_pending_flush = 1;
1089 }
1090 else {
1091 nea_sub_destroy(*ss);
1092 continue;
1093 }
1094 }
1095 ss = &((*ss)->s_next);
1096 }
1097 }
1098
1099
1100 /* ----------------------------------------------------------------- */
1101 static
nea_server_pending_flush(nea_server_t * nes)1102 void nea_server_pending_flush(nea_server_t *nes)
1103 {
1104 nea_sub_t **ss;
1105
1106 for (ss = &nes->nes_subscribers; *ss;) {
1107 if ((*ss)->s_pending_flush && !(*ss)->s_processing) {
1108 nea_sub_destroy(*ss);
1109 } else {
1110 ss = &((*ss)->s_next);
1111 }
1112 }
1113
1114 nes->nes_pending_flush = 0;
1115 }
1116
1117 /* ----------------------------------------------------------------- */
nea_sub_create(nea_server_t * nes)1118 nea_sub_t *nea_sub_create(nea_server_t *nes)
1119 {
1120 nea_sub_t *s;
1121
1122 assert(nes);
1123
1124 s = su_zalloc(nes->nes_home, sizeof (*s));
1125
1126 if (s) {
1127 s->s_nes = nes;
1128 if ((s->s_next = nes->nes_subscribers))
1129 s->s_next->s_prev = &s->s_next;
1130 s->s_prev = &nes->nes_subscribers;
1131 nes->nes_subscribers = s;
1132
1133 /* Copy default values */
1134 s->s_throttle = nes->nes_throttle;
1135 }
1136
1137 return s;
1138 }
1139
1140 /* ----------------------------------------------------------------- */
nea_subnode_get_incoming(nea_subnode_t * sn)1141 nta_incoming_t *nea_subnode_get_incoming(nea_subnode_t *sn)
1142 {
1143 assert(sn);
1144
1145 if (sn->sn_subscriber) {
1146 return sn->sn_subscriber->s_irq;
1147 }
1148 return NULL;
1149 }
1150
1151 /* ----------------------------------------------------------------- */
nea_sub_remove(nea_sub_t * s)1152 void nea_sub_remove(nea_sub_t *s)
1153 {
1154 if (s) {
1155 assert(s->s_prev);
1156
1157 if ((*s->s_prev = s->s_next))
1158 s->s_next->s_prev = s->s_prev;
1159
1160 s->s_prev = NULL;
1161 s->s_next = NULL;
1162 }
1163 }
1164
1165 /* ----------------------------------------------------------------- */
1166 /**Check if subscriber has been removed from list */
nea_sub_is_removed(nea_sub_t const * s)1167 static int nea_sub_is_removed(nea_sub_t const *s)
1168 {
1169 return s->s_prev == NULL;
1170 }
1171
1172 /* ----------------------------------------------------------------- */
nea_sub_destroy(nea_sub_t * s)1173 void nea_sub_destroy(nea_sub_t *s)
1174 {
1175 if (s) {
1176 nea_sub_t *del = s;
1177 su_home_t *home = del->s_nes->nes_home;
1178
1179 if (!nea_sub_is_removed(del))
1180 nea_sub_remove(del);
1181
1182 del->s_event = NULL;
1183
1184 su_free(home, del->s_local), del->s_local = NULL;
1185 su_free(home, del->s_remote), del->s_remote = NULL;
1186
1187 if (del->s_oreq)
1188 nta_outgoing_destroy(del->s_oreq), del->s_oreq = NULL;
1189 if (del->s_leg)
1190 nta_leg_destroy(del->s_leg), del->s_leg = NULL;
1191 if (del->s_from)
1192 su_free(home, del->s_from), del->s_from = NULL;
1193
1194 su_free(home, del);
1195 }
1196 }
1197
1198 /** Create a new event.
1199 *
1200 * The function nea_event_create() creates a new event for the event server.
1201 */
nea_event_create(nea_server_t * nes,nea_watcher_f * callback,nea_emagic_t * context,char const * name,char const * subname,char const * default_content_type,char const * accept)1202 nea_event_t *nea_event_create(nea_server_t *nes,
1203 nea_watcher_f *callback,
1204 nea_emagic_t *context,
1205 char const *name,
1206 char const *subname,
1207 char const *default_content_type,
1208 char const *accept)
1209 {
1210 return nea_event_tcreate(nes, callback, context,
1211 name, subname,
1212 SIPTAG_CONTENT_TYPE_STR(default_content_type),
1213 SIPTAG_ACCEPT_STR(accept),
1214 TAG_END());
1215 }
1216
1217 /** Create a new event (or subevent) with tags */
nea_event_tcreate(nea_server_t * nes,nea_watcher_f * callback,nea_emagic_t * context,char const * name,char const * subname,tag_type_t tag,tag_value_t value,...)1218 nea_event_t *nea_event_tcreate(nea_server_t *nes,
1219 nea_watcher_f *callback,
1220 nea_emagic_t *context,
1221 char const *name,
1222 char const *subname,
1223 tag_type_t tag, tag_value_t value, ...)
1224 {
1225 nea_event_t *ev, **pev;
1226 size_t len = strlen(name);
1227 ta_list ta;
1228
1229 if (nes == NULL || callback == NULL || name == NULL)
1230 return NULL;
1231
1232 /* Find a matching event */
1233 if (subname == NULL) {
1234 for (pev = &nes->nes_events; (ev = *pev); pev = &(*pev)->ev_next) {
1235 if (strcmp(ev->ev_event->o_type, name) != 0)
1236 continue;
1237 SU_DEBUG_5(("nea_event_create(): already event %s\n", name));
1238 return NULL;
1239 }
1240 }
1241 else {
1242 for (pev = &nes->nes_events; (ev = *pev); pev = &(*pev)->ev_next) {
1243 if (strncmp(ev->ev_event->o_type, name, len) != 0 ||
1244 ev->ev_event->o_type[len] != '.' ||
1245 strcmp(subname, ev->ev_event->o_type + len + 1) != 0)
1246 continue;
1247 SU_DEBUG_5(("nea_event_create(): already event %s.%s\n", name, subname));
1248 return NULL;
1249 }
1250 }
1251
1252 ta_start(ta, tag, value);
1253
1254 ev = su_zalloc(nes->nes_home, sizeof (*ev));
1255
1256 if (ev) {
1257 int reliable = 0;
1258 sip_content_type_t const *ct = NULL;
1259 sip_accept_t const *ac = NULL;
1260 sip_supported_t const *k = NULL;
1261 sip_require_t const *rq = NULL;
1262 char const *ct_str = NULL, *ac_str = NULL, *k_str = NULL, *rq_str = NULL;
1263
1264 unsigned throttle = nes->nes_throttle, min_throttle = nes->nes_min_throttle;
1265 int eventlist = nes->nes_eventlist;
1266
1267 tl_gets(ta_args(ta),
1268 NEATAG_RELIABLE_REF(reliable),
1269 NEATAG_THROTTLE_REF(throttle),
1270 NEATAG_MINTHROTTLE_REF(min_throttle),
1271 NEATAG_EVENTLIST_REF(eventlist),
1272 SIPTAG_CONTENT_TYPE_REF(ct),
1273 SIPTAG_CONTENT_TYPE_STR_REF(ct_str),
1274 SIPTAG_ACCEPT_REF(ac),
1275 SIPTAG_ACCEPT_STR_REF(ac_str),
1276 SIPTAG_SUPPORTED_REF(k),
1277 SIPTAG_SUPPORTED_STR_REF(k_str),
1278 SIPTAG_REQUIRE_REF(rq),
1279 SIPTAG_REQUIRE_STR_REF(rq_str),
1280 TAG_END());
1281
1282 ev->ev_callback = callback;
1283 ev->ev_magic = context;
1284 ev->ev_event = sip_event_format(nes->nes_home, "%s%s%s",
1285 name,
1286 subname ? "." : "",
1287 subname ? subname : "");
1288
1289 ev->ev_reliable = reliable != 0;
1290 ev->ev_throttle = throttle;
1291 ev->ev_min_throttle = min_throttle;
1292 ev->ev_eventlist = eventlist;
1293
1294 if (eventlist && rq == NULL && rq_str == NULL)
1295 rq_str = "eventlist";
1296
1297 if (rq)
1298 ev->ev_require = sip_require_dup(nes->nes_home, rq);
1299 else if (rq_str)
1300 ev->ev_require = sip_require_make(nes->nes_home, rq_str);
1301
1302 if (ev->ev_event) {
1303 #define sip_allow_events_find(k, i) sip_params_find(k->k_items, i)
1304 if (!sip_allow_events_find(nes->nes_allow_events,
1305 ev->ev_event->o_type))
1306 sip_allow_events_add(nes->nes_home, nes->nes_allow_events,
1307 ev->ev_event->o_type);
1308 }
1309
1310 if (ct)
1311 ev->ev_default = sip_accept_make(nes->nes_home, ct->c_type);
1312 else
1313 ev->ev_default = sip_accept_make(nes->nes_home, ct_str);
1314
1315 if (ac == NULL && ac_str == NULL)
1316 ac_str = ct ? ct->c_type : ct_str;
1317
1318 if (ac)
1319 ev->ev_accept = sip_accept_dup(nes->nes_home, ac);
1320 else
1321 ev->ev_accept = sip_accept_make(nes->nes_home, ac_str ? ac_str : "");
1322
1323 if (k)
1324 ev->ev_supported = sip_supported_dup(nes->nes_home, k);
1325 else if (k_str)
1326 ev->ev_supported = sip_supported_make(nes->nes_home, k_str);
1327
1328 ev->ev_prev = pev;
1329 *pev = ev;
1330 }
1331
1332 ta_end(ta);
1333
1334 return ev;
1335 }
1336
1337
1338 /* ----------------------------------------------------------------- */
1339 /** Return magic context bound to nea_event.
1340 *
1341 * The function returns the magic context bound to the event.
1342 *
1343 * @param ev pointer to event object
1344 *
1345 * @return
1346 * The function nea_emagic_get() returns the magic context
1347 * bound to the event.
1348 */
nea_emagic_get(nea_event_t * ev)1349 nea_emagic_t *nea_emagic_get(nea_event_t *ev)
1350 {
1351 assert(ev);
1352
1353 return ev->ev_magic;
1354 }
1355
1356
1357 /* ----------------------------------------------------------------- */
1358 /** Get named event */
nea_event_get(nea_server_t const * nes,char const * e)1359 nea_event_t *nea_event_get(nea_server_t const *nes, char const *e)
1360 {
1361 nea_event_t *ev = NULL;
1362
1363 for (ev = nes->nes_events; ev; ev = ev->ev_next)
1364 if (e == NULL || strcmp(ev->ev_event->o_type, e) == 0)
1365 break;
1366
1367 return ev;
1368 }
1369
1370 /* ----------------------------------------------------------------- */
nea_sub_get_request(nea_sub_t * sub)1371 nta_incoming_t *nea_sub_get_request(nea_sub_t *sub)
1372 {
1373 assert(sub);
1374
1375 return sub->s_irq;
1376 }
1377
1378 /** Invoke the event callback.
1379 *
1380 * The function nes_watcher_callback() calls the callback provided by the
1381 * application using the notifier object.
1382 *
1383 * @param nes pointer to notifier object
1384 * @param ev pointer to event view
1385 * @param s pointer to subscription object
1386 * @param sip pointer to subscribe request
1387 *
1388 * @return
1389 * The function nes_watcher_callback() returns -1 if the notifier object
1390 * has been destroyed by the callback function, 0 otherwise.
1391 */
1392 static
nes_watcher_callback(nea_server_t * nes,nea_event_t * ev,nea_sub_t * s,sip_t const * sip,sip_time_t now)1393 int nes_watcher_callback(nea_server_t *nes,
1394 nea_event_t *ev,
1395 nea_sub_t *s,
1396 sip_t const *sip,
1397 sip_time_t now)
1398 {
1399 if (!nes->nes_in_callback) {
1400 nes->nes_in_callback = 1;
1401 if (ev->ev_callback && !s->s_reported) {
1402 nea_subnode_t sn[1];
1403
1404 nea_subnode_init(sn, s, now);
1405
1406 if (sn->sn_expires == 0 || sn->sn_state == nea_terminated)
1407 s->s_reported = 1;
1408
1409 ev->ev_callback(nes, ev->ev_magic, ev, sn, sip);
1410 }
1411 nes->nes_in_callback = 0;
1412
1413 if (nes->nes_in_list)
1414 return 0;
1415
1416 if (nes->nes_pending_destroy) {
1417 nea_server_destroy(nes);
1418 return -2;
1419 }
1420
1421 if (sip == NULL && nes->nes_pending_flush) {
1422 int flushed = s->s_pending_flush;
1423 nea_server_pending_flush(nes);
1424 if (flushed)
1425 return -1;
1426 }
1427 }
1428
1429 return 0;
1430 }
1431
1432 /* ----------------------------------------------------------------- */
1433
1434 #if 0
1435 /** Process incoming SUBSCRIBE message.
1436 *
1437 * The function nea_server_add() is called when the notifier receives a
1438 * SUBSCRIBE request without existing event dialog.
1439 *
1440 * @param nes pointer to notifier
1441 * @param local_target optional contact header
1442 * @param msg pointer to request message
1443 * @param sip pointer to SIP view to request message
1444 *
1445 * @return
1446 * The function nea_server_add() returns 0 if successful, -1 upon an
1447 * error.
1448 *
1449 */
1450 int nea_server_add(nea_server_t *nes,
1451 sip_contact_t const *local_target,
1452 msg_t *msg, sip_t *sip)
1453 {
1454 su_home_t *home = nes->nes_home;
1455 nea_sub_t *s = NULL;
1456 url_t target[1];
1457
1458 s = nea_sub_create(nes);
1459
1460 s->s_from = sip_from_dup(home, sip->sip_from);
1461
1462 if (local_target == NULL)
1463 local_target = nes->nes_eventity_uri;
1464
1465 s->s_local = sip_contact_dup(nes->nes_home, local_target);
1466
1467 *target = *local_target->m_url;
1468
1469 s->s_leg = nta_leg_tcreate(nes->nes_agent, nea_sub_process_incoming, s,
1470 SIPTAG_CALL_ID(sip->sip_call_id),
1471 SIPTAG_FROM(sip->sip_to), /* local address */
1472 SIPTAG_TO(sip->sip_from), /* remote address */
1473 URLTAG_URL(target),
1474 TAG_END());
1475
1476 if (s->s_local && s->s_leg) {
1477 nta_leg_tag(s->s_leg, NULL);
1478 return 0;
1479 }
1480 else {
1481 nea_sub_destroy(s);
1482 return -1;
1483 }
1484 }
1485 #endif
1486
1487 static
nea_server_callback(nea_sub_t * nes_as_sub,nta_leg_t * leg,nta_incoming_t * irq,sip_t const * sip)1488 int nea_server_callback(nea_sub_t *nes_as_sub,
1489 nta_leg_t *leg,
1490 nta_incoming_t *irq,
1491 sip_t const *sip)
1492 {
1493 return nea_server_add_irq((nea_server_t *)nes_as_sub, leg, NULL, irq, sip);
1494 }
1495
1496 /** Process incoming request */
nea_server_add_irq(nea_server_t * nes,nta_leg_t * leg,sip_contact_t const * local_target,nta_incoming_t * irq,sip_t const * sip)1497 int nea_server_add_irq(nea_server_t *nes,
1498 nta_leg_t *leg,
1499 sip_contact_t const *local_target,
1500 nta_incoming_t *irq,
1501 sip_t const *sip)
1502 {
1503 nea_sub_t *s = nea_sub_create(nes);
1504 if (s == NULL)
1505 return 500;
1506
1507 s->s_from = sip_from_dup(nes->nes_home, sip->sip_from);
1508
1509 if (local_target == NULL)
1510 local_target = nes->nes_eventity_uri;
1511
1512 s->s_local = sip_contact_dup(nes->nes_home, local_target);
1513
1514 if (leg == NULL || leg == nes->nes_leg) {
1515 url_t target[1];
1516
1517 *target = *local_target->m_url;
1518
1519 s->s_leg = nta_leg_tcreate(nes->nes_agent, nea_sub_process_incoming, s,
1520 SIPTAG_FROM(sip->sip_to),
1521 SIPTAG_TO(sip->sip_from),
1522 SIPTAG_CALL_ID(sip->sip_call_id),
1523 URLTAG_URL((url_string_t *)target),
1524 TAG_NULL());
1525 }
1526 else {
1527 nta_leg_bind(s->s_leg = leg, nea_sub_process_incoming, s);
1528 }
1529
1530 if (s->s_leg) {
1531 if (sip->sip_to->a_tag == NULL) {
1532 nta_leg_tag(s->s_leg, NULL);
1533 nta_incoming_tag(irq, nta_leg_get_tag(s->s_leg));
1534 }
1535 nta_leg_server_route(s->s_leg, sip->sip_record_route, sip->sip_contact);
1536
1537 return nea_sub_process_incoming(s, s->s_leg, irq, sip);
1538 }
1539 else {
1540 nea_sub_destroy(s);
1541 return 500;
1542 }
1543 }
1544
1545
1546 /* ----------------------------------------------------------------- */
1547
1548 /**Process incoming transactions for event dialog.
1549 *
1550 * The nea_sub_process_incoming() processes the transactions for event
1551 * dialog. Currently, no other methods allowed beside SUBSCRIBE. The
1552 * SUBSCRIBE is processed by nea_sub_process_subscribe().
1553 *
1554 * @param s pointer to subscriber object
1555 * @param leg pointer to NTA dialog object
1556 * @param irq pointer to NTA server transaction
1557 * @param sip pointer to structure containing SIP headers of the request
1558 *
1559 * The nea_sub_process_incoming() returns 0 if successful, SIP error code
1560 * otherwise.
1561 */
nea_sub_process_incoming(nea_sub_t * s,nta_leg_t * leg,nta_incoming_t * irq,sip_t const * sip)1562 int nea_sub_process_incoming(nea_sub_t *s,
1563 nta_leg_t *leg,
1564 nta_incoming_t *irq,
1565 sip_t const *sip)
1566 {
1567 int retval;
1568
1569 s->s_processing = 1;
1570 s->s_irq = irq;
1571
1572 switch(sip->sip_request->rq_method) {
1573 case sip_method_subscribe:
1574 retval = nea_sub_process_subscribe(s, leg, irq, sip);
1575 break;
1576
1577 default:
1578 nta_incoming_treply(irq,
1579 retval = SIP_405_METHOD_NOT_ALLOWED,
1580 SIPTAG_ALLOW_STR("SUBSCRIBE"),
1581 TAG_END());
1582 retval = 405;
1583 }
1584
1585 s->s_processing = 0;
1586
1587 if (s->s_irq)
1588 nta_incoming_destroy(irq), s->s_irq = NULL;
1589
1590 if (s->s_pending_flush || s->s_state == nea_embryonic)
1591 nea_sub_destroy(s);
1592
1593 return retval;
1594 }
1595
1596
1597 /* ----------------------------------------------------------------- */
1598
1599 /**Process incoming SUBSCRIBE transactions for event dialog.
1600 *
1601 * The function nea_sub_process_subscribe() processes the SUBSCRIBE
1602 * transactions for (possible) event dialog.
1603 *
1604 * @param s pointer to subscriber object
1605 * @param leg pointer to NTA dialog object
1606 * @param irq pointer to NTA server transaction
1607 * @param sip pointer to structure containing SIP headers of the request
1608 *
1609 * @return
1610 * The function nea_sub_process_subscribe() returns 0 if successful, and a
1611 * SIP error code otherwise.
1612 */
nea_sub_process_subscribe(nea_sub_t * s,nta_leg_t * leg,nta_incoming_t * irq,sip_t const * sip)1613 int nea_sub_process_subscribe(nea_sub_t *s,
1614 nta_leg_t *leg,
1615 nta_incoming_t *irq,
1616 sip_t const *sip)
1617 {
1618 nea_server_t *nes = s->s_nes;
1619 su_home_t *home = nes->nes_home;
1620 nea_event_t *ev = NULL, *ev_maybe = NULL;
1621 nea_event_view_t *evv = NULL, *evv_maybe = NULL;
1622 sip_time_t delta = 0, now = sip_now();
1623 sip_expires_t expires[1] = { SIP_EXPIRES_INIT() };
1624 sip_unsupported_t *unsupported;
1625 sip_event_t const *o;
1626 sip_accept_t const *ac = NULL, *accept = NULL;
1627 sip_accept_t *a0 = NULL, *a, *a_next, **aa;
1628 sip_accept_t accept_default[1];
1629 unsigned proposed_throttle;
1630 char const *type, *throttle;
1631 int once, what, supported_eventlist, require_eventlist;
1632
1633 if (sip->sip_payload && !sip->sip_content_type) {
1634 nta_incoming_treply(irq, 400, "Missing Content-Type",
1635 SIPTAG_SERVER_STR(nes->nes_server),
1636 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1637 SIPTAG_ALLOW(nes->nes_allow_methods),
1638 TAG_NULL());
1639 return 0;
1640 }
1641
1642 if (sip->sip_expires &&
1643 sip->sip_expires->ex_delta > 0 &&
1644 sip->sip_expires->ex_delta < nes->nes_min_expires) {
1645 sip_min_expires_t me[1];
1646
1647 sip_min_expires_init(me);
1648
1649 me->me_delta = nes->nes_min_expires;
1650
1651 nta_incoming_treply(irq, 423, "Subscription Interval Too Small",
1652 SIPTAG_ACCEPT(accept),
1653 SIPTAG_MIN_EXPIRES(me),
1654 SIPTAG_SERVER_STR(nes->nes_server),
1655 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1656 SIPTAG_ALLOW(nes->nes_allow_methods),
1657 TAG_NULL());
1658 return 0;
1659 }
1660
1661 /* Check features */
1662 if (nes->nes_require) {
1663 unsupported = sip_has_unsupported2(nes->nes_home,
1664 sip->sip_supported,
1665 sip->sip_require,
1666 nes->nes_require);
1667
1668 if (unsupported) {
1669 nta_incoming_treply(irq, SIP_421_EXTENSION_REQUIRED,
1670 SIPTAG_REQUIRE(nes->nes_require),
1671 SIPTAG_UNSUPPORTED(unsupported),
1672 SIPTAG_SERVER_STR(nes->nes_server),
1673 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1674 SIPTAG_ALLOW(nes->nes_allow_methods),
1675 TAG_NULL());
1676 su_free(nes->nes_home, unsupported);
1677
1678 return 0;
1679 }
1680 }
1681
1682 supported_eventlist = sip_has_feature(sip->sip_supported, "eventlist");
1683 require_eventlist = sip_has_feature(sip->sip_require, "eventlist");
1684 supported_eventlist = supported_eventlist || require_eventlist;
1685
1686 if (s->s_id && (!sip->sip_event ||
1687 str0cmp(s->s_id->o_type, sip->sip_event->o_type) != 0 ||
1688 str0cmp(s->s_id->o_id, sip->sip_event->o_id))) {
1689 /* Multiple subscriptions per dialog are not supported. */
1690 return nta_incoming_treply(irq, 501,
1691 "Multiple subscriptions not implemented",
1692 SIPTAG_SERVER_STR(nes->nes_server),
1693 TAG_NULL());
1694 }
1695
1696 /* Check that subscriber asks for a supported event */
1697 for (once = 0; ev == NULL ;once++) {
1698 o = sip->sip_event;
1699
1700 /* Check that we have a matching event */
1701 if (o && o->o_type) {
1702 for (ev = nes->nes_events; ev; ev = ev->ev_next) {
1703 if (strcmp(o->o_type, ev->ev_event->o_type) == 0) {
1704 ev_maybe = ev;
1705
1706 if (ev->ev_eventlist) {
1707 if (supported_eventlist)
1708 break;
1709 } else {
1710 if (!supported_eventlist)
1711 break;
1712 }
1713 }
1714 }
1715 }
1716
1717 if (!ev && !require_eventlist)
1718 ev = ev_maybe;
1719
1720 if (ev || once)
1721 break;
1722
1723 /* Ask the application either to
1724 1) add a new event or assing us an event/payload (0),
1725 2) take care of transaction (positive), or
1726 3) drop request (negative).
1727 */
1728 if ((what = nes_new_event_callback(nes, &ev, &evv, irq, sip)) < 0)
1729 break;
1730 if (what > 0) {
1731 s->s_irq = NULL;
1732 return 0;
1733 }
1734 }
1735
1736 if (ev_maybe == NULL && ev == NULL) {
1737 nta_incoming_treply(irq, SIP_489_BAD_EVENT,
1738 SIPTAG_SERVER_STR(nes->nes_server),
1739 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1740 SIPTAG_ALLOW(nes->nes_allow_methods),
1741 NULL);
1742 return 0;
1743 } else if (ev == NULL) {
1744 ev = ev_maybe;
1745
1746 unsupported = sip_has_unsupported(nes->nes_home, ev->ev_supported,
1747 sip->sip_require);
1748
1749 nta_incoming_treply(irq, SIP_420_BAD_EXTENSION,
1750 SIPTAG_UNSUPPORTED(unsupported),
1751 SIPTAG_REQUIRE(ev->ev_require),
1752 SIPTAG_SUPPORTED(ev->ev_supported),
1753 SIPTAG_SERVER_STR(nes->nes_server),
1754 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1755 SIPTAG_ALLOW(nes->nes_allow_methods),
1756 TAG_NULL());
1757
1758 su_free(nes->nes_home, unsupported);
1759
1760 return 0;
1761 }
1762
1763 if (sip->sip_accept)
1764 accept = sip->sip_accept;
1765 else if (evv && evv->evv_content_type) {
1766 /* Generate accept header from event view specified by application */
1767 sip_accept_init(accept_default);
1768 accept_default->ac_type = evv->evv_content_type->c_type;
1769 accept_default->ac_subtype = evv->evv_content_type->c_subtype;
1770
1771 accept = a0;
1772 }
1773 else
1774 accept = ev->ev_default;
1775
1776 for (once = 0; evv == NULL ;once++) {
1777 /* If there are multiple accept values with different Q values,
1778 insertion sort by Q value */
1779 for (ac = accept->ac_next; ac; ac = ac->ac_next) {
1780 if (ac->ac_q != accept->ac_q) {
1781 if ((a0 = sip_accept_dup(home, accept))) {
1782 /* Sort the accept list by Q values */
1783 for (a = a0, accept = NULL; a; a = a_next) {
1784 a_next = a->ac_next;
1785
1786 for (aa = (sip_accept_t **)&accept;
1787 *aa && sip_q_value((*aa)->ac_q) >= sip_q_value(a->ac_q);
1788 aa = &(*aa)->ac_next)
1789 ;
1790
1791 a->ac_next = *aa; *aa = a; /* Insert */
1792 }
1793 }
1794
1795 break;
1796 }
1797 }
1798
1799 /* Check that subscriber asks for a supported content type */
1800 for (ac = accept; ac; ac = ac->ac_next) {
1801 int i;
1802
1803 if (ac->ac_type == NULL || ac->ac_subtype == NULL)
1804 continue;
1805
1806 /* Check all supported content types v. accept */
1807 for (i = 0; (evv = ev->ev_views[i]); i++) {
1808 assert(evv->evv_content_type && evv->evv_content_type->c_type);
1809
1810 if (strcmp(ac->ac_type, "*/*") == 0)
1811 break;
1812
1813 type = evv->evv_content_type->c_type;
1814
1815 if ((su_casematch(ac->ac_type, type)) ||
1816 (su_casematch(ac->ac_subtype, "*") &&
1817 su_casenmatch(ac->ac_type, type,
1818 ac->ac_subtype - ac->ac_type))) {
1819 if (evv_maybe == NULL)
1820 evv_maybe = evv;
1821 }
1822 }
1823
1824 if (evv) /* Found */
1825 break;
1826 }
1827
1828 /* Free the sorted Accept list */
1829 for (a = a0; a; a = a_next)
1830 a_next = a->ac_next, su_free(home, a);
1831
1832 if (!evv)
1833 evv = evv_maybe;
1834
1835 if (evv || once)
1836 break;
1837
1838 /* Ask the application either to
1839 1) add a new event view or assign us an event view (0),
1840 2) take care of transaction (positive), or
1841 3) drop request (negative).
1842 */
1843 if ((what = nes_new_event_callback(nes, &ev, &evv, irq, sip)) < 0)
1844 break;
1845 if (what > 0) {
1846 s->s_irq = NULL;
1847 return 0;
1848 }
1849 }
1850
1851 if (evv == NULL) {
1852 SU_DEBUG_3(("nea_server: event %s rejected %u %s\n",
1853 ev->ev_event->o_type, SIP_406_NOT_ACCEPTABLE));
1854
1855 /* There is no media acceptable to watcher */
1856 return nta_incoming_treply(irq, SIP_406_NOT_ACCEPTABLE,
1857 SIPTAG_ACCEPT(ev->ev_accept),
1858 SIPTAG_SERVER_STR(nes->nes_server),
1859 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1860 SIPTAG_ALLOW(nes->nes_allow_methods),
1861 TAG_NULL());
1862 }
1863
1864 /* Do not change private view */
1865 if (s->s_view && s->s_view->evv_primary == evv)
1866 evv = s->s_view;
1867
1868 /* Set throttle */
1869 if (sip->sip_event &&
1870 (throttle = sip_params_find(sip->sip_event->o_params, "throttle="))) {
1871 proposed_throttle = strtoul(throttle, NULL, 10);
1872
1873 if (proposed_throttle < evv->evv_min_throttle)
1874 proposed_throttle = evv->evv_min_throttle;
1875 } else
1876 proposed_throttle = evv->evv_throttle;
1877
1878 s->s_throttle = proposed_throttle;
1879
1880 /* Update route, store remote contact */
1881 nta_leg_server_route(leg, sip->sip_record_route, sip->sip_contact);
1882 su_free(home, s->s_remote);
1883 s->s_remote = sip_contact_dup(home, sip->sip_contact);
1884
1885 /* Store content-type and body */
1886 if (sip->sip_content_type) {
1887 su_free(home, s->s_content_type);
1888 s->s_content_type = sip_content_type_dup(home, sip->sip_content_type);
1889 su_free(home, s->s_payload);
1890 s->s_payload = sip_payload_dup(home, sip->sip_payload);
1891 }
1892
1893 /* Calculate expiration time for subscription */
1894 delta = sip_contact_expires(NULL, sip->sip_expires, sip->sip_date,
1895 nes->nes_expires, now);
1896 if (delta > nes->nes_max_expires)
1897 delta = nes->nes_max_expires;
1898 expires->ex_delta = delta;
1899
1900 if (s->s_subscribed == 0)
1901 s->s_subscribed = now;
1902 s->s_expires = now + delta;
1903 /* s->s_accept = sip_accept_dup(home, accept); */
1904 if (s->s_id == NULL)
1905 s->s_id = sip_event_dup(home, sip->sip_event);
1906 s->s_event = ev;
1907 s->s_eventlist = supported_eventlist;
1908 nea_sub_assign_view(s, evv);
1909 s->s_updated = evv->evv_updated - 1; /* Force notify */
1910
1911 if (nes->nes_202_before_notify) {
1912 nta_incoming_treply(irq, SIP_202_ACCEPTED,
1913 SIPTAG_SERVER_STR(nes->nes_server),
1914 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1915 SIPTAG_ALLOW(nes->nes_allow_methods),
1916 SIPTAG_REQUIRE(ev->ev_require),
1917 SIPTAG_SUPPORTED(ev->ev_supported),
1918 SIPTAG_EXPIRES(expires),
1919 SIPTAG_CONTACT(s->s_local),
1920 TAG_END());
1921 nta_incoming_destroy(irq), s->s_irq = irq = NULL;
1922 }
1923
1924 /* Callback for checking subscriber authorization */
1925 if (nes_watcher_callback(nes, ev, s, sip, now) < 0) {
1926 if (irq) {
1927 nta_incoming_treply(irq, SIP_503_SERVICE_UNAVAILABLE, TAG_END());
1928 nta_incoming_destroy(irq);
1929 }
1930 return -1;
1931 }
1932
1933
1934
1935 evv = s->s_view; /* Callback can change event view */
1936
1937 if (s->s_state == nea_embryonic)
1938 nea_sub_auth(s, nea_pending, NEATAG_FAKE(1), TAG_END());
1939
1940 if (s->s_updated != evv->evv_updated && !(irq && s->s_rejected))
1941 nea_sub_notify(nes, s, now, TAG_END());
1942
1943 if (irq) {
1944 if (s->s_rejected)
1945 nta_incoming_treply(irq, SIP_403_FORBIDDEN,
1946 SIPTAG_SERVER_STR(nes->nes_server),
1947 TAG_END());
1948 else if (s->s_state == nea_active)
1949 nta_incoming_treply(irq, SIP_200_OK,
1950 SIPTAG_REQUIRE(ev->ev_require),
1951 SIPTAG_SUPPORTED(ev->ev_supported),
1952 SIPTAG_EXPIRES(expires),
1953 SIPTAG_SERVER_STR(nes->nes_server),
1954 SIPTAG_CONTACT(s->s_local),
1955 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1956 SIPTAG_ALLOW(nes->nes_allow_methods),
1957 TAG_END());
1958 else
1959 nta_incoming_treply(irq, SIP_202_ACCEPTED,
1960 SIPTAG_REQUIRE(ev->ev_require),
1961 SIPTAG_SUPPORTED(ev->ev_supported),
1962 SIPTAG_EXPIRES(expires),
1963 SIPTAG_SERVER_STR(nes->nes_server),
1964 SIPTAG_ALLOW_EVENTS(nes->nes_allow_events),
1965 SIPTAG_ALLOW(nes->nes_allow_methods),
1966 SIPTAG_CONTACT(s->s_local),
1967 TAG_END());
1968 }
1969
1970 return 0;
1971 }
1972
1973 /* ----------------------------------------------------------------- */
1974 /**Notify subscriber
1975 *
1976 * The function nea_sub_notify() sends a notification to the subscriber. The
1977 * event type is specified by subscriber event, payload type and payload in
1978 * the event view. The responses to the NOTIFY transaction are
1979 * processed by response_to_notify().
1980 *
1981 * @param nes pointer to the notifier object
1982 * @param s pointer to the subscription object
1983 * @param now current SIP time (if 0, no body is sent,
1984 * but updated Subscription-State header only
1985 * @param tag,value,... tag list
1986 *
1987 */
nea_sub_notify(nea_server_t * nes,nea_sub_t * s,sip_time_t now,tag_type_t tag,tag_value_t value,...)1988 int nea_sub_notify(nea_server_t *nes, nea_sub_t *s,
1989 sip_time_t now,
1990 tag_type_t tag, tag_value_t value, ...)
1991 {
1992 int notified = 0;
1993 ta_list ta;
1994 int suppress = now != 0;
1995 nea_event_t *ev = s->s_event;
1996 nea_state_t substate = s->s_state;
1997
1998 if (s->s_pending_flush || (s->s_oreq && substate != nea_terminated)) {
1999 if (ev && ev->ev_throttling > s->s_updated)
2000 ev->ev_throttling = s->s_updated;
2001 return 0;
2002 }
2003
2004 if (s->s_oreq)
2005 nta_outgoing_destroy(s->s_oreq), s->s_oreq = NULL;
2006
2007 assert(s->s_view); assert(ev);
2008
2009 if (suppress && s->s_view->evv_updated == s->s_updated)
2010 return 0;
2011
2012 if (now == 0)
2013 now = sip_now();
2014
2015 if (s->s_notified + s->s_throttle > now &&
2016 /* Do not throttle state termination notification */
2017 substate != nea_terminated &&
2018 (long)(s->s_expires - now) > 0) {
2019 if (ev->ev_throttling > s->s_updated && !s->s_fake)
2020 ev->ev_throttling = s->s_updated;
2021 nes->nes_throttled++;
2022 return 0;
2023 }
2024
2025 ta_start(ta, tag, value);
2026 {
2027 sip_subscription_state_t ss[1];
2028 char expires[32];
2029 sip_param_t params[] = { NULL, NULL, NULL };
2030 char const *reason = NULL;
2031 int fake = 0;
2032 char reason_buf[64];
2033 unsigned retry_after = (unsigned)-1;
2034 char retry_after_buf[64];
2035 int i = 0;
2036 nta_response_f *callback;
2037 nea_event_view_t *evv = s->s_view;
2038 nea_event_queue_t *evq, *n_evq;
2039
2040 assert(ev);
2041
2042 sip_subscription_state_init(ss);
2043
2044 tl_gets(ta_args(ta),
2045 NEATAG_REASON_REF(reason),
2046 NEATAG_FAKE_REF(fake), /* XXX - semantics??? */
2047 NEATAG_RETRY_AFTER_REF(retry_after),
2048 TAG_END());
2049
2050 if (substate == nea_terminated) {
2051 if (reason)
2052 snprintf(reason_buf, sizeof(reason_buf),
2053 "reason=%s", reason), params[i++] = reason_buf;
2054 if (retry_after != (unsigned)-1)
2055 snprintf(retry_after_buf, sizeof(retry_after_buf),
2056 "retry-after=%u", retry_after), params[i++] = retry_after_buf;
2057 }
2058 else if ((long)(s->s_expires - now) <= 0) {
2059 substate = nea_terminated;
2060 params[i++] = "reason=timeout";
2061 }
2062 else {
2063 snprintf(expires, sizeof(expires),
2064 "expires=%lu", (unsigned long)(s->s_expires - now));
2065 params[i++] = expires;
2066 }
2067
2068 ss->ss_params = params;
2069
2070 switch (substate) {
2071 case nea_extended: ss->ss_substate = s->s_extended; break;
2072 case nea_pending: ss->ss_substate = "pending"; break;
2073 case nea_active: ss->ss_substate = "active"; break;
2074 case nea_terminated: ss->ss_substate = "terminated"; break;
2075 /* Do not send notifys for embryonic subscriptions */
2076 case nea_embryonic:
2077 ta_end(ta);
2078 return 0;
2079 }
2080
2081 callback = substate != nea_terminated ? response_to_notify : NULL;
2082
2083 for (evq = evv->evv_head; evq->evq_next; evq = evq->evq_next) {
2084 if (evq->evq_next->evq_updated <= s->s_updated)
2085 break;
2086 }
2087
2088 suppress = (s->s_view->evv_updated == s->s_updated);
2089
2090 n_evq = evq->evq_payload ? evq : evv->evv_primary->evv_head;
2091
2092 s->s_oreq =
2093 nta_outgoing_tcreate(s->s_leg,
2094 callback, s, NULL,
2095 SIP_METHOD_NOTIFY, NULL,
2096 SIPTAG_SUBSCRIPTION_STATE(ss),
2097 SIPTAG_REQUIRE(ev->ev_require),
2098 SIPTAG_SUPPORTED(ev->ev_supported),
2099 SIPTAG_USER_AGENT_STR(nes->nes_server),
2100 SIPTAG_CONTACT(s->s_local),
2101 SIPTAG_EVENT(s->s_id),
2102 TAG_IF(!suppress,
2103 SIPTAG_CONTENT_TYPE(n_evq->evq_content_type)),
2104 TAG_IF(!suppress,
2105 SIPTAG_PAYLOAD(n_evq->evq_payload)),
2106 ta_tags(ta));
2107
2108
2109 notified = s->s_oreq != 0;
2110
2111 if (notified) {
2112 s->s_notified = now;
2113 s->s_state = substate; /* XXX - we need state for "waiting" */
2114 s->s_latest = evq->evq_version; /* Application version */
2115 s->s_updated = evq->evq_updated; /* Internal version */
2116 if (ev->ev_throttling > s->s_updated)
2117 ev->ev_throttling = s->s_updated;
2118 }
2119
2120 if (callback == NULL) {
2121 nta_outgoing_destroy(s->s_oreq), s->s_oreq = NULL;
2122 /* Inform the application of a subscriber leaving the subscription. */
2123 nes_watcher_callback(nes, ev, s, NULL, now);
2124 }
2125 }
2126 ta_end(ta);
2127
2128 return notified;
2129 }
2130
2131 /* ----------------------------------------------------------------- */
2132 /**Process responses to the NOTIFY.
2133 *
2134 * The response_to_notify() processes the responses to the NOTIFY request.
2135 * If there was an error with delivering the NOTIFY, the subscription is
2136 * considered terminated.
2137 *
2138 * @param s pointer to subscription object
2139 */
response_to_notify(nea_sub_t * s,nta_outgoing_t * oreq,sip_t const * sip)2140 int response_to_notify(nea_sub_t *s,
2141 nta_outgoing_t *oreq,
2142 sip_t const *sip)
2143 {
2144 nea_server_t *nes = s->s_nes;
2145 int status = sip->sip_status->st_status;
2146 sip_time_t now = sip_now();
2147
2148 if (status < 200)
2149 return 0;
2150
2151 nta_outgoing_destroy(s->s_oreq), s->s_oreq = NULL;
2152
2153 if (status < 300) {
2154 if (s->s_view->evv_updated != s->s_updated) {
2155 if (s->s_notified + s->s_throttle <= now)
2156 nea_sub_notify(nes, s, now, TAG_END());
2157 else
2158 nes->nes_throttled++;
2159 }
2160 }
2161
2162 if (s->s_state == nea_terminated || status >= 300) {
2163 SU_DEBUG_5(("nea_server: removing subscriber " URL_PRINT_FORMAT "\n",
2164 URL_PRINT_ARGS(s->s_from->a_url)));
2165 /* Inform the application of a subscriber leaving the subscription. */
2166 nes_watcher_callback(nes, s->s_event, s, NULL, now);
2167 }
2168
2169 return 0;
2170 }
2171
2172 /* ----------------------------------------------------------------- */
2173
2174 /** Get number of active subscribers.
2175 *
2176 * The function nea_server_active() counts the number of active subscribers
2177 * watching the specified view. If the view is not specified (@a ev is @c
2178 * NULL), it counts the number of all subscribers.
2179 *
2180 * @param nes notifier
2181 * @param ev event
2182 *
2183 * The function nea_server_active() returns number of active subscribers.
2184 */
nea_server_active(nea_server_t * nes,nea_event_t const * ev)2185 int nea_server_active(nea_server_t *nes, nea_event_t const *ev)
2186 {
2187 int n = 0;
2188 nea_sub_t *s = NULL;
2189
2190 /* Count the number of subscribers watching this event */
2191 for (s = nes->nes_subscribers; s ; s = s->s_next)
2192 if (!s->s_pending_flush && s->s_state == nea_active
2193 && (ev == NULL || ev == s->s_event))
2194 n++;
2195
2196 return n;
2197 }
2198
2199 /** Get number of non-embryonic subscribers.
2200 *
2201 * The function nea_server_non_embryonic() counts the number of pending,
2202 * active or terminated subscribers watching the specified view. If the view
2203 * is not specified (@a ev is @c NULL), it counts the number of all
2204 * subscribers.
2205 *
2206 * @param nes notifier
2207 * @param ev event view
2208 *
2209 * The function nea_server_active() returns number of active subscribers.
2210 */
nea_server_non_embryonic(nea_server_t * nes,nea_event_t const * ev)2211 int nea_server_non_embryonic(nea_server_t *nes, nea_event_t const *ev)
2212 {
2213 int n = 0;
2214 nea_sub_t *s = NULL;
2215
2216 /* Count the number of subscribers watching this event */
2217 for (s = nes->nes_subscribers; s ; s = s->s_next)
2218 if (!s->s_pending_flush && s->s_state != nea_embryonic
2219 && (ev == NULL || ev == s->s_event))
2220 n++;
2221
2222 return n;
2223 }
2224
2225 /** Set application version number */
nea_sub_version(nea_sub_t * s,unsigned version)2226 int nea_sub_version(nea_sub_t *s, unsigned version)
2227 {
2228 if (s)
2229 return s->s_version = version;
2230 return 0;
2231 }
2232
2233 /** Authorize a subscription.
2234 *
2235 * Application can modify the subscription state and authorize the user.
2236 * The subscription state has following simple state diagram:
2237 *
2238 * @code
2239 * +---------------+ +------------------+
2240 * | | | |
2241 * +-----------+ | +---------+ V | +------------+ V +------------+
2242 * | embryonic |-+->| pending |--+-+->| authorized |--+->| terminated |
2243 * +-----------+ +---------+ +------------+ +------------+
2244 *
2245 * @endcode
2246 *
2247 * @TAGS
2248 * IF NEATAG_VIEW(view) is included in tagged arguments, @a view is assigned
2249 * to the subscriber and the content from the view is delivered to the
2250 * subscriber.
2251 *
2252 * If NEATAG_FAKE(1) is included in tags, content marked as 'fake' is
2253 * delivered to the subscriber.
2254 *
2255 * @retval 0 if successful
2256 * @retval -1 upon an error
2257 */
nea_sub_auth(nea_sub_t * s,nea_state_t state,tag_type_t tag,tag_value_t value,...)2258 int nea_sub_auth(nea_sub_t *s,
2259 nea_state_t state,
2260 tag_type_t tag, tag_value_t value, ...)
2261 {
2262
2263 ta_list ta;
2264 int retval, embryonic, rejected = 0;
2265 int fake = 0;
2266 char const *reason = NULL;
2267 nea_event_view_t *evv = NULL;
2268
2269 if (s == NULL)
2270 return -1;
2271 if (state == nea_embryonic)
2272 return -1;
2273 if (state < s->s_state)
2274 return -1;
2275
2276 ta_start(ta, tag, value);
2277
2278 embryonic = s->s_state == nea_embryonic;
2279
2280 s->s_state = state;
2281
2282 if (tl_gets(ta_args(ta), NEATAG_VIEW_REF(evv), TAG_END()) && evv) {
2283 nea_sub_assign_view(s, evv);
2284 }
2285 else {
2286 if (tl_gets(ta_args(ta), NEATAG_FAKE_REF(fake), TAG_END()))
2287 s->s_fake = fake;
2288
2289 if (s->s_view && s->s_view->evv_fake != s->s_fake) {
2290 for (evv = s->s_view->evv_primary; evv; evv = evv->evv_next)
2291 if (!evv->evv_private && evv->evv_fake == s->s_fake) {
2292 nea_sub_assign_view(s, evv);
2293 break;
2294 }
2295 }
2296 }
2297
2298 tl_gets(ta_args(ta), NEATAG_REASON_REF(reason), TAG_END());
2299
2300 rejected = su_casematch(reason, "rejected");
2301
2302 if (state == nea_terminated && embryonic && rejected && s->s_irq)
2303 retval = 0, s->s_rejected = 1;
2304 else
2305 retval = nea_sub_notify(s->s_nes, s, 0, ta_tags(ta));
2306
2307 ta_end(ta);
2308
2309 return retval;
2310 }
2311
2312 /** Obtain a list of subscribers */
nea_server_get_subscribers(nea_server_t * nes,nea_event_t const * ev)2313 nea_subnode_t const **nea_server_get_subscribers(nea_server_t *nes,
2314 nea_event_t const *ev)
2315 {
2316 nea_sub_t *s;
2317 nea_subnode_t **sn_list, *sn;
2318 int i, n;
2319 sip_time_t now = sip_now();
2320
2321 n = nea_server_non_embryonic(nes, ev);
2322 if (n == 0)
2323 return NULL;
2324
2325 sn_list = su_zalloc(nes->nes_home,
2326 (n + 1) * sizeof(sn) + n * sizeof(*sn));
2327 if (sn_list) {
2328 sn = (nea_subnode_t *)(sn_list + n + 1);
2329
2330 for (i = 0, s = nes->nes_subscribers; s; s = s->s_next) {
2331 if (!s->s_pending_flush && s->s_state != nea_embryonic
2332 && (ev == NULL || ev == s->s_event)) {
2333 assert(i < n);
2334 nea_subnode_init(sn, s, now);
2335 sn_list[i++] = sn++;
2336 }
2337 }
2338
2339 nes->nes_in_list++;
2340
2341 sn_list[i] = NULL;
2342 }
2343
2344 return (nea_subnode_t const **)sn_list;
2345 }
2346
2347 /** Free a list of subscriptions. */
nea_server_free_subscribers(nea_server_t * nes,nea_subnode_t const ** sn_list)2348 void nea_server_free_subscribers(nea_server_t *nes,
2349 nea_subnode_t const **sn_list)
2350 {
2351 if (sn_list) {
2352 su_free(nes->nes_home, (void *)sn_list);
2353 if (--nes->nes_in_list == 0 && nes->nes_pending_flush)
2354 nea_server_pending_flush(nes);
2355 }
2356 }
2357
2358 /* ----------------------------------------------------------------- */
nes_event_timer(nea_server_t * srvr,su_timer_t * timer,su_timer_arg_t * arg)2359 void nes_event_timer(nea_server_t *srvr,
2360 su_timer_t *timer,
2361 su_timer_arg_t *arg)
2362 {
2363 nea_server_t *nes = (nea_server_t *) arg;
2364 sip_time_t now = sip_now();
2365 nea_sub_t *s = NULL, *s_next = NULL;
2366 su_root_t *root = su_timer_root(timer);
2367
2368 su_timer_set(timer, nes_event_timer, nes);
2369
2370 nes->nes_in_list++;
2371
2372 /* Notify and terminate expired subscriptions */
2373 for (s = nes->nes_subscribers; s; s = s_next) {
2374 s_next = s->s_next;
2375 if (s->s_state == nea_terminated)
2376 continue;
2377 if ((int)(now - s->s_expires) >= 0) {
2378 nea_sub_notify(nes, s, now, TAG_END());
2379 /* Yield so we can handle received packets */
2380 su_root_yield(root);
2381 }
2382 }
2383
2384 if (--nes->nes_in_list == 0 && nes->nes_pending_flush)
2385 nea_server_pending_flush(nes);
2386
2387 if (nes->nes_throttled)
2388 nea_server_notify(nes, NULL);
2389
2390 return;
2391 }
2392