1 /*
2  * presence module- presence server implementation
3  *
4  * Copyright (C) 2006 Voice Sistem S.R.L.
5  *
6  * This file is part of Kamailio, a free SIP server.
7  *
8  * Kamailio is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 2 of the License, or
11  * (at your option) any later version
12  *
13  * Kamailio is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21  *
22  */
23 
24 /*! \file
25  * \brief Kamailio presence module :: Notification with SIP NOTIFY
26  * \ingroup presence
27  */
28 
29 
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <libxml/parser.h>
34 
35 #include "../../core/trim.h"
36 #include "../../core/ut.h"
37 #include "../../core/globals.h"
38 #include "../../core/str.h"
39 #include "../../lib/srdb1/db.h"
40 #include "../../lib/srdb1/db_val.h"
41 #include "../../core/hashes.h"
42 #include "../../core/socket_info.h"
43 #include "../../modules/tm/tm_load.h"
44 #include "../pua/hash.h"
45 #include "presentity.h"
46 #include "presence.h"
47 #include "notify.h"
48 #include "utils_func.h"
49 #include "../../core/receive.h"
50 
51 #define ALLOC_SIZE 3000
52 #define MAX_FORWARD 70
53 
54 int goto_on_notify_reply = -1;
55 
56 extern int pres_local_log_level;
57 extern int pres_local_log_facility;
58 extern subs_t *_pres_subs_last_sub;
59 extern int _pres_subs_mode;
60 
61 c_back_param *shm_dup_cbparam(subs_t *);
62 void free_cbparam(c_back_param *cb_param);
63 
64 void p_tm_callback(struct cell *t, int type, struct tmcb_params *ps);
65 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event);
66 int add_watcher_list(subs_t *s, watcher_t *watchers);
67 str *create_winfo_xml(watcher_t *watchers, char *version, str resource,
68 		str event, int STATE_FLAG);
69 void free_watcher_list(watcher_t *watchers);
70 
71 str str_to_user_col = str_init("to_user");
72 str str_username_col = str_init("username");
73 str str_domain_col = str_init("domain");
74 str str_body_col = str_init("body");
75 str str_to_domain_col = str_init("to_domain");
76 str str_from_user_col = str_init("from_user");
77 str str_from_domain_col = str_init("from_domain");
78 str str_watcher_username_col = str_init("watcher_username");
79 str str_watcher_domain_col = str_init("watcher_domain");
80 str str_event_id_col = str_init("event_id");
81 str str_event_col = str_init("event");
82 str str_etag_col = str_init("etag");
83 str str_ruid_col = str_init("ruid");
84 str str_from_tag_col = str_init("from_tag");
85 str str_to_tag_col = str_init("to_tag");
86 str str_callid_col = str_init("callid");
87 str str_local_cseq_col = str_init("local_cseq");
88 str str_remote_cseq_col = str_init("remote_cseq");
89 str str_record_route_col = str_init("record_route");
90 str str_contact_col = str_init("contact");
91 str str_expires_col = str_init("expires");
92 str str_status_col = str_init("status");
93 str str_reason_col = str_init("reason");
94 str str_socket_info_col = str_init("socket_info");
95 str str_local_contact_col = str_init("local_contact");
96 str str_version_col = str_init("version");
97 str str_presentity_uri_col = str_init("presentity_uri");
98 str str_inserted_time_col = str_init("inserted_time");
99 str str_received_time_col = str_init("received_time");
100 str str_id_col = str_init("id");
101 str str_sender_col = str_init("sender");
102 str str_updated_col = str_init("updated");
103 str str_updated_winfo_col = str_init("updated_winfo");
104 str str_priority_col = str_init("priority");
105 str str_flags_col = str_init("flags");
106 str str_user_agent_col = str_init("user_agent");
107 
108 int subset = 0;
109 
get_status_str(int status_flag)110 char *get_status_str(int status_flag)
111 {
112 	switch(status_flag) {
113 		case ACTIVE_STATUS:
114 			return "active";
115 		case PENDING_STATUS:
116 			return "pending";
117 		case TERMINATED_STATUS:
118 			return "terminated";
119 		case WAITING_STATUS:
120 			return "waiting";
121 	}
122 	return NULL;
123 }
124 
printf_subs(subs_t * subs)125 void printf_subs(subs_t *subs)
126 {
127 	LM_DBG("pres_uri: %.*s\n", subs->pres_uri.len, subs->pres_uri.s);
128 	LM_DBG("watcher_user@watcher_domain: %.*s@%.*s\n", subs->watcher_user.len,
129 			subs->watcher_user.s, subs->watcher_domain.len,
130 			subs->watcher_domain.s);
131 	LM_DBG("to_user@to_domain: %.*s@%.*s\n", subs->to_user.len, subs->to_user.s,
132 			subs->to_domain.len, subs->to_domain.s);
133 	LM_DBG("from_user@from_domain: %.*s@%.*s\n", subs->from_user.len,
134 			subs->from_user.s, subs->from_domain.len, subs->from_domain.s);
135 	LM_DBG("callid/from_tag/to_tag: %.*s/%.*s/%.*s\n", subs->callid.len,
136 			subs->callid.s, subs->from_tag.len, subs->from_tag.s,
137 			subs->to_tag.len, subs->to_tag.s);
138 	LM_DBG("local_cseq/remote_cseq: %u/%u\n", subs->local_cseq,
139 			subs->remote_cseq);
140 	LM_DBG("local_contact/contact: %.*s/%.*s\n", subs->local_contact.len,
141 			subs->local_contact.s, subs->contact.len, subs->contact.s);
142 	LM_DBG("record_route: %.*s\n", subs->record_route.len,
143 			subs->record_route.s);
144 	LM_DBG("sockinfo_str: %.*s\n", subs->sockinfo_str.len,
145 			subs->sockinfo_str.s);
146 
147 	LM_DBG("event: %.*s\n", subs->event->name.len, subs->event->name.s);
148 	LM_DBG("status: %s\n", get_status_str(subs->status));
149 	LM_DBG("reason: %.*s\n", subs->reason.len, subs->reason.s);
150 	LM_DBG("version: %u\n", subs->version);
151 	LM_DBG("expires: %u\n", subs->expires);
152 
153 	LM_DBG("updated/updated_winfo: %d/%d\n", subs->updated,
154 			subs->updated_winfo);
155 }
156 
build_str_hdr(subs_t * subs,int is_body,str * hdr)157 int build_str_hdr(subs_t *subs, int is_body, str *hdr)
158 {
159 	pres_ev_t *event = subs->event;
160 	str expires = {0, 0};
161 	str status = {0, 0};
162 	char *p;
163 	str trans = {";transport=", 11};
164 
165 	if(hdr == NULL) {
166 		LM_ERR("bad parameter\n");
167 		return -1;
168 	}
169 	expires.s = int2str(subs->expires, &expires.len);
170 
171 	status.s = get_status_str(subs->status);
172 	if(status.s == NULL) {
173 		LM_ERR("bad status %d\n", subs->status);
174 		return -1;
175 	}
176 	status.len = strlen(status.s);
177 
178 	hdr->len =
179 			18 /*Max-Forwards:  + val*/ + CRLF_LEN
180 			+ 7 /*Event: */ + subs->event->name.len
181 			+ 4 /*;id=*/ + subs->event_id.len + CRLF_LEN
182 			+ 10 /*Contact: <*/ + subs->local_contact.len
183 			+ 1 /*>*/ + 15 /*";transport=xxxx"*/ + CRLF_LEN
184 			+ 20 /*Subscription-State: */ + status.len
185 			+ 10 /*reason/expires params*/
186 			+ (subs->reason.len > expires.len ? subs->reason.len : expires.len)
187 			+ CRLF_LEN
188 			+ (is_body ? (14 /*Content-Type: */ + subs->event->content_type.len
189 								 + CRLF_LEN)
190 					   : 0)
191 			+ 1;
192 
193 	hdr->s = (char *)pkg_malloc(hdr->len);
194 	if(hdr->s == NULL) {
195 		LM_ERR("no more pkg memory\n");
196 		return -1;
197 	}
198 
199 	p = hdr->s;
200 	p += sprintf(p, "Max-Forwards: %d\r\n", MAX_FORWARD);
201 
202 	p += sprintf(p, "Event: %.*s", event->name.len, event->name.s);
203 	if(subs->event_id.len && subs->event_id.s) {
204 		p += sprintf(p, ";id=%.*s", subs->event_id.len, subs->event_id.s);
205 	}
206 	memcpy(p, CRLF, CRLF_LEN);
207 	p += CRLF_LEN;
208 
209 	p += sprintf(p, "Contact: <%.*s", subs->local_contact.len,
210 			subs->local_contact.s);
211 	if(subs->sockinfo_str.s != NULL
212 			&& str_search(&subs->local_contact, &trans) == 0) {
213 		/* fix me */
214 		switch(subs->sockinfo_str.s[0]) {
215 			case 's':
216 			case 'S':
217 				memcpy(p, ";transport=sctp", 15);
218 				p += 15;
219 				break;
220 			case 't':
221 			case 'T':
222 				switch(subs->sockinfo_str.s[1]) {
223 					case 'c':
224 					case 'C':
225 						memcpy(p, ";transport=tcp", 14);
226 						p += 14;
227 						break;
228 					case 'l':
229 					case 'L':
230 						memcpy(p, ";transport=tls", 14);
231 						p += 14;
232 						break;
233 				}
234 				break;
235 		}
236 	}
237 	*p = '>';
238 	p++;
239 	memcpy(p, CRLF, CRLF_LEN);
240 	p += CRLF_LEN;
241 
242 	p += sprintf(p, "Subscription-State: %.*s", status.len, status.s);
243 
244 	if(subs->status == TERMINATED_STATUS) {
245 		LM_DBG("state = terminated\n");
246 		p += sprintf(p, ";reason=%.*s", subs->reason.len, subs->reason.s);
247 	} else {
248 		p += sprintf(p, ";expires=%.*s", expires.len, expires.s);
249 	}
250 	memcpy(p, CRLF, CRLF_LEN);
251 	p += CRLF_LEN;
252 
253 	if(is_body) {
254 		p += sprintf(p, "Content-Type: %.*s\r\n", event->content_type.len,
255 				event->content_type.s);
256 	}
257 
258 	*p = '\0';
259 	hdr->len = p - hdr->s;
260 
261 	return 0;
262 }
263 
get_wi_subs_db(subs_t * subs,watcher_t * watchers)264 int get_wi_subs_db(subs_t *subs, watcher_t *watchers)
265 {
266 	subs_t sb;
267 	db_key_t query_cols[3];
268 	db_op_t query_ops[3];
269 	db_val_t query_vals[3];
270 	db_key_t result_cols[5];
271 	db1_res_t *result = NULL;
272 	db_row_t *row = NULL;
273 	db_val_t *row_vals = NULL;
274 	int n_result_cols = 0;
275 	int n_query_cols = 0;
276 	int i;
277 	int status_col, watcher_user_col, watcher_domain_col, callid_col;
278 
279 	query_cols[n_query_cols] = &str_presentity_uri_col;
280 	query_ops[n_query_cols] = OP_EQ;
281 	query_vals[n_query_cols].type = DB1_STR;
282 	query_vals[n_query_cols].nul = 0;
283 	query_vals[n_query_cols].val.str_val = subs->pres_uri;
284 	n_query_cols++;
285 
286 	query_cols[n_query_cols] = &str_event_col;
287 	query_ops[n_query_cols] = OP_EQ;
288 	query_vals[n_query_cols].type = DB1_STR;
289 	query_vals[n_query_cols].nul = 0;
290 	query_vals[n_query_cols].val.str_val = subs->event->wipeer->name;
291 	n_query_cols++;
292 
293 	query_cols[n_query_cols] = &str_expires_col;
294 	query_ops[n_query_cols] = OP_GT;
295 	query_vals[n_query_cols].type = DB1_INT;
296 	query_vals[n_query_cols].nul = 0;
297 	query_vals[n_query_cols].val.int_val = (int)time(NULL) + pres_expires_offset;
298 	n_query_cols++;
299 
300 	result_cols[status_col = n_result_cols++] = &str_status_col;
301 	result_cols[watcher_user_col = n_result_cols++] = &str_watcher_username_col;
302 	result_cols[watcher_domain_col = n_result_cols++] = &str_watcher_domain_col;
303 	result_cols[callid_col = n_result_cols++] = &str_callid_col;
304 
305 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
306 		LM_ERR("in use_table\n");
307 		goto error;
308 	}
309 
310 	if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
311 			   n_query_cols, n_result_cols, 0, &result)
312 			< 0) {
313 		LM_ERR("querying active_watchers db table\n");
314 		goto error;
315 	}
316 
317 	if(result == NULL) {
318 		goto error;
319 	}
320 
321 	if(result->n <= 0) {
322 		LM_DBG("The query in db table for active subscription"
323 			   " returned no result\n");
324 		pa_dbf.free_result(pa_db, result);
325 		return 0;
326 	}
327 
328 	for(i = 0; i < result->n; i++) {
329 		row = &result->rows[i];
330 		row_vals = ROW_VALUES(row);
331 
332 		sb.watcher_user.s = (char *)row_vals[watcher_user_col].val.string_val;
333 		sb.watcher_user.len = strlen(sb.watcher_user.s);
334 
335 		sb.watcher_domain.s =
336 				(char *)row_vals[watcher_domain_col].val.string_val;
337 		sb.watcher_domain.len = strlen(sb.watcher_domain.s);
338 
339 		sb.callid.s = (char *)row_vals[callid_col].val.string_val;
340 		sb.callid.len = strlen(sb.callid.s);
341 
342 		sb.event = subs->event->wipeer;
343 		sb.status = row_vals[status_col].val.int_val;
344 
345 		if(add_watcher_list(&sb, watchers) < 0)
346 			goto error;
347 	}
348 
349 	pa_dbf.free_result(pa_db, result);
350 	return 0;
351 
352 error:
353 	if(result)
354 		pa_dbf.free_result(pa_db, result);
355 	return -1;
356 }
357 
get_wi_notify_body(subs_t * subs,subs_t * watcher_subs)358 str *get_wi_notify_body(subs_t *subs, subs_t *watcher_subs)
359 {
360 	str *notify_body = NULL;
361 	char *version_str;
362 	watcher_t *watchers = NULL;
363 	int len = 0;
364 	unsigned int hash_code;
365 	subs_t *s = NULL;
366 	int state = FULL_STATE_FLAG;
367 	unsigned int now = (int)time(NULL);
368 
369 	hash_code = 0;
370 	version_str = int2str(subs->version, &len);
371 	if(version_str == NULL) {
372 		LM_ERR("converting int to str\n ");
373 		goto error;
374 	}
375 
376 	watchers = (watcher_t *)pkg_malloc(sizeof(watcher_t));
377 	if(watchers == NULL) {
378 		ERR_MEM(PKG_MEM_STR);
379 	}
380 	memset(watchers, 0, sizeof(watcher_t));
381 
382 	if(watcher_subs != NULL) {
383 		if(add_watcher_list(watcher_subs, watchers) < 0)
384 			goto error;
385 		state = PARTIAL_STATE_FLAG;
386 
387 		goto done;
388 	}
389 
390 	if(pres_subs_dbmode == DB_ONLY) {
391 		if(get_wi_subs_db(subs, watchers) < 0) {
392 			LM_ERR("getting watchers from database\n");
393 			goto error;
394 		}
395 	} else {
396 		hash_code = core_case_hash(
397 				&subs->pres_uri, &subs->event->wipeer->name, shtable_size);
398 		lock_get(&subs_htable[hash_code].lock);
399 		s = subs_htable[hash_code].entries;
400 		while(s->next) {
401 			s = s->next;
402 
403 			if(s->expires < now) {
404 				LM_DBG("expired record\n");
405 				continue;
406 			}
407 
408 			if(s->event == subs->event->wipeer
409 					&& s->pres_uri.len == subs->pres_uri.len
410 					&& presence_sip_uri_match(&s->pres_uri, &subs->pres_uri)
411 							   == 0) {
412 				if(add_watcher_list(s, watchers) < 0) {
413 					lock_release(&subs_htable[hash_code].lock);
414 					goto error;
415 				}
416 			}
417 		}
418 		lock_release(&subs_htable[hash_code].lock);
419 
420 		if(add_waiting_watchers(
421 				   watchers, subs->pres_uri, subs->event->wipeer->name)
422 				< 0) {
423 			LM_ERR("failed to add waiting watchers\n");
424 			goto error;
425 		}
426 	}
427 
428 done:
429 	notify_body = create_winfo_xml(watchers, version_str, subs->pres_uri,
430 			subs->event->wipeer->name, state);
431 	if(notify_body == NULL) {
432 		LM_ERR("in function create_winfo_xml\n");
433 		goto error;
434 	}
435 	free_watcher_list(watchers);
436 	return notify_body;
437 
438 error:
439 	free_watcher_list(watchers);
440 	return NULL;
441 }
442 
free_watcher_list(watcher_t * watchers)443 void free_watcher_list(watcher_t *watchers)
444 {
445 	watcher_t *w;
446 	while(watchers) {
447 		w = watchers;
448 		if(w->uri.s != NULL)
449 			pkg_free(w->uri.s);
450 		if(w->id.s != NULL)
451 			pkg_free(w->id.s);
452 		watchers = watchers->next;
453 		pkg_free(w);
454 	}
455 
456 	watchers = NULL;
457 }
458 
add_watcher_list(subs_t * s,watcher_t * watchers)459 int add_watcher_list(subs_t *s, watcher_t *watchers)
460 {
461 	watcher_t *w;
462 
463 	w = (watcher_t *)pkg_malloc(sizeof(watcher_t));
464 	if(w == NULL) {
465 		LM_ERR("No more private memory\n");
466 		return -1;
467 	}
468 	w->status = s->status;
469 	if(uandd_to_uri(s->watcher_user, s->watcher_domain, &w->uri) < 0) {
470 		LM_ERR("failed to create uri\n");
471 		goto error;
472 	}
473 	w->id.s = (char *)pkg_malloc(s->callid.len + 1);
474 	if(w->id.s == NULL) {
475 		LM_ERR("no more memory\n");
476 		goto error;
477 	}
478 	memcpy(w->id.s, s->callid.s, s->callid.len);
479 	w->id.len = s->callid.len;
480 	w->id.s[w->id.len] = '\0';
481 
482 	w->next = watchers->next;
483 	watchers->next = w;
484 
485 	return 0;
486 
487 error:
488 	if(w) {
489 		if(w->uri.s)
490 			pkg_free(w->uri.s);
491 		pkg_free(w);
492 	}
493 	return -1;
494 }
495 
build_empty_bla_body(str pres_uri)496 str *build_empty_bla_body(str pres_uri)
497 {
498 	xmlDocPtr doc;
499 	xmlNodePtr node;
500 	xmlAttrPtr attr;
501 	str *body = NULL;
502 	char *text;
503 	int len;
504 	char *entity = NULL;
505 
506 	doc = xmlNewDoc(BAD_CAST "1.0");
507 	if(doc == NULL) {
508 		LM_ERR("failed to construct xml document\n");
509 		return NULL;
510 	}
511 
512 	node = xmlNewNode(NULL, BAD_CAST "dialog-info");
513 	if(node == NULL) {
514 		LM_ERR("failed to initialize node\n");
515 		goto error;
516 	}
517 	xmlDocSetRootElement(doc, node);
518 
519 	attr = xmlNewProp(node, BAD_CAST "xmlns",
520 			BAD_CAST "urn:ietf:params:xml:ns:dialog-info");
521 	if(attr == NULL) {
522 		LM_ERR("failed to initialize node attribute\n");
523 		goto error;
524 	}
525 	attr = xmlNewProp(node, BAD_CAST "version", BAD_CAST "1");
526 	if(attr == NULL) {
527 		LM_ERR("failed to initialize node attribute\n");
528 		goto error;
529 	}
530 
531 	attr = xmlNewProp(node, BAD_CAST "state", BAD_CAST "full");
532 	if(attr == NULL) {
533 		LM_ERR("failed to initialize node attribute\n");
534 		goto error;
535 	}
536 
537 	entity = (char *)pkg_malloc(pres_uri.len + 1);
538 	if(entity == NULL) {
539 		LM_ERR("no more memory\n");
540 		goto error;
541 	}
542 	memcpy(entity, pres_uri.s, pres_uri.len);
543 	entity[pres_uri.len] = '\0';
544 
545 	attr = xmlNewProp(node, BAD_CAST "entity", BAD_CAST entity);
546 	if(attr == NULL) {
547 		LM_ERR("failed to initialize node attribute\n");
548 		pkg_free(entity);
549 		goto error;
550 	}
551 
552 	body = (str *)pkg_malloc(sizeof(str));
553 	if(body == NULL) {
554 		LM_ERR("no more private memory");
555 		pkg_free(entity);
556 		goto error;
557 	}
558 
559 	xmlDocDumpFormatMemory(doc, (xmlChar **)(void *)&text, &len, 1);
560 	body->s = (char *)pkg_malloc(len);
561 	if(body->s == NULL) {
562 		LM_ERR("no more private memory");
563 		pkg_free(body);
564 		pkg_free(entity);
565 		goto error;
566 	}
567 	memcpy(body->s, text, len);
568 	body->len = len;
569 
570 
571 	pkg_free(entity);
572 	xmlFreeDoc(doc);
573 	xmlFree(text);
574 
575 	return body;
576 
577 error:
578 	xmlFreeDoc(doc);
579 	return NULL;
580 }
581 
ps_db_get_p_notify_body(str pres_uri,pres_ev_t * event,str * etag,str * contact)582 str *ps_db_get_p_notify_body(str pres_uri, pres_ev_t *event, str *etag,
583 		str *contact)
584 {
585 	db_key_t query_cols[4];
586 	db_val_t query_vals[4];
587 	db_op_t query_ops[4];
588 	db_key_t result_cols[3];
589 	db1_res_t *result = NULL;
590 	int body_col, etag_col = 0, sender_col;
591 	str **body_array = NULL;
592 	str *notify_body = NULL;
593 	db_row_t *row = NULL;
594 	db_val_t *row_vals;
595 	int n_result_cols = 0;
596 	int n_query_cols = 0;
597 	int i, n = 0, len;
598 	int build_off_n = -1;
599 	str etags;
600 	str *body;
601 	int size = 0;
602 	struct sip_uri uri;
603 	unsigned int hash_code;
604 	str sender;
605 	static str query_str;
606 
607 	if(parse_uri(pres_uri.s, pres_uri.len, &uri) < 0) {
608 		LM_ERR("while parsing uri\n");
609 		return NULL;
610 	}
611 
612 	/* if in db_only mode, get the presentity information from database - skip htable search */
613 	if(publ_cache_mode == PS_PCACHE_HYBRID) {
614 		/* search in hash table if any record exists */
615 		hash_code = core_case_hash(&pres_uri, NULL, phtable_size);
616 		if(search_phtable(&pres_uri, event->evp->type, hash_code) == NULL) {
617 			LM_DBG("No record exists in hash_table\n");
618 
619 			/* for pidf manipulation */
620 			if(event->agg_nbody) {
621 				notify_body =
622 						event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
623 				if(notify_body)
624 					goto done;
625 			}
626 			return NULL;
627 		}
628 	}
629 
630 	query_cols[n_query_cols] = &str_domain_col;
631 	query_vals[n_query_cols].type = DB1_STR;
632 	query_vals[n_query_cols].nul = 0;
633 	query_vals[n_query_cols].val.str_val = uri.host;
634 	query_ops[n_query_cols] = OP_EQ;
635 	n_query_cols++;
636 
637 	query_cols[n_query_cols] = &str_username_col;
638 	query_vals[n_query_cols].type = DB1_STR;
639 	query_vals[n_query_cols].nul = 0;
640 	query_vals[n_query_cols].val.str_val = uri.user;
641 	query_ops[n_query_cols] = OP_EQ;
642 	n_query_cols++;
643 
644 	query_cols[n_query_cols] = &str_event_col;
645 	query_vals[n_query_cols].type = DB1_STR;
646 	query_vals[n_query_cols].nul = 0;
647 	query_vals[n_query_cols].val.str_val = event->name;
648 	query_ops[n_query_cols] = OP_EQ;
649 	n_query_cols++;
650 
651 	if(pres_startup_mode == 1) {
652 		query_cols[n_query_cols] = &str_expires_col;
653 		query_vals[n_query_cols].type = DB1_INT;
654 		query_vals[n_query_cols].nul = 0;
655 		query_vals[n_query_cols].val.int_val = (int)time(NULL);
656 		query_ops[n_query_cols] = OP_GT;
657 		n_query_cols++;
658 	}
659 
660 	result_cols[body_col = n_result_cols++] = &str_body_col;
661 	result_cols[etag_col = n_result_cols++] = &str_etag_col;
662 	result_cols[sender_col = n_result_cols++] = &str_sender_col;
663 
664 	if(pa_dbf.use_table(pa_db, &presentity_table) < 0) {
665 		LM_ERR("in use_table\n");
666 		return NULL;
667 	}
668 
669 	if(pres_retrieve_order == 1) {
670 		query_str = pres_retrieve_order_by;
671 	} else {
672 		query_str = str_received_time_col;
673 	}
674 	if(pres_startup_mode == 1) {
675 		if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
676 				   n_query_cols, n_result_cols, &query_str, &result)
677 				< 0) {
678 			LM_ERR("failed to query %.*s table\n", presentity_table.len,
679 					presentity_table.s);
680 			if(result)
681 				pa_dbf.free_result(pa_db, result);
682 			return NULL;
683 		}
684 	} else {
685 		if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
686 				   n_query_cols, n_result_cols, &query_str, &result)
687 				< 0) {
688 			LM_ERR("failed to query %.*s table\n", presentity_table.len,
689 					presentity_table.s);
690 			if(result)
691 				pa_dbf.free_result(pa_db, result);
692 			return NULL;
693 		}
694 	}
695 
696 	if(result == NULL)
697 		return NULL;
698 
699 	if(result->n <= 0) {
700 		LM_DBG("The query returned no result\n[username]= %.*s"
701 			   "\t[domain]= %.*s\t[event]= %.*s\n",
702 				uri.user.len, uri.user.s, uri.host.len, uri.host.s,
703 				event->name.len, event->name.s);
704 
705 		pa_dbf.free_result(pa_db, result);
706 		result = NULL;
707 
708 		if(event->agg_nbody) {
709 			notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
710 			if(notify_body)
711 				goto done;
712 		}
713 		return NULL;
714 	} else {
715 		n = result->n;
716 		if(event->agg_nbody == NULL) {
717 			LM_DBG("Event does not require aggregation\n");
718 			row = &result->rows[n - 1];
719 			row_vals = ROW_VALUES(row);
720 
721 			/* if event BLA - check if sender is the same as contact */
722 			/* if so, send an empty dialog info document */
723 			if(EVENT_DIALOG_SLA(event->evp) && contact) {
724 				sender.s = (char *)row_vals[sender_col].val.string_val;
725 				if(sender.s == NULL || strlen(sender.s) == 0)
726 					goto after_sender_check;
727 				sender.len = strlen(sender.s);
728 
729 				if(sender.len == contact->len
730 						&& presence_sip_uri_match(&sender, contact) == 0) {
731 					notify_body = build_empty_bla_body(pres_uri);
732 					pa_dbf.free_result(pa_db, result);
733 					return notify_body;
734 				}
735 			}
736 
737 		after_sender_check:
738 			if(row_vals[body_col].val.string_val == NULL) {
739 				LM_ERR("NULL notify body record\n");
740 				goto error;
741 			}
742 			len = strlen(row_vals[body_col].val.string_val);
743 			if(len == 0) {
744 				LM_ERR("Empty notify body record\n");
745 				goto error;
746 			}
747 			notify_body = (str *)pkg_malloc(sizeof(str));
748 			if(notify_body == NULL) {
749 				ERR_MEM(PKG_MEM_STR);
750 			}
751 			memset(notify_body, 0, sizeof(str));
752 			notify_body->s = (char *)pkg_malloc(len * sizeof(char));
753 			if(notify_body->s == NULL) {
754 				pkg_free(notify_body);
755 				ERR_MEM(PKG_MEM_STR);
756 			}
757 			memcpy(notify_body->s, row_vals[body_col].val.string_val, len);
758 			notify_body->len = len;
759 			pa_dbf.free_result(pa_db, result);
760 
761 			return notify_body;
762 		}
763 
764 		LM_DBG("Event requires aggregation\n");
765 
766 		body_array = (str **)pkg_malloc((n + 2) * sizeof(str *));
767 		if(body_array == NULL) {
768 			ERR_MEM(PKG_MEM_STR);
769 		}
770 		memset(body_array, 0, (n + 2) * sizeof(str *));
771 
772 		if(etag != NULL) {
773 			LM_DBG("searched etag = %.*s len= %d\n", etag->len, etag->s,
774 					etag->len);
775 			LM_DBG("etag not NULL\n");
776 			for(i = 0; i < n; i++) {
777 				row = &result->rows[i];
778 				row_vals = ROW_VALUES(row);
779 				etags.s = (char *)row_vals[etag_col].val.string_val;
780 				etags.len = strlen(etags.s);
781 
782 				LM_DBG("etag = %.*s len= %d\n", etags.len, etags.s, etags.len);
783 				if((etags.len == etag->len)
784 						&& (strncmp(etags.s, etag->s, etags.len) == 0)) {
785 					LM_DBG("found etag\n");
786 					build_off_n = i;
787 				}
788 				len = strlen((char *)row_vals[body_col].val.string_val);
789 				if(len == 0) {
790 					LM_ERR("Empty notify body record\n");
791 					goto error;
792 				}
793 
794 				size = sizeof(str) + len * sizeof(char);
795 				body = (str *)pkg_malloc(size);
796 				if(body == NULL) {
797 					ERR_MEM(PKG_MEM_STR);
798 				}
799 				memset(body, 0, size);
800 				size = sizeof(str);
801 				body->s = (char *)body + size;
802 				memcpy(body->s, (char *)row_vals[body_col].val.string_val, len);
803 				body->len = len;
804 
805 				body_array[i] = body;
806 			}
807 		} else {
808 			for(i = 0; i < n; i++) {
809 				row = &result->rows[i];
810 				row_vals = ROW_VALUES(row);
811 
812 				len = strlen((char *)row_vals[body_col].val.string_val);
813 				if(len == 0) {
814 					LM_ERR("Empty notify body record\n");
815 					goto error;
816 				}
817 
818 				size = sizeof(str) + len * sizeof(char);
819 				body = (str *)pkg_malloc(size);
820 				if(body == NULL) {
821 					ERR_MEM(PKG_MEM_STR);
822 				}
823 				memset(body, 0, size);
824 				size = sizeof(str);
825 				body->s = (char *)body + size;
826 				memcpy(body->s, row_vals[body_col].val.string_val, len);
827 				body->len = len;
828 
829 				body_array[i] = body;
830 			}
831 		}
832 		pa_dbf.free_result(pa_db, result);
833 		result = NULL;
834 
835 		notify_body = event->agg_nbody(
836 				&uri.user, &uri.host, body_array, n, build_off_n);
837 	}
838 
839 done:
840 	if(body_array != NULL) {
841 		for(i = 0; i < n; i++) {
842 			if(body_array[i])
843 				pkg_free(body_array[i]);
844 		}
845 		pkg_free(body_array);
846 	}
847 	return notify_body;
848 
849 error:
850 	if(result != NULL)
851 		pa_dbf.free_result(pa_db, result);
852 
853 	if(body_array != NULL) {
854 		for(i = 0; i < n; i++) {
855 			if(body_array[i])
856 				pkg_free(body_array[i]);
857 			else
858 				break;
859 		}
860 
861 		pkg_free(body_array);
862 	}
863 	return NULL;
864 }
865 
ps_cache_get_p_notify_body(str pres_uri,pres_ev_t * event,str * etag,str * contact)866 str *ps_cache_get_p_notify_body(str pres_uri, pres_ev_t *event, str *etag,
867 		str *contact)
868 {
869 	sip_uri_t uri;
870 	ps_presentity_t ptm;
871 	ps_presentity_t *pti;
872 	ps_presentity_t *ptlist = NULL;
873 	int n = 0;
874 	int i = 0;
875 	str **body_array = NULL;
876 	str *notify_body = NULL;
877 	str *body;
878 	int size = 0;
879 	int build_off_n = -1;
880 
881 	if(parse_uri(pres_uri.s, pres_uri.len, &uri) < 0) {
882 		LM_ERR("while parsing uri\n");
883 		return NULL;
884 	}
885 	memset(&ptm, 0, sizeof(ps_presentity_t));
886 
887 	ptm.user = uri.user;
888 	ptm.domain = uri.host;
889 	ptm.event = event->name;
890 	if(pres_startup_mode == 1) {
891 		ptm.expires = (int)time(NULL);
892 	}
893 
894 	ptlist = ps_ptable_search(&ptm, 1, pres_retrieve_order);
895 
896 	if(ptlist == NULL) {
897 		LM_DBG("the query returned no result\n[username]= %.*s"
898 			   "\t[domain]= %.*s\t[event]= %.*s\n",
899 				uri.user.len, uri.user.s, uri.host.len, uri.host.s,
900 				event->name.len, event->name.s);
901 
902 		if(event->agg_nbody) {
903 			notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
904 			if(notify_body) {
905 				goto done;
906 			}
907 		}
908 		return NULL;
909 	}
910 
911 	if(event->agg_nbody == NULL) {
912 		LM_DBG("event does not require aggregation\n");
913 		pti = ptlist;
914 		while(pti->next) {
915 			pti = pti->next;
916 		}
917 
918 		/* if event BLA - check if sender is the same as contact */
919 		/* if so, send an empty dialog info document */
920 		if(EVENT_DIALOG_SLA(event->evp) && contact) {
921 			if(pti->sender.s == NULL || pti->sender.len <= 0) {
922 				LM_DBG("no sender address\n");
923 				goto after_sender_check;
924 			}
925 
926 			if(pti->sender.len == contact->len
927 					&& presence_sip_uri_match(&pti->sender, contact) == 0) {
928 				notify_body = build_empty_bla_body(pres_uri);
929 				ps_presentity_list_free(ptlist, 1);
930 				return notify_body;
931 			}
932 		}
933 
934 	after_sender_check:
935 		if(pti->body.s == NULL || pti->body.len <= 0) {
936 			LM_ERR("NULL notify body record\n");
937 			goto error;
938 		}
939 
940 		notify_body = (str *)pkg_malloc(sizeof(str));
941 		if(notify_body == NULL) {
942 			ERR_MEM(PKG_MEM_STR);
943 		}
944 		memset(notify_body, 0, sizeof(str));
945 		notify_body->s = (char *)pkg_malloc((pti->body.len+1) * sizeof(char));
946 		if(notify_body->s == NULL) {
947 			pkg_free(notify_body);
948 			ERR_MEM(PKG_MEM_STR);
949 		}
950 		memcpy(notify_body->s, pti->body.s, pti->body.len);
951 		notify_body->len = pti->body.len;
952 		ps_presentity_list_free(ptlist, 1);
953 
954 		return notify_body;
955 	}
956 
957 	LM_DBG("event requires aggregation\n");
958 
959 	n = 0;
960 	pti = ptlist;
961 	while(pti) {
962 		n++;
963 		pti = pti->next;
964 	}
965 	body_array = (str **)pkg_malloc((n + 2) * sizeof(str *));
966 	if(body_array == NULL) {
967 		ERR_MEM(PKG_MEM_STR);
968 	}
969 	memset(body_array, 0, (n + 2) * sizeof(str *));
970 
971 	if(etag != NULL) {
972 		LM_DBG("searched etag = %.*s len= %d\n", etag->len, etag->s,
973 				etag->len);
974 		LM_DBG("etag not NULL\n");
975 		pti = ptlist;
976 		i = 0;
977 		while(pti) {
978 			LM_DBG("etag = %.*s len= %d\n", pti->etag.len, pti->etag.s,
979 					pti->etag.len);
980 			if((pti->etag.len == etag->len)
981 					&& (strncmp(pti->etag.s, etag->s, pti->etag.len) == 0)) {
982 				LM_DBG("found etag\n");
983 				build_off_n = i;
984 			}
985 			if(pti->body.s == NULL || pti->body.len <= 0) {
986 				LM_ERR("Empty notify body record\n");
987 				goto error;
988 			}
989 
990 			size = sizeof(str) + (pti->body.len +1) * sizeof(char);
991 			body = (str *)pkg_malloc(size);
992 			if(body == NULL) {
993 				ERR_MEM(PKG_MEM_STR);
994 			}
995 			memset(body, 0, size);
996 			size = sizeof(str);
997 			body->s = (char *)body + size;
998 			memcpy(body->s, pti->body.s, pti->body.len);
999 			body->len = pti->body.len;
1000 
1001 			body_array[i] = body;
1002 			i++;
1003 			pti = pti->next;
1004 		}
1005 	} else {
1006 		pti = ptlist;
1007 		i = 0;
1008 		while(pti) {
1009 			if(pti->body.s == NULL || pti->body.len <= 0) {
1010 				LM_ERR("Empty notify body record\n");
1011 				goto error;
1012 			}
1013 
1014 			size = sizeof(str) + (pti->body.len+1) * sizeof(char);
1015 			body = (str *)pkg_malloc(size);
1016 			if(body == NULL) {
1017 				ERR_MEM(PKG_MEM_STR);
1018 			}
1019 			memset(body, 0, size);
1020 			size = sizeof(str);
1021 			body->s = (char *)body + size;
1022 			memcpy(body->s, pti->body.s, pti->body.len);
1023 			body->len = pti->body.len;
1024 
1025 			body_array[i] = body;
1026 			i++;
1027 			pti = pti->next;
1028 		}
1029 	}
1030 
1031 	ps_presentity_list_free(ptlist, 1);
1032 
1033 	notify_body = event->agg_nbody(
1034 			&uri.user, &uri.host, body_array, n, build_off_n);
1035 
1036 done:
1037 	if(body_array != NULL) {
1038 		for(i = 0; i < n; i++) {
1039 			if(body_array[i]) {
1040 				pkg_free(body_array[i]);
1041 			}
1042 		}
1043 		pkg_free(body_array);
1044 	}
1045 	return notify_body;
1046 
1047 error:
1048 	if(ptlist != NULL) {
1049 		ps_presentity_list_free(ptlist, 1);
1050 	}
1051 
1052 	if(body_array != NULL) {
1053 		for(i = 0; i < n; i++) {
1054 			if(body_array[i])
1055 				pkg_free(body_array[i]);
1056 			else
1057 				break;
1058 		}
1059 
1060 		pkg_free(body_array);
1061 	}
1062 	return NULL;
1063 }
1064 
get_p_notify_body(str pres_uri,pres_ev_t * event,str * etag,str * contact)1065 str *get_p_notify_body(str pres_uri, pres_ev_t *event, str *etag, str *contact)
1066 {
1067 	if(publ_cache_mode == PS_PCACHE_RECORD) {
1068 		return ps_cache_get_p_notify_body(pres_uri, event, etag, contact);
1069 	} else {
1070 		return ps_db_get_p_notify_body(pres_uri, event, etag, contact);
1071 	}
1072 }
1073 
free_notify_body(str * body,pres_ev_t * ev)1074 void free_notify_body(str *body, pres_ev_t *ev)
1075 {
1076 	if(body != NULL) {
1077 		if(body->s != NULL) {
1078 			if(ev->type & WINFO_TYPE)
1079 				xmlFree(body->s);
1080 			else if(ev->agg_nbody == NULL && ev->apply_auth_nbody == NULL)
1081 				pkg_free(body->s);
1082 			else
1083 				ev->free_body(body->s);
1084 		}
1085 		pkg_free(body);
1086 	}
1087 }
1088 
ps_free_tm_dlg(dlg_t * td)1089 static int ps_free_tm_dlg(dlg_t *td)
1090 {
1091 	if(td) {
1092 		if(td->loc_uri.s)
1093 			pkg_free(td->loc_uri.s);
1094 		if(td->rem_uri.s)
1095 			pkg_free(td->rem_uri.s);
1096 
1097 		if(td->route_set)
1098 			free_rr(&td->route_set);
1099 		pkg_free(td);
1100 	}
1101 	return 0;
1102 }
1103 
ps_build_dlg_t(subs_t * subs)1104 dlg_t *ps_build_dlg_t(subs_t *subs)
1105 {
1106 	dlg_t *td = NULL;
1107 	int found_contact = 1;
1108 
1109 	td = (dlg_t *)pkg_malloc(sizeof(dlg_t));
1110 	if(td == NULL) {
1111 		ERR_MEM(PKG_MEM_STR);
1112 	}
1113 	memset(td, 0, sizeof(dlg_t));
1114 
1115 	td->loc_seq.value = subs->local_cseq;
1116 	td->loc_seq.is_set = 1;
1117 
1118 	td->id.call_id = subs->callid;
1119 	td->id.rem_tag = subs->from_tag;
1120 	td->id.loc_tag = subs->to_tag;
1121 
1122 	uandd_to_uri(subs->to_user, subs->to_domain, &td->loc_uri);
1123 	if(td->loc_uri.s == NULL) {
1124 		LM_ERR("while creating uri\n");
1125 		goto error;
1126 	}
1127 
1128 	if(subs->contact.len == 0 || subs->contact.s == NULL) {
1129 		found_contact = 0;
1130 	} else {
1131 		LM_DBG("CONTACT = %.*s\n", subs->contact.len, subs->contact.s);
1132 		td->rem_target = subs->contact;
1133 	}
1134 
1135 	uandd_to_uri(subs->from_user, subs->from_domain, &td->rem_uri);
1136 	if(td->rem_uri.s == NULL) {
1137 		LM_ERR("while creating uri\n");
1138 		goto error;
1139 	}
1140 
1141 	if(found_contact == 0) {
1142 		td->rem_target = td->rem_uri;
1143 	}
1144 	if(subs->record_route.s && subs->record_route.len) {
1145 		if(parse_rr_body(
1146 				   subs->record_route.s, subs->record_route.len, &td->route_set)
1147 				< 0) {
1148 			LM_ERR("in function parse_rr_body\n");
1149 			goto error;
1150 		}
1151 	}
1152 	td->state = DLG_CONFIRMED;
1153 
1154 	if(subs->sockinfo_str.len) {
1155 		int port, proto;
1156 		str host;
1157 		char *tmp;
1158 		if((tmp = as_asciiz(&subs->sockinfo_str)) == NULL) {
1159 			LM_ERR("no pkg memory left\n");
1160 			goto error;
1161 		}
1162 		if(parse_phostport(tmp, &host.s, &host.len, &port, &proto)) {
1163 			LM_ERR("bad sockinfo string\n");
1164 			pkg_free(tmp);
1165 			goto error;
1166 		}
1167 		td->send_sock = grep_sock_info(
1168 				&host, (unsigned short)port, (unsigned short)proto);
1169 		pkg_free(tmp);
1170 	}
1171 
1172 	return td;
1173 
1174 error:
1175 	ps_free_tm_dlg(td);
1176 	return NULL;
1177 }
1178 
get_subs_db(str * pres_uri,pres_ev_t * event,str * sender,subs_t ** s_array,int * n)1179 int get_subs_db(
1180 		str *pres_uri, pres_ev_t *event, str *sender, subs_t **s_array, int *n)
1181 {
1182 	db_key_t query_cols[7];
1183 	db_op_t query_ops[7];
1184 	db_val_t query_vals[7];
1185 	db_key_t result_cols[21];
1186 	int n_result_cols = 0, n_query_cols = 0;
1187 	db_row_t *row;
1188 	db_val_t *row_vals;
1189 	db1_res_t *result = NULL;
1190 	int from_user_col, from_domain_col, from_tag_col;
1191 	int to_user_col, to_domain_col, to_tag_col;
1192 	int expires_col = 0, callid_col, cseq_col, i, reason_col;
1193 	int version_col = 0, record_route_col = 0, contact_col = 0;
1194 	int sockinfo_col = 0, local_contact_col = 0, event_id_col = 0;
1195 	int watcher_user_col = 0, watcher_domain_col = 0;
1196 	int flags_col = 0, user_agent_col = 0;
1197 	subs_t s, *s_new;
1198 	int inc = 0;
1199 
1200 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
1201 		LM_ERR("in use_table\n");
1202 		return -1;
1203 	}
1204 
1205 	LM_DBG("querying database table = active_watchers\n");
1206 	query_cols[n_query_cols] = &str_presentity_uri_col;
1207 	query_ops[n_query_cols] = OP_EQ;
1208 	query_vals[n_query_cols].type = DB1_STR;
1209 	query_vals[n_query_cols].nul = 0;
1210 	query_vals[n_query_cols].val.str_val = *pres_uri;
1211 	n_query_cols++;
1212 
1213 	query_cols[n_query_cols] = &str_event_col;
1214 	query_ops[n_query_cols] = OP_EQ;
1215 	query_vals[n_query_cols].type = DB1_STR;
1216 	query_vals[n_query_cols].nul = 0;
1217 	query_vals[n_query_cols].val.str_val = event->name;
1218 	n_query_cols++;
1219 
1220 	query_cols[n_query_cols] = &str_status_col;
1221 	query_ops[n_query_cols] = OP_EQ;
1222 	query_vals[n_query_cols].type = DB1_INT;
1223 	query_vals[n_query_cols].nul = 0;
1224 	query_vals[n_query_cols].val.int_val = ACTIVE_STATUS;
1225 	n_query_cols++;
1226 
1227 	query_cols[n_query_cols] = &str_contact_col;
1228 	query_ops[n_query_cols] = OP_NEQ;
1229 	query_vals[n_query_cols].type = DB1_STR;
1230 	query_vals[n_query_cols].nul = 0;
1231 	if(sender) {
1232 		LM_DBG("Do not send Notify to:[uri]= %.*s\n", sender->len, sender->s);
1233 		query_vals[n_query_cols].val.str_val = *sender;
1234 	} else {
1235 		query_vals[n_query_cols].val.str_val.s = "";
1236 		query_vals[n_query_cols].val.str_val.len = 0;
1237 	}
1238 	n_query_cols++;
1239 
1240 	result_cols[to_user_col = n_result_cols++] = &str_to_user_col;
1241 	result_cols[to_domain_col = n_result_cols++] = &str_to_domain_col;
1242 	result_cols[from_user_col = n_result_cols++] = &str_from_user_col;
1243 	result_cols[from_domain_col = n_result_cols++] = &str_from_domain_col;
1244 	result_cols[watcher_user_col = n_result_cols++] = &str_watcher_username_col;
1245 	result_cols[watcher_domain_col = n_result_cols++] = &str_watcher_domain_col;
1246 	result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
1247 	result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
1248 	result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
1249 	result_cols[callid_col = n_result_cols++] = &str_callid_col;
1250 	result_cols[cseq_col = n_result_cols++] = &str_local_cseq_col;
1251 	result_cols[record_route_col = n_result_cols++] = &str_record_route_col;
1252 	result_cols[contact_col = n_result_cols++] = &str_contact_col;
1253 	result_cols[expires_col = n_result_cols++] = &str_expires_col;
1254 	result_cols[reason_col = n_result_cols++] = &str_reason_col;
1255 	result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
1256 	result_cols[local_contact_col = n_result_cols++] = &str_local_contact_col;
1257 	result_cols[version_col = n_result_cols++] = &str_version_col;
1258 	result_cols[flags_col = n_result_cols++] = &str_flags_col;
1259 	result_cols[user_agent_col = n_result_cols++] = &str_user_agent_col;
1260 
1261 	if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
1262 			   n_query_cols, n_result_cols, 0, &result)
1263 			< 0) {
1264 		LM_ERR("while querying database\n");
1265 		if(result) {
1266 			pa_dbf.free_result(pa_db, result);
1267 		}
1268 		return -1;
1269 	}
1270 
1271 	if(result == NULL)
1272 		return -1;
1273 
1274 	if(result->n <= 0) {
1275 		LM_DBG("The query for subscribtion for [uri]= %.*s for [event]= %.*s"
1276 			   " returned no result\n",
1277 				pres_uri->len, pres_uri->s, event->name.len, event->name.s);
1278 		pa_dbf.free_result(pa_db, result);
1279 		return 0;
1280 	}
1281 	LM_DBG("found %d dialogs\n", result->n);
1282 
1283 	for(i = 0; i < result->n; i++) {
1284 		row = &result->rows[i];
1285 		row_vals = ROW_VALUES(row);
1286 
1287 		if(row_vals[reason_col].val.string_val) {
1288 			if(strlen(row_vals[reason_col].val.string_val) != 0)
1289 				continue;
1290 		}
1291 
1292 		//	s.reason.len= strlen(s.reason.s);
1293 
1294 		memset(&s, 0, sizeof(subs_t));
1295 		s.status = ACTIVE_STATUS;
1296 
1297 		s.pres_uri = *pres_uri;
1298 		s.to_user.s = (char *)row_vals[to_user_col].val.string_val;
1299 		s.to_user.len = strlen(s.to_user.s);
1300 
1301 		s.to_domain.s = (char *)row_vals[to_domain_col].val.string_val;
1302 		s.to_domain.len = strlen(s.to_domain.s);
1303 
1304 		s.from_user.s = (char *)row_vals[from_user_col].val.string_val;
1305 		s.from_user.len = strlen(s.from_user.s);
1306 
1307 		s.from_domain.s = (char *)row_vals[from_domain_col].val.string_val;
1308 		s.from_domain.len = strlen(s.from_domain.s);
1309 
1310 		s.watcher_user.s = (char *)row_vals[watcher_user_col].val.string_val;
1311 		s.watcher_user.len = strlen(s.watcher_user.s);
1312 
1313 		s.watcher_domain.s =
1314 				(char *)row_vals[watcher_domain_col].val.string_val;
1315 		s.watcher_domain.len = strlen(s.watcher_domain.s);
1316 
1317 		s.event_id.s = (char *)row_vals[event_id_col].val.string_val;
1318 		s.event_id.len = (s.event_id.s) ? strlen(s.event_id.s) : 0;
1319 
1320 		s.to_tag.s = (char *)row_vals[to_tag_col].val.string_val;
1321 		s.to_tag.len = strlen(s.to_tag.s);
1322 
1323 		s.from_tag.s = (char *)row_vals[from_tag_col].val.string_val;
1324 		s.from_tag.len = strlen(s.from_tag.s);
1325 
1326 		s.callid.s = (char *)row_vals[callid_col].val.string_val;
1327 		s.callid.len = strlen(s.callid.s);
1328 
1329 		s.record_route.s = (char *)row_vals[record_route_col].val.string_val;
1330 		s.record_route.len = (s.record_route.s) ? strlen(s.record_route.s) : 0;
1331 
1332 		s.contact.s = (char *)row_vals[contact_col].val.string_val;
1333 		s.contact.len = strlen(s.contact.s);
1334 
1335 		s.sockinfo_str.s = (char *)row_vals[sockinfo_col].val.string_val;
1336 		s.sockinfo_str.len = s.sockinfo_str.s ? strlen(s.sockinfo_str.s) : 0;
1337 
1338 		s.local_contact.s = (char *)row_vals[local_contact_col].val.string_val;
1339 		s.local_contact.len = s.local_contact.s ? strlen(s.local_contact.s) : 0;
1340 
1341 		s.event = event;
1342 		s.local_cseq = row_vals[cseq_col].val.int_val + 1;
1343 		if(row_vals[expires_col].val.int_val < (int)time(NULL) + pres_expires_offset)
1344 			s.expires = 0;
1345 		else
1346 			s.expires = row_vals[expires_col].val.int_val - (int)time(NULL);
1347 		s.version = row_vals[version_col].val.int_val + 1;
1348 		s.flags = row_vals[flags_col].val.int_val;
1349 		s.user_agent.s = (char *)row_vals[user_agent_col].val.string_val;
1350 		s.user_agent.len = (s.user_agent.s) ? strlen(s.user_agent.s) : 0;
1351 
1352 		s_new = mem_copy_subs(&s, PKG_MEM_TYPE);
1353 		if(s_new == NULL) {
1354 			LM_ERR("while copying subs_t structure\n");
1355 			goto error;
1356 		}
1357 		s_new->next = (*s_array);
1358 		(*s_array) = s_new;
1359 		printf_subs(s_new);
1360 		inc++;
1361 	}
1362 	pa_dbf.free_result(pa_db, result);
1363 	*n = inc;
1364 
1365 	return 0;
1366 
1367 error:
1368 	if(result)
1369 		pa_dbf.free_result(pa_db, result);
1370 
1371 	return -1;
1372 }
1373 
get_subs_dialog(str * pres_uri,pres_ev_t * event,str * sender)1374 subs_t *get_subs_dialog(str *pres_uri, pres_ev_t *event, str *sender)
1375 {
1376 	unsigned int hash_code;
1377 	subs_t *s = NULL, *s_new;
1378 	subs_t *s_array = NULL;
1379 	int n = 0;
1380 
1381 	/* if pres_subs_dbmode!=DB_ONLY, should take the subscriptions from the
1382 		hashtable only in DB_ONLY mode should take all dialogs from db */
1383 
1384 	if(pres_subs_dbmode == DB_ONLY) {
1385 		if(get_subs_db(pres_uri, event, sender, &s_array, &n) < 0) {
1386 			LM_ERR("getting dialogs from database\n");
1387 			goto error;
1388 		}
1389 	} else {
1390 		hash_code = core_case_hash(pres_uri, &event->name, shtable_size);
1391 
1392 		lock_get(&subs_htable[hash_code].lock);
1393 
1394 		s = subs_htable[hash_code].entries;
1395 
1396 		while(s->next) {
1397 			s = s->next;
1398 
1399 			printf_subs(s);
1400 
1401 			if(s->expires < (int)time(NULL)) {
1402 				LM_DBG("expired subs\n");
1403 				continue;
1404 			}
1405 
1406 			if((!(s->status == ACTIVE_STATUS && s->reason.len == 0
1407 					   && s->event == event && s->pres_uri.len == pres_uri->len
1408 					   && presence_sip_uri_match(&s->pres_uri, pres_uri) == 0))
1409 					|| (sender && sender->len == s->contact.len
1410 							   && presence_sip_uri_match(sender, &s->contact)
1411 										  == 0))
1412 				continue;
1413 
1414 			s_new = mem_copy_subs(s, PKG_MEM_TYPE);
1415 			if(s_new == NULL) {
1416 				LM_ERR("copying subs_t structure\n");
1417 				lock_release(&subs_htable[hash_code].lock);
1418 				goto error;
1419 			}
1420 			s_new->expires -= (int)time(NULL);
1421 			s_new->next = s_array;
1422 			s_array = s_new;
1423 		}
1424 		lock_release(&subs_htable[hash_code].lock);
1425 	}
1426 
1427 	return s_array;
1428 
1429 error:
1430 	free_subs_list(s_array, PKG_MEM_TYPE, 0);
1431 	return NULL;
1432 }
1433 
publ_notify(presentity_t * p,str pres_uri,str * body,str * offline_etag,str * rules_doc)1434 int publ_notify(presentity_t *p, str pres_uri, str *body, str *offline_etag,
1435 		str *rules_doc)
1436 {
1437 	str *notify_body = NULL;
1438 	subs_t *subs_array = NULL, *s = NULL;
1439 	int ret_code = -1;
1440 
1441 	subs_array = get_subs_dialog(&pres_uri, p->event, p->sender);
1442 	if(subs_array == NULL) {
1443 		LM_DBG("Could not find subs_dialog\n");
1444 		ret_code = 0;
1445 		goto done;
1446 	}
1447 
1448 	/* if the event does not require aggregation - we have the final body */
1449 	if(p->event->agg_nbody) {
1450 		notify_body = get_p_notify_body(pres_uri, p->event, offline_etag, NULL);
1451 		if(notify_body == NULL) {
1452 			LM_DBG("Could not get the notify_body\n");
1453 			/* goto error; */
1454 		}
1455 	}
1456 
1457 	s = subs_array;
1458 	while(s) {
1459 		s->auth_rules_doc = rules_doc;
1460 
1461 		if(notify(s, NULL, notify_body ? notify_body : body, 0,
1462 				   p->event->aux_body_processing)
1463 				< 0) {
1464 			LM_ERR("Could not send notify for %.*s\n", p->event->name.len,
1465 					p->event->name.s);
1466 		}
1467 
1468 		s = s->next;
1469 	}
1470 	ret_code = 0;
1471 
1472 done:
1473 	free_subs_list(subs_array, PKG_MEM_TYPE, 0);
1474 	free_notify_body(notify_body, p->event);
1475 	return ret_code;
1476 }
1477 
publ_notify_notifier(str pres_uri,pres_ev_t * event)1478 int publ_notify_notifier(str pres_uri, pres_ev_t *event)
1479 {
1480 	db_key_t query_cols[2], result_cols[3];
1481 	db_val_t query_vals[2], *values;
1482 	db_row_t *rows;
1483 	db1_res_t *result = NULL;
1484 	int n_query_cols = 0, n_result_cols = 0;
1485 	int r_callid_col = 0, r_to_tag_col = 0, r_from_tag_col = 0;
1486 	int i;
1487 	int ret = -1;
1488 	subs_t subs;
1489 	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
1490 
1491 	if(pa_db == NULL) {
1492 		LM_ERR("null database connection\n");
1493 		goto error;
1494 	}
1495 
1496 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
1497 		LM_ERR("use table failed\n");
1498 		goto error;
1499 	}
1500 
1501 	query_cols[n_query_cols] = &str_presentity_uri_col;
1502 	query_vals[n_query_cols].type = DB1_STR;
1503 	query_vals[n_query_cols].nul = 0;
1504 	query_vals[n_query_cols].val.str_val = pres_uri;
1505 	n_query_cols++;
1506 
1507 	query_cols[n_query_cols] = &str_event_col;
1508 	query_vals[n_query_cols].type = DB1_STR;
1509 	query_vals[n_query_cols].nul = 0;
1510 	query_vals[n_query_cols].val.str_val = event->name;
1511 	n_query_cols++;
1512 
1513 	result_cols[r_callid_col = n_result_cols++] = &str_callid_col;
1514 	result_cols[r_to_tag_col = n_result_cols++] = &str_to_tag_col;
1515 	result_cols[r_from_tag_col = n_result_cols++] = &str_from_tag_col;
1516 
1517 	if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
1518 			   n_result_cols, 0, &result)
1519 			< 0) {
1520 		LM_ERR("Can't query db\n");
1521 		goto error;
1522 	}
1523 
1524 	if(result == NULL) {
1525 		LM_ERR("bad result\n");
1526 		goto error;
1527 	}
1528 
1529 	rows = RES_ROWS(result);
1530 	for(i = 0; i < RES_ROW_N(result); i++) {
1531 		values = ROW_VALUES(&rows[i]);
1532 
1533 		subs.callid.s = (char *)VAL_STRING(&values[r_callid_col]);
1534 		subs.callid.len = strlen(subs.callid.s);
1535 		subs.to_tag.s = (char *)VAL_STRING(&values[r_to_tag_col]);
1536 		subs.to_tag.len = strlen(subs.to_tag.s);
1537 		subs.from_tag.s = (char *)VAL_STRING(&values[r_from_tag_col]);
1538 		subs.from_tag.len = strlen(subs.from_tag.s);
1539 
1540 		set_updated(&subs);
1541 	}
1542 
1543 	ret = RES_ROW_N(result);
1544 
1545 error:
1546 	if(result)
1547 		pa_dbf.free_result(pa_db, result);
1548 
1549 	return ret;
1550 }
1551 
query_db_notify(str * pres_uri,pres_ev_t * event,subs_t * watcher_subs)1552 int query_db_notify(str *pres_uri, pres_ev_t *event, subs_t *watcher_subs)
1553 {
1554 	subs_t *subs_array = NULL, *s = NULL;
1555 	str *notify_body = NULL, *aux_body = NULL;
1556 	int ret_code = -1;
1557 
1558 	subs_array = get_subs_dialog(pres_uri, event, NULL);
1559 	if(subs_array == NULL) {
1560 		LM_DBG("Could not get subscription dialog\n");
1561 		ret_code = 1;
1562 		goto done;
1563 	}
1564 
1565 	s = subs_array;
1566 
1567 	if(pres_notifier_processes > 0) {
1568 		while(s) {
1569 			set_updated(s);
1570 			s = s->next;
1571 		}
1572 	} else {
1573 		if(event->type & PUBL_TYPE)
1574 			notify_body = get_p_notify_body(*pres_uri, event, NULL, NULL);
1575 
1576 		while(s) {
1577 
1578 			if(event->aux_body_processing) {
1579 				aux_body = event->aux_body_processing(s, notify_body);
1580 			}
1581 
1582 			if(notify(s, watcher_subs, aux_body ? aux_body : notify_body, 0, 0)
1583 					< 0) {
1584 				LM_ERR("Could not send notify for [event]=%.*s\n",
1585 						event->name.len, event->name.s);
1586 				goto done;
1587 			}
1588 
1589 			if(aux_body != NULL) {
1590 				if(aux_body->s) {
1591 					event->aux_free_body(aux_body->s);
1592 				}
1593 				pkg_free(aux_body);
1594 			}
1595 			s = s->next;
1596 		}
1597 	}
1598 
1599 	ret_code = 1;
1600 
1601 done:
1602 	free_subs_list(subs_array, PKG_MEM_TYPE, 0);
1603 	free_notify_body(notify_body, event);
1604 
1605 	return ret_code;
1606 }
1607 
send_notify_request(subs_t * subs,subs_t * watcher_subs,str * n_body,int force_null_body)1608 int send_notify_request(
1609 		subs_t *subs, subs_t *watcher_subs, str *n_body, int force_null_body)
1610 {
1611 	dlg_t *td = NULL;
1612 	str met = {"NOTIFY", 6};
1613 	str str_hdr = {0, 0};
1614 	str *notify_body = NULL;
1615 	int result = 0;
1616 	subs_t *cb_param = NULL;
1617 	str *final_body = NULL;
1618 	uac_req_t uac_r;
1619 	str *aux_body = NULL;
1620 	subs_t *backup_subs = NULL;
1621 
1622 	LM_DBG("dialog info:\n");
1623 	printf_subs(subs);
1624 
1625 	/* getting the status of the subscription */
1626 
1627 	if(force_null_body) {
1628 		goto jump_over_body;
1629 	}
1630 
1631 	if(n_body != NULL && subs->status == ACTIVE_STATUS) {
1632 		if(subs->event->req_auth) {
1633 
1634 			if(subs->auth_rules_doc && subs->event->apply_auth_nbody) {
1635 				if(subs->event->apply_auth_nbody(n_body, subs, &notify_body)
1636 						< 0) {
1637 					LM_ERR("in function apply_auth_nbody\n");
1638 					goto error;
1639 				}
1640 			}
1641 			if(notify_body == NULL)
1642 				notify_body = n_body;
1643 		} else
1644 			notify_body = n_body;
1645 	} else {
1646 		if(subs->status == TERMINATED_STATUS
1647 				|| subs->status == PENDING_STATUS) {
1648 			LM_DBG("state terminated or pending- notify body NULL\n");
1649 			notify_body = NULL;
1650 		} else {
1651 			if(subs->event->type & WINFO_TYPE) {
1652 				notify_body = get_wi_notify_body(subs, watcher_subs);
1653 				if(notify_body == NULL) {
1654 					LM_DBG("Could not get notify_body\n");
1655 					goto error;
1656 				}
1657 			} else {
1658 				notify_body = get_p_notify_body(subs->pres_uri, subs->event,
1659 						NULL, (subs->contact.s) ? &subs->contact : NULL);
1660 				if(notify_body == NULL || notify_body->s == NULL) {
1661 					LM_DBG("Could not get the notify_body\n");
1662 				} else {
1663 					/* call aux_body_processing if exists */
1664 					if(subs->event->aux_body_processing) {
1665 						aux_body = subs->event->aux_body_processing(
1666 								subs, notify_body);
1667 						if(aux_body) {
1668 							free_notify_body(notify_body, subs->event);
1669 							notify_body = aux_body;
1670 						}
1671 					}
1672 
1673 					/* apply authorization rules if exists */
1674 					if(subs->event->req_auth) {
1675 						if(subs->auth_rules_doc && subs->event->apply_auth_nbody
1676 								&& subs->event->apply_auth_nbody(
1677 										   notify_body, subs, &final_body)
1678 										   < 0) {
1679 							LM_ERR("in function apply_auth\n");
1680 							goto error;
1681 						}
1682 						if(final_body) {
1683 							xmlFree(notify_body->s);
1684 							pkg_free(notify_body);
1685 							notify_body = final_body;
1686 						}
1687 					}
1688 				}
1689 			}
1690 		}
1691 	}
1692 
1693 jump_over_body:
1694 
1695 	if(subs->expires <= 0) {
1696 		subs->expires = 0;
1697 		subs->status = TERMINATED_STATUS;
1698 		subs->reason.s = "timeout";
1699 		subs->reason.len = 7;
1700 	}
1701 
1702 	/* build extra headers */
1703 	if(build_str_hdr(subs, notify_body ? 1 : 0, &str_hdr) < 0) {
1704 		LM_ERR("while building headers\n");
1705 		goto error;
1706 	}
1707 	LM_DBG("headers:\n%.*s\n", str_hdr.len, str_hdr.s);
1708 
1709 	/* construct the dlg_t structure */
1710 	td = ps_build_dlg_t(subs);
1711 	if(td == NULL) {
1712 		LM_ERR("while building dlg_t structure\n");
1713 		goto error;
1714 	}
1715 
1716 	LM_DBG("expires %d status %d\n", subs->expires, subs->status);
1717 	cb_param = mem_copy_subs(subs, SHM_MEM_TYPE);
1718 
1719 	if(_pres_subs_mode==1) {
1720 		backup_subs = _pres_subs_last_sub;
1721 		_pres_subs_last_sub = subs;
1722 	}
1723 
1724 	set_uac_req(&uac_r, &met, &str_hdr, notify_body, td, TMCB_LOCAL_COMPLETED,
1725 			p_tm_callback, (void *)cb_param);
1726 	result = tmb.t_request_within(&uac_r);
1727 	if(_pres_subs_mode==1) {
1728 		_pres_subs_last_sub = backup_subs;
1729 	}
1730 	if(result < 0) {
1731 		LM_ERR("in function tmb.t_request_within\n");
1732 		if(cb_param)
1733 			shm_free(cb_param);
1734 		goto error;
1735 	}
1736 
1737 	LM_GEN2(pres_local_log_facility, pres_local_log_level,
1738 			"NOTIFY %.*s via %.*s on behalf of %.*s for event %.*s : %.*s\n",
1739 			td->rem_uri.len, td->rem_uri.s, td->hooks.next_hop->len,
1740 			td->hooks.next_hop->s, td->loc_uri.len, td->loc_uri.s,
1741 			subs->event->name.len, subs->event->name.s, subs->callid.len,
1742 			subs->callid.s);
1743 
1744 	ps_free_tm_dlg(td);
1745 
1746 	if(str_hdr.s)
1747 		pkg_free(str_hdr.s);
1748 
1749 	if((int)(long)n_body != (int)(long)notify_body)
1750 		free_notify_body(notify_body, subs->event);
1751 
1752 	return 0;
1753 
1754 error:
1755 	ps_free_tm_dlg(td);
1756 	if(str_hdr.s != NULL)
1757 		pkg_free(str_hdr.s);
1758 	if((int)(long)n_body != (int)(long)notify_body) {
1759 		if(notify_body != NULL) {
1760 			if(notify_body->s != NULL) {
1761 				if(subs->event->type & WINFO_TYPE)
1762 					xmlFree(notify_body->s);
1763 				else if(subs->event->apply_auth_nbody == NULL
1764 						&& subs->event->agg_nbody == NULL)
1765 					pkg_free(notify_body->s);
1766 				else
1767 					subs->event->free_body(notify_body->s);
1768 			}
1769 			pkg_free(notify_body);
1770 		}
1771 	}
1772 	return -1;
1773 }
1774 
1775 
notify(subs_t * subs,subs_t * watcher_subs,str * n_body,int force_null_body,aux_body_processing_t * aux_body_processing)1776 int notify(subs_t *subs, subs_t *watcher_subs, str *n_body, int force_null_body,
1777 		aux_body_processing_t *aux_body_processing)
1778 {
1779 
1780 	str *aux_body = NULL;
1781 
1782 	/* update first in hash table and the send Notify */
1783 	if(subs->expires != 0 && subs->status != TERMINATED_STATUS) {
1784 		unsigned int hash_code;
1785 		hash_code = core_case_hash(
1786 				&subs->pres_uri, &subs->event->name, shtable_size);
1787 
1788 		/* if subscriptions are held also in memory, update the subscription hashtable */
1789 		if(pres_subs_dbmode != DB_ONLY) {
1790 			if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE) < 0) {
1791 				/* subscriptions are held only in memory, and hashtable update failed */
1792 				LM_ERR("updating subscription record in hash table\n");
1793 				return -1;
1794 			}
1795 		}
1796 		/* if DB_ONLY mode or WRITE_THROUGH update in database */
1797 		if(subs->recv_event != PRES_SUBSCRIBE_RECV
1798 				&& ((pres_subs_dbmode == DB_ONLY && pres_notifier_processes == 0)
1799 						   || pres_subs_dbmode == WRITE_THROUGH)) {
1800 			LM_DBG("updating subscription to database\n");
1801 			if(update_subs_db(subs, LOCAL_TYPE) < 0) {
1802 				LM_ERR("updating subscription in database\n");
1803 				return -1;
1804 			}
1805 		}
1806 	}
1807 
1808 	if(subs->reason.s && subs->status == ACTIVE_STATUS && subs->reason.len == 12
1809 			&& strncmp(subs->reason.s, "polite-block", 12) == 0) {
1810 		force_null_body = 1;
1811 	}
1812 
1813 	if(!force_null_body && aux_body_processing) {
1814 		aux_body = aux_body_processing(subs, n_body);
1815 	}
1816 
1817 	if(send_notify_request(subs, watcher_subs, aux_body ? aux_body : n_body,
1818 			   force_null_body)
1819 			< 0) {
1820 		LM_ERR("sending Notify not successful\n");
1821 		if(aux_body != NULL) {
1822 			if(aux_body->s) {
1823 				subs->event->aux_free_body(aux_body->s);
1824 			}
1825 			pkg_free(aux_body);
1826 		}
1827 		return -1;
1828 	}
1829 
1830 	if(aux_body != NULL) {
1831 		if(aux_body->s) {
1832 			subs->event->aux_free_body(aux_body->s);
1833 		}
1834 		pkg_free(aux_body);
1835 	}
1836 	return 0;
1837 }
1838 
1839 static sip_msg_t *_pres_subs_notify_reply_msg = NULL;
1840 static int _pres_subs_notify_reply_code = 0;
1841 
pv_parse_notify_reply_var_name(pv_spec_p sp,str * in)1842 int pv_parse_notify_reply_var_name(pv_spec_p sp, str *in)
1843 {
1844 	pv_spec_t *pv = NULL;
1845 	if(in->s == NULL || in->len <= 0)
1846 		return -1;
1847 	pv = (pv_spec_t *)pkg_malloc(sizeof(pv_spec_t));
1848 	if(pv == NULL)
1849 		return -1;
1850 	memset(pv, 0, sizeof(pv_spec_t));
1851 	if(pv_parse_spec(in, pv) == NULL)
1852 		goto error;
1853 	sp->pvp.pvn.u.dname = (void *)pv;
1854 	sp->pvp.pvn.type = PV_NAME_PVAR;
1855 	return 0;
1856 
1857 error:
1858 	LM_ERR("invalid pv name [%.*s]\n", in->len, in->s);
1859 	if(pv != NULL)
1860 		pkg_free(pv);
1861 	return -1;
1862 }
1863 
pv_get_notify_reply(struct sip_msg * msg,pv_param_t * param,pv_value_t * res)1864 int pv_get_notify_reply(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
1865 {
1866 	pv_spec_t *pv = NULL;
1867 
1868 	if(msg == NULL)
1869 		return 1;
1870 
1871 	pv = (pv_spec_t *)param->pvn.u.dname;
1872 	if(pv == NULL)
1873 		return pv_get_null(msg, param, res);
1874 
1875 	return pv_get_spec_value(_pres_subs_notify_reply_msg, pv, res);
1876 }
1877 
1878 #define FAKED_SIP_408_MSG_FORMAT                                  \
1879 	"SIP/2.0 408 TIMEOUT\r\nVia: SIP/2.0/UDP 127.0.0.1\r\nFrom: " \
1880 	"invalid;\r\nTo: invalid\r\nCall-ID: invalid\r\nCSeq: 1 "     \
1881 	"TIMEOUT\r\nContent-Length: 0\r\n\r\n"
1882 static sip_msg_t *_faked_msg = NULL;
1883 
faked_msg()1884 sip_msg_t *faked_msg()
1885 {
1886 	if(_faked_msg == NULL) {
1887 		_faked_msg = pkg_malloc(sizeof(sip_msg_t));
1888 		if(likely(build_sip_msg_from_buf(_faked_msg, FAKED_SIP_408_MSG_FORMAT,
1889 						  strlen(FAKED_SIP_408_MSG_FORMAT), inc_msg_no())
1890 				   < 0)) {
1891 			LM_ERR("failed to parse msg buffer\n");
1892 			return NULL;
1893 		}
1894 	}
1895 	return _faked_msg;
1896 }
1897 
run_notify_reply_event(struct cell * t,struct tmcb_params * ps)1898 void run_notify_reply_event(struct cell *t, struct tmcb_params *ps)
1899 {
1900 	int backup_route_type;
1901 	subs_t *backup_subs = NULL;
1902 	sip_msg_t msg;
1903 
1904 	if(goto_on_notify_reply == -1)
1905 		return;
1906 
1907 	if(likely(build_sip_msg_from_buf(&msg, t->uac->request.buffer,
1908 					  t->uac->request.buffer_len, inc_msg_no())
1909 			   < 0)) {
1910 		LM_ERR("failed to parse msg buffer\n");
1911 		return;
1912 	}
1913 
1914 	_pres_subs_notify_reply_code = ps->code;
1915 	if(ps->code == 408 || ps->rpl == NULL) {
1916 		_pres_subs_notify_reply_msg = faked_msg();
1917 	} else {
1918 		_pres_subs_notify_reply_msg = ps->rpl;
1919 	}
1920 
1921 	if(_pres_subs_mode==1) {
1922 		backup_subs = _pres_subs_last_sub;
1923 		_pres_subs_last_sub = mem_copy_subs((subs_t *)(*ps->param), PKG_MEM_TYPE);
1924 	}
1925 
1926 	backup_route_type = get_route_type();
1927 	set_route_type(LOCAL_ROUTE);
1928 	run_top_route(event_rt.rlist[goto_on_notify_reply], &msg, 0);
1929 	set_route_type(backup_route_type);
1930 
1931 	_pres_subs_notify_reply_msg = NULL;
1932 	_pres_subs_notify_reply_code = 0;
1933 	if(_pres_subs_mode==1) {
1934 		pkg_free(_pres_subs_last_sub);
1935 		_pres_subs_last_sub = backup_subs;
1936 	}
1937 	free_sip_msg(&msg);
1938 }
1939 
pres_get_delete_sub(void)1940 int pres_get_delete_sub(void)
1941 {
1942 	sr_xavp_t *vavp = NULL;
1943 	str vname = str_init("delete_subscription");
1944 
1945 	if(pres_xavp_cfg.s == NULL || pres_xavp_cfg.len <= 0) {
1946 		return 0;
1947 	}
1948 
1949 	vavp = xavp_get_child_with_ival(&pres_xavp_cfg, &vname);
1950 	if(vavp != NULL) {
1951 		return (int)vavp->val.v.i;
1952 	}
1953 
1954 	return 0;
1955 }
1956 
p_tm_callback(struct cell * t,int type,struct tmcb_params * ps)1957 void p_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
1958 {
1959 	subs_t *subs;
1960 
1961 	if(ps->param == NULL || *ps->param == NULL) {
1962 		LM_ERR("weird shit happening\n");
1963 		if(ps->param != NULL && *ps->param != NULL)
1964 			shm_free((subs_t *)(*ps->param));
1965 		return;
1966 	}
1967 
1968 	subs = (subs_t *)(*ps->param);
1969 	LM_DBG("completed with status %d [to_tag:%.*s]\n", ps->code,
1970 			subs->to_tag.len, subs->to_tag.s);
1971 
1972 	run_notify_reply_event(t, ps);
1973 
1974 	if(ps->code == 404 || ps->code == 481
1975 			|| (ps->code == 408 && pres_timeout_rm_subs
1976 					   && subs->status != TERMINATED_STATUS)
1977 			|| pres_get_delete_sub()) {
1978 		delete_subs(&subs->pres_uri, &subs->event->name, &subs->to_tag,
1979 				&subs->from_tag, &subs->callid);
1980 	}
1981 
1982 	shm_free(subs);
1983 }
1984 
free_cbparam(c_back_param * cb_param)1985 void free_cbparam(c_back_param *cb_param)
1986 {
1987 	if(cb_param != NULL)
1988 		shm_free(cb_param);
1989 }
1990 
shm_dup_cbparam(subs_t * subs)1991 c_back_param *shm_dup_cbparam(subs_t *subs)
1992 {
1993 	int size;
1994 	c_back_param *cb_param = NULL;
1995 
1996 	size = sizeof(c_back_param) + subs->pres_uri.len + subs->event->name.len
1997 		   + subs->to_tag.len + subs->from_tag.len + subs->callid.len;
1998 
1999 	cb_param = (c_back_param *)shm_malloc(size);
2000 	LM_DBG("=== %d/%d/%d\n", subs->pres_uri.len, subs->event->name.len,
2001 			subs->to_tag.len);
2002 	if(cb_param == NULL) {
2003 		LM_ERR("no more shared memory\n");
2004 		return NULL;
2005 	}
2006 	memset(cb_param, 0, size);
2007 
2008 	cb_param->pres_uri.s = (char *)cb_param + sizeof(c_back_param);
2009 	memcpy(cb_param->pres_uri.s, subs->pres_uri.s, subs->pres_uri.len);
2010 	cb_param->pres_uri.len = subs->pres_uri.len;
2011 	cb_param->ev_name.s =
2012 			(char *)(cb_param->pres_uri.s) + cb_param->pres_uri.len;
2013 	memcpy(cb_param->ev_name.s, subs->event->name.s, subs->event->name.len);
2014 	cb_param->ev_name.len = subs->event->name.len;
2015 	cb_param->to_tag.s = (char *)(cb_param->ev_name.s) + cb_param->ev_name.len;
2016 	memcpy(cb_param->to_tag.s, subs->to_tag.s, subs->to_tag.len);
2017 	cb_param->to_tag.len = subs->to_tag.len;
2018 
2019 	cb_param->from_tag.s = (char *)(cb_param->to_tag.s) + cb_param->to_tag.len;
2020 	memcpy(cb_param->from_tag.s, subs->from_tag.s, subs->from_tag.len);
2021 	cb_param->from_tag.len = subs->from_tag.len;
2022 
2023 	cb_param->callid.s =
2024 			(char *)(cb_param->from_tag.s) + cb_param->from_tag.len;
2025 	memcpy(cb_param->callid.s, subs->callid.s, subs->callid.len);
2026 	cb_param->callid.len = subs->callid.len;
2027 
2028 	return cb_param;
2029 }
2030 
2031 
create_winfo_xml(watcher_t * watchers,char * version,str resource,str event,int STATE_FLAG)2032 str *create_winfo_xml(watcher_t *watchers, char *version, str resource,
2033 		str event, int STATE_FLAG)
2034 {
2035 	xmlDocPtr doc = NULL;
2036 	xmlNodePtr root_node = NULL, node = NULL;
2037 	xmlNodePtr w_list_node = NULL;
2038 	char content[200];
2039 	str *body = NULL;
2040 	char *res = NULL;
2041 	watcher_t *w;
2042 
2043 	LIBXML_TEST_VERSION;
2044 
2045 	doc = xmlNewDoc(BAD_CAST "1.0");
2046 	root_node = xmlNewNode(NULL, BAD_CAST "watcherinfo");
2047 	xmlDocSetRootElement(doc, root_node);
2048 
2049 	xmlNewProp(root_node, BAD_CAST "xmlns",
2050 			BAD_CAST "urn:ietf:params:xml:ns:watcherinfo");
2051 	xmlNewProp(root_node, BAD_CAST "version", BAD_CAST version);
2052 
2053 	if(STATE_FLAG & FULL_STATE_FLAG) {
2054 		if(xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "full") == NULL) {
2055 			LM_ERR("while adding new attribute\n");
2056 			goto error;
2057 		}
2058 	} else {
2059 		if(xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "partial")
2060 				== NULL) {
2061 			LM_ERR("while adding new attribute\n");
2062 			goto error;
2063 		}
2064 	}
2065 
2066 	w_list_node = xmlNewChild(root_node, NULL, BAD_CAST "watcher-list", NULL);
2067 	if(w_list_node == NULL) {
2068 		LM_ERR("while adding child\n");
2069 		goto error;
2070 	}
2071 	res = (char *)pkg_malloc(MAX_unsigned(resource.len, event.len) + 1);
2072 	if(res == NULL) {
2073 		ERR_MEM(PKG_MEM_STR);
2074 	}
2075 	memcpy(res, resource.s, resource.len);
2076 	res[resource.len] = '\0';
2077 	xmlNewProp(w_list_node, BAD_CAST "resource", BAD_CAST res);
2078 	memcpy(res, event.s, event.len);
2079 	res[event.len] = '\0';
2080 	xmlNewProp(w_list_node, BAD_CAST "package", BAD_CAST res);
2081 	pkg_free(res);
2082 
2083 
2084 	w = watchers->next;
2085 	while(w) {
2086 		strncpy(content, w->uri.s, w->uri.len);
2087 		content[w->uri.len] = '\0';
2088 		node = xmlNewChild(
2089 				w_list_node, NULL, BAD_CAST "watcher", BAD_CAST content);
2090 		if(node == NULL) {
2091 			LM_ERR("while adding child\n");
2092 			goto error;
2093 		}
2094 		if(xmlNewProp(node, BAD_CAST "id", BAD_CAST w->id.s) == NULL) {
2095 			LM_ERR("while adding new attribute\n");
2096 			goto error;
2097 		}
2098 
2099 		if(xmlNewProp(node, BAD_CAST "event", BAD_CAST "subscribe") == NULL) {
2100 			LM_ERR("while adding new attribute\n");
2101 			goto error;
2102 		}
2103 
2104 		if(xmlNewProp(
2105 				   node, BAD_CAST "status", BAD_CAST get_status_str(w->status))
2106 				== NULL) {
2107 			LM_ERR("while adding new attribute\n");
2108 			goto error;
2109 		}
2110 		w = w->next;
2111 	}
2112 	body = (str *)pkg_malloc(sizeof(str));
2113 	if(body == NULL) {
2114 		ERR_MEM(PKG_MEM_STR);
2115 	}
2116 	memset(body, 0, sizeof(str));
2117 
2118 	xmlDocDumpFormatMemory(doc, (xmlChar **)(void *)&body->s, &body->len, 1);
2119 
2120 	xmlFreeDoc(doc);
2121 
2122 	xmlCleanupParser();
2123 
2124 	xmlMemoryDump();
2125 
2126 	return body;
2127 
2128 error:
2129 	if(doc)
2130 		xmlFreeDoc(doc);
2131 	return NULL;
2132 }
2133 
watcher_found_in_list(watcher_t * watchers,str wuri)2134 int watcher_found_in_list(watcher_t *watchers, str wuri)
2135 {
2136 	watcher_t *w;
2137 
2138 	w = watchers->next;
2139 
2140 	while(w) {
2141 		if(w->uri.len == wuri.len
2142 				&& presence_sip_uri_match(&w->uri, &wuri) == 0)
2143 			return 1;
2144 		w = w->next;
2145 	}
2146 
2147 	return 0;
2148 }
2149 
add_waiting_watchers(watcher_t * watchers,str pres_uri,str event)2150 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event)
2151 {
2152 	watcher_t *w;
2153 	db_key_t query_cols[3];
2154 	db_val_t query_vals[3];
2155 	db_key_t result_cols[2];
2156 	db1_res_t *result = NULL;
2157 	db_row_t *row = NULL;
2158 	db_val_t *row_vals;
2159 	int n_result_cols = 0;
2160 	int n_query_cols = 0;
2161 	int wuser_col, wdomain_col;
2162 	str wuser, wdomain, wuri;
2163 	int i;
2164 
2165 	/* select from watchers table the users that have subscribed
2166 	 * to the presentity and have status pending */
2167 
2168 	query_cols[n_query_cols] = &str_presentity_uri_col;
2169 	query_vals[n_query_cols].type = DB1_STR;
2170 	query_vals[n_query_cols].nul = 0;
2171 	query_vals[n_query_cols].val.str_val = pres_uri;
2172 	n_query_cols++;
2173 
2174 	query_cols[n_query_cols] = &str_event_col;
2175 	query_vals[n_query_cols].type = DB1_STR;
2176 	query_vals[n_query_cols].nul = 0;
2177 	query_vals[n_query_cols].val.str_val = event;
2178 	n_query_cols++;
2179 
2180 	query_cols[n_query_cols] = &str_status_col;
2181 	query_vals[n_query_cols].type = DB1_INT;
2182 	query_vals[n_query_cols].nul = 0;
2183 	query_vals[n_query_cols].val.int_val = PENDING_STATUS;
2184 	n_query_cols++;
2185 
2186 	result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
2187 	result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
2188 
2189 	if(pa_dbf.use_table(pa_db, &watchers_table) < 0) {
2190 		LM_ERR("sql use table 'watchers_table' failed\n");
2191 		return -1;
2192 	}
2193 
2194 	if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2195 			   n_result_cols, 0, &result)
2196 			< 0) {
2197 		LM_ERR("failed to query %.*s table\n", watchers_table.len,
2198 				watchers_table.s);
2199 		if(result)
2200 			pa_dbf.free_result(pa_db, result);
2201 		return -1;
2202 	}
2203 
2204 	if(result == NULL) {
2205 		LM_ERR("mysql query failed - null result\n");
2206 		return -1;
2207 	}
2208 
2209 	if(result->n <= 0) {
2210 		LM_DBG("The query returned no result\n");
2211 		pa_dbf.free_result(pa_db, result);
2212 		return 0;
2213 	}
2214 
2215 	for(i = 0; i < result->n; i++) {
2216 		row = &result->rows[i];
2217 		row_vals = ROW_VALUES(row);
2218 
2219 		wuser.s = (char *)row_vals[wuser_col].val.string_val;
2220 		wuser.len = strlen(wuser.s);
2221 
2222 		wdomain.s = (char *)row_vals[wdomain_col].val.string_val;
2223 		wdomain.len = strlen(wdomain.s);
2224 
2225 		if(uandd_to_uri(wuser, wdomain, &wuri) < 0) {
2226 			LM_ERR("creating uri from username and domain\n");
2227 			goto error;
2228 		}
2229 
2230 		if(watcher_found_in_list(watchers, wuri)) {
2231 			pkg_free(wuri.s);
2232 			continue;
2233 		}
2234 
2235 		w = (watcher_t *)pkg_malloc(sizeof(watcher_t));
2236 		if(w == NULL) {
2237 			pkg_free(wuri.s);
2238 			ERR_MEM(PKG_MEM_STR);
2239 		}
2240 		memset(w, 0, sizeof(watcher_t));
2241 
2242 		w->status = WAITING_STATUS;
2243 		w->uri = wuri;
2244 		w->id.s = (char *)pkg_malloc(w->uri.len * 2 + 1);
2245 		if(w->id.s == NULL) {
2246 			pkg_free(w->uri.s);
2247 			pkg_free(w);
2248 			ERR_MEM(PKG_MEM_STR);
2249 		}
2250 
2251 		to64frombits((unsigned char *)w->id.s, (const unsigned char *)w->uri.s,
2252 				w->uri.len);
2253 		w->id.len = strlen(w->id.s);
2254 		w->event = event;
2255 
2256 		w->next = watchers->next;
2257 		watchers->next = w;
2258 	}
2259 
2260 	pa_dbf.free_result(pa_db, result);
2261 	return 0;
2262 
2263 error:
2264 	if(result)
2265 		pa_dbf.free_result(pa_db, result);
2266 	return -1;
2267 }
2268 
2269 #define EXTRACT_STRING(strng, chars)                       \
2270 	do {                                                   \
2271 		strng.s = (char *)chars;                           \
2272 		strng.len = strng.s == NULL ? 0 : strlen(strng.s); \
2273 	} while(0);
2274 
unset_watchers_updated_winfo(str * pres_uri)2275 static int unset_watchers_updated_winfo(str *pres_uri)
2276 {
2277 	db_key_t query_cols[3], result_cols[1], update_cols[1];
2278 	db_val_t query_vals[3], update_vals[1];
2279 	db_op_t query_ops[2];
2280 	db1_res_t *result = NULL;
2281 	int n_query_cols = 0;
2282 	int ret = -1;
2283 	str winfo = str_init("presence.winfo");
2284 	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2285 
2286 	/* If this is the only presence.winfo dialog awaiting
2287 	   update for this presentity reset all of the watchers
2288 	   updated_winfo fields. */
2289 
2290 	query_cols[n_query_cols] = &str_presentity_uri_col;
2291 	query_vals[n_query_cols].type = DB1_STR;
2292 	query_vals[n_query_cols].nul = 0;
2293 	query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2294 	query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2295 	n_query_cols++;
2296 
2297 	query_cols[n_query_cols] = &str_event_col;
2298 	query_vals[n_query_cols].type = DB1_STR;
2299 	query_vals[n_query_cols].nul = 0;
2300 	query_vals[n_query_cols].val.str_val = winfo;
2301 	n_query_cols++;
2302 
2303 	query_cols[n_query_cols] = &str_updated_col;
2304 	query_vals[n_query_cols].type = DB1_INT;
2305 	query_vals[n_query_cols].nul = 0;
2306 	query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
2307 	n_query_cols++;
2308 
2309 	result_cols[0] = &str_id_col;
2310 
2311 	update_cols[0] = &str_updated_winfo_col;
2312 	update_vals[0].type = DB1_INT;
2313 	update_vals[0].nul = 0;
2314 	update_vals[0].val.int_val = NO_UPDATE_TYPE;
2315 
2316 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2317 		LM_ERR("use table failed\n");
2318 		goto error;
2319 	}
2320 
2321 	if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols, 1,
2322 			   0, &result)
2323 			< 0) {
2324 		LM_ERR("in sql query\n");
2325 		goto error;
2326 	}
2327 
2328 	if(result == NULL) {
2329 		LM_ERR("bad result\n");
2330 		goto error;
2331 	}
2332 
2333 	if(RES_ROW_N(result) <= 0) {
2334 		query_ops[0] = OP_EQ;
2335 		query_ops[1] = OP_NEQ;
2336 
2337 		if(pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
2338 				   update_vals, 2, 1)
2339 				< 0) {
2340 			LM_ERR("in sql query\n");
2341 			goto error;
2342 		}
2343 
2344 		if(pa_dbf.affected_rows)
2345 			ret = pa_dbf.affected_rows(pa_db);
2346 		else
2347 			ret = 0;
2348 	} else
2349 		ret = 0;
2350 
2351 error:
2352 	if(result)
2353 		pa_dbf.free_result(pa_db, result);
2354 	return ret;
2355 }
2356 
dialogs_awaiting_update(str * pres_uri,str event)2357 static int dialogs_awaiting_update(str *pres_uri, str event)
2358 {
2359 	db_key_t query_cols[3], result_cols[1];
2360 	db_val_t query_vals[3];
2361 	db_op_t query_ops[3];
2362 	db1_res_t *result = NULL;
2363 	int n_query_cols = 0;
2364 	int ret = -1;
2365 	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2366 
2367 	query_cols[n_query_cols] = &str_presentity_uri_col;
2368 	query_vals[n_query_cols].type = DB1_STR;
2369 	query_vals[n_query_cols].nul = 0;
2370 	query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2371 	query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2372 	query_ops[n_query_cols] = OP_EQ;
2373 	n_query_cols++;
2374 
2375 	query_cols[n_query_cols] = &str_event_col;
2376 	query_vals[n_query_cols].type = DB1_STR;
2377 	query_vals[n_query_cols].nul = 0;
2378 	query_vals[n_query_cols].val.str_val = event;
2379 	query_ops[n_query_cols] = OP_EQ;
2380 	n_query_cols++;
2381 
2382 	query_cols[n_query_cols] = &str_updated_col;
2383 	query_vals[n_query_cols].type = DB1_INT;
2384 	query_vals[n_query_cols].nul = 0;
2385 	query_vals[n_query_cols].val.int_val = NO_UPDATE_TYPE;
2386 	query_ops[n_query_cols] = OP_NEQ;
2387 	n_query_cols++;
2388 
2389 	result_cols[0] = &str_id_col;
2390 
2391 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2392 		LM_ERR("use table failed\n");
2393 		goto error;
2394 	}
2395 
2396 	if(query_fn(pa_db, query_cols, query_ops, query_vals, result_cols,
2397 			   n_query_cols, 1, 0, &result)
2398 			< 0) {
2399 		LM_ERR("in sql query\n");
2400 		goto error;
2401 	}
2402 
2403 	if(result == NULL) {
2404 		LM_ERR("bad result\n");
2405 		goto error;
2406 	} else
2407 		ret = RES_ROW_N(result);
2408 
2409 error:
2410 	if(result)
2411 		pa_dbf.free_result(pa_db, result);
2412 	return ret;
2413 }
2414 
set_wipeer_subs_updated(str * pres_uri,pres_ev_t * event,int full)2415 int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
2416 {
2417 	db_key_t query_cols[3], result_cols[3], update_cols[2];
2418 	db_val_t query_vals[3], update_vals[2], *values;
2419 	db_row_t *rows;
2420 	db1_res_t *result = NULL;
2421 	int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
2422 	int callid_col, from_tag_col, to_tag_col;
2423 	int i, ret = -1, count;
2424 	str callid, from_tag, to_tag;
2425 	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2426 
2427 	query_cols[n_query_cols] = &str_presentity_uri_col;
2428 	query_vals[n_query_cols].type = DB1_STR;
2429 	query_vals[n_query_cols].nul = 0;
2430 	query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2431 	query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2432 	n_query_cols++;
2433 
2434 	query_cols[n_query_cols] = &str_event_col;
2435 	query_vals[n_query_cols].type = DB1_STR;
2436 	query_vals[n_query_cols].nul = 0;
2437 	query_vals[n_query_cols].val.str_val = event->name;
2438 	n_query_cols++;
2439 
2440 	result_cols[callid_col = n_result_cols++] = &str_callid_col;
2441 	result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
2442 	result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
2443 
2444 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2445 		LM_ERR("use table failed\n");
2446 		goto error;
2447 	}
2448 
2449 	if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2450 			   n_result_cols, 0, &result)
2451 			< 0) {
2452 		LM_ERR("in sql query\n");
2453 		goto error;
2454 	}
2455 
2456 	if(result == NULL) {
2457 		LM_ERR("bad result\n");
2458 		goto error;
2459 	}
2460 
2461 	if(RES_ROW_N(result) <= 0) {
2462 		ret = 0;
2463 		goto done;
2464 	}
2465 
2466 	rows = RES_ROWS(result);
2467 	count = RES_ROW_N(result);
2468 	for(i = 0; i < RES_ROW_N(result); i++) {
2469 		values = ROW_VALUES(&rows[i]);
2470 
2471 		EXTRACT_STRING(callid, VAL_STRING(&values[callid_col]));
2472 		EXTRACT_STRING(from_tag, VAL_STRING(&values[from_tag_col]));
2473 		EXTRACT_STRING(to_tag, VAL_STRING(&values[to_tag_col]));
2474 
2475 		n_query_cols = 0;
2476 		n_update_cols = 0;
2477 
2478 		query_cols[n_query_cols] = &str_callid_col;
2479 		query_vals[n_query_cols].type = DB1_STR;
2480 		query_vals[n_query_cols].nul = 0;
2481 		query_vals[n_query_cols].val.str_val = callid;
2482 		n_query_cols++;
2483 
2484 		query_cols[n_query_cols] = &str_to_tag_col;
2485 		query_vals[n_query_cols].type = DB1_STR;
2486 		query_vals[n_query_cols].nul = 0;
2487 		query_vals[n_query_cols].val.str_val = to_tag;
2488 		n_query_cols++;
2489 
2490 		query_cols[n_query_cols] = &str_from_tag_col;
2491 		query_vals[n_query_cols].type = DB1_STR;
2492 		query_vals[n_query_cols].nul = 0;
2493 		query_vals[n_query_cols].val.str_val = from_tag;
2494 		n_query_cols++;
2495 
2496 		update_cols[n_update_cols] = &str_updated_col;
2497 		update_vals[n_update_cols].type = DB1_INT;
2498 		update_vals[n_update_cols].nul = 0;
2499 		update_vals[n_update_cols].val.int_val =
2500 				core_case_hash(&callid, &from_tag, 0)
2501 				% (pres_waitn_time * pres_notifier_poll_rate
2502 						  * pres_notifier_processes);
2503 		n_update_cols++;
2504 
2505 		if(full) {
2506 			update_cols[n_update_cols] = &str_updated_winfo_col;
2507 			update_vals[n_update_cols].type = DB1_INT;
2508 			update_vals[n_update_cols].nul = 0;
2509 			update_vals[n_update_cols].val.int_val = UPDATED_TYPE;
2510 			n_update_cols++;
2511 		}
2512 
2513 		if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
2514 				   update_vals, n_query_cols, n_update_cols)
2515 				< 0) {
2516 			LM_ERR("in sql query\n");
2517 			goto error;
2518 		}
2519 
2520 		if(pa_dbf.affected_rows)
2521 			if(pa_dbf.affected_rows(pa_db) == 0)
2522 				count--;
2523 	}
2524 
2525 	ret = count;
2526 
2527 done:
2528 error:
2529 	if(result)
2530 		pa_dbf.free_result(pa_db, result);
2531 
2532 	return ret;
2533 }
2534 
set_updated(subs_t * sub)2535 int set_updated(subs_t *sub)
2536 {
2537 	db_key_t query_cols[3], update_cols[1];
2538 	db_val_t query_vals[3], update_vals[1];
2539 	int n_query_cols = 0;
2540 
2541 	query_cols[n_query_cols] = &str_callid_col;
2542 	query_vals[n_query_cols].type = DB1_STR;
2543 	query_vals[n_query_cols].nul = 0;
2544 	query_vals[n_query_cols].val.str_val = sub->callid;
2545 	n_query_cols++;
2546 
2547 	query_cols[n_query_cols] = &str_to_tag_col;
2548 	query_vals[n_query_cols].type = DB1_STR;
2549 	query_vals[n_query_cols].nul = 0;
2550 	query_vals[n_query_cols].val.str_val = sub->to_tag;
2551 	n_query_cols++;
2552 
2553 	query_cols[n_query_cols] = &str_from_tag_col;
2554 	query_vals[n_query_cols].type = DB1_STR;
2555 	query_vals[n_query_cols].nul = 0;
2556 	query_vals[n_query_cols].val.str_val = sub->from_tag;
2557 	n_query_cols++;
2558 
2559 	update_cols[0] = &str_updated_col;
2560 	update_vals[0].type = DB1_INT;
2561 	update_vals[0].nul = 0;
2562 	update_vals[0].val.int_val = core_case_hash(&sub->callid, &sub->from_tag, 0)
2563 								 % (pres_waitn_time * pres_notifier_poll_rate
2564 										   * pres_notifier_processes);
2565 
2566 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2567 		LM_ERR("use table failed\n");
2568 		return -1;
2569 	}
2570 
2571 	if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols, update_vals,
2572 			   n_query_cols, 1)
2573 			< 0) {
2574 		LM_ERR("in sql query\n");
2575 		return -1;
2576 	}
2577 
2578 	if(pa_dbf.affected_rows)
2579 		return pa_dbf.affected_rows(pa_db);
2580 	else
2581 		return 0;
2582 }
2583 
build_watchers_list(subs_t * sub)2584 static watcher_t *build_watchers_list(subs_t *sub)
2585 {
2586 	db_key_t query_cols[3], result_cols[4];
2587 	db_val_t query_vals[3], *values;
2588 	db_row_t *rows;
2589 	db1_res_t *result = NULL;
2590 	int n_query_cols = 0, n_result_cols = 0;
2591 	int wuser_col, wdomain_col, callid_col, status_col;
2592 	int i;
2593 	subs_t sb;
2594 	watcher_t *watchers = NULL;
2595 
2596 	watchers = (watcher_t *)pkg_malloc(sizeof(watcher_t));
2597 	if(watchers == NULL) {
2598 		ERR_MEM(PKG_MEM_STR);
2599 	}
2600 	memset(watchers, 0, sizeof(watcher_t));
2601 
2602 	query_cols[n_query_cols] = &str_presentity_uri_col;
2603 	query_vals[n_query_cols].type = DB1_STR;
2604 	query_vals[n_query_cols].nul = 0;
2605 	query_vals[n_query_cols].val.str_val = sub->pres_uri;
2606 	n_query_cols++;
2607 
2608 	query_cols[n_query_cols] = &str_event_col;
2609 	query_vals[n_query_cols].type = DB1_STR;
2610 	query_vals[n_query_cols].nul = 0;
2611 	query_vals[n_query_cols].val.str_val = sub->event->wipeer->name;
2612 	n_query_cols++;
2613 
2614 	query_cols[n_query_cols] = &str_updated_winfo_col;
2615 	query_vals[n_query_cols].type = DB1_INT;
2616 	query_vals[n_query_cols].nul = 0;
2617 	query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
2618 	n_query_cols++;
2619 
2620 	result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
2621 	result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
2622 	result_cols[callid_col = n_result_cols++] = &str_callid_col;
2623 	result_cols[status_col = n_result_cols++] = &str_status_col;
2624 
2625 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2626 		LM_ERR("use table failed\n");
2627 		goto error;
2628 	}
2629 
2630 	if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2631 			   n_result_cols, 0, &result)
2632 			< 0) {
2633 		LM_ERR("in sql query\n");
2634 		goto error;
2635 	}
2636 
2637 	if(result == NULL) {
2638 		LM_ERR("bad result\n");
2639 		goto error;
2640 	}
2641 
2642 	if(RES_ROW_N(result) <= 0)
2643 		goto done;
2644 
2645 	rows = RES_ROWS(result);
2646 	for(i = 0; i < RES_ROW_N(result); i++) {
2647 		memset(&sb, 0, sizeof(subs_t));
2648 		values = ROW_VALUES(&rows[i]);
2649 
2650 		EXTRACT_STRING(sb.watcher_user, VAL_STRING(&values[wuser_col]));
2651 		EXTRACT_STRING(sb.watcher_domain, VAL_STRING(&values[wdomain_col]));
2652 		EXTRACT_STRING(sb.callid, VAL_STRING(&values[callid_col]));
2653 		sb.status = VAL_INT(&values[status_col]);
2654 
2655 		sb.event = sub->event->wipeer;
2656 
2657 		if(add_watcher_list(&sb, watchers) < 0)
2658 			goto error;
2659 	}
2660 
2661 done:
2662 	pa_dbf.free_result(pa_db, result);
2663 	return watchers;
2664 
2665 error:
2666 	if(result)
2667 		pa_dbf.free_result(pa_db, result);
2668 	free_watcher_list(watchers);
2669 	return NULL;
2670 }
2671 
cleanup_missing_dialog(subs_t * sub)2672 static int cleanup_missing_dialog(subs_t *sub)
2673 {
2674 	int ret = -1, num_other_watchers = 0;
2675 
2676 	if(sub->event->type & WINFO_TYPE) {
2677 		if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2678 			LM_ERR("resetting updated_winfo flags\n");
2679 			goto error;
2680 		}
2681 	} else if(sub->event->type & PUBL_TYPE) {
2682 		if((num_other_watchers = dialogs_awaiting_update(
2683 					&sub->pres_uri, sub->event->name))
2684 				< 0) {
2685 			LM_ERR("checking watchers\n");
2686 			goto error;
2687 		} else if(num_other_watchers == 0) {
2688 			if(delete_offline_presentities(&sub->pres_uri, sub->event) < 0) {
2689 				LM_ERR("deleting presentity\n");
2690 				goto error;
2691 			}
2692 		}
2693 	}
2694 
2695 	ret = 0;
2696 
2697 error:
2698 	return ret;
2699 }
2700 
notifier_notify(subs_t * sub,int * updated,int * end_transaction)2701 static int notifier_notify(subs_t *sub, int *updated, int *end_transaction)
2702 {
2703 	str *nbody = NULL;
2704 	watcher_t *watchers = NULL;
2705 	int ret = 0, attempt_delete_presentities = 0;
2706 
2707 	*updated = 0;
2708 
2709 	/* Terminating dialog NOTIFY */
2710 	if(sub->expires == 0 || sub->status == TERMINATED_STATUS) {
2711 		sub->status = TERMINATED_STATUS;
2712 
2713 		if(sub->event->type & WINFO_TYPE) {
2714 			if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2715 				LM_WARN("resetting updated_winfo flags\n");
2716 
2717 				if(pa_dbf.abort_transaction) {
2718 					if(pa_dbf.abort_transaction(pa_db) < 0) {
2719 						LM_ERR("in abort_transaction\n");
2720 						goto error;
2721 					}
2722 				}
2723 				*end_transaction = 0;
2724 
2725 				/* Make sure this gets tried again next time */
2726 				*updated = 1;
2727 				goto done;
2728 			}
2729 		} else {
2730 			str winfo = str_init("presence.winfo");
2731 			int num_other_watchers, num_winfos;
2732 
2733 			if(sub->event->type & PUBL_TYPE) {
2734 				if((num_other_watchers = dialogs_awaiting_update(
2735 							&sub->pres_uri, sub->event->name))
2736 						< 0) {
2737 					LM_ERR("checking watchers\n");
2738 					goto error;
2739 				} else if(num_other_watchers == 0)
2740 					attempt_delete_presentities = 1;
2741 			}
2742 
2743 			if(sub->event->wipeer) {
2744 				if((num_winfos = dialogs_awaiting_update(&sub->pres_uri, winfo))
2745 						< 0) {
2746 					LM_ERR("checking winfos\n");
2747 					goto error;
2748 				}
2749 
2750 				if(sub->updated_winfo == UPDATED_TYPE && num_winfos > 0) {
2751 					*updated = 1;
2752 					goto done;
2753 				}
2754 			}
2755 		}
2756 	} else /* Non-terminating dialog */
2757 	{
2758 		if(sub->event->type & WINFO_TYPE) /* presence.winfo dialog */
2759 		{
2760 			if(sub->updated_winfo == NO_UPDATE_TYPE) {
2761 				/* Partial notify if
2762 				   updated_winfo == NO_UPDATE_TYPE */
2763 				int len = 0;
2764 				char *version_str = int2str(sub->version, &len);
2765 				if(version_str == NULL) {
2766 					LM_ERR("converting int to str\n");
2767 					goto error;
2768 				}
2769 
2770 				watchers = build_watchers_list(sub);
2771 				if(watchers == NULL) {
2772 					LM_ERR("in build_watchers_list\n");
2773 					goto error;
2774 				}
2775 
2776 				nbody = create_winfo_xml(watchers, version_str, sub->pres_uri,
2777 						sub->event->wipeer->name, PARTIAL_STATE_FLAG);
2778 				if(nbody == NULL) {
2779 					LM_ERR("in create_winfo_xml\n");
2780 					goto error;
2781 				}
2782 
2783 			} else /* Full presence.winfo NOTIFY */
2784 				sub->updated_winfo = NO_UPDATE_TYPE;
2785 
2786 			if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2787 				LM_WARN("resetting updated_winfo flags\n");
2788 
2789 				if(pa_dbf.abort_transaction) {
2790 					if(pa_dbf.abort_transaction(pa_db) < 0) {
2791 						LM_ERR("in abort_transaction\n");
2792 						goto error;
2793 					}
2794 				}
2795 				*end_transaction = 0;
2796 
2797 				/* Make sure this gets tried again next time */
2798 				*updated = 1;
2799 				goto done;
2800 			}
2801 
2802 		} else if(sub->event->type & PUBL_TYPE) {
2803 			int num_other_watchers;
2804 
2805 			if((num_other_watchers = dialogs_awaiting_update(
2806 						&sub->pres_uri, sub->event->name))
2807 					< 0) {
2808 				LM_ERR("checking watchers\n");
2809 				goto error;
2810 			} else if(num_other_watchers == 0)
2811 				attempt_delete_presentities = 1;
2812 		} else if(!pres_send_fast_notify)
2813 			goto done;
2814 	}
2815 
2816 	if(notify(sub, NULL, nbody, 0, 0) < 0) {
2817 		LM_ERR("could not send notify\n");
2818 		goto error;
2819 	}
2820 
2821 	ret = 1;
2822 
2823 done:
2824 	if(attempt_delete_presentities) {
2825 		if(delete_offline_presentities(&sub->pres_uri, sub->event) < 0) {
2826 			LM_ERR("deleting presentity\n");
2827 			goto error;
2828 		}
2829 	}
2830 
2831 	free_notify_body(nbody, sub->event);
2832 	free_watcher_list(watchers);
2833 
2834 	return ret;
2835 
2836 error:
2837 	free_notify_body(nbody, sub->event);
2838 	free_watcher_list(watchers);
2839 
2840 	if(pa_dbf.abort_transaction) {
2841 		if(pa_dbf.abort_transaction(pa_db) < 0)
2842 			LM_ERR("in abort_transaction\n");
2843 	}
2844 	*end_transaction = 0;
2845 
2846 	return -1;
2847 }
2848 
process_dialogs(int round,int presence_winfo)2849 int process_dialogs(int round, int presence_winfo)
2850 {
2851 	db_key_t query_cols[3], result_cols[20], update_cols[4];
2852 	db_val_t query_vals[3], update_vals[4], *values, *dvalues;
2853 	db_op_t query_ops[2];
2854 	db_row_t *rows, *drows;
2855 	db1_res_t *dialog_list = NULL, *dialog = NULL;
2856 	int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
2857 	int callid_col, to_tag_col, from_tag_col;
2858 	int pres_uri_col, tuser_col, tdomain_col, fuser_col, fdomain_col;
2859 	int wuser_col, wdomain_col, sockinfo_col, lcontact_col, contact_col;
2860 	int rroute_col, event_id_col, reason_col, event_col, lcseq_col;
2861 	int rcseq_col, status_col, version_col, updated_winfo_col, expires_col;
2862 	int flags_col, user_agent_col;
2863 	int i, notify_sent = 0, cached_updated_winfo, ret = -1;
2864 	int end_transaction = 0;
2865 	subs_t sub;
2866 	str ev_sname, winfo = str_init("presence.winfo");
2867 	int now = (int)time(NULL);
2868 	int updated = 0;
2869 	db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2870 
2871 	query_cols[n_query_cols] = &str_updated_col;
2872 	query_vals[n_query_cols].type = DB1_INT;
2873 	query_vals[n_query_cols].nul = 0;
2874 	query_vals[n_query_cols].val.int_val = round;
2875 	query_ops[n_query_cols] = OP_EQ;
2876 	n_query_cols++;
2877 
2878 	query_cols[n_query_cols] = &str_event_col;
2879 	query_vals[n_query_cols].type = DB1_STR;
2880 	query_vals[n_query_cols].nul = 0;
2881 	query_vals[n_query_cols].val.str_val = winfo;
2882 	query_ops[n_query_cols] = presence_winfo ? OP_EQ : OP_NEQ;
2883 	n_query_cols++;
2884 
2885 	result_cols[pres_uri_col = n_result_cols++] = &str_presentity_uri_col;
2886 	result_cols[callid_col = n_result_cols++] = &str_callid_col;
2887 	result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
2888 	result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
2889 	result_cols[event_col = n_result_cols++] = &str_event_col;
2890 
2891 	update_cols[n_update_cols] = &str_updated_col;
2892 	update_vals[n_update_cols].type = DB1_INT;
2893 	update_vals[n_update_cols].nul = 0;
2894 	update_vals[n_update_cols].val.int_val = NO_UPDATE_TYPE;
2895 	n_update_cols++;
2896 
2897 	if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2898 		LM_ERR("use table failed\n");
2899 		goto error;
2900 	}
2901 
2902 	if(pa_dbf.start_transaction) {
2903 		if(pa_dbf.start_transaction(pa_db, pres_db_table_lock) < 0) {
2904 			LM_ERR("in start_transaction\n");
2905 			goto error;
2906 		}
2907 	}
2908 
2909 	/* Step 1: Find active_watchers that require notification */
2910 	if(query_fn(pa_db, query_cols, query_ops, query_vals, result_cols,
2911 			   n_query_cols, n_result_cols, 0, &dialog_list)
2912 			< 0) {
2913 		LM_ERR("in sql query\n");
2914 		goto error;
2915 	}
2916 	if(dialog_list == NULL) {
2917 		LM_ERR("bad result\n");
2918 		goto error;
2919 	}
2920 
2921 	if(dialog_list->n <= 0)
2922 		goto done;
2923 
2924 	/* Step 2: Update the records so they are not notified again */
2925 	if(pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
2926 			   update_vals, n_query_cols, n_update_cols)
2927 			< 0) {
2928 		LM_ERR("in sql update\n");
2929 		goto error;
2930 	}
2931 
2932 	if(pa_dbf.end_transaction) {
2933 		if(pa_dbf.end_transaction(pa_db) < 0) {
2934 			LM_ERR("in end_transaction\n");
2935 			goto error;
2936 		}
2937 	}
2938 
2939 	/* Step 3: Notify each watcher we found */
2940 	rows = RES_ROWS(dialog_list);
2941 	for(i = 0; i < RES_ROW_N(dialog_list); i++) {
2942 		n_query_cols = 0;
2943 		n_result_cols = 0;
2944 		n_update_cols = 0;
2945 		memset(&sub, 0, sizeof(subs_t));
2946 		values = ROW_VALUES(&rows[i]);
2947 
2948 		EXTRACT_STRING(sub.pres_uri, VAL_STRING(&values[pres_uri_col]));
2949 		EXTRACT_STRING(sub.callid, VAL_STRING(&values[callid_col]));
2950 		EXTRACT_STRING(sub.to_tag, VAL_STRING(&values[to_tag_col]));
2951 		EXTRACT_STRING(sub.from_tag, VAL_STRING(&values[from_tag_col]));
2952 		EXTRACT_STRING(ev_sname, VAL_STRING(&values[event_col]));
2953 		sub.event = contains_event(&ev_sname, NULL);
2954 		if(sub.event == NULL) {
2955 			LM_ERR("event not found and set to NULL\n");
2956 			goto delete_dialog;
2957 		}
2958 
2959 		query_cols[n_query_cols] = &str_callid_col;
2960 		query_vals[n_query_cols].type = DB1_STR;
2961 		query_vals[n_query_cols].nul = 0;
2962 		query_vals[n_query_cols].val.str_val = sub.callid;
2963 		n_query_cols++;
2964 
2965 		query_cols[n_query_cols] = &str_to_tag_col;
2966 		query_vals[n_query_cols].type = DB1_STR;
2967 		query_vals[n_query_cols].nul = 0;
2968 		query_vals[n_query_cols].val.str_val = sub.to_tag;
2969 		n_query_cols++;
2970 
2971 		query_cols[n_query_cols] = &str_from_tag_col;
2972 		query_vals[n_query_cols].type = DB1_STR;
2973 		query_vals[n_query_cols].nul = 0;
2974 		query_vals[n_query_cols].val.str_val = sub.from_tag;
2975 		n_query_cols++;
2976 
2977 		result_cols[tuser_col = n_result_cols++] = &str_to_user_col;
2978 		result_cols[tdomain_col = n_result_cols++] = &str_to_domain_col;
2979 		result_cols[fuser_col = n_result_cols++] = &str_from_user_col;
2980 		result_cols[fdomain_col = n_result_cols++] = &str_from_domain_col;
2981 		result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
2982 		result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
2983 		result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
2984 		result_cols[lcontact_col = n_result_cols++] = &str_local_contact_col;
2985 		result_cols[contact_col = n_result_cols++] = &str_contact_col;
2986 		result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
2987 		result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
2988 		result_cols[reason_col = n_result_cols++] = &str_reason_col;
2989 		result_cols[lcseq_col = n_result_cols++] = &str_local_cseq_col;
2990 		result_cols[rcseq_col = n_result_cols++] = &str_remote_cseq_col;
2991 		result_cols[status_col = n_result_cols++] = &str_status_col;
2992 		result_cols[version_col = n_result_cols++] = &str_version_col;
2993 		result_cols[updated_winfo_col = n_result_cols++] =
2994 				&str_updated_winfo_col;
2995 		result_cols[expires_col = n_result_cols++] = &str_expires_col;
2996 		result_cols[flags_col = n_result_cols++] = &str_flags_col;
2997 		result_cols[user_agent_col = n_result_cols++] = &str_user_agent_col;
2998 
2999 		/* Need to redo this here as we might have switched to the
3000 		   presentity table during a previous iteration. */
3001 		if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
3002 			LM_ERR("use table failed\n");
3003 			goto error;
3004 		}
3005 
3006 		if(pa_dbf.start_transaction) {
3007 			if(pa_dbf.start_transaction(pa_db, pres_db_table_lock) < 0) {
3008 				LM_ERR("in start_transaction\n");
3009 				goto error;
3010 			}
3011 		}
3012 		end_transaction = 1;
3013 
3014 		if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
3015 				   n_result_cols, 0, &dialog)
3016 				< 0) {
3017 			LM_ERR("in sql query\n");
3018 			goto error;
3019 		}
3020 
3021 		if(dialog == NULL) {
3022 			LM_ERR("bad result\n");
3023 			goto error;
3024 		}
3025 
3026 		if(dialog->n <= 0) {
3027 			LM_INFO("record not found - this may be observed in multi-server "
3028 					"systems\n");
3029 			if(cleanup_missing_dialog(&sub) < 0)
3030 				LM_ERR("cleaning up after missing record\n");
3031 			goto next_dialog;
3032 		}
3033 
3034 		if(dialog->n > 1) {
3035 			LM_ERR("multiple records found for %.*s, ci : %.*s, tt : %.*s, ft "
3036 				   ": %.*s, ev : %.*s\n",
3037 					sub.pres_uri.len, sub.pres_uri.s, sub.callid.len,
3038 					sub.callid.s, sub.to_tag.len, sub.to_tag.s,
3039 					sub.from_tag.len, sub.from_tag.s, ev_sname.len, ev_sname.s);
3040 			goto delete_dialog;
3041 		}
3042 
3043 		drows = RES_ROWS(dialog);
3044 		dvalues = ROW_VALUES(drows);
3045 
3046 		EXTRACT_STRING(sub.to_user, VAL_STRING(&dvalues[tuser_col]));
3047 		EXTRACT_STRING(sub.to_domain, VAL_STRING(&dvalues[tdomain_col]));
3048 		EXTRACT_STRING(sub.from_user, VAL_STRING(&dvalues[fuser_col]));
3049 		EXTRACT_STRING(sub.from_domain, VAL_STRING(&dvalues[fdomain_col]));
3050 		EXTRACT_STRING(sub.watcher_user, VAL_STRING(&dvalues[wuser_col]));
3051 		EXTRACT_STRING(sub.watcher_domain, VAL_STRING(&dvalues[wdomain_col]));
3052 		EXTRACT_STRING(sub.sockinfo_str, VAL_STRING(&dvalues[sockinfo_col]));
3053 		EXTRACT_STRING(sub.local_contact, VAL_STRING(&dvalues[lcontact_col]));
3054 		EXTRACT_STRING(sub.contact, VAL_STRING(&dvalues[contact_col]));
3055 		EXTRACT_STRING(sub.record_route, VAL_STRING(&dvalues[rroute_col]));
3056 		EXTRACT_STRING(sub.event_id, VAL_STRING(&dvalues[event_id_col]));
3057 		EXTRACT_STRING(sub.reason, VAL_STRING(&dvalues[reason_col]));
3058 		EXTRACT_STRING(sub.user_agent, VAL_STRING(&dvalues[user_agent_col]));
3059 
3060 		sub.local_cseq = VAL_INT(&dvalues[lcseq_col]) + 1;
3061 		sub.remote_cseq = VAL_INT(&dvalues[rcseq_col]);
3062 		sub.status = VAL_INT(&dvalues[status_col]);
3063 		sub.version = VAL_INT(&dvalues[version_col]) + 1;
3064 		cached_updated_winfo = sub.updated_winfo =
3065 				VAL_INT(&dvalues[updated_winfo_col]);
3066 
3067 		if(VAL_INT(&dvalues[expires_col]) > now + pres_expires_offset)
3068 			sub.expires = VAL_INT(&dvalues[expires_col]) - now;
3069 		else
3070 			sub.expires = 0;
3071 		sub.flags = VAL_INT(&dvalues[flags_col]);
3072 
3073 		sub.updated = round;
3074 
3075 		if((notify_sent = notifier_notify(&sub, &updated, &end_transaction))
3076 				< 0) {
3077 			LM_ERR("sending NOTIFY request\n");
3078 
3079 			if(cleanup_missing_dialog(&sub) < 0)
3080 				LM_ERR("cleaning up after error sending NOTIFY"
3081 					   "request\n");
3082 
3083 			/* remove the dialog and continue */
3084 			goto delete_dialog;
3085 		}
3086 
3087 		if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
3088 			LM_ERR("use table failed\n");
3089 			goto error;
3090 		}
3091 
3092 		if((sub.expires > 0 && sub.status != TERMINATED_STATUS) || updated) {
3093 			if(sub.updated_winfo != cached_updated_winfo) {
3094 				update_cols[n_update_cols] = &str_updated_winfo_col;
3095 				update_vals[n_update_cols].type = DB1_INT;
3096 				update_vals[n_update_cols].nul = 0;
3097 				update_vals[n_update_cols].val.int_val = sub.updated_winfo;
3098 				n_update_cols++;
3099 			}
3100 
3101 			if(updated) {
3102 				update_cols[n_update_cols] = &str_updated_col;
3103 				update_vals[n_update_cols].type = DB1_INT;
3104 				update_vals[n_update_cols].nul = 0;
3105 				update_vals[n_update_cols].val.int_val = round;
3106 				n_update_cols++;
3107 			}
3108 
3109 			if(notify_sent) {
3110 				update_cols[n_update_cols] = &str_local_cseq_col;
3111 				update_vals[n_update_cols].type = DB1_INT;
3112 				update_vals[n_update_cols].nul = 0;
3113 				update_vals[n_update_cols].val.int_val = sub.local_cseq;
3114 				n_update_cols++;
3115 
3116 				update_cols[n_update_cols] = &str_version_col;
3117 				update_vals[n_update_cols].type = DB1_INT;
3118 				update_vals[n_update_cols].nul = 0;
3119 				update_vals[n_update_cols].val.int_val = sub.version;
3120 				n_update_cols++;
3121 			}
3122 
3123 			if(n_update_cols > 0) {
3124 				if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
3125 						   update_vals, n_query_cols, n_update_cols)
3126 						< 0) {
3127 					LM_ERR("in sql update\n");
3128 					goto error;
3129 				}
3130 			}
3131 
3132 		} else if(notify_sent) {
3133 		delete_dialog:
3134 			if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
3135 				LM_ERR("use table failed\n");
3136 				goto error;
3137 			}
3138 
3139 			if(pa_dbf.delete(pa_db, query_cols, 0, query_vals, n_query_cols)
3140 					< 0) {
3141 				LM_ERR("in sql delete");
3142 				goto error;
3143 			}
3144 		}
3145 
3146 	next_dialog:
3147 		if(pa_dbf.end_transaction && end_transaction) {
3148 			if(pa_dbf.end_transaction(pa_db) < 0) {
3149 				LM_ERR("in end_transaction\n");
3150 				goto error;
3151 			}
3152 		}
3153 
3154 		pa_dbf.free_result(pa_db, dialog);
3155 		dialog = NULL;
3156 	}
3157 
3158 done:
3159 	ret = 0;
3160 error:
3161 	if(dialog_list)
3162 		pa_dbf.free_result(pa_db, dialog_list);
3163 	if(dialog)
3164 		pa_dbf.free_result(pa_db, dialog);
3165 
3166 	if(pa_dbf.abort_transaction) {
3167 		if(pa_dbf.abort_transaction(pa_db) < 0)
3168 			LM_ERR("in abort_transaction\n");
3169 	}
3170 
3171 	return ret;
3172 }
3173 
pres_timer_send_notify(unsigned int ticks,void * param)3174 void pres_timer_send_notify(unsigned int ticks, void *param)
3175 {
3176 	int process_num = *((int *)param);
3177 	int round =
3178 			subset + (pres_waitn_time * pres_notifier_poll_rate * process_num);
3179 
3180 	if(++subset > (pres_waitn_time * pres_notifier_poll_rate) - 1)
3181 		subset = 0;
3182 
3183 	if(process_dialogs(round, 0) < 0) {
3184 		LM_ERR("Handling non presence.winfo dialogs\n");
3185 		return;
3186 	}
3187 	if(process_dialogs(round, 1) < 0) {
3188 		LM_ERR("Handling presence.winfo dialogs\n");
3189 		return;
3190 	}
3191 }
3192