1 /**
2 * Copyright (C) 2013 Flowroute LLC (flowroute.com)
3 *
4 * This file is part of Kamailio, a free SIP server.
5 *
6 * This file is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version
10 *
11 *
12 * This file is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 *
21 */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <string.h>
27 #include <fcntl.h>
28 #include <event.h>
29
30 #include "../../core/sr_module.h"
31 #include "../../core/route.h"
32 #include "../../core/route_struct.h"
33 #include "../../core/resolve.h"
34 #include "../../core/parser/parse_param.h"
35 #include "../../core/mem/mem.h"
36 #include "../../core/lvalue.h"
37
38 #include "netstring.h"
39 #include "janssonrpc.h"
40 #include "janssonrpc_request.h"
41 #include "janssonrpc_io.h"
42 #include "janssonrpc_srv.h"
43 #include "janssonrpc_server.h"
44 #include "janssonrpc_connect.h"
45
46 gen_lock_t* jsonrpc_server_group_lock = NULL;
47
48 /* where all the servers are stored */
49 jsonrpc_server_group_t** global_server_group = NULL;
50
51 /* used for debugging only */
print_server(jsonrpc_server_t * server)52 void print_server(jsonrpc_server_t* server)
53 {
54 INFO("\t----- server ------\n");
55 INFO("\t|pointer: %p\n", server);
56 INFO("\t|conn: %.*s\n", STR(server->conn));
57 INFO("\t|addr: %.*s\n", STR(server->addr));
58 switch (server->status) {
59 case JSONRPC_SERVER_CONNECTED:
60 INFO("\t|status: connected\n");
61 break;
62 case JSONRPC_SERVER_DISCONNECTED:
63 INFO("\t|status: disconnected\n");
64 break;
65 case JSONRPC_SERVER_FAILURE:
66 INFO("\t|status: failure\n");
67 break;
68 case JSONRPC_SERVER_CLOSING:
69 INFO("\t|status: closing\n");
70 break;
71 case JSONRPC_SERVER_RECONNECTING:
72 INFO("\t|status: reconnecting\n");
73 break;
74 default:
75 INFO("\t|status: invalid (%d)\n", server->status);
76 break;
77 }
78 INFO("\t|srv: %.*s\n", STR(server->srv));
79 INFO("\t|ttl: %d\n", server->ttl);
80 INFO("\t|port: %d\n", server->port);
81 INFO("\t|priority: %d\n", server->priority);
82 INFO("\t|weight: %d\n", server->weight);
83 INFO("\t|hwm: %d\n", server->hwm);
84 INFO("\t|req_count: %d\n", server->req_count);
85 if(server->added) {
86 INFO("\t|added: true\n");
87 } else {
88 INFO("\t|added: false\n");
89 }
90 INFO("\t-------------------\n");
91 }
92
93 /* used for debugging only */
print_group(jsonrpc_server_group_t ** group)94 void print_group(jsonrpc_server_group_t** group)
95 {
96 jsonrpc_server_group_t* grp = NULL;
97
98 INFO("group addr is %p\n", group);
99
100 if(group == NULL)
101 return;
102
103 for (grp=*group; grp != NULL; grp=grp->next) {
104 switch(grp->type) {
105 case CONN_GROUP:
106 INFO("Connection group: %.*s\n", STR(grp->conn));
107 print_group(&(grp->sub_group));
108 break;
109 case PRIORITY_GROUP:
110 INFO("Priority group: %d\n", grp->priority);
111 print_group(&(grp->sub_group));
112 break;
113 case WEIGHT_GROUP:
114 INFO("Weight group: %d\n", grp->weight);
115 print_server(grp->server);
116 break;
117 }
118 }
119 }
120
jsonrpc_parse_server(char * server_s,jsonrpc_server_group_t ** group_ptr)121 int jsonrpc_parse_server(char* server_s, jsonrpc_server_group_t **group_ptr)
122 {
123 if(group_ptr == NULL) {
124 ERR("Trying to add server to null group ptr\n");
125 return -1;
126 }
127
128 str s;
129 param_hooks_t phooks;
130 param_t* pit=NULL;
131 param_t* freeme=NULL;
132 str conn;
133 str addr;
134 addr.s = NULL;
135 str srv;
136 srv.s = NULL;
137
138 unsigned int priority = JSONRPC_DEFAULT_PRIORITY;
139 unsigned int weight = JSONRPC_DEFAULT_WEIGHT;
140 unsigned int hwm = JSONRPC_DEFAULT_HWM;
141 unsigned int port = 0;
142
143 s.s = server_s;
144 s.len = strlen(server_s);
145 if (s.s[s.len-1] == ';')
146 s.len--;
147
148 if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0) {
149 ERR("Failed parsing params value\n");
150 return -1;
151 }
152
153 freeme = pit;
154
155 for (; pit;pit=pit->next)
156 {
157 if PIT_MATCHES("conn") {
158 shm_str_dup(&conn, &pit->body);
159 CHECK_MALLOC(conn.s);
160
161 } else if PIT_MATCHES("srv") {
162 shm_str_dup(&srv, &pit->body);
163 CHECK_MALLOC(srv.s);
164
165 } else if PIT_MATCHES("addr") {
166 shm_str_dup(&addr, &pit->body);
167 CHECK_MALLOC(addr.s);
168
169 } else if PIT_MATCHES("port") {
170 port = atoi(pit->body.s);
171
172 } else if PIT_MATCHES("priority") {
173 priority = atoi(pit->body.s);
174
175 } else if PIT_MATCHES("weight") {
176 weight = atoi(pit->body.s);
177
178 } else if PIT_MATCHES("hwm") {
179 hwm = atoi(pit->body.s);
180
181 } else if PIT_MATCHES("proto") {
182 if(strncmp(pit->body.s, "tcp", sizeof("tcp")-1) != 0) {
183 ERR("Unsupported proto=%.*s. Only tcp is supported.\n",
184 STR(pit->body));
185 goto error;
186 }
187 } else {
188 ERR("Unrecognized parameter: %.*s\n", STR(pit->name));
189 goto error;
190 }
191
192 DEBUG("%.*s = %.*s\n", STR(pit->name), STR(pit->body));
193 }
194
195 if(conn.s == NULL) {
196 ERR("No conn defined! conn parameter is required.\n");
197 goto error;
198 }
199
200 if (srv.s != NULL) {
201 if (addr.s != NULL
202 || port != 0
203 || weight != JSONRPC_DEFAULT_WEIGHT
204 || priority != JSONRPC_DEFAULT_PRIORITY) {
205 ERR("addr, port, weight, and priority are not supported when using srv\n");
206 goto error;
207 }
208
209 if (jsonrpc_server_from_srv(conn, srv, hwm, group_ptr)<0) goto error;
210
211 } else {
212
213 if (addr.s == NULL || port == 0) {
214 ERR("no address/port defined\n");
215 goto error;
216 }
217
218 jsonrpc_server_t* server = create_server();
219 CHECK_MALLOC(server);
220
221 server->conn = conn;
222 server->addr = addr;
223 server->port = port;
224 server->priority = priority;
225 server->weight = weight;
226 server->hwm = hwm;
227
228 if(jsonrpc_add_server(server, group_ptr)<0) goto error;
229 }
230
231 //print_group(group_ptr); /* debug */
232
233 CHECK_AND_FREE(srv.s);
234 if (freeme) free_params(freeme);
235 return 0;
236
237 error:
238 CHECK_AND_FREE(srv.s);
239 if (freeme) free_params(freeme);
240 return -1;
241 }
242
jsonrpc_server_from_srv(str conn,str srv,unsigned int hwm,jsonrpc_server_group_t ** group_ptr)243 int jsonrpc_server_from_srv(str conn, str srv,
244 unsigned int hwm, jsonrpc_server_group_t** group_ptr)
245 {
246 struct rdata *l, *head;
247 struct srv_rdata *srv_record;
248 str name;
249 unsigned int ttl = jsonrpc_min_srv_ttl;
250
251 jsonrpc_server_t* server = NULL;
252
253 resolv_init();
254
255 head = get_record(srv.s, T_SRV, RES_AR);
256 if (head == NULL) {
257 ERR("No SRV record returned for %.*s\n", STR(srv));
258 goto error;
259 }
260 for (l=head; l; l=l->next) {
261 if (l->type != T_SRV)
262 continue;
263 srv_record = (struct srv_rdata*)l->rdata;
264 if (srv_record == NULL) {
265 ERR("BUG: null rdata\n");
266 goto error;
267 }
268
269 if (l->ttl < jsonrpc_min_srv_ttl) {
270 ttl = jsonrpc_min_srv_ttl;
271 } else {
272 ttl = l->ttl;
273 }
274
275 name.s = srv_record->name;
276 name.len = srv_record->name_len;
277
278 DBG("server %s\n", srv_record->name);
279
280 server = create_server();
281 CHECK_MALLOC(server);
282
283 shm_str_dup(&server->conn, &conn);
284 CHECK_MALLOC_GOTO(server->conn.s, error);
285
286 shm_str_dup(&server->addr, &name);
287 CHECK_MALLOC_GOTO(server->addr.s, error);
288
289 shm_str_dup(&server->srv, &srv);
290 CHECK_MALLOC_GOTO(server->srv.s, error);
291
292 server->port = srv_record->port;
293 server->priority = srv_record->priority;
294 server->weight = srv_record->weight;
295 server->ttl = ttl;
296 server->hwm = hwm;
297
298 if(jsonrpc_add_server(server, group_ptr)<0) goto error;
299 }
300
301 jsonrpc_srv_t* new_srv = create_srv(srv, conn, ttl);
302 addto_srv_list(new_srv, &global_srv_list);
303
304 free_rdata_list(head);
305
306 return 0;
307 error:
308 CHECK_AND_FREE(server);
309 if (head) free_rdata_list(head);
310
311 return -1;
312 }
313
create_server_group(server_group_t type,jsonrpc_server_group_t ** grp)314 int create_server_group(server_group_t type, jsonrpc_server_group_t** grp)
315 {
316 if(grp == NULL) {
317 ERR("Trying to dereference null group pointer\n");
318 return -1;
319 }
320
321 jsonrpc_server_group_t* new_grp =
322 shm_malloc(sizeof(jsonrpc_server_group_t));
323 CHECK_MALLOC(new_grp);
324
325 switch(type) {
326 case CONN_GROUP:
327 DEBUG("Creating new connection group\n");
328 new_grp->conn.s = NULL;
329 new_grp->conn.len = 0;
330 break;
331 case PRIORITY_GROUP:
332 DEBUG("Creating new priority group\n");
333 new_grp->priority = JSONRPC_DEFAULT_PRIORITY;
334 break;
335 case WEIGHT_GROUP:
336 DEBUG("Creating new weight group\n");
337 new_grp->server = NULL;
338 new_grp->weight = JSONRPC_DEFAULT_WEIGHT;
339 break;
340 }
341
342 new_grp->next = NULL;
343 new_grp->sub_group = NULL;
344 new_grp->type = type;
345 *grp = new_grp;
346 return 0;
347 }
348
free_server_group(jsonrpc_server_group_t ** grp)349 void free_server_group(jsonrpc_server_group_t** grp)
350 {
351 if(grp == NULL)
352 return;
353
354 jsonrpc_server_group_t* next = NULL;
355 jsonrpc_server_group_t* cgroup = NULL;
356 jsonrpc_server_group_t* pgroup = NULL;
357 jsonrpc_server_group_t* wgroup = NULL;
358
359 cgroup=*grp;
360 while(cgroup!=NULL) {
361 pgroup=cgroup->sub_group;
362 while(pgroup!=NULL) {
363 wgroup=pgroup->sub_group;
364 while(wgroup!=NULL) {
365 next = wgroup->next;
366 CHECK_AND_FREE(wgroup);
367 wgroup = next;
368 }
369 next = pgroup->next;
370 CHECK_AND_FREE(pgroup);
371 pgroup = next;
372 }
373 next = cgroup->next;
374 CHECK_AND_FREE(cgroup->conn.s);
375 CHECK_AND_FREE(cgroup);
376 cgroup = next;
377 }
378 }
379
insert_server_group(jsonrpc_server_group_t * new_grp,jsonrpc_server_group_t ** parent)380 int insert_server_group(jsonrpc_server_group_t* new_grp,
381 jsonrpc_server_group_t** parent)
382 {
383 if(parent == NULL) {
384 ERR("Trying to insert into NULL group\n");
385 return -1;
386 }
387
388 jsonrpc_server_group_t* head = *parent;
389
390 if (head == NULL) {
391 *parent = new_grp;
392 } else {
393 if (new_grp->type != head->type) {
394 ERR("Inserting group (%d) into the wrong type of list (%d)\n",
395 new_grp->type, head->type);
396 return -1;
397 }
398
399 jsonrpc_server_group_t* current = head;
400 jsonrpc_server_group_t** prev = parent;
401
402 while (1) {
403 if(new_grp->type == PRIORITY_GROUP
404 && new_grp->priority < current->priority) {
405 /* Priority groups are organized in ascending order.*/
406 new_grp->next = current;
407 *prev = new_grp;
408 break;
409 } else if (new_grp->type == WEIGHT_GROUP ) {
410 /* Weight groups are special in how they are organized in order
411 * to facilitate load balancing and weighted random selection.
412 *
413 * The weight in the head of a weight group list represents
414 * the total weight of the list. Subsequent nodes represent the
415 * remaining total.
416 *
417 * In order to achieve this, the weight to be inserted is added
418 * to each node that is passed before insertion.
419 *
420 * Weight groups are organized in descending order.
421 *
422 * The actual weight of a node can be found in its server.
423 * */
424 if(new_grp->server == NULL) {
425 ERR("Trying to insert an empty weight group.\n");
426 return -1;
427 }
428 if(new_grp->server->weight != new_grp->weight) {
429 ERR("Weight of the new node (%d) doesn't match its server (%d). This is a bug. Please report this to the maintainer.\n",
430 new_grp->server->weight, new_grp->weight);
431 return -1;
432 }
433 if(new_grp->weight > current->server->weight) {
434 new_grp->weight += current->weight;
435 new_grp->next = current;
436 *prev = new_grp;
437 break;
438 } else {
439 current->weight += new_grp->weight;
440 }
441 }
442
443 if(current->next == NULL) {
444 current->next = new_grp;
445 break;
446 }
447 prev = &((*prev)->next); // This is madness. Madness? THIS IS POINTERS!
448 current = current->next;
449 }
450 }
451 return 0;
452 }
453
server_group_size(jsonrpc_server_group_t * grp)454 unsigned int server_group_size(jsonrpc_server_group_t* grp)
455 {
456 unsigned int size = 0;
457 for(;grp != NULL; grp=grp->next) {
458 size++;
459 }
460 return size;
461 }
462
create_server()463 jsonrpc_server_t* create_server()
464 {
465 jsonrpc_server_t* server = shm_malloc(sizeof(jsonrpc_server_t));
466 CHECK_MALLOC_NULL(server);
467 memset(server, 0, sizeof(jsonrpc_server_t));
468
469 server->priority = JSONRPC_DEFAULT_PRIORITY;
470 server->weight = JSONRPC_DEFAULT_WEIGHT;
471 server->status = JSONRPC_SERVER_DISCONNECTED;
472
473 return server;
474 }
475
free_server(jsonrpc_server_t * server)476 void free_server(jsonrpc_server_t* server)
477 {
478 if(!server)
479 return;
480
481 CHECK_AND_FREE(server->conn.s);
482 CHECK_AND_FREE(server->addr.s);
483 CHECK_AND_FREE(server->srv.s);
484
485 if ((server->buffer)!=NULL) free_netstring(server->buffer);
486 memset(server, 0, sizeof(jsonrpc_server_t));
487 shm_free(server);
488 server = NULL;
489 }
490
server_eq(jsonrpc_server_t * a,jsonrpc_server_t * b)491 int server_eq(jsonrpc_server_t* a, jsonrpc_server_t* b)
492 {
493 if(!a || !b)
494 return 0;
495
496 if(!STR_EQ(a->conn, b->conn)) return 0;
497 if(!STR_EQ(a->srv, b->srv)) return 0;
498 if(!STR_EQ(a->addr, b->addr)) return 0;
499 if(a->port != b->port) return 0;
500 if(a->priority != b->priority) return 0;
501 if(a->weight != b->weight) return 0;
502
503 return 1;
504 }
505
jsonrpc_add_server(jsonrpc_server_t * server,jsonrpc_server_group_t ** group_ptr)506 int jsonrpc_add_server(jsonrpc_server_t* server, jsonrpc_server_group_t** group_ptr)
507 {
508 jsonrpc_server_group_t* conn_grp = NULL;
509 jsonrpc_server_group_t* priority_grp = NULL;
510 jsonrpc_server_group_t* weight_grp = NULL;
511
512 if(group_ptr == NULL) {
513 ERR("Trying to add server to null group\n");
514 return -1;
515 }
516
517 if(create_server_group(WEIGHT_GROUP, &weight_grp) < 0) goto error;
518
519 weight_grp->weight = server->weight;
520 weight_grp->server = server;
521
522 /* find conn group */
523 for (conn_grp=*group_ptr; conn_grp != NULL; conn_grp=conn_grp->next) {
524 if (strncmp(conn_grp->conn.s, server->conn.s, server->conn.len) == 0)
525 break;
526 }
527
528 if (conn_grp == NULL) {
529 if(create_server_group(CONN_GROUP, &conn_grp) < 0) goto error;
530 if(create_server_group(PRIORITY_GROUP, &priority_grp) < 0) goto error;
531
532 priority_grp->priority = server->priority;
533 priority_grp->sub_group = weight_grp;
534
535 shm_str_dup(&conn_grp->conn, &server->conn);
536 CHECK_MALLOC_GOTO(conn_grp->conn.s, error);
537
538 conn_grp->sub_group = priority_grp;
539 if(insert_server_group(conn_grp, group_ptr) < 0) goto error;
540 goto success;
541 }
542
543 /* find priority group */
544 for (priority_grp=conn_grp->sub_group;
545 priority_grp != NULL;
546 priority_grp=priority_grp->next) {
547 if (priority_grp->priority == server->priority) break;
548 }
549
550 if (priority_grp == NULL) {
551 if(create_server_group(PRIORITY_GROUP, &priority_grp) < 0) goto error;
552
553 priority_grp->priority = server->priority;
554 priority_grp->sub_group = weight_grp;
555
556 if(insert_server_group(priority_grp, &(conn_grp->sub_group)) < 0) goto error;
557 goto success;
558 }
559
560 if(insert_server_group(weight_grp, &(priority_grp->sub_group)) < 0) goto error;
561
562 success:
563 return 0;
564 error:
565 ERR("Failed to add server: %s, %s, %d\n",
566 server->conn.s, server->addr.s, server->port);
567 CHECK_AND_FREE(conn_grp);
568 CHECK_AND_FREE(priority_grp);
569 CHECK_AND_FREE(weight_grp);
570 CHECK_AND_FREE(server);
571 return -1;
572 }
573
addto_server_list(jsonrpc_server_t * server,server_list_t ** list)574 void addto_server_list(jsonrpc_server_t* server, server_list_t** list)
575 {
576 server_list_t* new_node = (server_list_t*)pkg_malloc(sizeof(server_list_t));
577 CHECK_MALLOC_VOID(new_node);
578
579 new_node->server = server;
580 new_node->next = NULL;
581
582 if (*list == NULL) {
583 *list = new_node;
584 return;
585 }
586
587 server_list_t* node = *list;
588 for(; node->next!=NULL; node=node->next);
589
590 node->next = new_node;
591 }
592
free_server_list(server_list_t * list)593 void free_server_list(server_list_t* list)
594 {
595 if (!list)
596 return;
597
598 server_list_t* node = NULL;
599 server_list_t* next = NULL;
600 for(node=list; node!=NULL; node=next)
601 {
602 next = node->next;
603 pkg_free(node);
604 }
605 }
606
close_server(jsonrpc_server_t * server)607 void close_server(jsonrpc_server_t* server)
608 {
609 if(!server)
610 return;
611
612 INFO("Closing server %.*s:%d for conn %.*s.\n",
613 STR(server->addr), server->port, STR(server->conn));
614 force_disconnect(server);
615
616 free_server(server);
617 }
618
619