1 /*
2 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Based on mod_skel by
25 * Anthony Minessale II <anthm@freeswitch.org>
26 *
27 * Contributor(s):
28 *
29 * Daniel Bryars <danb@aeriandi.com>
30 * Tim Brown <tim.brown@aeriandi.com>
31 * Anthony Minessale II <anthm@freeswitch.org>
32 * William King <william.king@quentustech.com>
33 * Mike Jerris <mike@jerris.com>
34 *
35 * mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
36 *
37 */
38 
39 #include "mod_amqp.h"
40 
mod_amqp_connection_close(mod_amqp_connection_t * connection)41 void mod_amqp_connection_close(mod_amqp_connection_t *connection)
42 {
43 	amqp_connection_state_t old_state = connection->state;
44 	int status = 0;
45 
46 	connection->state = NULL;
47 
48 	if (old_state != NULL) {
49 		mod_amqp_log_if_amqp_error(amqp_channel_close(old_state, 1, AMQP_REPLY_SUCCESS), "Closing channel");
50 		mod_amqp_log_if_amqp_error(amqp_connection_close(old_state, AMQP_REPLY_SUCCESS), "Closing connection");
51 
52 		if ((status = amqp_destroy_connection(old_state))) {
53 			const char *errstr = amqp_error_string2(-status);
54 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error destroying amqp connection: %s\n", errstr);
55 		}
56 	}
57 }
58 
mod_amqp_connection_open(mod_amqp_connection_t * connections,mod_amqp_connection_t ** active,char * profile_name,char * custom_attr)59 switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr)
60 {
61 	int channel_max = 0;
62 	int frame_max = 131072;
63 	amqp_table_t loginProperties;
64 	amqp_table_entry_t loginTableEntries[5];
65 	char hostname[64];
66 	int bHasHostname;
67 	char key_string[256] = {0};
68 	amqp_rpc_reply_t status;
69 	amqp_socket_t *socket = NULL;
70 	int amqp_status = -1;
71 	mod_amqp_connection_t *connection_attempt = NULL;
72 	amqp_connection_state_t newConnection = amqp_new_connection();
73 	amqp_connection_state_t oldConnection = NULL;
74 
75 	if (active && *active) {
76 		oldConnection = (*active)->state;
77 	}
78 
79 	/* Set up meta data for connection */
80 	bHasHostname = gethostname(hostname, sizeof(hostname)) == 0;
81 
82 	loginProperties.num_entries = sizeof(loginTableEntries)/sizeof(*loginTableEntries);
83 	loginProperties.entries = loginTableEntries;
84 
85 	snprintf(key_string, 256, "x_%s_HostMachineName", custom_attr);
86 	loginTableEntries[0].key = amqp_cstring_bytes(key_string);
87 	loginTableEntries[0].value.kind = AMQP_FIELD_KIND_BYTES;
88 	loginTableEntries[0].value.value.bytes = amqp_cstring_bytes(bHasHostname ? hostname : "(unknown)");
89 
90 	snprintf(key_string, 256, "x_%s_ProcessDescription", custom_attr);
91 	loginTableEntries[1].key = amqp_cstring_bytes(key_string);
92 	loginTableEntries[1].value.kind = AMQP_FIELD_KIND_BYTES;
93 	loginTableEntries[1].value.value.bytes = amqp_cstring_bytes("FreeSwitch");
94 
95 	snprintf(key_string, 256, "x_%s_ProcessType", custom_attr);
96 	loginTableEntries[2].key = amqp_cstring_bytes(key_string);
97 	loginTableEntries[2].value.kind = AMQP_FIELD_KIND_BYTES;
98 	loginTableEntries[2].value.value.bytes = amqp_cstring_bytes("TAP");
99 
100 	snprintf(key_string, 256, "x_%s_ProcessBuildVersion", custom_attr);
101 	loginTableEntries[3].key = amqp_cstring_bytes(key_string);
102 	loginTableEntries[3].value.kind = AMQP_FIELD_KIND_BYTES;
103 	loginTableEntries[3].value.value.bytes = amqp_cstring_bytes(switch_version_full());
104 
105 	snprintf(key_string, 256, "x_%s_Liquid_ProcessBuildBornOn", custom_attr);
106 	loginTableEntries[4].key = amqp_cstring_bytes(key_string);
107 	loginTableEntries[4].value.kind = AMQP_FIELD_KIND_BYTES;
108 	loginTableEntries[4].value.value.bytes = amqp_cstring_bytes(__DATE__ " " __TIME__);
109 
110 	if (!(socket = amqp_tcp_socket_new(newConnection))) {
111 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not create TCP socket\n");
112 		goto err;
113 	}
114 
115 	connection_attempt = connections;
116 	amqp_status = -1;
117 
118 	while (connection_attempt && amqp_status){
119 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] trying to connect to AMQP broker %s:%d\n",
120 						  profile_name, connection_attempt->hostname, connection_attempt->port);
121 
122 		if ((amqp_status = amqp_socket_open(socket, connection_attempt->hostname, connection_attempt->port))){
123 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n",
124 							  connection_attempt->hostname, connection_attempt->port,	amqp_status, amqp_error_string2(amqp_status));
125 			connection_attempt = connection_attempt->next;
126 		}
127 	}
128 
129 	if (active) {
130 		*active = connection_attempt;
131 	}
132 
133 	if (!connection_attempt) {
134 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] could not connect to any AMQP brokers\n", profile_name);
135 		goto err;
136 	}
137 
138 	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] opened socket connection to AMQP broker %s:%d\n",
139 					  profile_name, connection_attempt->hostname, connection_attempt->port);
140 
141 	/* We have a connection, now log in */
142 	status = amqp_login_with_properties(newConnection,
143 										connection_attempt->virtualhost,
144 										channel_max,
145 										frame_max,
146 										connection_attempt->heartbeat,
147 										&loginProperties,
148 										AMQP_SASL_METHOD_PLAIN,
149 										connection_attempt->username,
150 										connection_attempt->password);
151 
152 	if (mod_amqp_log_if_amqp_error(status, "Logging in")) {
153 		if (active) {
154 			mod_amqp_connection_close(*active);
155 			*active = NULL;
156 		}
157 		goto err;
158 	}
159 
160 	// Open a channel (1). This is fairly standard
161 	amqp_channel_open(newConnection, 1);
162 	if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(newConnection), "Opening channel")) {
163 		if (active) {
164 			mod_amqp_connection_close(*active);
165 			*active = NULL;
166 		}
167 		goto err;
168 	}
169 
170 	if (active) {
171 		(*active)->state = newConnection;
172 	}
173 
174 	if (oldConnection) {
175 		amqp_destroy_connection(oldConnection);
176 	}
177 
178 	return SWITCH_STATUS_SUCCESS;
179 
180 err:
181     if (newConnection) {
182         amqp_destroy_connection(newConnection);
183     }
184     return SWITCH_STATUS_GENERR;
185 }
186 
mod_amqp_connection_create(mod_amqp_connection_t ** conn,switch_xml_t cfg,switch_memory_pool_t * pool)187 switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool)
188 {
189 	mod_amqp_connection_t *new_con = switch_core_alloc(pool, sizeof(mod_amqp_connection_t));
190 	switch_xml_t param;
191 	char *name = (char *) switch_xml_attr_soft(cfg, "name");
192 	char *hostname = NULL, *virtualhost = NULL, *username = NULL, *password = NULL;
193 	unsigned int port = 0, heartbeat = 0;
194 
195 	if (zstr(name)) {
196 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", switch_xml_toxml(cfg, 1));
197 		return SWITCH_STATUS_GENERR;
198 	}
199 
200 	new_con->name = switch_core_strdup(pool, name);
201 	new_con->state = NULL;
202 	new_con->next = NULL;
203 
204 	for (param = switch_xml_child(cfg, "param"); param; param = param->next) {
205 		char *var = (char *) switch_xml_attr_soft(param, "name");
206 		char *val = (char *) switch_xml_attr_soft(param, "value");
207 
208 		if (!var) {
209 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param missing 'name' attribute\n", name);
210 			continue;
211 		}
212 
213 		if (!val) {
214 			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param[%s] missing 'value' attribute\n", name, var);
215 			continue;
216 		}
217 
218 		if (!strncmp(var, "hostname", 8)) {
219 			hostname = switch_core_strdup(pool, val);
220 		} else if (!strncmp(var, "virtualhost", 11)) {
221 			virtualhost = switch_core_strdup(pool, val);
222 		} else if (!strncmp(var, "username", 8)) {
223 			username = switch_core_strdup(pool, val);
224 		} else if (!strncmp(var, "password", 8)) {
225 			password = switch_core_strdup(pool, val);
226 		} else if (!strncmp(var, "port", 4)) {
227 			int interval = atoi(val);
228 			if (interval && interval > 0) {
229 				port = interval;
230 			}
231 		} else if (!strncmp(var, "heartbeat", 9)) {
232 			int interval = atoi(val);
233 			if (interval && interval > 0) {
234 				heartbeat = interval;
235 			}
236 		}
237 	}
238 
239 	new_con->hostname = hostname ? hostname : "localhost";
240 	new_con->virtualhost = virtualhost ? virtualhost : "/";
241 	new_con->username = username ? username : "guest";
242 	new_con->password = password ? password : "guest";
243 	new_con->port = port ? port : 5672;
244 	new_con->heartbeat = heartbeat ? heartbeat : 0;
245 
246 	*conn = new_con;
247 	return SWITCH_STATUS_SUCCESS;
248 }
249 
mod_amqp_connection_destroy(mod_amqp_connection_t ** conn)250 void mod_amqp_connection_destroy(mod_amqp_connection_t **conn)
251 {
252 	if (conn && *conn) {
253 		mod_amqp_connection_close(*conn);
254 		*conn = NULL;
255 	}
256 }
257 
258 /* For Emacs:
259  * Local Variables:
260  * mode:c
261  * indent-tabs-mode:t
262  * tab-width:4
263  * c-basic-offset:4
264  * End:
265  * For VIM:
266  * vim:set softtabstop=4 shiftwidth=4 tabstop=4
267  */
268