1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2015, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Tamas Cseke <cstomi.levlist@gmail.com>
27  * Christopher Rienzo <crienzo@grasshopper.com>
28  *
29  * mod_mongo.c -- API for MongoDB
30  *
31  */
32 #include <switch.h>
33 
34 #ifndef MAX
35 /* libbson will define MIN/MAX in a way that won't compile in FS */
36 #define MAX(a,b) (((a)>(b))?(a):(b))
37 #define MIN(a,b) (((a)<(b))?(a):(b))
38 #include <mongoc.h>
39 #undef MAX
40 #undef MIN
41 #else
42 #include <mongoc.h>
43 #endif
44 
45 #define DELIMITER ';'
46 #define FIND_ONE_SYNTAX  "mongo_find_one ns; query; fields; options"
47 #define FIND_N_SYNTAX "mongo_find_n ns; query; fields; options; n"
48 #define MAPREDUCE_SYNTAX "mongo_mapreduce ns; query"
49 
50 SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load);
51 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown);
52 SWITCH_MODULE_RUNTIME_FUNCTION(mod_mongo_runtime);
53 SWITCH_MODULE_DEFINITION(mod_mongo, mod_mongo_load, mod_mongo_shutdown, mod_mongo_runtime);
54 
55 static struct {
56 	int shutdown;
57 	const char *map;
58 	const char *reduce;
59 	const char *finalize;
60 	const char *conn_str;
61 	mongoc_client_pool_t *client_pool;
62 	const char *limit_database;
63 	const char *limit_collection;
64 	const char *limit_conn_str;
65 	int limit_cleanup_interval_sec;
66 	mongoc_client_pool_t *limit_client_pool;
67 	switch_mutex_t *mod_mongo_private_mutex;
68 	switch_thread_rwlock_t *limit_database_rwlock;
69 	switch_thread_rwlock_t *shutdown_rwlock;
70 } globals;
71 
72 /**
73  * resources acquired by this session
74  */
75 struct mod_mongo_private {
76 	switch_hash_t *resources;
77 	switch_mutex_t *mutex;
78 };
79 
80 /**
81  * @param query_options_str
82  * @return query options
83  */
parse_query_options(char * query_options_str)84 static int parse_query_options(char *query_options_str)
85 {
86 	int query_options = MONGOC_QUERY_NONE;
87 	if (strstr(query_options_str, "cursorTailable")) {
88 		query_options |= MONGOC_QUERY_TAILABLE_CURSOR;
89 	}
90 	if (strstr(query_options_str, "slaveOk")) {
91 		query_options |= MONGOC_QUERY_SLAVE_OK;
92 	}
93 	if (strstr(query_options_str, "oplogReplay")) {
94 		query_options |= MONGOC_QUERY_OPLOG_REPLAY;
95 	}
96 	if (strstr(query_options_str, "noCursorTimeout")) {
97 		query_options |= MONGOC_QUERY_NO_CURSOR_TIMEOUT;
98 	}
99 	if (strstr(query_options_str, "awaitData")) {
100 		query_options |= MONGOC_QUERY_AWAIT_DATA;
101 	}
102 	if (strstr(query_options_str, "exhaust")) {
103 		query_options |= MONGOC_QUERY_EXHAUST;
104 	}
105 	if (strstr(query_options_str, "partialResults")) {
106 		query_options |= MONGOC_QUERY_PARTIAL;
107 	}
108 	return query_options;
109 }
110 
111 /**
112  * @return a new connection to mongodb or NULL if error
113  */
get_connection(mongoc_client_pool_t * client_pool,const char * conn_str)114 static mongoc_client_t *get_connection(mongoc_client_pool_t *client_pool, const char *conn_str)
115 {
116 	mongoc_client_t *client = mongoc_client_pool_pop(client_pool);
117 	if (!client) {
118 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to get connection to: %s\n", conn_str);
119 		return NULL;
120 	}
121 	/* TODO auth */
122 	return client;
123 }
124 
125 /**
126  * Mark connection as finished
127  */
connection_done(mongoc_client_pool_t * client_pool,mongoc_client_t * conn)128 static void connection_done(mongoc_client_pool_t *client_pool, mongoc_client_t *conn)
129 {
130 	mongoc_client_pool_push(client_pool, conn);
131 }
132 
SWITCH_STANDARD_API(mod_mongo_mapreduce_function)133 SWITCH_STANDARD_API(mod_mongo_mapreduce_function)
134 {
135 	switch_status_t status = SWITCH_STATUS_SUCCESS;
136 	char *db = NULL, *collection = NULL, *json_query = NULL;
137 
138 	db = strdup(cmd);
139 	switch_assert(db != NULL);
140 
141 	if ((collection = strchr(db, '.'))) {
142 		*collection++ = '\0';
143 		if ((json_query = strchr(collection, DELIMITER))) {
144 			*json_query++ = '\0';
145 		}
146 	}
147 
148 	if (!zstr(db) && !zstr(collection) && !zstr(json_query)) {
149 		mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str);
150 		if (conn) {
151 			bson_error_t error;
152 			bson_t *query = bson_new_from_json((uint8_t *)json_query, strlen(json_query), &error);
153 			if (query) {
154 				bson_t out;
155 				bson_t cmd;
156 				bson_t child;
157 
158 				/* build command to send to mongodb */
159 				bson_init(&cmd);
160 				BSON_APPEND_UTF8(&cmd, "mapreduce", collection);
161 				if (!zstr(globals.map)) {
162 					BSON_APPEND_CODE(&cmd, "map", globals.map);
163 				}
164 				if (!zstr(globals.reduce)) {
165 					BSON_APPEND_CODE(&cmd, "reduce", globals.reduce);
166 				}
167 				if (!zstr(globals.finalize)) {
168 					BSON_APPEND_CODE(&cmd, "finalize", globals.finalize);
169 				}
170 				if (!bson_empty(query)) {
171 					BSON_APPEND_DOCUMENT(&cmd, "query", query);
172 				}
173 				bson_append_document_begin(&cmd, "out", strlen("out"), &child);
174 				BSON_APPEND_INT32(&child, "inline", 1);
175 				bson_append_document_end(&cmd, &child);
176 
177 				/* send command and get result */
178 				if (mongoc_client_command_simple(conn, db, &cmd, NULL /* read prefs */, &out, &error)) {
179 					char *json_result = bson_as_json(&out, NULL);
180 					stream->write_function(stream, "-OK\n%s\n", json_result);
181 					bson_free(json_result);
182 				} else {
183 					stream->write_function(stream, "-ERR\nmongo_run_command failed!\n");
184 				}
185 
186 				bson_destroy(query);
187 				bson_destroy(&cmd);
188 				bson_destroy(&out);
189 			} else {
190 				stream->write_function(stream, "-ERR\nfailed to parse query!\n");
191 			}
192 			connection_done(globals.client_pool, conn);
193 		} else {
194 			stream->write_function(stream, "-ERR\nfailed to get connection!\n");
195 		}
196 	} else {
197 		stream->write_function(stream, "-ERR\n%s\n", MAPREDUCE_SYNTAX);
198 	}
199 
200 	switch_safe_free(db);
201 
202 	return status;
203 }
204 
SWITCH_STANDARD_API(mod_mongo_find_n_function)205 SWITCH_STANDARD_API(mod_mongo_find_n_function)
206 {
207 	switch_status_t status = SWITCH_STATUS_SUCCESS;
208 	char *db = NULL, *collection = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL;
209 	int query_options = 0;
210 	int n = 1;
211 
212 	db = strdup(cmd);
213 	switch_assert(db != NULL);
214 
215 	if ((collection = strchr(db, '.'))) {
216 		*collection++ = '\0';
217 		if ((json_query = strchr(collection, DELIMITER))) {
218 			*json_query++ = '\0';
219 			if ((json_fields = strchr(json_query, DELIMITER))) {
220 				*json_fields++ = '\0';
221 				if ((query_options_str = strchr(json_fields, DELIMITER))) {
222 					char *n_str;
223 					*query_options_str++ = '\0';
224 					if (!zstr(query_options_str)) {
225 						query_options = parse_query_options(query_options_str);
226 					}
227 					if ((n_str = strchr(query_options_str, DELIMITER))) {
228 						*n_str++ = '\0';
229 						if (switch_is_number(n_str)) {
230 							n = atoi(n_str);
231 							if (n < 1) {
232 								n = 1;
233 							}
234 						}
235 					}
236 				}
237 			}
238 		}
239 	}
240 
241 	if (!zstr(db) && !zstr(collection) && !zstr(json_query) && !zstr(json_fields)) {
242 		bson_error_t error;
243 		mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str);
244 		if (conn) {
245 			mongoc_collection_t *col = mongoc_client_get_collection(conn, db, collection);
246 			if (col) {
247 				bson_t *query = bson_new_from_json((uint8_t *)json_query, strlen(json_query), &error);
248 				bson_t *fields = bson_new_from_json((uint8_t *)json_fields, strlen(json_fields), &error);
249 				if (query && fields) {
250 					/* send query */
251 					mongoc_cursor_t *cursor = mongoc_collection_find(col, query_options, 0, n, 0, query, fields, NULL);
252 					if (cursor && !mongoc_cursor_error(cursor, &error)) {
253 						/* get results from cursor */
254 						switch_stream_handle_t result_stream = { 0 };
255 						const bson_t *result;
256 						SWITCH_STANDARD_STREAM(result_stream);
257 
258 						if (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) {
259 							char *json_result;
260 							json_result = bson_as_json(result, NULL);
261 							result_stream.write_function(&result_stream, "%s", json_result);
262 							bson_free(json_result);
263 						}
264 						while (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) {
265 							char *json_result;
266 							json_result = bson_as_json(result, NULL);
267 							result_stream.write_function(&result_stream, ",%s", json_result);
268 							bson_free(json_result);
269 						}
270 						if (!mongoc_cursor_error(cursor, &error)) {
271 							stream->write_function(stream, "-OK\n[%s]", zstr((char *)result_stream.data) ? "" :(char *)result_stream.data);
272 						} else {
273 							stream->write_function(stream, "-ERR\nquery failed: %s", error.message);
274 						}
275 						switch_safe_free(result_stream.data);
276 					} else {
277 						stream->write_function(stream, "-ERR\nquery failed: %s", error.message);
278 					}
279 					if (cursor) {
280 						mongoc_cursor_destroy(cursor);
281 					}
282 				} else {
283 					stream->write_function(stream, "-ERR\nmissing query or fields!\n%s\n", FIND_ONE_SYNTAX);
284 				}
285 				if (query) {
286 					bson_destroy(query);
287 				}
288 				if (fields) {
289 					bson_destroy(fields);
290 				}
291 				mongoc_collection_destroy(col);
292 			} else {
293 				stream->write_function(stream, "-ERR\nunknown collection: %s\n", collection);
294 			}
295 			connection_done(globals.client_pool, conn);
296 		} else {
297 			stream->write_function(stream, "-ERR\nfailed to get connection!\n");
298 		}
299 	} else {
300 		stream->write_function(stream, "-ERR\n%s\n", FIND_N_SYNTAX);
301 	}
302 
303 	switch_safe_free(db);
304 
305 	return status;
306 }
307 
SWITCH_STANDARD_API(mod_mongo_find_one_function)308 SWITCH_STANDARD_API(mod_mongo_find_one_function)
309 {
310 	switch_status_t status = SWITCH_STATUS_SUCCESS;
311 	char *db = NULL, *collection = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL;
312 	int query_options = 0;
313 
314 	db = strdup(cmd);
315 	switch_assert(db != NULL);
316 
317 	if ((collection = strchr(db, '.'))) {
318 		*collection++ = '\0';
319 		if ((json_query = strchr(collection, DELIMITER))) {
320 			*json_query++ = '\0';
321 			if ((json_fields = strchr(json_query, DELIMITER))) {
322 				*json_fields++ = '\0';
323 				if ((query_options_str = strchr(json_fields, DELIMITER))) {
324 					*query_options_str++ = '\0';
325 					if (!zstr(query_options_str)) {
326 						query_options = parse_query_options(query_options_str);
327 					}
328 				}
329 			}
330 		}
331 	}
332 
333 	if (!zstr(db) && !zstr(collection) && !zstr(json_query) && !zstr(json_fields)) {
334 		bson_error_t error;
335 		mongoc_client_t *conn = get_connection(globals.client_pool, globals.conn_str);
336 		if (conn) {
337 			mongoc_collection_t *col = mongoc_client_get_collection(conn, db, collection);
338 			if (col) {
339 				bson_t *query = bson_new_from_json((uint8_t *)json_query, strlen(json_query), &error);
340 				bson_t *fields = bson_new_from_json((uint8_t *)json_fields, strlen(json_fields), &error);
341 				if (query && fields) {
342 					/* send query */
343 					mongoc_cursor_t *cursor = mongoc_collection_find(col, query_options, 0, 1, 0, query, fields, NULL);
344 					if (cursor && !mongoc_cursor_error(cursor, &error)) {
345 						/* get result from cursor */
346 						const bson_t *result;
347 						if (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) {
348 							char *json_result;
349 							json_result = bson_as_json(result, NULL);
350 							stream->write_function(stream, "-OK\n%s\n", json_result);
351 							bson_free(json_result);
352 						} else if (mongoc_cursor_error(cursor, &error)) {
353 							stream->write_function(stream, "-ERR\nquery failed: %s\n", error.message);
354 						} else {
355 							/* empty set */
356 							stream->write_function(stream, "-OK\n{}\n");
357 						}
358 					} else {
359 						stream->write_function(stream, "-ERR\nquery failed!\n");
360 					}
361 					if (cursor) {
362 						mongoc_cursor_destroy(cursor);
363 					}
364 				} else {
365 					stream->write_function(stream, "-ERR\nmissing query or fields!\n%s\n", FIND_ONE_SYNTAX);
366 				}
367 				if (query) {
368 					bson_destroy(query);
369 				}
370 				if (fields) {
371 					bson_destroy(fields);
372 				}
373 				mongoc_collection_destroy(col);
374 			} else {
375 				stream->write_function(stream, "-ERR\nunknown collection: %s\n", collection);
376 			}
377 			connection_done(globals.client_pool, conn);
378 		} else {
379 			stream->write_function(stream, "-ERR\nfailed to get connection!\n");
380 		}
381 	} else {
382 		stream->write_function(stream, "-ERR\n%s\n", FIND_ONE_SYNTAX);
383 	}
384 
385 	switch_safe_free(db);
386 
387 	return status;
388 }
389 
390 /**
391  * Calculate resource count from BSON document
392  */
mod_mongo_get_count(switch_core_session_t * session,const char * key,const bson_t * b,int * new_val_ret,char ** resource_ret)393 static switch_status_t mod_mongo_get_count(switch_core_session_t *session, const char *key, const bson_t *b, int *new_val_ret, char **resource_ret)
394 {
395 	switch_status_t status = SWITCH_STATUS_SUCCESS;
396 	bson_iter_t iter;
397 	if (new_val_ret) {
398 		if (bson_iter_init_find(&iter, b, key) && BSON_ITER_HOLDS_INT32(&iter)) {
399 			*new_val_ret = bson_iter_int32(&iter);
400 		} else {
401 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to get resource count\n");
402 			status = SWITCH_STATUS_GENERR;
403 		}
404 	}
405 	if (resource_ret) {
406 		if (bson_iter_init_find(&iter, b, "_id") && BSON_ITER_HOLDS_UTF8(&iter)) {
407 			uint32_t len;
408 			const char *resource = bson_iter_utf8(&iter, &len);
409 			if (!zstr(resource)) {
410 				if (bson_utf8_validate(resource, len, 0)) {
411 					*resource_ret = strdup(resource);
412 				} else {
413 					switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name is not valid utf8\n");
414 					status = SWITCH_STATUS_GENERR;
415 				}
416 			} else {
417 				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name is empty string");
418 				status = SWITCH_STATUS_GENERR;
419 			}
420 		} else {
421 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Resource name not found\n");
422 			status = SWITCH_STATUS_GENERR;
423 		}
424 	}
425 	return status;
426 }
427 
428 /**
429  * Increment a resource by val
430  * @param session
431  * @param resource name of resource being incremented
432  * @param val number to increment resource by
433  * @param max maximum value of resource allowed
434  * @param new_val_ret new value of resource after increment completed
435  */
mod_mongo_increment(switch_core_session_t * session,const char * resource,int val,int max,int * new_val_ret)436 static switch_status_t mod_mongo_increment(switch_core_session_t *session, const char *resource, int val, int max, int *new_val_ret)
437 {
438 	switch_status_t status = SWITCH_STATUS_GENERR;
439 	mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
440 	if (conn) {
441 		mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
442 		if (col) {
443 			int upsert;
444 			bson_t *query, *update, reply;
445 			bson_error_t error;
446 
447 			/* construct update query - the counts are stored as:
448 			{ _id: "realm_resource", total: 29, "fs-1": 5, "fs-2": 10, "fs-3": 3, "fs-4": 11 }
449 			*/
450 			if (val >= 0) {
451 				if (max > 0) {
452 					/* increment if < max */
453 					query = BCON_NEW("_id", resource,
454 						"total", "{", "$lt", BCON_INT32(max), "}");
455 					upsert = 1; /* will fail with duplicate index key error if total condition is not satisfied */
456 				} else {
457 					/* increment, no restrictions */
458 					query = BCON_NEW("_id", resource);
459 					upsert = 1;
460 				}
461 			} else {
462 				/* don't allow decrement below 0, don't add fields that don't exist */
463 				query = BCON_NEW("_id", resource,
464 					"total", "{", "$gte", BCON_INT32(-val), "}",
465 					switch_core_get_switchname(), "{", "$gte", BCON_INT32(-val), "}");
466 				upsert = 0;
467 			}
468 			update = BCON_NEW("$inc", "{", "total", BCON_INT32(val), switch_core_get_switchname(), BCON_INT32(val), "}");
469 
470 			if (!mongoc_collection_find_and_modify(col, query, NULL, update, NULL, false, upsert, true, &reply, &error)) {
471 				if (max > 0 && error.code == 11000) {
472 					/* duplicate key index error - limit exceeded  */
473 					switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s exceeds maximum rate of %d\n", resource, max);
474 					status = SWITCH_STATUS_FALSE;
475 				} else {
476 					switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Increment %s by %d failed: %s\n", resource, val, error.message);
477 					status = SWITCH_STATUS_GENERR;
478 				}
479 			} else if (new_val_ret) {
480 				status = mod_mongo_get_count(session, "total", &reply, new_val_ret, NULL);
481 			} else {
482 				status = SWITCH_STATUS_SUCCESS;
483 			}
484 
485 			bson_destroy(query);
486 			bson_destroy(update);
487 			bson_destroy(&reply);
488 			mongoc_collection_destroy(col);
489 		} else {
490 			switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Increment %s by %d failed: unable to get collection %s from database %s\n", resource, val, globals.limit_collection, globals.limit_database);
491 		}
492 		connection_done(globals.limit_client_pool, conn);
493 	}
494 	return status;
495 }
496 
497 /**
498  * Get resource usage
499  */
mod_mongo_get_usage(const char * resource,int * usage)500 static switch_status_t mod_mongo_get_usage(const char *resource, int *usage)
501 {
502 	switch_status_t status = SWITCH_STATUS_GENERR;
503 	mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
504 	if (conn) {
505 		mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
506 		if (col) {
507 			bson_t *query = BCON_NEW("_id", resource);
508 			bson_t *fields = BCON_NEW("total", BCON_INT32(1));
509 			bson_error_t error;
510 			mongoc_cursor_t *cursor = mongoc_collection_find(col, 0, 0, 1, 0, query, fields, NULL);
511 			if (cursor) {
512 				if (!mongoc_cursor_error(cursor, &error)) {
513 					/* get result from cursor */
514 					const bson_t *result;
515 					if (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) {
516 						status = mod_mongo_get_count(NULL, "total", result, usage, NULL);
517 					}
518 				}
519 				mongoc_cursor_destroy(cursor);
520 			}
521 			bson_destroy(query);
522 			bson_destroy(fields);
523 			mongoc_collection_destroy(col);
524 		} else {
525 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Get usage failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database);
526 		}
527 		connection_done(globals.limit_client_pool, conn);
528 	}
529 	return status;
530 }
531 
532 /**
533  * Clear all limits on this server
534  */
mod_mongo_reset(void)535 static switch_status_t mod_mongo_reset(void)
536 {
537 	switch_status_t status = SWITCH_STATUS_GENERR;
538 	mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
539 	if (conn) {
540 		mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
541 		if (col) {
542 			bson_t *query;
543 			//bson_t *fields;
544 			mongoc_cursor_t *cursor;
545 			bson_error_t error;
546 			query = BCON_NEW(switch_core_get_switchname(), "{", "$gt", BCON_INT32(0), "}");
547 			//fields = BCON_NEW(switch_core_get_switchname(), "1");
548 
549 			/* find all docs w/ this server and clear its counts */
550 			switch_thread_rwlock_wrlock(globals.limit_database_rwlock); /* prevent increments on this server */
551 			cursor = mongoc_collection_find(col, 0, 0, 0, 0, query, NULL, NULL);
552 			if (cursor) {
553 				if (!mongoc_cursor_error(cursor, &error)) {
554 					/* get result from cursor */
555 					const bson_t *result;
556 					char *resource = NULL;
557 					while (mongoc_cursor_more(cursor) && mongoc_cursor_next(cursor, &result)) {
558 						int count = 0;
559 						if ((status = mod_mongo_get_count(NULL, switch_core_get_switchname(), result, &count, &resource)) == SWITCH_STATUS_SUCCESS) {
560 							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reset %s, -%d\n", resource, count);
561 							if (count > 0 && !zstr(resource)) {
562 								/* decrement server counts from mongo */
563 								if ((status = mod_mongo_increment(NULL, resource, -count, 0, NULL)) == SWITCH_STATUS_GENERR) {
564 									switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset done - increment error\n");
565 									break;
566 								}
567 							}
568 							switch_safe_free(resource);
569 							resource = NULL;
570 						} else {
571 							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Reset failed - get count\n");
572 							break;
573 						}
574 					}
575 					switch_safe_free(resource);
576 				} else {
577 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: %s\n", error.message);
578 				}
579 				mongoc_cursor_destroy(cursor);
580 			} else {
581 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: NULL cursor returned\n");
582 			}
583 			switch_thread_rwlock_unlock(globals.limit_database_rwlock);
584 
585 			bson_destroy(query);
586 			//bson_destroy(fields);
587 			mongoc_collection_destroy(col);
588 		} else {
589 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Reset failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database);
590 		}
591 		connection_done(globals.limit_client_pool, conn);
592 	}
593 	return status;
594 }
595 
596 /**
597  * Clean up all entries w/ resource count of 0
598  */
mod_mongo_cleanup(void)599 static switch_status_t mod_mongo_cleanup(void)
600 {
601 	switch_status_t status = SWITCH_STATUS_GENERR;
602 	mongoc_client_t *conn = get_connection(globals.limit_client_pool, globals.limit_conn_str);
603 	if (conn) {
604 		mongoc_collection_t *col = mongoc_client_get_collection(conn, globals.limit_database, globals.limit_collection);
605 		if (col) {
606 			bson_t *selector = BCON_NEW("total", BCON_INT32(0));
607 			bson_error_t error;
608 			if (mongoc_collection_remove(col, 0, selector, NULL, &error)) {
609 				status = SWITCH_STATUS_SUCCESS;
610 			} else {
611 				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Cleanup failed: %s\n", error.message);
612 			}
613 			bson_destroy(selector);
614 			mongoc_collection_destroy(col);
615 		} else {
616 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Cleanup failed: unable to get collection %s from database %s\n", globals.limit_collection, globals.limit_database);
617 		}
618 		connection_done(globals.limit_client_pool, conn);
619 	}
620 	return status;
621 }
622 
623 /**
624  * @brief Enforces limit_mongo restrictions
625  * @param session current session
626  * @param realm limit realm
627  * @param id limit id
628  * @param max maximum count
629  * @param interval interval for rate limiting
630  * @return SWITCH_TRUE if the access is allowed, SWITCH_FALSE if it isn't
631  */
SWITCH_LIMIT_INCR(mod_mongo_limit_incr)632 SWITCH_LIMIT_INCR(mod_mongo_limit_incr)
633 {
634 	switch_status_t status = SWITCH_STATUS_FALSE;
635 	switch_channel_t *channel = switch_core_session_get_channel(session);
636 	const char *limit_id = switch_core_session_sprintf(session, "%s_%s", realm, resource);
637 
638 	/* get session's resource tracking information */
639 	struct mod_mongo_private *pvt = switch_channel_get_private(channel, "limit_mongo");
640 	if (!pvt) {
641 		switch_mutex_lock(globals.mod_mongo_private_mutex); /* prevents concurrent alloc of mod_mongo_private */
642 		pvt = switch_channel_get_private(channel, "limit_mongo");
643 		if (!pvt) {
644 			pvt = (struct mod_mongo_private *) switch_core_session_alloc(session, sizeof(*pvt));
645 			switch_core_hash_init(&pvt->resources);
646 			switch_mutex_init(&pvt->mutex, SWITCH_MUTEX_UNNESTED, switch_core_session_get_pool(session));
647 			switch_channel_set_private(channel, "limit_mongo", pvt);
648 		}
649 		switch_mutex_unlock(globals.mod_mongo_private_mutex);
650 	}
651 
652 	switch_mutex_lock(pvt->mutex); /* prevents concurrent increment in session */
653 	switch_thread_rwlock_rdlock(globals.limit_database_rwlock); /* prevent reset operation on this server */
654 
655 	/* check if resource is already incremented on this session */
656 	if (!switch_core_hash_find(pvt->resources, limit_id)) {
657 		/* increment resource usage */
658 		if ((status = mod_mongo_increment(session, limit_id, 1, max, NULL)) == SWITCH_STATUS_SUCCESS) {
659 			/* remember this resource was incremented */
660 			switch_core_hash_insert(pvt->resources, limit_id, limit_id);
661 		}
662 	} else {
663 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "%s already acquired\n", limit_id);
664 	}
665 
666 	switch_thread_rwlock_unlock(globals.limit_database_rwlock);
667 	switch_mutex_unlock(pvt->mutex);
668 
669 	return status;
670 }
671 
672 /**
673  * @brief Releases usage of a limit_mongo-controlled resource
674  */
SWITCH_LIMIT_RELEASE(mod_mongo_limit_release)675 SWITCH_LIMIT_RELEASE(mod_mongo_limit_release)
676 {
677 	switch_channel_t *channel = switch_core_session_get_channel(session);
678 	struct mod_mongo_private *pvt = switch_channel_get_private(channel, "limit_mongo");
679 	int status = SWITCH_STATUS_SUCCESS;
680 
681 	if (!pvt) {
682 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "No limit tracking data for channel\n");
683 		return SWITCH_STATUS_SUCCESS;
684 	}
685 
686 	switch_mutex_lock(pvt->mutex); /* prevents concurrent decrement in session */
687 	switch_thread_rwlock_rdlock(globals.limit_database_rwlock); /* prevent reset operation on this server */
688 
689 	/* no realm / resource = clear all resources */
690 	if (realm == NULL && resource == NULL) {
691 		/* clear all resources */
692 		switch_hash_index_t *hi = NULL;
693 		while ((hi = switch_core_hash_first_iter(pvt->resources, hi))) {
694 			void *p_val = NULL;
695 			const void *p_key;
696 			switch_ssize_t keylen;
697 			switch_core_hash_this(hi, &p_key, &keylen, &p_val);
698 			if (mod_mongo_increment(session, (const char *)p_key, -1, 0, NULL) != SWITCH_STATUS_SUCCESS) {
699 				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Couldn't decrement %s\n", (const char *)p_key);
700 				status = SWITCH_STATUS_FALSE;
701 				break;
702 			} else {
703 				switch_core_hash_delete(pvt->resources, (const char *) p_key);
704 			}
705 		}
706 	} else if (!zstr(realm) && !zstr(resource)) {
707 		/* clear specific resource */
708 		const char *limit_id = switch_core_session_sprintf(session, "%s_%s", realm, resource);
709 		if (switch_core_hash_find(pvt->resources, limit_id)) {
710 			if (mod_mongo_increment(session, limit_id, -1, 0, NULL) != SWITCH_STATUS_SUCCESS) {
711 				switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Couldn't decrement %s\n", limit_id);
712 			} else {
713 				switch_core_hash_delete(pvt->resources, limit_id);
714 			}
715 		}
716 	} else {
717 		switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Missing either realm or resource to release\n");
718 	}
719 
720 	switch_thread_rwlock_unlock(globals.limit_database_rwlock);
721 	switch_mutex_unlock(pvt->mutex);
722 
723 	return status;
724 }
725 
SWITCH_LIMIT_USAGE(mod_mongo_limit_usage)726 SWITCH_LIMIT_USAGE(mod_mongo_limit_usage)
727 {
728 	char *limit_id = switch_mprintf("%s_%s", realm, resource);
729 	int usage = 0;
730 	mod_mongo_get_usage(limit_id, &usage);
731 	switch_safe_free(limit_id);
732 	return usage;
733 }
734 
SWITCH_LIMIT_RESET(mod_mongo_limit_reset)735 SWITCH_LIMIT_RESET(mod_mongo_limit_reset)
736 {
737 	return mod_mongo_reset();
738 }
739 
SWITCH_LIMIT_STATUS(mod_mongo_limit_status)740 SWITCH_LIMIT_STATUS(mod_mongo_limit_status)
741 {
742 	return strdup("-ERR not supported");
743 }
744 
do_config(switch_memory_pool_t * pool)745 static switch_status_t do_config(switch_memory_pool_t *pool)
746 {
747 	const char *cf = "mongo.conf";
748 	switch_xml_t cfg, xml, settings, param;
749 	switch_status_t status = SWITCH_STATUS_SUCCESS;
750 
751 	/* set defaults */
752 	globals.map = "";
753 	globals.reduce = "";
754 	globals.finalize = "";
755 	globals.conn_str = "";
756 	globals.client_pool = NULL;
757 	globals.limit_database = "limit";
758 	globals.limit_collection = "mod_mongo";
759 	globals.limit_conn_str = "";
760 	globals.limit_client_pool = NULL;
761 	globals.limit_cleanup_interval_sec = 300;
762 
763 	if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
764 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
765 		return SWITCH_STATUS_GENERR;
766 	}
767 
768 	if ((settings = switch_xml_child(cfg, "settings"))) {
769 		for (param = switch_xml_child(settings, "param"); param; param = param->next) {
770 			char *var = (char *) switch_xml_attr_soft(param, "name");
771 			char *val = (char *) switch_xml_attr_soft(param, "value");
772 
773 			if (!strcmp(var, "connection-string")) {
774 				if (zstr(val)) {
775 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing connection-string\n");
776 					status = SWITCH_STATUS_GENERR;
777 					goto done;
778 				} else {
779 					mongoc_uri_t *uri;
780 					globals.conn_str = switch_core_strdup(pool, val);
781 					uri = mongoc_uri_new(globals.conn_str);
782 					if (uri) {
783 						globals.client_pool = mongoc_client_pool_new(uri);
784 						mongoc_uri_destroy(uri);
785 						if (!globals.client_pool) {
786 							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to pool for connection-string: %s\n", globals.conn_str);
787 							status = SWITCH_STATUS_GENERR;
788 							goto done;
789 						}
790 						if (!globals.limit_client_pool) {
791 							/* use connection-string for limit backend unless overriden by limit-connection-string */
792 							globals.limit_client_pool = globals.client_pool;
793 							globals.limit_conn_str = globals.conn_str;
794 						}
795 					} else {
796 						mongoc_uri_destroy(uri);
797 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid connection-string: %s\n", globals.conn_str);
798 						status = SWITCH_STATUS_GENERR;
799 						goto done;
800 					}
801 				}
802 			} else if (!strcmp(var, "limit-connection-string")) {
803 				if (zstr(val)) {
804 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-connection-string - using connection-string instead\n");
805 					continue;
806 				} else {
807 					mongoc_uri_t *uri;
808 					globals.limit_conn_str = switch_core_strdup(pool, val);
809 					uri = mongoc_uri_new(globals.limit_conn_str);
810 					if (uri) {
811 						globals.limit_client_pool = mongoc_client_pool_new(uri);
812 						mongoc_uri_destroy(uri);
813 						if (!globals.limit_client_pool) {
814 							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Failed to pool for limit-connection-string: %s\n", globals.limit_conn_str);
815 							status = SWITCH_STATUS_GENERR;
816 							goto done;
817 						}
818 					} else {
819 						mongoc_uri_destroy(uri);
820 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid limit-connection-string: %s\n", globals.limit_conn_str);
821 						status = SWITCH_STATUS_GENERR;
822 						goto done;
823 					}
824 				}
825 			} else if (!strcmp(var, "limit-database")) {
826 				if (zstr(val)) {
827 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-database - using '%s'\n", globals.limit_database);
828 				} else {
829 					globals.limit_database = switch_core_strdup(pool, val);
830 				}
831 			} else if (!strcmp(var, "limit-collection")) {
832 				if (zstr(val)) {
833 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing limit-collection - using '%s'\n", globals.limit_collection);
834 				} else {
835 					globals.limit_collection = switch_core_strdup(pool, val);
836 				}
837 			} else if (!strcmp(var, "limit-cleanup-interval-sec")) {
838 				if (zstr(val) || !switch_is_number(val)) {
839 					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "bad value of limit-cleanup-interval-sec\n");
840 				} else {
841 					int new_interval = atoi(val);
842 					if (new_interval >= 0) {
843 						globals.limit_cleanup_interval_sec = new_interval;
844 					} else {
845 						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "limit-cleanup-interval-sec must be >= 0\n");
846 					}
847 				}
848 			} else if (!strcmp(var, "map")) {
849 				if (!zstr(val)) {
850 					globals.map = switch_core_strdup(pool, val);
851 				}
852 			} else if (!strcmp(var, "reduce")) {
853 				if (!zstr(val)) {
854 					globals.reduce = switch_core_strdup(pool, val);
855 				}
856 			} else if (!strcmp(var, "finalize")) {
857 				if (!zstr(val)) {
858 					globals.finalize = switch_core_strdup(pool, val);
859 				}
860 			}
861 		}
862 	}
863 
864 	if (!globals.client_pool) {
865 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No mongodb connection pool configured!  Make sure connection-string is set\n");
866 		status = SWITCH_STATUS_GENERR;
867 	}
868 
869 done:
870 	switch_xml_free(xml);
871 
872 	return status;
873 }
874 
SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load)875 SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load)
876 {
877 	switch_api_interface_t *api_interface = NULL;
878 	switch_limit_interface_t *limit_interface = NULL;
879 
880 	*module_interface = switch_loadable_module_create_module_interface(pool, modname);
881 
882 	memset(&globals, 0, sizeof(globals));
883 
884 	if (do_config(pool) != SWITCH_STATUS_SUCCESS) {
885 		return SWITCH_STATUS_TERM;
886 	}
887 
888 	switch_mutex_init(&globals.mod_mongo_private_mutex, SWITCH_MUTEX_UNNESTED, pool);
889 	switch_thread_rwlock_create(&globals.limit_database_rwlock, pool);
890 	switch_thread_rwlock_create(&globals.shutdown_rwlock, pool);
891 
892 	/* clear all entries */
893 	mod_mongo_reset();
894 
895 	SWITCH_ADD_API(api_interface, "mongo_find_one", "findOne", mod_mongo_find_one_function, FIND_ONE_SYNTAX);
896 	SWITCH_ADD_API(api_interface, "mongo_find_n", "find", mod_mongo_find_n_function, FIND_N_SYNTAX);
897 	SWITCH_ADD_API(api_interface, "mongo_mapreduce", "Map/Reduce", mod_mongo_mapreduce_function, MAPREDUCE_SYNTAX);
898 
899 	SWITCH_ADD_LIMIT(limit_interface, "mongo", mod_mongo_limit_incr, mod_mongo_limit_release, mod_mongo_limit_usage, mod_mongo_limit_reset, mod_mongo_limit_status, NULL);
900 
901 	return SWITCH_STATUS_SUCCESS;
902 }
903 
904 /**
905  * Periodically cleanup mongo limit counters
906  */
SWITCH_MODULE_RUNTIME_FUNCTION(mod_mongo_runtime)907 SWITCH_MODULE_RUNTIME_FUNCTION(mod_mongo_runtime)
908 {
909 	switch_interval_time_t cleanup_time = switch_micro_time_now() + (globals.limit_cleanup_interval_sec * 1000 * 1000);
910 	switch_thread_rwlock_rdlock(globals.shutdown_rwlock);
911 	while(!globals.shutdown && globals.limit_cleanup_interval_sec) {
912 		switch_micro_sleep(1 * 1000 * 1000);
913 		if (!globals.shutdown && switch_micro_time_now() > cleanup_time) {
914 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Cleanup\n");
915 			mod_mongo_cleanup();
916 			cleanup_time = switch_micro_time_now() + (globals.limit_cleanup_interval_sec * 1000 * 1000);
917 		}
918 	}
919 	switch_thread_rwlock_unlock(globals.shutdown_rwlock);
920 	return SWITCH_STATUS_TERM;
921 }
922 
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown)923 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown)
924 {
925 	globals.shutdown = 1;
926 	switch_thread_rwlock_wrlock(globals.shutdown_rwlock);
927 	switch_thread_rwlock_unlock(globals.shutdown_rwlock);
928 	if (globals.limit_client_pool && globals.limit_client_pool != globals.client_pool) {
929 		mongoc_client_pool_destroy(globals.limit_client_pool);
930 		globals.limit_client_pool = NULL;
931 	}
932 	if (globals.client_pool) {
933 		mongoc_client_pool_destroy(globals.client_pool);
934 		globals.client_pool = NULL;
935 	}
936 	if (globals.mod_mongo_private_mutex) {
937 		switch_mutex_destroy(globals.mod_mongo_private_mutex);
938 		globals.mod_mongo_private_mutex = NULL;
939 	}
940 	return SWITCH_STATUS_SUCCESS;
941 }
942 
943 /* For Emacs:
944  * Local Variables:
945  * mode:c
946  * indent-tabs-mode:t
947  * tab-width:4
948  * c-basic-offset:4
949  * End:
950  * For VIM:
951  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet
952  */
953 
954