1 /**
2 *
3 * Copyright (C) 2018 Charles Chance (Sipcentric Ltd)
4 *
5 * This file is part of Kamailio, a free SIP server.
6 *
7 * Kamailio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version
11 *
12 * Kamailio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
20 *
21 */
22 
23 #include "presence.h"
24 #include "presence_dmq.h"
25 
26 static str pres_dmq_content_type = str_init("application/json");
27 static str pres_dmq_200_rpl = str_init("OK");
28 static str pres_dmq_400_rpl = str_init("Bad Request");
29 static str pres_dmq_500_rpl = str_init("Server Internal Error");
30 
31 static int *pres_dmq_proc_init = 0;
32 static int *pres_dmq_recv = 0;
33 
34 dmq_api_t pres_dmqb;
35 dmq_peer_t *pres_dmq_peer = NULL;
36 dmq_resp_cback_t pres_dmq_resp_callback = {&pres_dmq_resp_callback_f, 0};
37 
38 int pres_dmq_send_all_presentities();
39 int pres_dmq_request_sync();
40 
41 /**
42 * @brief add notification peer
43 */
pres_dmq_initialize()44 int pres_dmq_initialize()
45 {
46 	dmq_peer_t not_peer;
47 
48 	/* load the DMQ API */
49 	if(dmq_load_api(&pres_dmqb) != 0) {
50 		LM_ERR("cannot load dmq api\n");
51 		return -1;
52 	} else {
53 		LM_DBG("loaded dmq api\n");
54 	}
55 
56 	not_peer.callback = pres_dmq_handle_msg;
57 	not_peer.init_callback = pres_dmq_request_sync;
58 	not_peer.description.s = "presence";
59 	not_peer.description.len = 8;
60 	not_peer.peer_id.s = "presence";
61 	not_peer.peer_id.len = 8;
62 	pres_dmq_peer = pres_dmqb.register_dmq_peer(&not_peer);
63 	if(!pres_dmq_peer) {
64 		LM_ERR("error in register_dmq_peer\n");
65 		goto error;
66 	} else {
67 		LM_DBG("dmq peer registered\n");
68 	}
69 	return 0;
70 error:
71 	return -1;
72 }
73 
pres_dmq_init_proc()74 static int pres_dmq_init_proc()
75 {
76 	// TODO: tidy up
77 
78 	if(!pres_dmq_proc_init) {
79 		LM_DBG("Initializing pres_dmq_proc_init for pid (%d)\n", my_pid());
80 		pres_dmq_proc_init = (int *)pkg_malloc(sizeof(int));
81 		if(!pres_dmq_proc_init) {
82 			LM_ERR("no more pkg memory\n");
83 			return -1;
84 		}
85 		*pres_dmq_proc_init = 0;
86 	}
87 
88 	if(!pres_dmq_recv) {
89 		LM_DBG("Initializing pres_dmq_recv for pid (%d)\n", my_pid());
90 		pres_dmq_recv = (int *)pkg_malloc(sizeof(int));
91 		if(!pres_dmq_recv) {
92 			LM_ERR("no more pkg memory\n");
93 			return -1;
94 		}
95 		*pres_dmq_recv = 0;
96 	}
97 
98 	if(pres_sruid.pid == 0) {
99 		LM_DBG("Initializing pres_sruid for pid (%d)\n", my_pid());
100 		if(sruid_init(&pres_sruid, '-', "pres", SRUID_INC) < 0) {
101 			return -1;
102 		}
103 	}
104 
105 	if(publ_cache_mode==PS_PCACHE_RECORD && pres_subs_dbmode==NO_DB) {
106 		goto finish;
107 	}
108 
109 	if(!pa_db) {
110 		LM_DBG("Initializing presence DB connection for pid (%d)\n", my_pid());
111 
112 		if(pa_dbf.init == 0) {
113 			LM_ERR("database not bound\n");
114 			return -1;
115 		}
116 
117 		/* Do not pool the connections where possible when running notifier
118 		* processes. */
119 		if(pres_notifier_processes > 0 && pa_dbf.init2)
120 			pa_db = pa_dbf.init2(&pres_db_url, DB_POOLING_NONE);
121 		else
122 			pa_db = pa_dbf.init(&pres_db_url);
123 
124 		if(!pa_db) {
125 			LM_ERR("dmq_worker_init: unsuccessful database connection\n");
126 			return -1;
127 		}
128 	}
129 
130 finish:
131 	*pres_dmq_proc_init = 1;
132 
133 	LM_DBG("process initialization complete\n");
134 
135 	return 0;
136 }
137 
pres_dmq_send(str * body,dmq_node_t * node)138 int pres_dmq_send(str *body, dmq_node_t *node)
139 {
140 	if(!pres_dmq_peer) {
141 		LM_ERR("pres_dmq_peer is null!\n");
142 		return -1;
143 	}
144 	if(node) {
145 		LM_DBG("sending dmq message ...\n");
146 		pres_dmqb.send_message(pres_dmq_peer, body, node,
147 				&pres_dmq_resp_callback, 1, &pres_dmq_content_type);
148 	} else {
149 		LM_DBG("sending dmq broadcast...\n");
150 		pres_dmqb.bcast_message(pres_dmq_peer, body, 0, &pres_dmq_resp_callback,
151 				1, &pres_dmq_content_type);
152 	}
153 	return 0;
154 }
155 
156 /**
157  * @brief extract presentity from json object
158 */
pres_parse_json_presentity(srjson_t * in)159 presentity_t *pres_parse_json_presentity(srjson_t *in)
160 {
161 
162 	int p_expires = 0, p_recv = 0;
163 	str p_domain = STR_NULL, p_user = STR_NULL, p_etag = STR_NULL,
164 		p_sender = STR_NULL, p_event_str = STR_NULL;
165 	srjson_t *p_it;
166 	pres_ev_t *p_event = NULL;
167 	presentity_t *presentity = NULL;
168 
169 	LM_DBG("extracting presentity\n");
170 
171 	for(p_it = in->child; p_it; p_it = p_it->next) {
172 		if(strcmp(p_it->string, "domain") == 0) {
173 			p_domain.s = p_it->valuestring;
174 			p_domain.len = strlen(p_it->valuestring);
175 		} else if(strcmp(p_it->string, "user") == 0) {
176 			p_user.s = p_it->valuestring;
177 			p_user.len = strlen(p_it->valuestring);
178 		} else if(strcmp(p_it->string, "etag") == 0) {
179 			p_etag.s = p_it->valuestring;
180 			p_etag.len = strlen(p_it->valuestring);
181 		} else if(strcmp(p_it->string, "expires") == 0) {
182 			p_expires = SRJSON_GET_INT(p_it);
183 		} else if(strcmp(p_it->string, "recv") == 0) {
184 			p_recv = SRJSON_GET_INT(p_it);
185 		} else if(strcmp(p_it->string, "sender") == 0) {
186 			p_sender.s = p_it->valuestring;
187 			p_sender.len = strlen(p_it->valuestring);
188 		} else if(strcmp(p_it->string, "event") == 0) {
189 			p_event_str.s = p_it->valuestring;
190 			p_event_str.len = strlen(p_it->valuestring);
191 			p_event = contains_event(&p_event_str, 0);
192 			if(!p_event) {
193 				LM_ERR("unsupported event %s\n", p_it->valuestring);
194 				return NULL;
195 			}
196 		} else {
197 			LM_ERR("unrecognized field in json object\n");
198 			return NULL;
199 		}
200 	}
201 
202 	if(!p_event) {
203 		LM_ERR("presence event not found\n");
204 		return NULL;
205 	}
206 
207 	LM_DBG("building presentity from domain: %.*s, user: %.*s, expires: %d, "
208 		   "event: %.*s, etag: %.*s, sender: %.*s",
209 			p_domain.len, p_domain.s, p_user.len, p_user.s, p_expires,
210 			p_event->name.len, p_event->name.s, p_etag.len, p_etag.s,
211 			p_sender.len, p_sender.s);
212 
213 	presentity = new_presentity(
214 			&p_domain, &p_user, p_expires, p_event, &p_etag, &p_sender);
215 
216 	if(!presentity)
217 		return NULL;
218 
219 	if(p_recv > 0)
220 		presentity->received_time = p_recv;
221 
222 	return presentity;
223 }
224 
225 /**
226 * @brief presence dmq callback
227 */
pres_dmq_handle_msg(struct sip_msg * msg,peer_reponse_t * resp,dmq_node_t * node)228 int pres_dmq_handle_msg(
229 		struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *node)
230 {
231 	int content_length = 0, t_new = 0, sent_reply = 0;
232 	str cur_etag = STR_NULL, body = STR_NULL, p_body = STR_NULL,
233 		ruid = STR_NULL;
234 	char *sphere = NULL;
235 	srjson_doc_t jdoc;
236 	srjson_t *it = NULL;
237 	presentity_t *presentity = NULL;
238 
239 	pres_dmq_action_t action = PRES_DMQ_NONE;
240 
241 	/* received dmq message */
242 	LM_DBG("dmq message received\n");
243 
244 	if(!pres_dmq_proc_init && pres_dmq_init_proc() < 0) {
245 		return 0;
246 	}
247 
248 	*pres_dmq_recv = 1;
249 	srjson_InitDoc(&jdoc, NULL);
250 
251 	if(!msg->content_length) {
252 		LM_ERR("no content length header found\n");
253 		goto invalid;
254 	}
255 	content_length = get_content_length(msg);
256 	if(!content_length) {
257 		LM_DBG("content length is 0\n");
258 		goto invalid;
259 	}
260 
261 	body.s = get_body(msg);
262 	body.len = content_length;
263 
264 	if(!body.s) {
265 		LM_ERR("unable to get body\n");
266 		goto error;
267 	}
268 
269 	/* parse body */
270 	LM_DBG("body: %.*s\n", body.len, body.s);
271 
272 	jdoc.buf = body;
273 
274 	if(jdoc.root == NULL) {
275 		jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
276 		if(jdoc.root == NULL) {
277 			LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
278 			goto invalid;
279 		}
280 	}
281 
282 	/* iterate over keys */
283 	for(it = jdoc.root->child; it; it = it->next) {
284 		LM_DBG("found field: %s\n", it->string);
285 		if(strcmp(it->string, "action") == 0) {
286 			action = SRJSON_GET_INT(it);
287 		} else if(strcmp(it->string, "presentity") == 0) {
288 			presentity = pres_parse_json_presentity(it);
289 			if(!presentity) {
290 				LM_ERR("failed to construct presentity from json\n");
291 				goto invalid;
292 			}
293 		} else if(strcmp(it->string, "t_new") == 0) {
294 			t_new = SRJSON_GET_INT(it);
295 		} else if(strcmp(it->string, "cur_etag") == 0) {
296 			cur_etag.s = it->valuestring;
297 			cur_etag.len = strlen(it->valuestring);
298 		} else if(strcmp(it->string, "sphere") == 0) {
299 			sphere = it->valuestring;
300 		} else if(strcmp(it->string, "ruid") == 0) {
301 			ruid.s = it->valuestring;
302 			ruid.len = strlen(it->valuestring);
303 		} else if(strcmp(it->string, "body") == 0) {
304 			p_body.s = it->valuestring;
305 			p_body.len = strlen(it->valuestring);
306 		} else {
307 			LM_ERR("unrecognized field in json object\n");
308 			goto invalid;
309 		}
310 	}
311 
312 	switch(action) {
313 		case PRES_DMQ_UPDATE_PRESENTITY:
314 			if(presentity == NULL
315 					|| update_presentity(NULL, presentity, &p_body, t_new,
316 							   &sent_reply, sphere, &cur_etag, &ruid, 0)
317 							   < 0) {
318 				goto error;
319 			}
320 			break;
321 		case PRES_DMQ_SYNC:
322 		case PRES_DMQ_NONE:
323 			break;
324 	}
325 
326 	resp->reason = pres_dmq_200_rpl;
327 	resp->resp_code = 200;
328 	goto cleanup;
329 
330 invalid:
331 	resp->reason = pres_dmq_400_rpl;
332 	resp->resp_code = 400;
333 	goto cleanup;
334 
335 error:
336 	resp->reason = pres_dmq_500_rpl;
337 	resp->resp_code = 500;
338 
339 cleanup:
340 	*pres_dmq_recv = 0;
341 	srjson_DestroyDoc(&jdoc);
342 	if(presentity)
343 		pkg_free(presentity);
344 
345 	return 0;
346 }
347 
348 
pres_dmq_request_sync()349 int pres_dmq_request_sync()
350 {
351 	srjson_doc_t jdoc;
352 
353 	LM_DBG("requesting sync from dmq peers\n");
354 
355 	srjson_InitDoc(&jdoc, NULL);
356 
357 	jdoc.root = srjson_CreateObject(&jdoc);
358 	if(jdoc.root == NULL) {
359 		LM_ERR("cannot create json root\n");
360 		goto error;
361 	}
362 
363 	srjson_AddNumberToObject(&jdoc, jdoc.root, "action", PRES_DMQ_SYNC);
364 	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
365 	if(jdoc.buf.s == NULL) {
366 		LM_ERR("unable to serialize data\n");
367 		goto error;
368 	}
369 	jdoc.buf.len = strlen(jdoc.buf.s);
370 	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
371 	if(pres_dmq_send(&jdoc.buf, 0) != 0) {
372 		goto error;
373 	}
374 
375 	jdoc.free_fn(jdoc.buf.s);
376 	jdoc.buf.s = NULL;
377 	srjson_DestroyDoc(&jdoc);
378 	return 0;
379 
380 error:
381 	if(jdoc.buf.s != NULL) {
382 		jdoc.free_fn(jdoc.buf.s);
383 		jdoc.buf.s = NULL;
384 	}
385 	srjson_DestroyDoc(&jdoc);
386 	return -1;
387 }
388 
389 
pres_dmq_replicate_presentity(presentity_t * presentity,str * body,int t_new,str * cur_etag,char * sphere,str * ruid,dmq_node_t * node)390 int pres_dmq_replicate_presentity(presentity_t *presentity, str *body,
391 		int t_new, str *cur_etag, char *sphere, str *ruid, dmq_node_t *node)
392 {
393 
394 	srjson_doc_t jdoc;
395 	srjson_t *p_json;
396 
397 	LM_DBG("replicating presentity record - old etag %.*s, new etag %.*s, ruid "
398 		   "%.*s\n",
399 			presentity->etag.len, presentity->etag.s, cur_etag->len,
400 			cur_etag->s, ruid->len, ruid->s);
401 
402 	if(!pres_dmq_proc_init && pres_dmq_init_proc() < 0) {
403 		return -1;
404 	}
405 
406 	if(*pres_dmq_recv) {
407 		return 0;
408 	}
409 
410 	srjson_InitDoc(&jdoc, NULL);
411 
412 	jdoc.root = srjson_CreateObject(&jdoc);
413 	if(jdoc.root == NULL) {
414 		LM_ERR("cannot create json root\n");
415 		goto error;
416 	}
417 
418 	// action
419 	srjson_AddNumberToObject(
420 			&jdoc, jdoc.root, "action", PRES_DMQ_UPDATE_PRESENTITY);
421 	// presentity
422 	p_json = srjson_CreateObject(&jdoc);
423 	srjson_AddStrToObject(&jdoc, p_json, "domain", presentity->domain.s,
424 			presentity->domain.len);
425 	srjson_AddStrToObject(
426 			&jdoc, p_json, "user", presentity->user.s, presentity->user.len);
427 	srjson_AddStrToObject(
428 			&jdoc, p_json, "etag", presentity->etag.s, presentity->etag.len);
429 	srjson_AddNumberToObject(&jdoc, p_json, "expires", presentity->expires);
430 	srjson_AddNumberToObject(&jdoc, p_json, "recv", presentity->received_time);
431 	if(presentity->sender) {
432 		srjson_AddStrToObject(&jdoc, p_json, "sender", presentity->sender->s,
433 				presentity->sender->len);
434 	}
435 	srjson_AddStrToObject(&jdoc, p_json, "event", presentity->event->name.s,
436 			presentity->event->name.len);
437 	srjson_AddItemToObject(&jdoc, jdoc.root, "presentity", p_json);
438 	// t_new
439 	srjson_AddNumberToObject(&jdoc, jdoc.root, "t_new", t_new);
440 	// cur_etag
441 	if(cur_etag) {
442 		srjson_AddStrToObject(
443 				&jdoc, jdoc.root, "cur_etag", cur_etag->s, cur_etag->len);
444 	}
445 	// sphere
446 	if(sphere) {
447 		srjson_AddStringToObject(&jdoc, jdoc.root, "sphere", sphere);
448 	}
449 	// ruid
450 	if(ruid) {
451 		srjson_AddStrToObject(&jdoc, jdoc.root, "ruid", ruid->s, ruid->len);
452 	}
453 	// body
454 	if(body) {
455 		srjson_AddStrToObject(&jdoc, jdoc.root, "body", body->s, body->len);
456 	}
457 
458 	jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
459 	if(jdoc.buf.s == NULL) {
460 		LM_ERR("unable to serialize data\n");
461 		goto error;
462 	}
463 	jdoc.buf.len = strlen(jdoc.buf.s);
464 	LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
465 	if(pres_dmq_send(&jdoc.buf, node) != 0) {
466 		goto error;
467 	}
468 
469 	jdoc.free_fn(jdoc.buf.s);
470 	jdoc.buf.s = NULL;
471 	srjson_DestroyDoc(&jdoc);
472 	return 0;
473 
474 error:
475 	if(jdoc.buf.s != NULL) {
476 		jdoc.free_fn(jdoc.buf.s);
477 		jdoc.buf.s = NULL;
478 	}
479 	srjson_DestroyDoc(&jdoc);
480 	return -1;
481 }
482 
483 
pres_dmq_send_all_presentities(dmq_node_t * dmq_node)484 int pres_dmq_send_all_presentities(dmq_node_t *dmq_node)
485 {
486 	// TODO: implement send all presentities
487 
488 	return 0;
489 }
490 
491 
492 /**
493 * @brief dmq response callback
494 */
pres_dmq_resp_callback_f(struct sip_msg * msg,int code,dmq_node_t * node,void * param)495 int pres_dmq_resp_callback_f(
496 		struct sip_msg *msg, int code, dmq_node_t *node, void *param)
497 {
498 	LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
499 	return 0;
500 }
501