1 /**
2 *
3 * Copyright (C) 2018 Charles Chance (Sipcentric Ltd)
4 *
5 * This file is part of Kamailio, a free SIP server.
6 *
7 * Kamailio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version
11 *
12 * Kamailio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 */
22
23 #include "presence.h"
24 #include "presence_dmq.h"
25
26 static str pres_dmq_content_type = str_init("application/json");
27 static str pres_dmq_200_rpl = str_init("OK");
28 static str pres_dmq_400_rpl = str_init("Bad Request");
29 static str pres_dmq_500_rpl = str_init("Server Internal Error");
30
31 static int *pres_dmq_proc_init = 0;
32 static int *pres_dmq_recv = 0;
33
34 dmq_api_t pres_dmqb;
35 dmq_peer_t *pres_dmq_peer = NULL;
36 dmq_resp_cback_t pres_dmq_resp_callback = {&pres_dmq_resp_callback_f, 0};
37
38 int pres_dmq_send_all_presentities();
39 int pres_dmq_request_sync();
40
41 /**
42 * @brief add notification peer
43 */
pres_dmq_initialize()44 int pres_dmq_initialize()
45 {
46 dmq_peer_t not_peer;
47
48 /* load the DMQ API */
49 if(dmq_load_api(&pres_dmqb) != 0) {
50 LM_ERR("cannot load dmq api\n");
51 return -1;
52 } else {
53 LM_DBG("loaded dmq api\n");
54 }
55
56 not_peer.callback = pres_dmq_handle_msg;
57 not_peer.init_callback = pres_dmq_request_sync;
58 not_peer.description.s = "presence";
59 not_peer.description.len = 8;
60 not_peer.peer_id.s = "presence";
61 not_peer.peer_id.len = 8;
62 pres_dmq_peer = pres_dmqb.register_dmq_peer(¬_peer);
63 if(!pres_dmq_peer) {
64 LM_ERR("error in register_dmq_peer\n");
65 goto error;
66 } else {
67 LM_DBG("dmq peer registered\n");
68 }
69 return 0;
70 error:
71 return -1;
72 }
73
pres_dmq_init_proc()74 static int pres_dmq_init_proc()
75 {
76 // TODO: tidy up
77
78 if(!pres_dmq_proc_init) {
79 LM_DBG("Initializing pres_dmq_proc_init for pid (%d)\n", my_pid());
80 pres_dmq_proc_init = (int *)pkg_malloc(sizeof(int));
81 if(!pres_dmq_proc_init) {
82 LM_ERR("no more pkg memory\n");
83 return -1;
84 }
85 *pres_dmq_proc_init = 0;
86 }
87
88 if(!pres_dmq_recv) {
89 LM_DBG("Initializing pres_dmq_recv for pid (%d)\n", my_pid());
90 pres_dmq_recv = (int *)pkg_malloc(sizeof(int));
91 if(!pres_dmq_recv) {
92 LM_ERR("no more pkg memory\n");
93 return -1;
94 }
95 *pres_dmq_recv = 0;
96 }
97
98 if(pres_sruid.pid == 0) {
99 LM_DBG("Initializing pres_sruid for pid (%d)\n", my_pid());
100 if(sruid_init(&pres_sruid, '-', "pres", SRUID_INC) < 0) {
101 return -1;
102 }
103 }
104
105 if(publ_cache_mode==PS_PCACHE_RECORD && pres_subs_dbmode==NO_DB) {
106 goto finish;
107 }
108
109 if(!pa_db) {
110 LM_DBG("Initializing presence DB connection for pid (%d)\n", my_pid());
111
112 if(pa_dbf.init == 0) {
113 LM_ERR("database not bound\n");
114 return -1;
115 }
116
117 /* Do not pool the connections where possible when running notifier
118 * processes. */
119 if(pres_notifier_processes > 0 && pa_dbf.init2)
120 pa_db = pa_dbf.init2(&pres_db_url, DB_POOLING_NONE);
121 else
122 pa_db = pa_dbf.init(&pres_db_url);
123
124 if(!pa_db) {
125 LM_ERR("dmq_worker_init: unsuccessful database connection\n");
126 return -1;
127 }
128 }
129
130 finish:
131 *pres_dmq_proc_init = 1;
132
133 LM_DBG("process initialization complete\n");
134
135 return 0;
136 }
137
pres_dmq_send(str * body,dmq_node_t * node)138 int pres_dmq_send(str *body, dmq_node_t *node)
139 {
140 if(!pres_dmq_peer) {
141 LM_ERR("pres_dmq_peer is null!\n");
142 return -1;
143 }
144 if(node) {
145 LM_DBG("sending dmq message ...\n");
146 pres_dmqb.send_message(pres_dmq_peer, body, node,
147 &pres_dmq_resp_callback, 1, &pres_dmq_content_type);
148 } else {
149 LM_DBG("sending dmq broadcast...\n");
150 pres_dmqb.bcast_message(pres_dmq_peer, body, 0, &pres_dmq_resp_callback,
151 1, &pres_dmq_content_type);
152 }
153 return 0;
154 }
155
156 /**
157 * @brief extract presentity from json object
158 */
pres_parse_json_presentity(srjson_t * in)159 presentity_t *pres_parse_json_presentity(srjson_t *in)
160 {
161
162 int p_expires = 0, p_recv = 0;
163 str p_domain = STR_NULL, p_user = STR_NULL, p_etag = STR_NULL,
164 p_sender = STR_NULL, p_event_str = STR_NULL;
165 srjson_t *p_it;
166 pres_ev_t *p_event = NULL;
167 presentity_t *presentity = NULL;
168
169 LM_DBG("extracting presentity\n");
170
171 for(p_it = in->child; p_it; p_it = p_it->next) {
172 if(strcmp(p_it->string, "domain") == 0) {
173 p_domain.s = p_it->valuestring;
174 p_domain.len = strlen(p_it->valuestring);
175 } else if(strcmp(p_it->string, "user") == 0) {
176 p_user.s = p_it->valuestring;
177 p_user.len = strlen(p_it->valuestring);
178 } else if(strcmp(p_it->string, "etag") == 0) {
179 p_etag.s = p_it->valuestring;
180 p_etag.len = strlen(p_it->valuestring);
181 } else if(strcmp(p_it->string, "expires") == 0) {
182 p_expires = SRJSON_GET_INT(p_it);
183 } else if(strcmp(p_it->string, "recv") == 0) {
184 p_recv = SRJSON_GET_INT(p_it);
185 } else if(strcmp(p_it->string, "sender") == 0) {
186 p_sender.s = p_it->valuestring;
187 p_sender.len = strlen(p_it->valuestring);
188 } else if(strcmp(p_it->string, "event") == 0) {
189 p_event_str.s = p_it->valuestring;
190 p_event_str.len = strlen(p_it->valuestring);
191 p_event = contains_event(&p_event_str, 0);
192 if(!p_event) {
193 LM_ERR("unsupported event %s\n", p_it->valuestring);
194 return NULL;
195 }
196 } else {
197 LM_ERR("unrecognized field in json object\n");
198 return NULL;
199 }
200 }
201
202 if(!p_event) {
203 LM_ERR("presence event not found\n");
204 return NULL;
205 }
206
207 LM_DBG("building presentity from domain: %.*s, user: %.*s, expires: %d, "
208 "event: %.*s, etag: %.*s, sender: %.*s",
209 p_domain.len, p_domain.s, p_user.len, p_user.s, p_expires,
210 p_event->name.len, p_event->name.s, p_etag.len, p_etag.s,
211 p_sender.len, p_sender.s);
212
213 presentity = new_presentity(
214 &p_domain, &p_user, p_expires, p_event, &p_etag, &p_sender);
215
216 if(!presentity)
217 return NULL;
218
219 if(p_recv > 0)
220 presentity->received_time = p_recv;
221
222 return presentity;
223 }
224
225 /**
226 * @brief presence dmq callback
227 */
pres_dmq_handle_msg(struct sip_msg * msg,peer_reponse_t * resp,dmq_node_t * node)228 int pres_dmq_handle_msg(
229 struct sip_msg *msg, peer_reponse_t *resp, dmq_node_t *node)
230 {
231 int content_length = 0, t_new = 0, sent_reply = 0;
232 str cur_etag = STR_NULL, body = STR_NULL, p_body = STR_NULL,
233 ruid = STR_NULL;
234 char *sphere = NULL;
235 srjson_doc_t jdoc;
236 srjson_t *it = NULL;
237 presentity_t *presentity = NULL;
238
239 pres_dmq_action_t action = PRES_DMQ_NONE;
240
241 /* received dmq message */
242 LM_DBG("dmq message received\n");
243
244 if(!pres_dmq_proc_init && pres_dmq_init_proc() < 0) {
245 return 0;
246 }
247
248 *pres_dmq_recv = 1;
249 srjson_InitDoc(&jdoc, NULL);
250
251 if(!msg->content_length) {
252 LM_ERR("no content length header found\n");
253 goto invalid;
254 }
255 content_length = get_content_length(msg);
256 if(!content_length) {
257 LM_DBG("content length is 0\n");
258 goto invalid;
259 }
260
261 body.s = get_body(msg);
262 body.len = content_length;
263
264 if(!body.s) {
265 LM_ERR("unable to get body\n");
266 goto error;
267 }
268
269 /* parse body */
270 LM_DBG("body: %.*s\n", body.len, body.s);
271
272 jdoc.buf = body;
273
274 if(jdoc.root == NULL) {
275 jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
276 if(jdoc.root == NULL) {
277 LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
278 goto invalid;
279 }
280 }
281
282 /* iterate over keys */
283 for(it = jdoc.root->child; it; it = it->next) {
284 LM_DBG("found field: %s\n", it->string);
285 if(strcmp(it->string, "action") == 0) {
286 action = SRJSON_GET_INT(it);
287 } else if(strcmp(it->string, "presentity") == 0) {
288 presentity = pres_parse_json_presentity(it);
289 if(!presentity) {
290 LM_ERR("failed to construct presentity from json\n");
291 goto invalid;
292 }
293 } else if(strcmp(it->string, "t_new") == 0) {
294 t_new = SRJSON_GET_INT(it);
295 } else if(strcmp(it->string, "cur_etag") == 0) {
296 cur_etag.s = it->valuestring;
297 cur_etag.len = strlen(it->valuestring);
298 } else if(strcmp(it->string, "sphere") == 0) {
299 sphere = it->valuestring;
300 } else if(strcmp(it->string, "ruid") == 0) {
301 ruid.s = it->valuestring;
302 ruid.len = strlen(it->valuestring);
303 } else if(strcmp(it->string, "body") == 0) {
304 p_body.s = it->valuestring;
305 p_body.len = strlen(it->valuestring);
306 } else {
307 LM_ERR("unrecognized field in json object\n");
308 goto invalid;
309 }
310 }
311
312 switch(action) {
313 case PRES_DMQ_UPDATE_PRESENTITY:
314 if(presentity == NULL
315 || update_presentity(NULL, presentity, &p_body, t_new,
316 &sent_reply, sphere, &cur_etag, &ruid, 0)
317 < 0) {
318 goto error;
319 }
320 break;
321 case PRES_DMQ_SYNC:
322 case PRES_DMQ_NONE:
323 break;
324 }
325
326 resp->reason = pres_dmq_200_rpl;
327 resp->resp_code = 200;
328 goto cleanup;
329
330 invalid:
331 resp->reason = pres_dmq_400_rpl;
332 resp->resp_code = 400;
333 goto cleanup;
334
335 error:
336 resp->reason = pres_dmq_500_rpl;
337 resp->resp_code = 500;
338
339 cleanup:
340 *pres_dmq_recv = 0;
341 srjson_DestroyDoc(&jdoc);
342 if(presentity)
343 pkg_free(presentity);
344
345 return 0;
346 }
347
348
pres_dmq_request_sync()349 int pres_dmq_request_sync()
350 {
351 srjson_doc_t jdoc;
352
353 LM_DBG("requesting sync from dmq peers\n");
354
355 srjson_InitDoc(&jdoc, NULL);
356
357 jdoc.root = srjson_CreateObject(&jdoc);
358 if(jdoc.root == NULL) {
359 LM_ERR("cannot create json root\n");
360 goto error;
361 }
362
363 srjson_AddNumberToObject(&jdoc, jdoc.root, "action", PRES_DMQ_SYNC);
364 jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
365 if(jdoc.buf.s == NULL) {
366 LM_ERR("unable to serialize data\n");
367 goto error;
368 }
369 jdoc.buf.len = strlen(jdoc.buf.s);
370 LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
371 if(pres_dmq_send(&jdoc.buf, 0) != 0) {
372 goto error;
373 }
374
375 jdoc.free_fn(jdoc.buf.s);
376 jdoc.buf.s = NULL;
377 srjson_DestroyDoc(&jdoc);
378 return 0;
379
380 error:
381 if(jdoc.buf.s != NULL) {
382 jdoc.free_fn(jdoc.buf.s);
383 jdoc.buf.s = NULL;
384 }
385 srjson_DestroyDoc(&jdoc);
386 return -1;
387 }
388
389
pres_dmq_replicate_presentity(presentity_t * presentity,str * body,int t_new,str * cur_etag,char * sphere,str * ruid,dmq_node_t * node)390 int pres_dmq_replicate_presentity(presentity_t *presentity, str *body,
391 int t_new, str *cur_etag, char *sphere, str *ruid, dmq_node_t *node)
392 {
393
394 srjson_doc_t jdoc;
395 srjson_t *p_json;
396
397 LM_DBG("replicating presentity record - old etag %.*s, new etag %.*s, ruid "
398 "%.*s\n",
399 presentity->etag.len, presentity->etag.s, cur_etag->len,
400 cur_etag->s, ruid->len, ruid->s);
401
402 if(!pres_dmq_proc_init && pres_dmq_init_proc() < 0) {
403 return -1;
404 }
405
406 if(*pres_dmq_recv) {
407 return 0;
408 }
409
410 srjson_InitDoc(&jdoc, NULL);
411
412 jdoc.root = srjson_CreateObject(&jdoc);
413 if(jdoc.root == NULL) {
414 LM_ERR("cannot create json root\n");
415 goto error;
416 }
417
418 // action
419 srjson_AddNumberToObject(
420 &jdoc, jdoc.root, "action", PRES_DMQ_UPDATE_PRESENTITY);
421 // presentity
422 p_json = srjson_CreateObject(&jdoc);
423 srjson_AddStrToObject(&jdoc, p_json, "domain", presentity->domain.s,
424 presentity->domain.len);
425 srjson_AddStrToObject(
426 &jdoc, p_json, "user", presentity->user.s, presentity->user.len);
427 srjson_AddStrToObject(
428 &jdoc, p_json, "etag", presentity->etag.s, presentity->etag.len);
429 srjson_AddNumberToObject(&jdoc, p_json, "expires", presentity->expires);
430 srjson_AddNumberToObject(&jdoc, p_json, "recv", presentity->received_time);
431 if(presentity->sender) {
432 srjson_AddStrToObject(&jdoc, p_json, "sender", presentity->sender->s,
433 presentity->sender->len);
434 }
435 srjson_AddStrToObject(&jdoc, p_json, "event", presentity->event->name.s,
436 presentity->event->name.len);
437 srjson_AddItemToObject(&jdoc, jdoc.root, "presentity", p_json);
438 // t_new
439 srjson_AddNumberToObject(&jdoc, jdoc.root, "t_new", t_new);
440 // cur_etag
441 if(cur_etag) {
442 srjson_AddStrToObject(
443 &jdoc, jdoc.root, "cur_etag", cur_etag->s, cur_etag->len);
444 }
445 // sphere
446 if(sphere) {
447 srjson_AddStringToObject(&jdoc, jdoc.root, "sphere", sphere);
448 }
449 // ruid
450 if(ruid) {
451 srjson_AddStrToObject(&jdoc, jdoc.root, "ruid", ruid->s, ruid->len);
452 }
453 // body
454 if(body) {
455 srjson_AddStrToObject(&jdoc, jdoc.root, "body", body->s, body->len);
456 }
457
458 jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
459 if(jdoc.buf.s == NULL) {
460 LM_ERR("unable to serialize data\n");
461 goto error;
462 }
463 jdoc.buf.len = strlen(jdoc.buf.s);
464 LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
465 if(pres_dmq_send(&jdoc.buf, node) != 0) {
466 goto error;
467 }
468
469 jdoc.free_fn(jdoc.buf.s);
470 jdoc.buf.s = NULL;
471 srjson_DestroyDoc(&jdoc);
472 return 0;
473
474 error:
475 if(jdoc.buf.s != NULL) {
476 jdoc.free_fn(jdoc.buf.s);
477 jdoc.buf.s = NULL;
478 }
479 srjson_DestroyDoc(&jdoc);
480 return -1;
481 }
482
483
pres_dmq_send_all_presentities(dmq_node_t * dmq_node)484 int pres_dmq_send_all_presentities(dmq_node_t *dmq_node)
485 {
486 // TODO: implement send all presentities
487
488 return 0;
489 }
490
491
492 /**
493 * @brief dmq response callback
494 */
pres_dmq_resp_callback_f(struct sip_msg * msg,int code,dmq_node_t * node,void * param)495 int pres_dmq_resp_callback_f(
496 struct sip_msg *msg, int code, dmq_node_t *node, void *param)
497 {
498 LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
499 return 0;
500 }
501