1 /**
2  * Copyright (C) 2013 Flowroute LLC (flowroute.com)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * This file is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  *
12  * This file is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22 
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <string.h>
27 #include <fcntl.h>
28 #include <event.h>
29 
30 #include "../../core/sr_module.h"
31 #include "../../core/route.h"
32 #include "../../core/route_struct.h"
33 #include "../../core/resolve.h"
34 #include "../../core/parser/parse_param.h"
35 #include "../../core/mem/mem.h"
36 #include "../../core/lvalue.h"
37 
38 #include "netstring.h"
39 #include "janssonrpc.h"
40 #include "janssonrpc_request.h"
41 #include "janssonrpc_io.h"
42 #include "janssonrpc_srv.h"
43 #include "janssonrpc_server.h"
44 #include "janssonrpc_connect.h"
45 
46 gen_lock_t* jsonrpc_server_group_lock = NULL;
47 
48 /* where all the servers are stored */
49 jsonrpc_server_group_t** global_server_group = NULL;
50 
51 /* used for debugging only */
print_server(jsonrpc_server_t * server)52 void print_server(jsonrpc_server_t* server)
53 {
54 	INFO("\t----- server ------\n");
55 	INFO("\t|pointer: %p\n", server);
56 	INFO("\t|conn: %.*s\n", STR(server->conn));
57 	INFO("\t|addr: %.*s\n", STR(server->addr));
58 	switch (server->status) {
59 	case JSONRPC_SERVER_CONNECTED:
60 		INFO("\t|status: connected\n");
61 		break;
62 	case JSONRPC_SERVER_DISCONNECTED:
63 		INFO("\t|status: disconnected\n");
64 		break;
65 	case JSONRPC_SERVER_FAILURE:
66 		INFO("\t|status: failure\n");
67 		break;
68 	case JSONRPC_SERVER_CLOSING:
69 		INFO("\t|status: closing\n");
70 		break;
71 	case JSONRPC_SERVER_RECONNECTING:
72 		INFO("\t|status: reconnecting\n");
73 		break;
74 	default:
75 		INFO("\t|status: invalid (%d)\n", server->status);
76 		break;
77 	}
78 	INFO("\t|srv: %.*s\n", STR(server->srv));
79 	INFO("\t|ttl: %d\n", server->ttl);
80 	INFO("\t|port: %d\n", server->port);
81 	INFO("\t|priority: %d\n", server->priority);
82 	INFO("\t|weight: %d\n", server->weight);
83 	INFO("\t|hwm: %d\n", server->hwm);
84 	INFO("\t|req_count: %d\n", server->req_count);
85 	if(server->added) {
86 		INFO("\t|added: true\n");
87 	} else {
88 		INFO("\t|added: false\n");
89 	}
90 	INFO("\t-------------------\n");
91 }
92 
93 /* used for debugging only */
print_group(jsonrpc_server_group_t ** group)94 void print_group(jsonrpc_server_group_t** group)
95 {
96 	jsonrpc_server_group_t* grp = NULL;
97 
98 	INFO("group addr is %p\n", group);
99 
100 	if(group == NULL)
101 		return;
102 
103 	for (grp=*group; grp != NULL; grp=grp->next) {
104 		switch(grp->type) {
105 		case CONN_GROUP:
106 			INFO("Connection group: %.*s\n", STR(grp->conn));
107 			print_group(&(grp->sub_group));
108 			break;
109 		case PRIORITY_GROUP:
110 			INFO("Priority group: %d\n", grp->priority);
111 			print_group(&(grp->sub_group));
112 			break;
113 		case WEIGHT_GROUP:
114 			INFO("Weight group: %d\n", grp->weight);
115 			print_server(grp->server);
116 			break;
117 		}
118 	}
119 }
120 
jsonrpc_parse_server(char * server_s,jsonrpc_server_group_t ** group_ptr)121 int jsonrpc_parse_server(char* server_s, jsonrpc_server_group_t **group_ptr)
122 {
123 	if(group_ptr == NULL) {
124 		ERR("Trying to add server to null group ptr\n");
125 		return -1;
126 	}
127 
128 	str s;
129 	param_hooks_t phooks;
130 	param_t* pit=NULL;
131 	param_t* freeme=NULL;
132 	str conn;
133 	str addr;
134 	addr.s = NULL;
135 	str srv;
136 	srv.s = NULL;
137 
138 	unsigned int priority = JSONRPC_DEFAULT_PRIORITY;
139 	unsigned int weight = JSONRPC_DEFAULT_WEIGHT;
140 	unsigned int hwm = JSONRPC_DEFAULT_HWM;
141 	unsigned int port = 0;
142 
143 	s.s = server_s;
144 	s.len = strlen(server_s);
145 	if (s.s[s.len-1] == ';')
146 		s.len--;
147 
148 	if (parse_params(&s, CLASS_ANY, &phooks, &pit)<0) {
149 		ERR("Failed parsing params value\n");
150 		return -1;
151 	}
152 
153 	freeme = pit;
154 
155 	for (; pit;pit=pit->next)
156 	{
157 		if PIT_MATCHES("conn") {
158 			shm_str_dup(&conn, &pit->body);
159 			CHECK_MALLOC(conn.s);
160 
161 		} else if PIT_MATCHES("srv") {
162 			shm_str_dup(&srv, &pit->body);
163 			CHECK_MALLOC(srv.s);
164 
165 		} else if PIT_MATCHES("addr") {
166 			shm_str_dup(&addr, &pit->body);
167 			CHECK_MALLOC(addr.s);
168 
169 		} else if PIT_MATCHES("port") {
170 			port = atoi(pit->body.s);
171 
172 		} else if PIT_MATCHES("priority") {
173 			priority = atoi(pit->body.s);
174 
175 		} else if PIT_MATCHES("weight") {
176 			weight = atoi(pit->body.s);
177 
178 		} else if PIT_MATCHES("hwm") {
179 			hwm = atoi(pit->body.s);
180 
181 		} else if PIT_MATCHES("proto") {
182 			if(strncmp(pit->body.s, "tcp", sizeof("tcp")-1) != 0) {
183 				ERR("Unsupported proto=%.*s. Only tcp is supported.\n",
184 						STR(pit->body));
185 				goto error;
186 			}
187 		} else {
188 			ERR("Unrecognized parameter: %.*s\n", STR(pit->name));
189 			goto error;
190 		}
191 
192 		DEBUG("%.*s = %.*s\n", STR(pit->name), STR(pit->body));
193 	}
194 
195 	if(conn.s == NULL) {
196 		ERR("No conn defined! conn parameter is required.\n");
197 		goto error;
198 	}
199 
200 	if (srv.s != NULL) {
201 		if (addr.s != NULL
202 			|| port != 0
203 			|| weight != JSONRPC_DEFAULT_WEIGHT
204 			|| priority != JSONRPC_DEFAULT_PRIORITY) {
205 			ERR("addr, port, weight, and priority are not supported when using srv\n");
206 			goto error;
207 		}
208 
209 		if (jsonrpc_server_from_srv(conn, srv, hwm, group_ptr)<0) goto error;
210 
211 	} else {
212 
213 		if (addr.s == NULL || port == 0) {
214 			ERR("no address/port defined\n");
215 			goto error;
216 		}
217 
218 		jsonrpc_server_t* server = create_server();
219 		CHECK_MALLOC(server);
220 
221 		server->conn = conn;
222 		server->addr = addr;
223 		server->port = port;
224 		server->priority = priority;
225 		server->weight = weight;
226 		server->hwm = hwm;
227 
228 		if(jsonrpc_add_server(server, group_ptr)<0) goto error;
229 	}
230 
231 	//print_group(group_ptr); /* debug */
232 
233 	CHECK_AND_FREE(srv.s);
234 	if (freeme) free_params(freeme);
235 	return 0;
236 
237 error:
238 	CHECK_AND_FREE(srv.s);
239 	if (freeme) free_params(freeme);
240 	return -1;
241 }
242 
jsonrpc_server_from_srv(str conn,str srv,unsigned int hwm,jsonrpc_server_group_t ** group_ptr)243 int jsonrpc_server_from_srv(str conn, str srv,
244 		unsigned int hwm, jsonrpc_server_group_t** group_ptr)
245 {
246 	struct rdata *l, *head;
247 	struct srv_rdata *srv_record;
248 	str name;
249 	unsigned int ttl = jsonrpc_min_srv_ttl;
250 
251 	jsonrpc_server_t* server = NULL;
252 
253 	resolv_init();
254 
255 	head = get_record(srv.s, T_SRV, RES_AR);
256 	if (head == NULL) {
257 		ERR("No SRV record returned for %.*s\n", STR(srv));
258 		goto error;
259 	}
260 	for (l=head; l; l=l->next) {
261 		if (l->type != T_SRV)
262 			continue;
263 		srv_record = (struct srv_rdata*)l->rdata;
264 		if (srv_record == NULL) {
265 			ERR("BUG: null rdata\n");
266 			goto error;
267 		}
268 
269 		if (l->ttl < jsonrpc_min_srv_ttl) {
270 			ttl = jsonrpc_min_srv_ttl;
271 		} else {
272 			ttl = l->ttl;
273 		}
274 
275 		name.s = srv_record->name;
276 		name.len = srv_record->name_len;
277 
278 		DBG("server %s\n", srv_record->name);
279 
280 		server = create_server();
281 		CHECK_MALLOC(server);
282 
283 		shm_str_dup(&server->conn, &conn);
284 		CHECK_MALLOC_GOTO(server->conn.s, error);
285 
286 		shm_str_dup(&server->addr, &name);
287 		CHECK_MALLOC_GOTO(server->addr.s, error);
288 
289 		shm_str_dup(&server->srv, &srv);
290 		CHECK_MALLOC_GOTO(server->srv.s, error);
291 
292 		server->port = srv_record->port;
293 		server->priority = srv_record->priority;
294 		server->weight = srv_record->weight;
295 		server->ttl = ttl;
296 		server->hwm = hwm;
297 
298 		if(jsonrpc_add_server(server, group_ptr)<0) goto error;
299 	}
300 
301 	jsonrpc_srv_t* new_srv = create_srv(srv, conn, ttl);
302 	addto_srv_list(new_srv, &global_srv_list);
303 
304 	free_rdata_list(head);
305 
306 	return 0;
307 error:
308 	CHECK_AND_FREE(server);
309 	if (head) free_rdata_list(head);
310 
311 	return -1;
312 }
313 
create_server_group(server_group_t type,jsonrpc_server_group_t ** grp)314 int create_server_group(server_group_t type, jsonrpc_server_group_t** grp)
315 {
316 	if(grp == NULL) {
317 		ERR("Trying to dereference null group pointer\n");
318 		return -1;
319 	}
320 
321 	jsonrpc_server_group_t* new_grp =
322 		shm_malloc(sizeof(jsonrpc_server_group_t));
323 	CHECK_MALLOC(new_grp);
324 
325 	switch(type) {
326 	case CONN_GROUP:
327 		DEBUG("Creating new connection group\n");
328 		new_grp->conn.s = NULL;
329 		new_grp->conn.len = 0;
330 		break;
331 	case PRIORITY_GROUP:
332 		DEBUG("Creating new priority group\n");
333 		new_grp->priority = JSONRPC_DEFAULT_PRIORITY;
334 		break;
335 	case WEIGHT_GROUP:
336 		DEBUG("Creating new weight group\n");
337 		new_grp->server = NULL;
338 		new_grp->weight = JSONRPC_DEFAULT_WEIGHT;
339 		break;
340 	}
341 
342 	new_grp->next = NULL;
343 	new_grp->sub_group = NULL;
344 	new_grp->type = type;
345 	*grp = new_grp;
346 	return 0;
347 }
348 
free_server_group(jsonrpc_server_group_t ** grp)349 void free_server_group(jsonrpc_server_group_t** grp)
350 {
351 	if(grp == NULL)
352 		return;
353 
354 	jsonrpc_server_group_t* next = NULL;
355 	jsonrpc_server_group_t* cgroup = NULL;
356 	jsonrpc_server_group_t* pgroup = NULL;
357 	jsonrpc_server_group_t* wgroup = NULL;
358 
359 	cgroup=*grp;
360 	while(cgroup!=NULL) {
361 		pgroup=cgroup->sub_group;
362 		while(pgroup!=NULL) {
363 			wgroup=pgroup->sub_group;
364 			while(wgroup!=NULL) {
365 				next = wgroup->next;
366 				CHECK_AND_FREE(wgroup);
367 				wgroup = next;
368 			}
369 			next = pgroup->next;
370 			CHECK_AND_FREE(pgroup);
371 			pgroup = next;
372 		}
373 		next = cgroup->next;
374 		CHECK_AND_FREE(cgroup->conn.s);
375 		CHECK_AND_FREE(cgroup);
376 		cgroup = next;
377 	}
378 }
379 
insert_server_group(jsonrpc_server_group_t * new_grp,jsonrpc_server_group_t ** parent)380 int insert_server_group(jsonrpc_server_group_t* new_grp,
381 		jsonrpc_server_group_t** parent)
382 {
383 	if(parent == NULL) {
384 		ERR("Trying to insert into NULL group\n");
385 		return -1;
386 	}
387 
388 	jsonrpc_server_group_t* head = *parent;
389 
390 	if (head == NULL) {
391 		*parent = new_grp;
392 	} else {
393 		if (new_grp->type != head->type) {
394 			ERR("Inserting group (%d) into the wrong type of list (%d)\n",
395 				new_grp->type, head->type);
396 			return -1;
397 		}
398 
399 		jsonrpc_server_group_t* current = head;
400 		jsonrpc_server_group_t** prev = parent;
401 
402 		while (1) {
403 			if(new_grp->type == PRIORITY_GROUP
404 					&& new_grp->priority < current->priority) {
405 			 /* Priority groups are organized in ascending order.*/
406 				new_grp->next = current;
407 				*prev = new_grp;
408 				break;
409 			} else if (new_grp->type == WEIGHT_GROUP ) {
410 			/* Weight groups are special in how they are organized in order
411 		     * to facilitate load balancing and weighted random selection.
412 			 *
413 			 * The weight in the head of a weight group list represents
414 			 * the total weight of the list. Subsequent nodes represent the
415 			 * remaining total.
416 			 *
417 			 * In order to achieve this, the weight to be inserted is added
418 			 * to each node that is passed before insertion.
419 			 *
420 			 * Weight groups are organized in descending order.
421 			 *
422 			 * The actual weight of a node can be found in its server.
423 			 * */
424 				if(new_grp->server == NULL) {
425 					ERR("Trying to insert an empty weight group.\n");
426 					return -1;
427 				}
428 				if(new_grp->server->weight != new_grp->weight) {
429 					ERR("Weight of the new node (%d) doesn't match its server (%d). This is a bug. Please report this to the maintainer.\n",
430 							new_grp->server->weight, new_grp->weight);
431 					return -1;
432 				}
433 				if(new_grp->weight > current->server->weight) {
434 					new_grp->weight += current->weight;
435 					new_grp->next = current;
436 					*prev = new_grp;
437 					break;
438 				} else {
439 					current->weight += new_grp->weight;
440 				}
441 			}
442 
443 			if(current->next == NULL) {
444 				current->next = new_grp;
445 				break;
446 			}
447 			prev = &((*prev)->next); // This is madness. Madness? THIS IS POINTERS!
448 			current = current->next;
449 		}
450 	}
451 	return 0;
452 }
453 
server_group_size(jsonrpc_server_group_t * grp)454 unsigned int server_group_size(jsonrpc_server_group_t* grp)
455 {
456 	unsigned int size = 0;
457 	for(;grp != NULL; grp=grp->next) {
458 		size++;
459 	}
460 	return size;
461 }
462 
create_server()463 jsonrpc_server_t* create_server()
464 {
465 	jsonrpc_server_t* server = shm_malloc(sizeof(jsonrpc_server_t));
466 	CHECK_MALLOC_NULL(server);
467 	memset(server, 0, sizeof(jsonrpc_server_t));
468 
469 	server->priority = JSONRPC_DEFAULT_PRIORITY;
470 	server->weight = JSONRPC_DEFAULT_WEIGHT;
471 	server->status = JSONRPC_SERVER_DISCONNECTED;
472 
473 	return server;
474 }
475 
free_server(jsonrpc_server_t * server)476 void free_server(jsonrpc_server_t* server)
477 {
478 	if(!server)
479 		return;
480 
481 	CHECK_AND_FREE(server->conn.s);
482 	CHECK_AND_FREE(server->addr.s);
483 	CHECK_AND_FREE(server->srv.s);
484 
485 	if ((server->buffer)!=NULL) free_netstring(server->buffer);
486 	memset(server, 0, sizeof(jsonrpc_server_t));
487 	shm_free(server);
488 	server = NULL;
489 }
490 
server_eq(jsonrpc_server_t * a,jsonrpc_server_t * b)491 int server_eq(jsonrpc_server_t* a, jsonrpc_server_t* b)
492 {
493 	if(!a || !b)
494 		return 0;
495 
496 	if(!STR_EQ(a->conn, b->conn)) return 0;
497 	if(!STR_EQ(a->srv, b->srv)) return 0;
498 	if(!STR_EQ(a->addr, b->addr)) return 0;
499 	if(a->port != b->port) return 0;
500 	if(a->priority != b->priority) return 0;
501 	if(a->weight != b->weight) return 0;
502 
503 	return 1;
504 }
505 
jsonrpc_add_server(jsonrpc_server_t * server,jsonrpc_server_group_t ** group_ptr)506 int jsonrpc_add_server(jsonrpc_server_t* server, jsonrpc_server_group_t** group_ptr)
507 {
508 	jsonrpc_server_group_t* conn_grp = NULL;
509 	jsonrpc_server_group_t* priority_grp = NULL;
510 	jsonrpc_server_group_t* weight_grp = NULL;
511 
512 	if(group_ptr == NULL) {
513 		ERR("Trying to add server to null group\n");
514 		return -1;
515 	}
516 
517 	if(create_server_group(WEIGHT_GROUP, &weight_grp) < 0) goto error;
518 
519 	weight_grp->weight = server->weight;
520 	weight_grp->server = server;
521 
522 	/* find conn group */
523 	for (conn_grp=*group_ptr; conn_grp != NULL; conn_grp=conn_grp->next) {
524 		if (strncmp(conn_grp->conn.s, server->conn.s, server->conn.len) == 0)
525 			break;
526 	}
527 
528 	if (conn_grp == NULL) {
529 		if(create_server_group(CONN_GROUP, &conn_grp) < 0) goto error;
530 		if(create_server_group(PRIORITY_GROUP, &priority_grp) < 0) goto error;
531 
532 		priority_grp->priority = server->priority;
533 		priority_grp->sub_group = weight_grp;
534 
535 		shm_str_dup(&conn_grp->conn, &server->conn);
536 		CHECK_MALLOC_GOTO(conn_grp->conn.s, error);
537 
538 		conn_grp->sub_group = priority_grp;
539 		if(insert_server_group(conn_grp, group_ptr) < 0) goto error;
540 		goto success;
541 	}
542 
543 	/* find priority group */
544 	for (priority_grp=conn_grp->sub_group;
545 			priority_grp != NULL;
546 			priority_grp=priority_grp->next) {
547 		if (priority_grp->priority == server->priority) break;
548 	}
549 
550 	if (priority_grp == NULL) {
551 		if(create_server_group(PRIORITY_GROUP, &priority_grp) < 0) goto error;
552 
553 		priority_grp->priority = server->priority;
554 		priority_grp->sub_group = weight_grp;
555 
556 		if(insert_server_group(priority_grp, &(conn_grp->sub_group)) < 0) goto error;
557 		goto success;
558 	}
559 
560 	if(insert_server_group(weight_grp, &(priority_grp->sub_group)) < 0) goto error;
561 
562 success:
563 	return 0;
564 error:
565 	ERR("Failed to add server: %s, %s, %d\n",
566 			server->conn.s, server->addr.s, server->port);
567 	CHECK_AND_FREE(conn_grp);
568 	CHECK_AND_FREE(priority_grp);
569 	CHECK_AND_FREE(weight_grp);
570 	CHECK_AND_FREE(server);
571 	return -1;
572 }
573 
addto_server_list(jsonrpc_server_t * server,server_list_t ** list)574 void addto_server_list(jsonrpc_server_t* server, server_list_t** list)
575 {
576 	server_list_t* new_node = (server_list_t*)pkg_malloc(sizeof(server_list_t));
577 	CHECK_MALLOC_VOID(new_node);
578 
579 	new_node->server = server;
580 	new_node->next = NULL;
581 
582 	if (*list == NULL) {
583 		*list = new_node;
584 		return;
585 	}
586 
587 	server_list_t* node = *list;
588 	for(; node->next!=NULL; node=node->next);
589 
590 	node->next = new_node;
591 }
592 
free_server_list(server_list_t * list)593 void free_server_list(server_list_t* list)
594 {
595 	if (!list)
596 		return;
597 
598 	server_list_t* node = NULL;
599 	server_list_t* next = NULL;
600 	for(node=list; node!=NULL; node=next)
601 	{
602 		next = node->next;
603 		pkg_free(node);
604 	}
605 }
606 
close_server(jsonrpc_server_t * server)607 void close_server(jsonrpc_server_t* server)
608 {
609 	if(!server)
610 		return;
611 
612 	INFO("Closing server %.*s:%d for conn %.*s.\n",
613 			STR(server->addr), server->port, STR(server->conn));
614 	force_disconnect(server);
615 
616 	free_server(server);
617 }
618 
619