1 /*
2  * Copyright 2008-2014 Arsen Chaloyan
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * $Id: mrcp_client_connection.c 2249 2014-11-19 05:26:24Z achaloyan@gmail.com $
17  */
18 
19 #include "mrcp_connection.h"
20 #include "mrcp_client_connection.h"
21 #include "mrcp_control_descriptor.h"
22 #include "mrcp_resource_factory.h"
23 #include "mrcp_message.h"
24 #include "apt_text_stream.h"
25 #include "apt_poller_task.h"
26 #include "apt_log.h"
27 
28 
29 struct mrcp_connection_agent_t {
30 	/** List (ring) of MRCP connections */
31 	APR_RING_HEAD(mrcp_connection_head_t, mrcp_connection_t) connection_list;
32 
33 	apr_pool_t                           *pool;
34 	apt_poller_task_t                    *task;
35 	const mrcp_resource_factory_t        *resource_factory;
36 
37 	apr_uint32_t                          request_timeout;
38 	apt_bool_t                            offer_new_connection;
39 	apr_size_t                            tx_buffer_size;
40 	apr_size_t                            rx_buffer_size;
41 
42 	void                                 *obj;
43 	const mrcp_connection_event_vtable_t *vtable;
44 };
45 
46 typedef enum {
47 	CONNECTION_TASK_MSG_ADD_CHANNEL,
48 	CONNECTION_TASK_MSG_MODIFY_CHANNEL,
49 	CONNECTION_TASK_MSG_REMOVE_CHANNEL,
50 	CONNECTION_TASK_MSG_SEND_MESSAGE
51 } connection_task_msg_type_e;
52 
53 typedef struct connection_task_msg_t connection_task_msg_t;
54 struct connection_task_msg_t {
55 	connection_task_msg_type_e type;
56 	mrcp_connection_agent_t   *agent;
57 	mrcp_control_channel_t    *channel;
58 	mrcp_control_descriptor_t *descriptor;
59 	mrcp_message_t            *message;
60 };
61 
62 
63 static apt_bool_t mrcp_client_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg);
64 static apt_bool_t mrcp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor);
65 static void mrcp_client_timer_proc(apt_timer_t *timer, void *obj);
66 
67 /** Create connection agent. */
mrcp_client_connection_agent_create(const char * id,apr_size_t max_connection_count,apt_bool_t offer_new_connection,apr_pool_t * pool)68 MRCP_DECLARE(mrcp_connection_agent_t*) mrcp_client_connection_agent_create(
69 											const char *id,
70 											apr_size_t max_connection_count,
71 											apt_bool_t offer_new_connection,
72 											apr_pool_t *pool)
73 {
74 	apt_task_t *task;
75 	apt_task_vtable_t *vtable;
76 	apt_task_msg_pool_t *msg_pool;
77 	mrcp_connection_agent_t *agent;
78 
79 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create MRCPv2 Agent [%s] [%"APR_SIZE_T_FMT"]",
80 				id,	max_connection_count);
81 	agent = apr_palloc(pool,sizeof(mrcp_connection_agent_t));
82 	agent->pool = pool;
83 	agent->request_timeout = 0;
84 	agent->offer_new_connection = offer_new_connection;
85 	agent->rx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
86 	agent->tx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
87 
88 	msg_pool = apt_task_msg_pool_create_dynamic(sizeof(connection_task_msg_t),pool);
89 
90 	agent->task = apt_poller_task_create(
91 					max_connection_count,
92 					mrcp_client_poller_signal_process,
93 					agent,
94 					msg_pool,
95 					pool);
96 	if(!agent->task) {
97 		return NULL;
98 	}
99 
100 	task = apt_poller_task_base_get(agent->task);
101 	if(task) {
102 		apt_task_name_set(task,id);
103 	}
104 
105 	vtable = apt_poller_task_vtable_get(agent->task);
106 	if(vtable) {
107 		vtable->process_msg = mrcp_client_agent_msg_process;
108 	}
109 
110 	APR_RING_INIT(&agent->connection_list, mrcp_connection_t, link);
111 	return agent;
112 }
113 
114 /** Destroy connection agent. */
mrcp_client_connection_agent_destroy(mrcp_connection_agent_t * agent)115 MRCP_DECLARE(apt_bool_t) mrcp_client_connection_agent_destroy(mrcp_connection_agent_t *agent)
116 {
117 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy MRCPv2 Agent [%s]",
118 		mrcp_client_connection_agent_id_get(agent));
119 	return apt_poller_task_destroy(agent->task);
120 }
121 
122 /** Start connection agent. */
mrcp_client_connection_agent_start(mrcp_connection_agent_t * agent)123 MRCP_DECLARE(apt_bool_t) mrcp_client_connection_agent_start(mrcp_connection_agent_t *agent)
124 {
125 	return apt_poller_task_start(agent->task);
126 }
127 
128 /** Terminate connection agent. */
mrcp_client_connection_agent_terminate(mrcp_connection_agent_t * agent)129 MRCP_DECLARE(apt_bool_t) mrcp_client_connection_agent_terminate(mrcp_connection_agent_t *agent)
130 {
131 	return apt_poller_task_terminate(agent->task);
132 }
133 
134 /** Set connection event handler. */
mrcp_client_connection_agent_handler_set(mrcp_connection_agent_t * agent,void * obj,const mrcp_connection_event_vtable_t * vtable)135 MRCP_DECLARE(void) mrcp_client_connection_agent_handler_set(
136 									mrcp_connection_agent_t *agent,
137 									void *obj,
138 									const mrcp_connection_event_vtable_t *vtable)
139 {
140 	agent->obj = obj;
141 	agent->vtable = vtable;
142 }
143 
144 /** Set MRCP resource factory */
mrcp_client_connection_resource_factory_set(mrcp_connection_agent_t * agent,const mrcp_resource_factory_t * resource_factroy)145 MRCP_DECLARE(void) mrcp_client_connection_resource_factory_set(
146 								mrcp_connection_agent_t *agent,
147 								const mrcp_resource_factory_t *resource_factroy)
148 {
149 	agent->resource_factory = resource_factroy;
150 }
151 
152 /** Set rx buffer size */
mrcp_client_connection_rx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)153 MRCP_DECLARE(void) mrcp_client_connection_rx_size_set(
154 								mrcp_connection_agent_t *agent,
155 								apr_size_t size)
156 {
157 	if(size < MRCP_STREAM_BUFFER_SIZE) {
158 		size = MRCP_STREAM_BUFFER_SIZE;
159 	}
160 	agent->rx_buffer_size = size;
161 }
162 
163 /** Set tx buffer size */
mrcp_client_connection_tx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)164 MRCP_DECLARE(void) mrcp_client_connection_tx_size_set(
165 								mrcp_connection_agent_t *agent,
166 								apr_size_t size)
167 {
168 	if(size < MRCP_STREAM_BUFFER_SIZE) {
169 		size = MRCP_STREAM_BUFFER_SIZE;
170 	}
171 	agent->tx_buffer_size = size;
172 }
173 
174 /** Set request timeout */
mrcp_client_connection_timeout_set(mrcp_connection_agent_t * agent,apr_size_t timeout)175 MRCP_DECLARE(void) mrcp_client_connection_timeout_set(
176 								mrcp_connection_agent_t *agent,
177 								apr_size_t timeout)
178 {
179 	agent->request_timeout = (apr_uint32_t)timeout;
180 }
181 
182 /** Get task */
mrcp_client_connection_agent_task_get(const mrcp_connection_agent_t * agent)183 MRCP_DECLARE(apt_task_t*) mrcp_client_connection_agent_task_get(const mrcp_connection_agent_t *agent)
184 {
185 	return apt_poller_task_base_get(agent->task);
186 }
187 
188 /** Get external object */
mrcp_client_connection_agent_object_get(const mrcp_connection_agent_t * agent)189 MRCP_DECLARE(void*) mrcp_client_connection_agent_object_get(const mrcp_connection_agent_t *agent)
190 {
191 	return agent->obj;
192 }
193 
194 /** Get string identifier */
mrcp_client_connection_agent_id_get(const mrcp_connection_agent_t * agent)195 MRCP_DECLARE(const char*) mrcp_client_connection_agent_id_get(const mrcp_connection_agent_t *agent)
196 {
197 	apt_task_t *task = apt_poller_task_base_get(agent->task);
198 	return apt_task_name_get(task);
199 }
200 
201 
202 /** Create control channel */
mrcp_client_control_channel_create(mrcp_connection_agent_t * agent,void * obj,apr_pool_t * pool)203 MRCP_DECLARE(mrcp_control_channel_t*) mrcp_client_control_channel_create(mrcp_connection_agent_t *agent, void *obj, apr_pool_t *pool)
204 {
205 	mrcp_control_channel_t *channel = apr_palloc(pool,sizeof(mrcp_control_channel_t));
206 	channel->agent = agent;
207 	channel->connection = NULL;
208 	channel->active_request = NULL;
209 	channel->request_timer = NULL;
210 	channel->removed = FALSE;
211 	channel->obj = obj;
212 	channel->log_obj = NULL;
213 	channel->pool = pool;
214 
215 	channel->request_timer = apt_poller_task_timer_create(
216 								agent->task,
217 								mrcp_client_timer_proc,
218 								channel,
219 								pool);
220 	return channel;
221 }
222 
223 /** Set the logger object */
mrcp_client_control_channel_log_obj_set(mrcp_control_channel_t * channel,void * log_obj)224 MRCP_DECLARE(void) mrcp_client_control_channel_log_obj_set(mrcp_control_channel_t *channel, void *log_obj)
225 {
226 	channel->log_obj = log_obj;
227 }
228 
229 /** Destroy MRCPv2 control channel */
mrcp_client_control_channel_destroy(mrcp_control_channel_t * channel)230 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_destroy(mrcp_control_channel_t *channel)
231 {
232 	if(channel && channel->connection && channel->removed == TRUE) {
233 		mrcp_connection_t *connection = channel->connection;
234 		channel->connection = NULL;
235 		apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy TCP/MRCPv2 Connection %s",connection->id);
236 		mrcp_connection_destroy(connection);
237 	}
238 	return TRUE;
239 }
240 
241 /** Signal task message */
mrcp_client_control_message_signal(connection_task_msg_type_e type,mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor,mrcp_message_t * message)242 static apt_bool_t mrcp_client_control_message_signal(
243 								connection_task_msg_type_e type,
244 								mrcp_connection_agent_t *agent,
245 								mrcp_control_channel_t *channel,
246 								mrcp_control_descriptor_t *descriptor,
247 								mrcp_message_t *message)
248 {
249 	apt_task_t *task = apt_poller_task_base_get(agent->task);
250 	apt_task_msg_t *task_msg = apt_task_msg_get(task);
251 	if(task_msg) {
252 		connection_task_msg_t *msg = (connection_task_msg_t*)task_msg->data;
253 		msg->type = type;
254 		msg->agent = agent;
255 		msg->channel = channel;
256 		msg->descriptor = descriptor;
257 		msg->message = message;
258 		apt_task_msg_signal(task,task_msg);
259 	}
260 	return TRUE;
261 }
262 
263 /** Add MRCPv2 control channel */
mrcp_client_control_channel_add(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)264 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_add(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
265 {
266 	return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_ADD_CHANNEL,channel->agent,channel,descriptor,NULL);
267 }
268 
269 /** Modify MRCPv2 control channel */
mrcp_client_control_channel_modify(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)270 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_modify(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
271 {
272 	return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_MODIFY_CHANNEL,channel->agent,channel,descriptor,NULL);
273 }
274 
275 /** Remove MRCPv2 control channel */
mrcp_client_control_channel_remove(mrcp_control_channel_t * channel)276 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_remove(mrcp_control_channel_t *channel)
277 {
278 	return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_REMOVE_CHANNEL,channel->agent,channel,NULL,NULL);
279 }
280 
281 /** Send MRCPv2 message */
mrcp_client_control_message_send(mrcp_control_channel_t * channel,mrcp_message_t * message)282 MRCP_DECLARE(apt_bool_t) mrcp_client_control_message_send(mrcp_control_channel_t *channel, mrcp_message_t *message)
283 {
284 	return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_SEND_MESSAGE,channel->agent,channel,NULL,message);
285 }
286 
mrcp_client_agent_connection_create(mrcp_connection_agent_t * agent,mrcp_control_descriptor_t * descriptor)287 static mrcp_connection_t* mrcp_client_agent_connection_create(mrcp_connection_agent_t *agent, mrcp_control_descriptor_t *descriptor)
288 {
289 	char *local_ip = NULL;
290 	char *remote_ip = NULL;
291 	mrcp_connection_t *connection = mrcp_connection_create();
292 
293 	apr_sockaddr_info_get(&connection->r_sockaddr,descriptor->ip.buf,APR_INET,descriptor->port,0,connection->pool);
294 	if(!connection->r_sockaddr) {
295 		mrcp_connection_destroy(connection);
296 		return NULL;
297 	}
298 
299 	if(apr_socket_create(&connection->sock,connection->r_sockaddr->family,SOCK_STREAM,APR_PROTO_TCP,connection->pool) != APR_SUCCESS) {
300 		mrcp_connection_destroy(connection);
301 		return NULL;
302 	}
303 
304 	apr_socket_opt_set(connection->sock, APR_SO_NONBLOCK, 0);
305 	apr_socket_timeout_set(connection->sock, -1);
306 	apr_socket_opt_set(connection->sock, APR_SO_REUSEADDR, 1);
307 
308 	if(apr_socket_connect(connection->sock, connection->r_sockaddr) != APR_SUCCESS) {
309 		apr_socket_close(connection->sock);
310 		mrcp_connection_destroy(connection);
311 		return NULL;
312 	}
313 
314 	if(apr_socket_addr_get(&connection->l_sockaddr,APR_LOCAL,connection->sock) != APR_SUCCESS) {
315 		apr_socket_close(connection->sock);
316 		mrcp_connection_destroy(connection);
317 		return NULL;
318 	}
319 
320 	apr_sockaddr_ip_get(&local_ip,connection->l_sockaddr);
321 	apr_sockaddr_ip_get(&remote_ip,connection->r_sockaddr);
322 	connection->id = apr_psprintf(connection->pool,"%s:%hu <-> %s:%hu",
323 		local_ip,connection->l_sockaddr->port,
324 		remote_ip,connection->r_sockaddr->port);
325 
326 	memset(&connection->sock_pfd,0,sizeof(apr_pollfd_t));
327 	connection->sock_pfd.desc_type = APR_POLL_SOCKET;
328 	connection->sock_pfd.reqevents = APR_POLLIN;
329 	connection->sock_pfd.desc.s = connection->sock;
330 	connection->sock_pfd.client_data = connection;
331 	if(apt_poller_task_descriptor_add(agent->task, &connection->sock_pfd) != TRUE) {
332 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add to Pollset %s",connection->id);
333 		apr_socket_close(connection->sock);
334 		mrcp_connection_destroy(connection);
335 		return NULL;
336 	}
337 
338 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Established TCP/MRCPv2 Connection %s",connection->id);
339 	connection->agent = agent;
340 	APR_RING_INSERT_TAIL(&agent->connection_list,connection,mrcp_connection_t,link);
341 
342 	connection->parser = mrcp_parser_create(agent->resource_factory,connection->pool);
343 	connection->generator = mrcp_generator_create(agent->resource_factory,connection->pool);
344 
345 	connection->tx_buffer_size = agent->tx_buffer_size;
346 	connection->tx_buffer = apr_palloc(connection->pool,connection->tx_buffer_size+1);
347 
348 	connection->rx_buffer_size = agent->rx_buffer_size;
349 	connection->rx_buffer = apr_palloc(connection->pool,connection->rx_buffer_size+1);
350 	apt_text_stream_init(&connection->rx_stream,connection->rx_buffer,connection->rx_buffer_size);
351 
352 	if(apt_log_masking_get() != APT_LOG_MASKING_NONE) {
353 		connection->verbose = FALSE;
354 		mrcp_parser_verbose_set(connection->parser,TRUE);
355 		mrcp_generator_verbose_set(connection->generator,TRUE);
356 	}
357 
358 	return connection;
359 }
360 
mrcp_client_agent_connection_find(mrcp_connection_agent_t * agent,mrcp_control_descriptor_t * descriptor)361 static mrcp_connection_t* mrcp_client_agent_connection_find(mrcp_connection_agent_t *agent, mrcp_control_descriptor_t *descriptor)
362 {
363 	apr_sockaddr_t *sockaddr;
364 	mrcp_connection_t *connection;
365 
366 	for(connection = APR_RING_FIRST(&agent->connection_list);
367 			connection != APR_RING_SENTINEL(&agent->connection_list, mrcp_connection_t, link);
368 				connection = APR_RING_NEXT(connection, link)) {
369 		if(apr_sockaddr_info_get(&sockaddr,descriptor->ip.buf,APR_INET,descriptor->port,0,connection->pool) == APR_SUCCESS) {
370 			if(apr_sockaddr_equal(sockaddr,connection->r_sockaddr) != 0 &&
371 				descriptor->port == connection->r_sockaddr->port) {
372 				return connection;
373 			}
374 		}
375 	}
376 
377 	return NULL;
378 }
379 
mrcp_client_agent_connection_remove(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)380 static apt_bool_t mrcp_client_agent_connection_remove(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
381 {
382 	/* remove from the list */
383 	APR_RING_REMOVE(connection,link);
384 
385 	if(connection->sock) {
386 		apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Close TCP/MRCPv2 Connection %s",connection->id);
387 		apt_poller_task_descriptor_remove(agent->task,&connection->sock_pfd);
388 		apr_socket_close(connection->sock);
389 		connection->sock = NULL;
390 	}
391 	return TRUE;
392 }
393 
mrcp_client_agent_channel_add(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)394 static apt_bool_t mrcp_client_agent_channel_add(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
395 {
396 	if(agent->offer_new_connection == TRUE) {
397 		descriptor->connection_type = MRCP_CONNECTION_TYPE_NEW;
398 	}
399 	else {
400 		descriptor->connection_type = MRCP_CONNECTION_TYPE_EXISTING;
401 		if(APR_RING_EMPTY(&agent->connection_list, mrcp_connection_t, link)) {
402 			/* offer new connection if there is no established connection yet */
403 			descriptor->connection_type = MRCP_CONNECTION_TYPE_NEW;
404 		}
405 	}
406 	/* send response */
407 	return mrcp_control_channel_add_respond(agent->vtable,channel,descriptor,TRUE);
408 }
409 
mrcp_client_agent_channel_modify(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)410 static apt_bool_t mrcp_client_agent_channel_modify(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
411 {
412 	apt_bool_t status = TRUE;
413 	if(descriptor->port) {
414 		if(!channel->connection) {
415 			mrcp_connection_t *connection = NULL;
416 			apt_id_resource_generate(&descriptor->session_id,&descriptor->resource_name,'@',&channel->identifier,channel->pool);
417 			/* no connection yet */
418 			if(descriptor->connection_type == MRCP_CONNECTION_TYPE_EXISTING) {
419 				/* try to find existing connection */
420 				connection = mrcp_client_agent_connection_find(agent,descriptor);
421 				if(!connection) {
422 					apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Found No Existing TCP/MRCPv2 Connection");
423 				}
424 			}
425 			if(!connection) {
426 				/* create new connection */
427 				connection = mrcp_client_agent_connection_create(agent,descriptor);
428 				if(!connection) {
429 					apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Failed to Establish TCP/MRCPv2 Connection");
430 				}
431 			}
432 
433 			if(connection) {
434 				mrcp_connection_channel_add(connection,channel);
435 				apt_obj_log(APT_LOG_MARK,APT_PRIO_INFO,channel->log_obj,"Add Control Channel <%s> %s [%d]",
436 						channel->identifier.buf,
437 						connection->id,
438 						apr_hash_count(connection->channel_table));
439 				if(descriptor->connection_type == MRCP_CONNECTION_TYPE_NEW) {
440 					/* set connection type to existing for the next offers / if any */
441 					descriptor->connection_type = MRCP_CONNECTION_TYPE_EXISTING;
442 				}
443 			}
444 			else {
445 				descriptor->port = 0;
446 				status = FALSE;
447 			}
448 		}
449 	}
450 	/* send response */
451 	return mrcp_control_channel_modify_respond(agent->vtable,channel,descriptor,status);
452 }
453 
mrcp_client_agent_channel_remove(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel)454 static apt_bool_t mrcp_client_agent_channel_remove(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel)
455 {
456 	if(channel->connection) {
457 		mrcp_connection_t *connection = channel->connection;
458 		mrcp_connection_channel_remove(connection,channel);
459 		apt_obj_log(APT_LOG_MARK,APT_PRIO_INFO,channel->log_obj,"Remove Control Channel <%s> [%d]",
460 				channel->identifier.buf,
461 				apr_hash_count(connection->channel_table));
462 		if(!connection->access_count) {
463 			mrcp_client_agent_connection_remove(agent,connection);
464 			/* set connection to be destroyed on channel destroy */
465 			channel->connection = connection;
466 			channel->removed = TRUE;
467 		}
468 	}
469 
470 	/* send response */
471 	return mrcp_control_channel_remove_respond(agent->vtable,channel,TRUE);
472 }
473 
mrcp_client_agent_request_cancel(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_message_t * message)474 static apt_bool_t mrcp_client_agent_request_cancel(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_message_t *message)
475 {
476 	mrcp_message_t *response;
477 	apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Cancel MRCP Request <%s@%s> [%d]",
478 		MRCP_MESSAGE_SIDRES(message),
479 		message->start_line.request_id);
480 	response = mrcp_response_create(message,message->pool);
481 	response->start_line.status_code = MRCP_STATUS_CODE_METHOD_FAILED;
482 	return mrcp_connection_message_receive(agent->vtable,channel,response);
483 }
484 
mrcp_client_agent_disconnect_raise(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)485 static apt_bool_t mrcp_client_agent_disconnect_raise(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
486 {
487 	mrcp_control_channel_t *channel;
488 	void *val;
489 	apr_hash_index_t *it = apr_hash_first(connection->pool,connection->channel_table);
490 	/* walk through the list of channels and raise disconnect event for them */
491 	for(; it; it = apr_hash_next(it)) {
492 		apr_hash_this(it,NULL,NULL,&val);
493 		channel = val;
494 		if(!channel) continue;
495 
496 		if(channel->active_request) {
497 			mrcp_client_agent_request_cancel(channel->agent,channel,channel->active_request);
498 			channel->active_request = NULL;
499 			if(channel->request_timer) {
500 				apt_timer_kill(channel->request_timer);
501 			}
502 		}
503 		else if(agent->vtable->on_disconnect){
504 			agent->vtable->on_disconnect(channel);
505 		}
506 	}
507 	return TRUE;
508 }
509 
mrcp_client_agent_messsage_send(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_message_t * message)510 static apt_bool_t mrcp_client_agent_messsage_send(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_message_t *message)
511 {
512 	apt_bool_t status = FALSE;
513 	mrcp_connection_t *connection = channel->connection;
514 	apt_text_stream_t stream;
515 	apt_message_status_e result;
516 
517 	if(!connection || !connection->sock) {
518 		apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Null MRCPv2 Connection "APT_SIDRES_FMT,MRCP_MESSAGE_SIDRES(message));
519 		mrcp_client_agent_request_cancel(agent,channel,message);
520 		return FALSE;
521 	}
522 
523 	do {
524 		apt_text_stream_init(&stream,connection->tx_buffer,connection->tx_buffer_size);
525 		result = mrcp_generator_run(connection->generator,message,&stream);
526 		if(result != APT_MESSAGE_STATUS_INVALID) {
527 			stream.text.length = stream.pos - stream.text.buf;
528 			*stream.pos = '\0';
529 
530 			apt_obj_log(APT_LOG_MARK,APT_PRIO_INFO,channel->log_obj,"Send MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
531 				connection->id,
532 				stream.text.length,
533 				connection->verbose == TRUE ? stream.text.length : 0,
534 				stream.text.buf);
535 
536 			if(apr_socket_send(connection->sock,stream.text.buf,&stream.text.length) == APR_SUCCESS) {
537 				status = TRUE;
538 			}
539 			else {
540 				apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Failed to Send MRCPv2 Data %s",
541 					connection->id);
542 			}
543 		}
544 		else {
545 			apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Failed to Generate MRCPv2 Data %s",
546 				connection->id);
547 		}
548 	}
549 	while(result == APT_MESSAGE_STATUS_INCOMPLETE);
550 
551 	if(status == TRUE) {
552 		channel->active_request = message;
553 		if(channel->request_timer && agent->request_timeout) {
554 			apt_timer_set(channel->request_timer,agent->request_timeout);
555 		}
556 	}
557 	else {
558 		mrcp_client_agent_request_cancel(agent,channel,message);
559 	}
560 	return status;
561 }
562 
mrcp_client_message_handler(mrcp_connection_t * connection,mrcp_message_t * message,apt_message_status_e status)563 static apt_bool_t mrcp_client_message_handler(mrcp_connection_t *connection, mrcp_message_t *message, apt_message_status_e status)
564 {
565 	if(status == APT_MESSAGE_STATUS_COMPLETE) {
566 		/* message is completely parsed */
567 		mrcp_control_channel_t *channel;
568 		apt_str_t identifier;
569 		apt_id_resource_generate(&message->channel_id.session_id,&message->channel_id.resource_name,'@',&identifier,message->pool);
570 		channel = mrcp_connection_channel_find(connection,&identifier);
571 		if(channel) {
572 			mrcp_connection_agent_t *agent = connection->agent;
573 			if(message->start_line.message_type == MRCP_MESSAGE_TYPE_RESPONSE) {
574 				if(!channel->active_request ||
575 					channel->active_request->start_line.request_id != message->start_line.request_id) {
576 					apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Unexpected MRCP Response "APT_SIDRES_FMT" [%d]",
577 						MRCP_MESSAGE_SIDRES(message),
578 						message->start_line.request_id);
579 					return FALSE;
580 				}
581 				if(channel->request_timer) {
582 					apt_timer_kill(channel->request_timer);
583 				}
584 				channel->active_request = NULL;
585 			}
586 
587 			mrcp_connection_message_receive(agent->vtable,channel,message);
588 		}
589 		else {
590 			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Find Channel "APT_SIDRES_FMT" in Connection %s [%d]",
591 				MRCP_MESSAGE_SIDRES(message),
592 				connection->id,
593 				apr_hash_count(connection->channel_table));
594 		}
595 	}
596 	return TRUE;
597 }
598 
599 /* Receive MRCP message through TCP/MRCPv2 connection */
mrcp_client_poller_signal_process(void * obj,const apr_pollfd_t * descriptor)600 static apt_bool_t mrcp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
601 {
602 	mrcp_connection_agent_t *agent = obj;
603 	mrcp_connection_t *connection = descriptor->client_data;
604 	apr_status_t status;
605 	apr_size_t offset;
606 	apr_size_t length;
607 	apt_text_stream_t *stream;
608 	mrcp_message_t *message;
609 	apt_message_status_e msg_status;
610 
611 	if(!connection || !connection->sock) {
612 		return FALSE;
613 	}
614 	stream = &connection->rx_stream;
615 
616 	/* calculate offset remaining from the previous receive / if any */
617 	offset = stream->pos - stream->text.buf;
618 	/* calculate available length */
619 	length = connection->rx_buffer_size - offset;
620 
621 	status = apr_socket_recv(connection->sock,stream->pos,&length);
622 	if(status == APR_EOF || length == 0) {
623 		apt_log(APT_LOG_MARK,APT_PRIO_INFO,"TCP/MRCPv2 Peer Disconnected %s",connection->id);
624 		apt_poller_task_descriptor_remove(agent->task,&connection->sock_pfd);
625 		apr_socket_close(connection->sock);
626 		connection->sock = NULL;
627 
628 		mrcp_client_agent_disconnect_raise(agent,connection);
629 		return TRUE;
630 	}
631 
632 	/* calculate actual length of the stream */
633 	stream->text.length = offset + length;
634 	stream->pos[length] = '\0';
635 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Receive MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
636 			connection->id,
637 			length,
638 			connection->verbose == TRUE ? length : 0,
639 			stream->pos);
640 
641 	/* reset pos */
642 	apt_text_stream_reset(stream);
643 
644 	do {
645 		msg_status = mrcp_parser_run(connection->parser,stream,&message);
646 		if(mrcp_client_message_handler(connection,message,msg_status) == FALSE) {
647 			return FALSE;
648 		}
649 	}
650 	while(apt_text_is_eos(stream) == FALSE);
651 
652 	/* scroll remaining stream */
653 	apt_text_stream_scroll(stream);
654 	return TRUE;
655 }
656 
657 /* Process task message */
mrcp_client_agent_msg_process(apt_task_t * task,apt_task_msg_t * task_msg)658 static apt_bool_t mrcp_client_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
659 {
660 	apt_poller_task_t *poller_task = apt_task_object_get(task);
661 	mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
662 	connection_task_msg_t *msg = (connection_task_msg_t*) task_msg->data;
663 
664 	switch(msg->type) {
665 		case CONNECTION_TASK_MSG_ADD_CHANNEL:
666 			mrcp_client_agent_channel_add(agent,msg->channel,msg->descriptor);
667 			break;
668 		case CONNECTION_TASK_MSG_MODIFY_CHANNEL:
669 			mrcp_client_agent_channel_modify(agent,msg->channel,msg->descriptor);
670 			break;
671 		case CONNECTION_TASK_MSG_REMOVE_CHANNEL:
672 			mrcp_client_agent_channel_remove(agent,msg->channel);
673 			break;
674 		case CONNECTION_TASK_MSG_SEND_MESSAGE:
675 			mrcp_client_agent_messsage_send(agent,msg->channel,msg->message);
676 			break;
677 	}
678 
679 	return TRUE;
680 }
681 
682 /* Timer callback */
mrcp_client_timer_proc(apt_timer_t * timer,void * obj)683 static void mrcp_client_timer_proc(apt_timer_t *timer, void *obj)
684 {
685 	mrcp_control_channel_t *channel = obj;
686 	if(!channel) {
687 		return;
688 	}
689 
690 	if(channel->request_timer == timer) {
691 		if(channel->active_request) {
692 			mrcp_client_agent_request_cancel(channel->agent,channel,channel->active_request);
693 			channel->active_request = NULL;
694 		}
695 	}
696 }
697