1 /*
2 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3 * Copyright (C) 2005-2012, Anthony Minessale II <anthm@freeswitch.org>
4 *
5 * Version: MPL 1.1
6 *
7 * The contents of this file are subject to the Mozilla Public License Version
8 * 1.1 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://www.mozilla.org/MPL/
11 *
12 * Software distributed under the License is distributed on an "AS IS" basis,
13 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14 * for the specific language governing rights and limitations under the
15 * License.
16 *
17 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18 *
19 * The Initial Developer of the Original Code is
20 * Anthony Minessale II <anthm@freeswitch.org>
21 * Portions created by the Initial Developer are Copyright (C)
22 * the Initial Developer. All Rights Reserved.
23 *
24 * Based on mod_skel by
25 * Anthony Minessale II <anthm@freeswitch.org>
26 *
27 * Contributor(s):
28 *
29 * Daniel Bryars <danb@aeriandi.com>
30 * Tim Brown <tim.brown@aeriandi.com>
31 * Anthony Minessale II <anthm@freeswitch.org>
32 * William King <william.king@quentustech.com>
33 * Mike Jerris <mike@jerris.com>
34 *
35 * mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker
36 *
37 */
38
39 #include "mod_amqp.h"
40
mod_amqp_connection_close(mod_amqp_connection_t * connection)41 void mod_amqp_connection_close(mod_amqp_connection_t *connection)
42 {
43 amqp_connection_state_t old_state = connection->state;
44 int status = 0;
45
46 connection->state = NULL;
47
48 if (old_state != NULL) {
49 mod_amqp_log_if_amqp_error(amqp_channel_close(old_state, 1, AMQP_REPLY_SUCCESS), "Closing channel");
50 mod_amqp_log_if_amqp_error(amqp_connection_close(old_state, AMQP_REPLY_SUCCESS), "Closing connection");
51
52 if ((status = amqp_destroy_connection(old_state))) {
53 const char *errstr = amqp_error_string2(-status);
54 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error destroying amqp connection: %s\n", errstr);
55 }
56 }
57 }
58
mod_amqp_connection_open(mod_amqp_connection_t * connections,mod_amqp_connection_t ** active,char * profile_name,char * custom_attr)59 switch_status_t mod_amqp_connection_open(mod_amqp_connection_t *connections, mod_amqp_connection_t **active, char *profile_name, char *custom_attr)
60 {
61 int channel_max = 0;
62 int frame_max = 131072;
63 amqp_table_t loginProperties;
64 amqp_table_entry_t loginTableEntries[5];
65 char hostname[64];
66 int bHasHostname;
67 char key_string[256] = {0};
68 amqp_rpc_reply_t status;
69 amqp_socket_t *socket = NULL;
70 int amqp_status = -1;
71 mod_amqp_connection_t *connection_attempt = NULL;
72 amqp_connection_state_t newConnection = amqp_new_connection();
73 amqp_connection_state_t oldConnection = NULL;
74
75 if (active && *active) {
76 oldConnection = (*active)->state;
77 }
78
79 /* Set up meta data for connection */
80 bHasHostname = gethostname(hostname, sizeof(hostname)) == 0;
81
82 loginProperties.num_entries = sizeof(loginTableEntries)/sizeof(*loginTableEntries);
83 loginProperties.entries = loginTableEntries;
84
85 snprintf(key_string, 256, "x_%s_HostMachineName", custom_attr);
86 loginTableEntries[0].key = amqp_cstring_bytes(key_string);
87 loginTableEntries[0].value.kind = AMQP_FIELD_KIND_BYTES;
88 loginTableEntries[0].value.value.bytes = amqp_cstring_bytes(bHasHostname ? hostname : "(unknown)");
89
90 snprintf(key_string, 256, "x_%s_ProcessDescription", custom_attr);
91 loginTableEntries[1].key = amqp_cstring_bytes(key_string);
92 loginTableEntries[1].value.kind = AMQP_FIELD_KIND_BYTES;
93 loginTableEntries[1].value.value.bytes = amqp_cstring_bytes("FreeSwitch");
94
95 snprintf(key_string, 256, "x_%s_ProcessType", custom_attr);
96 loginTableEntries[2].key = amqp_cstring_bytes(key_string);
97 loginTableEntries[2].value.kind = AMQP_FIELD_KIND_BYTES;
98 loginTableEntries[2].value.value.bytes = amqp_cstring_bytes("TAP");
99
100 snprintf(key_string, 256, "x_%s_ProcessBuildVersion", custom_attr);
101 loginTableEntries[3].key = amqp_cstring_bytes(key_string);
102 loginTableEntries[3].value.kind = AMQP_FIELD_KIND_BYTES;
103 loginTableEntries[3].value.value.bytes = amqp_cstring_bytes(switch_version_full());
104
105 snprintf(key_string, 256, "x_%s_Liquid_ProcessBuildBornOn", custom_attr);
106 loginTableEntries[4].key = amqp_cstring_bytes(key_string);
107 loginTableEntries[4].value.kind = AMQP_FIELD_KIND_BYTES;
108 loginTableEntries[4].value.value.bytes = amqp_cstring_bytes(__DATE__ " " __TIME__);
109
110 if (!(socket = amqp_tcp_socket_new(newConnection))) {
111 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not create TCP socket\n");
112 goto err;
113 }
114
115 connection_attempt = connections;
116 amqp_status = -1;
117
118 while (connection_attempt && amqp_status){
119 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] trying to connect to AMQP broker %s:%d\n",
120 profile_name, connection_attempt->hostname, connection_attempt->port);
121
122 if ((amqp_status = amqp_socket_open(socket, connection_attempt->hostname, connection_attempt->port))){
123 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Could not open socket connection to AMQP broker %s:%d status(%d) %s\n",
124 connection_attempt->hostname, connection_attempt->port, amqp_status, amqp_error_string2(amqp_status));
125 connection_attempt = connection_attempt->next;
126 }
127 }
128
129 if (active) {
130 *active = connection_attempt;
131 }
132
133 if (!connection_attempt) {
134 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] could not connect to any AMQP brokers\n", profile_name);
135 goto err;
136 }
137
138 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Profile[%s] opened socket connection to AMQP broker %s:%d\n",
139 profile_name, connection_attempt->hostname, connection_attempt->port);
140
141 /* We have a connection, now log in */
142 status = amqp_login_with_properties(newConnection,
143 connection_attempt->virtualhost,
144 channel_max,
145 frame_max,
146 connection_attempt->heartbeat,
147 &loginProperties,
148 AMQP_SASL_METHOD_PLAIN,
149 connection_attempt->username,
150 connection_attempt->password);
151
152 if (mod_amqp_log_if_amqp_error(status, "Logging in")) {
153 if (active) {
154 mod_amqp_connection_close(*active);
155 *active = NULL;
156 }
157 goto err;
158 }
159
160 // Open a channel (1). This is fairly standard
161 amqp_channel_open(newConnection, 1);
162 if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(newConnection), "Opening channel")) {
163 if (active) {
164 mod_amqp_connection_close(*active);
165 *active = NULL;
166 }
167 goto err;
168 }
169
170 if (active) {
171 (*active)->state = newConnection;
172 }
173
174 if (oldConnection) {
175 amqp_destroy_connection(oldConnection);
176 }
177
178 return SWITCH_STATUS_SUCCESS;
179
180 err:
181 if (newConnection) {
182 amqp_destroy_connection(newConnection);
183 }
184 return SWITCH_STATUS_GENERR;
185 }
186
mod_amqp_connection_create(mod_amqp_connection_t ** conn,switch_xml_t cfg,switch_memory_pool_t * pool)187 switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool)
188 {
189 mod_amqp_connection_t *new_con = switch_core_alloc(pool, sizeof(mod_amqp_connection_t));
190 switch_xml_t param;
191 char *name = (char *) switch_xml_attr_soft(cfg, "name");
192 char *hostname = NULL, *virtualhost = NULL, *username = NULL, *password = NULL;
193 unsigned int port = 0, heartbeat = 0;
194
195 if (zstr(name)) {
196 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Connection missing name attribute\n%s\n", switch_xml_toxml(cfg, 1));
197 return SWITCH_STATUS_GENERR;
198 }
199
200 new_con->name = switch_core_strdup(pool, name);
201 new_con->state = NULL;
202 new_con->next = NULL;
203
204 for (param = switch_xml_child(cfg, "param"); param; param = param->next) {
205 char *var = (char *) switch_xml_attr_soft(param, "name");
206 char *val = (char *) switch_xml_attr_soft(param, "value");
207
208 if (!var) {
209 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param missing 'name' attribute\n", name);
210 continue;
211 }
212
213 if (!val) {
214 switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP connection[%s] param[%s] missing 'value' attribute\n", name, var);
215 continue;
216 }
217
218 if (!strncmp(var, "hostname", 8)) {
219 hostname = switch_core_strdup(pool, val);
220 } else if (!strncmp(var, "virtualhost", 11)) {
221 virtualhost = switch_core_strdup(pool, val);
222 } else if (!strncmp(var, "username", 8)) {
223 username = switch_core_strdup(pool, val);
224 } else if (!strncmp(var, "password", 8)) {
225 password = switch_core_strdup(pool, val);
226 } else if (!strncmp(var, "port", 4)) {
227 int interval = atoi(val);
228 if (interval && interval > 0) {
229 port = interval;
230 }
231 } else if (!strncmp(var, "heartbeat", 9)) {
232 int interval = atoi(val);
233 if (interval && interval > 0) {
234 heartbeat = interval;
235 }
236 }
237 }
238
239 new_con->hostname = hostname ? hostname : "localhost";
240 new_con->virtualhost = virtualhost ? virtualhost : "/";
241 new_con->username = username ? username : "guest";
242 new_con->password = password ? password : "guest";
243 new_con->port = port ? port : 5672;
244 new_con->heartbeat = heartbeat ? heartbeat : 0;
245
246 *conn = new_con;
247 return SWITCH_STATUS_SUCCESS;
248 }
249
mod_amqp_connection_destroy(mod_amqp_connection_t ** conn)250 void mod_amqp_connection_destroy(mod_amqp_connection_t **conn)
251 {
252 if (conn && *conn) {
253 mod_amqp_connection_close(*conn);
254 *conn = NULL;
255 }
256 }
257
258 /* For Emacs:
259 * Local Variables:
260 * mode:c
261 * indent-tabs-mode:t
262 * tab-width:4
263 * c-basic-offset:4
264 * End:
265 * For VIM:
266 * vim:set softtabstop=4 shiftwidth=4 tabstop=4
267 */
268