1 /*
2  * Copyright 2008-2014 Arsen Chaloyan
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * $Id: mrcp_server_connection.c 2249 2014-11-19 05:26:24Z achaloyan@gmail.com $
17  */
18 
19 #include "mrcp_connection.h"
20 #include "mrcp_server_connection.h"
21 #include "mrcp_control_descriptor.h"
22 #include "mrcp_resource_factory.h"
23 #include "mrcp_message.h"
24 #include "apt_text_stream.h"
25 #include "apt_poller_task.h"
26 #include "apt_pool.h"
27 #include "apt_log.h"
28 
29 
30 struct mrcp_connection_agent_t {
31 	apr_pool_t                           *pool;
32 	apt_poller_task_t                    *task;
33 	const mrcp_resource_factory_t        *resource_factory;
34 
35 	/** List (ring) of MRCP connections */
36 	APR_RING_HEAD(mrcp_connection_head_t, mrcp_connection_t) connection_list;
37 	/** Table of pending control channels */
38 	apr_hash_t                           *pending_channel_table;
39 
40 	apt_bool_t                            force_new_connection;
41 	apr_size_t                            tx_buffer_size;
42 	apr_size_t                            rx_buffer_size;
43 
44 	/* Listening socket */
45 	apr_sockaddr_t                       *sockaddr;
46 	apr_socket_t                         *listen_sock;
47 	apr_pollfd_t                          listen_sock_pfd;
48 
49 	void                                 *obj;
50 	const mrcp_connection_event_vtable_t *vtable;
51 };
52 
53 typedef enum {
54 	CONNECTION_TASK_MSG_ADD_CHANNEL,
55 	CONNECTION_TASK_MSG_MODIFY_CHANNEL,
56 	CONNECTION_TASK_MSG_REMOVE_CHANNEL,
57 	CONNECTION_TASK_MSG_SEND_MESSAGE
58 } connection_task_msg_type_e;
59 
60 typedef struct connection_task_msg_t connection_task_msg_t;
61 struct connection_task_msg_t {
62 	connection_task_msg_type_e type;
63 	mrcp_connection_agent_t   *agent;
64 	mrcp_control_channel_t    *channel;
65 	mrcp_control_descriptor_t *descriptor;
66 	mrcp_message_t            *message;
67 };
68 
69 static apt_bool_t mrcp_server_agent_on_destroy(apt_task_t *task);
70 static apt_bool_t mrcp_server_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg);
71 static apt_bool_t mrcp_server_poller_signal_process(void *obj, const apr_pollfd_t *descriptor);
72 
73 static apt_bool_t mrcp_server_agent_listening_socket_create(mrcp_connection_agent_t *agent);
74 static void mrcp_server_agent_listening_socket_destroy(mrcp_connection_agent_t *agent);
75 
76 
77 /** Create connection agent */
mrcp_server_connection_agent_create(const char * id,const char * listen_ip,apr_port_t listen_port,apr_size_t max_connection_count,apt_bool_t force_new_connection,apr_pool_t * pool)78 MRCP_DECLARE(mrcp_connection_agent_t*) mrcp_server_connection_agent_create(
79 										const char *id,
80 										const char *listen_ip,
81 										apr_port_t listen_port,
82 										apr_size_t max_connection_count,
83 										apt_bool_t force_new_connection,
84 										apr_pool_t *pool)
85 {
86 	apt_task_t *task;
87 	apt_task_vtable_t *vtable;
88 	apt_task_msg_pool_t *msg_pool;
89 	mrcp_connection_agent_t *agent;
90 
91 	if(!listen_ip) {
92 		return NULL;
93 	}
94 
95 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create MRCPv2 Agent [%s] %s:%hu [%"APR_SIZE_T_FMT"]",
96 		id,listen_ip,listen_port,max_connection_count);
97 	agent = apr_palloc(pool,sizeof(mrcp_connection_agent_t));
98 	agent->pool = pool;
99 	agent->sockaddr = NULL;
100 	agent->listen_sock = NULL;
101 	agent->force_new_connection = force_new_connection;
102 	agent->rx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
103 	agent->tx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
104 
105 	apr_sockaddr_info_get(&agent->sockaddr,listen_ip,APR_INET,listen_port,0,pool);
106 	if(!agent->sockaddr) {
107 		return NULL;
108 	}
109 
110 	msg_pool = apt_task_msg_pool_create_dynamic(sizeof(connection_task_msg_t),pool);
111 
112 	agent->task = apt_poller_task_create(
113 					max_connection_count + 1,
114 					mrcp_server_poller_signal_process,
115 					agent,
116 					msg_pool,
117 					pool);
118 	if(!agent->task) {
119 		return NULL;
120 	}
121 
122 	task = apt_poller_task_base_get(agent->task);
123 	if(task) {
124 		apt_task_name_set(task,id);
125 	}
126 
127 	vtable = apt_poller_task_vtable_get(agent->task);
128 	if(vtable) {
129 		vtable->destroy = mrcp_server_agent_on_destroy;
130 		vtable->process_msg = mrcp_server_agent_msg_process;
131 	}
132 
133 	APR_RING_INIT(&agent->connection_list, mrcp_connection_t, link);
134 	agent->pending_channel_table = apr_hash_make(pool);
135 
136 	if(mrcp_server_agent_listening_socket_create(agent) != TRUE) {
137 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Create Listening Socket [%s] %s:%hu",
138 				id,
139 				listen_ip,
140 				listen_port);
141 	}
142 	return agent;
143 }
144 
mrcp_server_agent_on_destroy(apt_task_t * task)145 static apt_bool_t mrcp_server_agent_on_destroy(apt_task_t *task)
146 {
147 	apt_poller_task_t *poller_task = apt_task_object_get(task);
148 	mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
149 
150 	mrcp_server_agent_listening_socket_destroy(agent);
151 	apt_poller_task_cleanup(poller_task);
152 	return TRUE;
153 }
154 
155 /** Destroy connection agent. */
mrcp_server_connection_agent_destroy(mrcp_connection_agent_t * agent)156 MRCP_DECLARE(apt_bool_t) mrcp_server_connection_agent_destroy(mrcp_connection_agent_t *agent)
157 {
158 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy MRCPv2 Agent [%s]",
159 		mrcp_server_connection_agent_id_get(agent));
160 	return apt_poller_task_destroy(agent->task);
161 }
162 
163 /** Start connection agent. */
mrcp_server_connection_agent_start(mrcp_connection_agent_t * agent)164 MRCP_DECLARE(apt_bool_t) mrcp_server_connection_agent_start(mrcp_connection_agent_t *agent)
165 {
166 	return apt_poller_task_start(agent->task);
167 }
168 
169 /** Terminate connection agent. */
mrcp_server_connection_agent_terminate(mrcp_connection_agent_t * agent)170 MRCP_DECLARE(apt_bool_t) mrcp_server_connection_agent_terminate(mrcp_connection_agent_t *agent)
171 {
172 	return apt_poller_task_terminate(agent->task);
173 }
174 
175 /** Set connection event handler. */
mrcp_server_connection_agent_handler_set(mrcp_connection_agent_t * agent,void * obj,const mrcp_connection_event_vtable_t * vtable)176 MRCP_DECLARE(void) mrcp_server_connection_agent_handler_set(
177 									mrcp_connection_agent_t *agent,
178 									void *obj,
179 									const mrcp_connection_event_vtable_t *vtable)
180 {
181 	agent->obj = obj;
182 	agent->vtable = vtable;
183 }
184 
185 /** Set MRCP resource factory */
mrcp_server_connection_resource_factory_set(mrcp_connection_agent_t * agent,const mrcp_resource_factory_t * resource_factroy)186 MRCP_DECLARE(void) mrcp_server_connection_resource_factory_set(
187 								mrcp_connection_agent_t *agent,
188 								const mrcp_resource_factory_t *resource_factroy)
189 {
190 	agent->resource_factory = resource_factroy;
191 }
192 
193 /** Set rx buffer size */
mrcp_server_connection_rx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)194 MRCP_DECLARE(void) mrcp_server_connection_rx_size_set(
195 								mrcp_connection_agent_t *agent,
196 								apr_size_t size)
197 {
198 	if(size < MRCP_STREAM_BUFFER_SIZE) {
199 		size = MRCP_STREAM_BUFFER_SIZE;
200 	}
201 	agent->rx_buffer_size = size;
202 }
203 
204 /** Set tx buffer size */
mrcp_server_connection_tx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)205 MRCP_DECLARE(void) mrcp_server_connection_tx_size_set(
206 								mrcp_connection_agent_t *agent,
207 								apr_size_t size)
208 {
209 	if(size < MRCP_STREAM_BUFFER_SIZE) {
210 		size = MRCP_STREAM_BUFFER_SIZE;
211 	}
212 	agent->tx_buffer_size = size;
213 }
214 
215 /** Get task */
mrcp_server_connection_agent_task_get(const mrcp_connection_agent_t * agent)216 MRCP_DECLARE(apt_task_t*) mrcp_server_connection_agent_task_get(const mrcp_connection_agent_t *agent)
217 {
218 	return apt_poller_task_base_get(agent->task);
219 }
220 
221 /** Get external object */
mrcp_server_connection_agent_object_get(const mrcp_connection_agent_t * agent)222 MRCP_DECLARE(void*) mrcp_server_connection_agent_object_get(const mrcp_connection_agent_t *agent)
223 {
224 	return agent->obj;
225 }
226 
227 /** Get string identifier */
mrcp_server_connection_agent_id_get(const mrcp_connection_agent_t * agent)228 MRCP_DECLARE(const char*) mrcp_server_connection_agent_id_get(const mrcp_connection_agent_t *agent)
229 {
230 	apt_task_t *task = apt_poller_task_base_get(agent->task);
231 	return apt_task_name_get(task);
232 }
233 
234 
235 /** Create MRCPv2 control channel */
mrcp_server_control_channel_create(mrcp_connection_agent_t * agent,void * obj,apr_pool_t * pool)236 MRCP_DECLARE(mrcp_control_channel_t*) mrcp_server_control_channel_create(mrcp_connection_agent_t *agent, void *obj, apr_pool_t *pool)
237 {
238 	mrcp_control_channel_t *channel = apr_palloc(pool,sizeof(mrcp_control_channel_t));
239 	channel->agent = agent;
240 	channel->connection = NULL;
241 	channel->active_request = NULL;
242 	channel->request_timer = NULL;
243 	channel->removed = FALSE;
244 	channel->obj = obj;
245 	channel->log_obj = NULL;
246 	channel->pool = pool;
247 	return channel;
248 }
249 
250 /** Destroy MRCPv2 control channel */
mrcp_server_control_channel_destroy(mrcp_control_channel_t * channel)251 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_destroy(mrcp_control_channel_t *channel)
252 {
253 	if(channel && channel->connection && channel->removed == TRUE) {
254 		mrcp_connection_t *connection = channel->connection;
255 		channel->connection = NULL;
256 		apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy TCP/MRCPv2 Connection %s",connection->id);
257 		mrcp_connection_destroy(connection);
258 	}
259 	return TRUE;
260 }
261 
262 /** Signal task message */
mrcp_server_control_message_signal(connection_task_msg_type_e type,mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor,mrcp_message_t * message)263 static apt_bool_t mrcp_server_control_message_signal(
264 								connection_task_msg_type_e type,
265 								mrcp_connection_agent_t *agent,
266 								mrcp_control_channel_t *channel,
267 								mrcp_control_descriptor_t *descriptor,
268 								mrcp_message_t *message)
269 {
270 	apt_task_t *task = apt_poller_task_base_get(agent->task);
271 	apt_task_msg_t *task_msg = apt_task_msg_get(task);
272 	if(task_msg) {
273 		connection_task_msg_t *msg = (connection_task_msg_t*)task_msg->data;
274 		msg->type = type;
275 		msg->agent = agent;
276 		msg->channel = channel;
277 		msg->descriptor = descriptor;
278 		msg->message = message;
279 		apt_task_msg_signal(task,task_msg);
280 	}
281 	return TRUE;
282 }
283 
284 /** Add MRCPv2 control channel */
mrcp_server_control_channel_add(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)285 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_add(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
286 {
287 	return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_ADD_CHANNEL,channel->agent,channel,descriptor,NULL);
288 }
289 
290 /** Modify MRCPv2 control channel */
mrcp_server_control_channel_modify(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)291 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_modify(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
292 {
293 	return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_MODIFY_CHANNEL,channel->agent,channel,descriptor,NULL);
294 }
295 
296 /** Remove MRCPv2 control channel */
mrcp_server_control_channel_remove(mrcp_control_channel_t * channel)297 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_remove(mrcp_control_channel_t *channel)
298 {
299 	return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_REMOVE_CHANNEL,channel->agent,channel,NULL,NULL);
300 }
301 
302 /** Send MRCPv2 message */
mrcp_server_control_message_send(mrcp_control_channel_t * channel,mrcp_message_t * message)303 MRCP_DECLARE(apt_bool_t) mrcp_server_control_message_send(mrcp_control_channel_t *channel, mrcp_message_t *message)
304 {
305 	return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_SEND_MESSAGE,channel->agent,channel,NULL,message);
306 }
307 
308 /** Create listening socket and add it to pollset */
mrcp_server_agent_listening_socket_create(mrcp_connection_agent_t * agent)309 static apt_bool_t mrcp_server_agent_listening_socket_create(mrcp_connection_agent_t *agent)
310 {
311 	apr_status_t status;
312 	if(!agent->sockaddr) {
313 		return FALSE;
314 	}
315 
316 	/* create listening socket */
317 	status = apr_socket_create(&agent->listen_sock, agent->sockaddr->family, SOCK_STREAM, APR_PROTO_TCP, agent->pool);
318 	if(status != APR_SUCCESS) {
319 		return FALSE;
320 	}
321 
322 	apr_socket_opt_set(agent->listen_sock, APR_SO_NONBLOCK, 0);
323 	apr_socket_timeout_set(agent->listen_sock, -1);
324 	apr_socket_opt_set(agent->listen_sock, APR_SO_REUSEADDR, 1);
325 
326 	status = apr_socket_bind(agent->listen_sock, agent->sockaddr);
327 	if(status != APR_SUCCESS) {
328 		apr_socket_close(agent->listen_sock);
329 		agent->listen_sock = NULL;
330 		return FALSE;
331 	}
332 	status = apr_socket_listen(agent->listen_sock, SOMAXCONN);
333 	if(status != APR_SUCCESS) {
334 		apr_socket_close(agent->listen_sock);
335 		agent->listen_sock = NULL;
336 		return FALSE;
337 	}
338 
339 	/* add listening socket to pollset */
340 	memset(&agent->listen_sock_pfd,0,sizeof(apr_pollfd_t));
341 	agent->listen_sock_pfd.desc_type = APR_POLL_SOCKET;
342 	agent->listen_sock_pfd.reqevents = APR_POLLIN;
343 	agent->listen_sock_pfd.desc.s = agent->listen_sock;
344 	agent->listen_sock_pfd.client_data = agent->listen_sock;
345 	if(apt_poller_task_descriptor_add(agent->task, &agent->listen_sock_pfd) != TRUE) {
346 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add Listening Socket to Pollset [%s]",
347 			apt_task_name_get(apt_poller_task_base_get(agent->task)));
348 		apr_socket_close(agent->listen_sock);
349 		agent->listen_sock = NULL;
350 		return FALSE;
351 	}
352 
353 	return TRUE;
354 }
355 
356 /** Remove from pollset and destroy listening socket */
mrcp_server_agent_listening_socket_destroy(mrcp_connection_agent_t * agent)357 static void mrcp_server_agent_listening_socket_destroy(mrcp_connection_agent_t *agent)
358 {
359 	if(agent->listen_sock) {
360 		apt_poller_task_descriptor_remove(agent->task,&agent->listen_sock_pfd);
361 		apr_socket_close(agent->listen_sock);
362 		agent->listen_sock = NULL;
363 	}
364 }
365 
mrcp_connection_channel_associate(mrcp_connection_agent_t * agent,mrcp_connection_t * connection,const mrcp_message_t * message)366 static mrcp_control_channel_t* mrcp_connection_channel_associate(mrcp_connection_agent_t *agent, mrcp_connection_t *connection, const mrcp_message_t *message)
367 {
368 	apt_str_t identifier;
369 	mrcp_control_channel_t *channel;
370 	if(!connection || !message) {
371 		return NULL;
372 	}
373 	apt_id_resource_generate(&message->channel_id.session_id,&message->channel_id.resource_name,'@',&identifier,connection->pool);
374 	channel = mrcp_connection_channel_find(connection,&identifier);
375 	if(!channel) {
376 		channel = apr_hash_get(agent->pending_channel_table,identifier.buf,identifier.length);
377 		if(channel) {
378 			apr_hash_set(agent->pending_channel_table,identifier.buf,identifier.length,NULL);
379 			mrcp_connection_channel_add(connection,channel);
380 			apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Assign Control Channel <%s> to Connection %s [%d] -> [%d]",
381 				channel->identifier.buf,
382 				connection->id,
383 				apr_hash_count(agent->pending_channel_table),
384 				apr_hash_count(connection->channel_table));
385 		}
386 	}
387 	return channel;
388 }
389 
mrcp_connection_find(mrcp_connection_agent_t * agent,const apt_str_t * remote_ip)390 static mrcp_connection_t* mrcp_connection_find(mrcp_connection_agent_t *agent, const apt_str_t *remote_ip)
391 {
392 	mrcp_connection_t *connection;
393 	if(!agent || !remote_ip) {
394 		return NULL;
395 	}
396 
397 	for(connection = APR_RING_FIRST(&agent->connection_list);
398 			connection != APR_RING_SENTINEL(&agent->connection_list, mrcp_connection_t, link);
399 				connection = APR_RING_NEXT(connection, link)) {
400 		if(apt_string_compare(&connection->remote_ip,remote_ip) == TRUE) {
401 			return connection;
402 		}
403 	}
404 
405 	return NULL;
406 }
407 
mrcp_connection_remove(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)408 static apt_bool_t mrcp_connection_remove(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
409 {
410 	APR_RING_REMOVE(connection,link);
411 	return TRUE;
412 }
413 
mrcp_server_agent_connection_accept(mrcp_connection_agent_t * agent)414 static apt_bool_t mrcp_server_agent_connection_accept(mrcp_connection_agent_t *agent)
415 {
416 	char *local_ip = NULL;
417 	char *remote_ip = NULL;
418 
419 	mrcp_connection_t *connection = mrcp_connection_create();
420 
421 	if(apr_socket_accept(&connection->sock,agent->listen_sock,connection->pool) != APR_SUCCESS) {
422 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Accept Connection");
423 		mrcp_connection_destroy(connection);
424 		return FALSE;
425 	}
426 
427 	if(apr_socket_addr_get(&connection->r_sockaddr,APR_REMOTE,connection->sock) != APR_SUCCESS ||
428 		apr_socket_addr_get(&connection->l_sockaddr,APR_LOCAL,connection->sock) != APR_SUCCESS) {
429 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Get Socket Address");
430 		apr_socket_close(connection->sock);
431 		mrcp_connection_destroy(connection);
432 		return FALSE;
433 	}
434 
435 	apr_sockaddr_ip_get(&local_ip,connection->l_sockaddr);
436 	apr_sockaddr_ip_get(&remote_ip,connection->r_sockaddr);
437 	apt_string_set(&connection->remote_ip,remote_ip);
438 	connection->id = apr_psprintf(connection->pool,"%s:%hu <-> %s:%hu",
439 		local_ip,connection->l_sockaddr->port,
440 		remote_ip,connection->r_sockaddr->port);
441 
442 	if(apr_hash_count(agent->pending_channel_table) == 0) {
443 		apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Reject Unexpected TCP/MRCPv2 Connection %s",connection->id);
444 		apr_socket_close(connection->sock);
445 		mrcp_connection_destroy(connection);
446 		return FALSE;
447 	}
448 
449 	memset(&connection->sock_pfd,0,sizeof(apr_pollfd_t));
450 	connection->sock_pfd.desc_type = APR_POLL_SOCKET;
451 	connection->sock_pfd.reqevents = APR_POLLIN;
452 	connection->sock_pfd.desc.s = connection->sock;
453 	connection->sock_pfd.client_data = connection;
454 	if(apt_poller_task_descriptor_add(agent->task, &connection->sock_pfd) != TRUE) {
455 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add to Pollset %s",connection->id);
456 		apr_socket_close(connection->sock);
457 		mrcp_connection_destroy(connection);
458 		return FALSE;
459 	}
460 
461 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Accepted TCP/MRCPv2 Connection %s",connection->id);
462 	connection->agent = agent;
463 	APR_RING_INSERT_TAIL(&agent->connection_list,connection,mrcp_connection_t,link);
464 
465 	connection->parser = mrcp_parser_create(agent->resource_factory,connection->pool);
466 	connection->generator = mrcp_generator_create(agent->resource_factory,connection->pool);
467 
468 	connection->tx_buffer_size = agent->tx_buffer_size;
469 	connection->tx_buffer = apr_palloc(connection->pool,connection->tx_buffer_size+1);
470 
471 	connection->rx_buffer_size = agent->rx_buffer_size;
472 	connection->rx_buffer = apr_palloc(connection->pool,connection->rx_buffer_size+1);
473 	apt_text_stream_init(&connection->rx_stream,connection->rx_buffer,connection->rx_buffer_size);
474 
475 	if(apt_log_masking_get() != APT_LOG_MASKING_NONE) {
476 		connection->verbose = FALSE;
477 		mrcp_parser_verbose_set(connection->parser,TRUE);
478 		mrcp_generator_verbose_set(connection->generator,TRUE);
479 	}
480 	return TRUE;
481 }
482 
mrcp_server_agent_connection_close(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)483 static apt_bool_t mrcp_server_agent_connection_close(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
484 {
485 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"TCP/MRCPv2 Peer Disconnected %s",connection->id);
486 	apt_poller_task_descriptor_remove(agent->task,&connection->sock_pfd);
487 	apr_socket_close(connection->sock);
488 	connection->sock = NULL;
489 	if(!connection->access_count) {
490 		mrcp_connection_remove(agent,connection);
491 		apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy TCP/MRCPv2 Connection %s",connection->id);
492 		mrcp_connection_destroy(connection);
493 	}
494 	return TRUE;
495 }
496 
mrcp_server_agent_channel_add(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * offer)497 static apt_bool_t mrcp_server_agent_channel_add(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *offer)
498 {
499 	mrcp_control_descriptor_t *answer = mrcp_control_answer_create(offer,channel->pool);
500 	apt_id_resource_generate(&offer->session_id,&offer->resource_name,'@',&channel->identifier,channel->pool);
501 	if(offer->port) {
502 		answer->port = agent->sockaddr->port;
503 	}
504 	if(offer->connection_type == MRCP_CONNECTION_TYPE_EXISTING) {
505 		if(agent->force_new_connection == TRUE) {
506 			/* force client to establish new connection */
507 			answer->connection_type = MRCP_CONNECTION_TYPE_NEW;
508 		}
509 		else {
510 			mrcp_connection_t *connection = NULL;
511 			/* try to find any existing connection */
512 			connection = mrcp_connection_find(agent,&offer->ip);
513 			if(!connection) {
514 				/* no existing conection found, force the new one */
515 				answer->connection_type = MRCP_CONNECTION_TYPE_NEW;
516 			}
517 		}
518 	}
519 
520 	apr_hash_set(agent->pending_channel_table,channel->identifier.buf,channel->identifier.length,channel);
521 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Add Pending Control Channel <%s> [%d]",
522 			channel->identifier.buf,
523 			apr_hash_count(agent->pending_channel_table));
524 	/* send response */
525 	return mrcp_control_channel_add_respond(agent->vtable,channel,answer,TRUE);
526 }
527 
mrcp_server_agent_channel_modify(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * offer)528 static apt_bool_t mrcp_server_agent_channel_modify(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *offer)
529 {
530 	mrcp_control_descriptor_t *answer = mrcp_control_answer_create(offer,channel->pool);
531 	if(offer->port) {
532 		answer->port = agent->sockaddr->port;
533 	}
534 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Modify Control Channel <%s>",channel->identifier.buf);
535 	/* send response */
536 	return mrcp_control_channel_modify_respond(agent->vtable,channel,answer,TRUE);
537 }
538 
mrcp_server_agent_channel_remove(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel)539 static apt_bool_t mrcp_server_agent_channel_remove(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel)
540 {
541 	mrcp_connection_t *connection = channel->connection;
542 	if(connection) {
543 		mrcp_connection_channel_remove(connection,channel);
544 		apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove Control Channel <%s> [%d]",
545 				channel->identifier.buf,
546 				apr_hash_count(connection->channel_table));
547 		if(!connection->access_count) {
548 			if(!connection->sock) {
549 				mrcp_connection_remove(agent,connection);
550 				/* set connection to be destroyed on channel destroy */
551 				apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Mark Connection for Removal %s",connection->id);
552 				channel->connection = connection;
553 				channel->removed = TRUE;
554 			}
555 		}
556 	}
557 	else {
558 		apr_hash_set(agent->pending_channel_table,channel->identifier.buf,channel->identifier.length,NULL);
559 		apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove Pending Control Channel <%s> [%d]",
560 				channel->identifier.buf,
561 				apr_hash_count(agent->pending_channel_table));
562 	}
563 	/* send response */
564 	return mrcp_control_channel_remove_respond(agent->vtable,channel,TRUE);
565 }
566 
mrcp_server_agent_messsage_send(mrcp_connection_agent_t * agent,mrcp_connection_t * connection,mrcp_message_t * message)567 static apt_bool_t mrcp_server_agent_messsage_send(mrcp_connection_agent_t *agent, mrcp_connection_t *connection, mrcp_message_t *message)
568 {
569 	apt_bool_t status = FALSE;
570 	apt_text_stream_t stream;
571 	apt_message_status_e result;
572 	if(!connection || !connection->sock) {
573 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Null MRCPv2 Connection "APT_SIDRES_FMT,MRCP_MESSAGE_SIDRES(message));
574 		return FALSE;
575 	}
576 
577 	do {
578 		apt_text_stream_init(&stream,connection->tx_buffer,connection->tx_buffer_size);
579 		result = mrcp_generator_run(connection->generator,message,&stream);
580 		if(result != APT_MESSAGE_STATUS_INVALID) {
581 			stream.text.length = stream.pos - stream.text.buf;
582 			*stream.pos = '\0';
583 
584 			apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Send MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
585 					connection->id,
586 					stream.text.length,
587 					connection->verbose == TRUE ? stream.text.length : 0,
588 					stream.text.buf);
589 
590 			if(apr_socket_send(connection->sock,stream.text.buf,&stream.text.length) == APR_SUCCESS) {
591 				status = TRUE;
592 			}
593 			else {
594 				apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send MRCPv2 Data");
595 			}
596 		}
597 		else {
598 			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Generate MRCPv2 Data");
599 		}
600 	}
601 	while(result == APT_MESSAGE_STATUS_INCOMPLETE);
602 
603 	return status;
604 }
605 
mrcp_server_message_handler(mrcp_connection_t * connection,mrcp_message_t * message,apt_message_status_e status)606 static apt_bool_t mrcp_server_message_handler(mrcp_connection_t *connection, mrcp_message_t *message, apt_message_status_e status)
607 {
608 	mrcp_connection_agent_t *agent = connection->agent;
609 	if(status == APT_MESSAGE_STATUS_COMPLETE) {
610 		/* message is completely parsed */
611 		mrcp_control_channel_t *channel = mrcp_connection_channel_associate(agent,connection,message);
612 		if(channel) {
613 			mrcp_connection_message_receive(agent->vtable,channel,message);
614 		}
615 		else {
616 			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Find Channel "APT_SIDRES_FMT" in Connection %s",
617 				MRCP_MESSAGE_SIDRES(message),
618 				connection->id);
619 		}
620 	}
621 	else if(status == APT_MESSAGE_STATUS_INVALID) {
622 		/* error case */
623 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Parse MRCPv2 Data");
624 		if(message && message->resource) {
625 			mrcp_message_t *response;
626 			response = mrcp_response_create(message,message->pool);
627 			response->start_line.status_code = MRCP_STATUS_CODE_UNRECOGNIZED_MESSAGE;
628 			if(mrcp_server_agent_messsage_send(agent,connection,response) == FALSE) {
629 				apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send MRCPv2 Response");
630 			}
631 		}
632 	}
633 	return TRUE;
634 }
635 
636 /* Receive MRCP message through TCP/MRCPv2 connection */
mrcp_server_poller_signal_process(void * obj,const apr_pollfd_t * descriptor)637 static apt_bool_t mrcp_server_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
638 {
639 	mrcp_connection_agent_t *agent = obj;
640 	mrcp_connection_t *connection = descriptor->client_data;
641 	apr_status_t status;
642 	apr_size_t offset;
643 	apr_size_t length;
644 	apt_text_stream_t *stream;
645 	mrcp_message_t *message;
646 	apt_message_status_e msg_status;
647 
648 	if(descriptor->desc.s == agent->listen_sock) {
649 		return mrcp_server_agent_connection_accept(agent);
650 	}
651 
652 	if(!connection || !connection->sock) {
653 		return FALSE;
654 	}
655 	stream = &connection->rx_stream;
656 
657 	/* calculate offset remaining from the previous receive / if any */
658 	offset = stream->pos - stream->text.buf;
659 	/* calculate available length */
660 	length = connection->rx_buffer_size - offset;
661 
662 	status = apr_socket_recv(connection->sock,stream->pos,&length);
663 	if(status == APR_EOF || length == 0) {
664 		return mrcp_server_agent_connection_close(agent,connection);
665 	}
666 
667 	/* calculate actual length of the stream */
668 	stream->text.length = offset + length;
669 	stream->pos[length] = '\0';
670 
671 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Receive MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
672 			connection->id,
673 			length,
674 			connection->verbose == TRUE ? length : 0,
675 			stream->pos);
676 
677 	/* reset pos */
678 	apt_text_stream_reset(stream);
679 
680 	do {
681 		msg_status = mrcp_parser_run(connection->parser,stream,&message);
682 		if(mrcp_server_message_handler(connection,message,msg_status) == FALSE) {
683 			return FALSE;
684 		}
685 	}
686 	while(apt_text_is_eos(stream) == FALSE);
687 
688 	/* scroll remaining stream */
689 	apt_text_stream_scroll(stream);
690 	return TRUE;
691 }
692 
693 /* Process task message */
mrcp_server_agent_msg_process(apt_task_t * task,apt_task_msg_t * task_msg)694 static apt_bool_t mrcp_server_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
695 {
696 	apt_poller_task_t *poller_task = apt_task_object_get(task);
697 	mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
698 	connection_task_msg_t *msg = (connection_task_msg_t*) task_msg->data;
699 	switch(msg->type) {
700 		case CONNECTION_TASK_MSG_ADD_CHANNEL:
701 			mrcp_server_agent_channel_add(agent,msg->channel,msg->descriptor);
702 			break;
703 		case CONNECTION_TASK_MSG_MODIFY_CHANNEL:
704 			mrcp_server_agent_channel_modify(agent,msg->channel,msg->descriptor);
705 			break;
706 		case CONNECTION_TASK_MSG_REMOVE_CHANNEL:
707 			mrcp_server_agent_channel_remove(agent,msg->channel);
708 			break;
709 		case CONNECTION_TASK_MSG_SEND_MESSAGE:
710 			mrcp_server_agent_messsage_send(agent,msg->channel->connection,msg->message);
711 			break;
712 	}
713 
714 	return TRUE;
715 }
716