1 /*
2 * Copyright 2008-2014 Arsen Chaloyan
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * $Id: mrcp_client_connection.c 2249 2014-11-19 05:26:24Z achaloyan@gmail.com $
17 */
18
19 #include "mrcp_connection.h"
20 #include "mrcp_client_connection.h"
21 #include "mrcp_control_descriptor.h"
22 #include "mrcp_resource_factory.h"
23 #include "mrcp_message.h"
24 #include "apt_text_stream.h"
25 #include "apt_poller_task.h"
26 #include "apt_log.h"
27
28
29 struct mrcp_connection_agent_t {
30 /** List (ring) of MRCP connections */
31 APR_RING_HEAD(mrcp_connection_head_t, mrcp_connection_t) connection_list;
32
33 apr_pool_t *pool;
34 apt_poller_task_t *task;
35 const mrcp_resource_factory_t *resource_factory;
36
37 apr_uint32_t request_timeout;
38 apt_bool_t offer_new_connection;
39 apr_size_t tx_buffer_size;
40 apr_size_t rx_buffer_size;
41
42 void *obj;
43 const mrcp_connection_event_vtable_t *vtable;
44 };
45
46 typedef enum {
47 CONNECTION_TASK_MSG_ADD_CHANNEL,
48 CONNECTION_TASK_MSG_MODIFY_CHANNEL,
49 CONNECTION_TASK_MSG_REMOVE_CHANNEL,
50 CONNECTION_TASK_MSG_SEND_MESSAGE
51 } connection_task_msg_type_e;
52
53 typedef struct connection_task_msg_t connection_task_msg_t;
54 struct connection_task_msg_t {
55 connection_task_msg_type_e type;
56 mrcp_connection_agent_t *agent;
57 mrcp_control_channel_t *channel;
58 mrcp_control_descriptor_t *descriptor;
59 mrcp_message_t *message;
60 };
61
62
63 static apt_bool_t mrcp_client_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg);
64 static apt_bool_t mrcp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor);
65 static void mrcp_client_timer_proc(apt_timer_t *timer, void *obj);
66
67 /** Create connection agent. */
mrcp_client_connection_agent_create(const char * id,apr_size_t max_connection_count,apt_bool_t offer_new_connection,apr_pool_t * pool)68 MRCP_DECLARE(mrcp_connection_agent_t*) mrcp_client_connection_agent_create(
69 const char *id,
70 apr_size_t max_connection_count,
71 apt_bool_t offer_new_connection,
72 apr_pool_t *pool)
73 {
74 apt_task_t *task;
75 apt_task_vtable_t *vtable;
76 apt_task_msg_pool_t *msg_pool;
77 mrcp_connection_agent_t *agent;
78
79 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create MRCPv2 Agent [%s] [%"APR_SIZE_T_FMT"]",
80 id, max_connection_count);
81 agent = apr_palloc(pool,sizeof(mrcp_connection_agent_t));
82 agent->pool = pool;
83 agent->request_timeout = 0;
84 agent->offer_new_connection = offer_new_connection;
85 agent->rx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
86 agent->tx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
87
88 msg_pool = apt_task_msg_pool_create_dynamic(sizeof(connection_task_msg_t),pool);
89
90 agent->task = apt_poller_task_create(
91 max_connection_count,
92 mrcp_client_poller_signal_process,
93 agent,
94 msg_pool,
95 pool);
96 if(!agent->task) {
97 return NULL;
98 }
99
100 task = apt_poller_task_base_get(agent->task);
101 if(task) {
102 apt_task_name_set(task,id);
103 }
104
105 vtable = apt_poller_task_vtable_get(agent->task);
106 if(vtable) {
107 vtable->process_msg = mrcp_client_agent_msg_process;
108 }
109
110 APR_RING_INIT(&agent->connection_list, mrcp_connection_t, link);
111 return agent;
112 }
113
114 /** Destroy connection agent. */
mrcp_client_connection_agent_destroy(mrcp_connection_agent_t * agent)115 MRCP_DECLARE(apt_bool_t) mrcp_client_connection_agent_destroy(mrcp_connection_agent_t *agent)
116 {
117 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy MRCPv2 Agent [%s]",
118 mrcp_client_connection_agent_id_get(agent));
119 return apt_poller_task_destroy(agent->task);
120 }
121
122 /** Start connection agent. */
mrcp_client_connection_agent_start(mrcp_connection_agent_t * agent)123 MRCP_DECLARE(apt_bool_t) mrcp_client_connection_agent_start(mrcp_connection_agent_t *agent)
124 {
125 return apt_poller_task_start(agent->task);
126 }
127
128 /** Terminate connection agent. */
mrcp_client_connection_agent_terminate(mrcp_connection_agent_t * agent)129 MRCP_DECLARE(apt_bool_t) mrcp_client_connection_agent_terminate(mrcp_connection_agent_t *agent)
130 {
131 return apt_poller_task_terminate(agent->task);
132 }
133
134 /** Set connection event handler. */
mrcp_client_connection_agent_handler_set(mrcp_connection_agent_t * agent,void * obj,const mrcp_connection_event_vtable_t * vtable)135 MRCP_DECLARE(void) mrcp_client_connection_agent_handler_set(
136 mrcp_connection_agent_t *agent,
137 void *obj,
138 const mrcp_connection_event_vtable_t *vtable)
139 {
140 agent->obj = obj;
141 agent->vtable = vtable;
142 }
143
144 /** Set MRCP resource factory */
mrcp_client_connection_resource_factory_set(mrcp_connection_agent_t * agent,const mrcp_resource_factory_t * resource_factroy)145 MRCP_DECLARE(void) mrcp_client_connection_resource_factory_set(
146 mrcp_connection_agent_t *agent,
147 const mrcp_resource_factory_t *resource_factroy)
148 {
149 agent->resource_factory = resource_factroy;
150 }
151
152 /** Set rx buffer size */
mrcp_client_connection_rx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)153 MRCP_DECLARE(void) mrcp_client_connection_rx_size_set(
154 mrcp_connection_agent_t *agent,
155 apr_size_t size)
156 {
157 if(size < MRCP_STREAM_BUFFER_SIZE) {
158 size = MRCP_STREAM_BUFFER_SIZE;
159 }
160 agent->rx_buffer_size = size;
161 }
162
163 /** Set tx buffer size */
mrcp_client_connection_tx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)164 MRCP_DECLARE(void) mrcp_client_connection_tx_size_set(
165 mrcp_connection_agent_t *agent,
166 apr_size_t size)
167 {
168 if(size < MRCP_STREAM_BUFFER_SIZE) {
169 size = MRCP_STREAM_BUFFER_SIZE;
170 }
171 agent->tx_buffer_size = size;
172 }
173
174 /** Set request timeout */
mrcp_client_connection_timeout_set(mrcp_connection_agent_t * agent,apr_size_t timeout)175 MRCP_DECLARE(void) mrcp_client_connection_timeout_set(
176 mrcp_connection_agent_t *agent,
177 apr_size_t timeout)
178 {
179 agent->request_timeout = (apr_uint32_t)timeout;
180 }
181
182 /** Get task */
mrcp_client_connection_agent_task_get(const mrcp_connection_agent_t * agent)183 MRCP_DECLARE(apt_task_t*) mrcp_client_connection_agent_task_get(const mrcp_connection_agent_t *agent)
184 {
185 return apt_poller_task_base_get(agent->task);
186 }
187
188 /** Get external object */
mrcp_client_connection_agent_object_get(const mrcp_connection_agent_t * agent)189 MRCP_DECLARE(void*) mrcp_client_connection_agent_object_get(const mrcp_connection_agent_t *agent)
190 {
191 return agent->obj;
192 }
193
194 /** Get string identifier */
mrcp_client_connection_agent_id_get(const mrcp_connection_agent_t * agent)195 MRCP_DECLARE(const char*) mrcp_client_connection_agent_id_get(const mrcp_connection_agent_t *agent)
196 {
197 apt_task_t *task = apt_poller_task_base_get(agent->task);
198 return apt_task_name_get(task);
199 }
200
201
202 /** Create control channel */
mrcp_client_control_channel_create(mrcp_connection_agent_t * agent,void * obj,apr_pool_t * pool)203 MRCP_DECLARE(mrcp_control_channel_t*) mrcp_client_control_channel_create(mrcp_connection_agent_t *agent, void *obj, apr_pool_t *pool)
204 {
205 mrcp_control_channel_t *channel = apr_palloc(pool,sizeof(mrcp_control_channel_t));
206 channel->agent = agent;
207 channel->connection = NULL;
208 channel->active_request = NULL;
209 channel->request_timer = NULL;
210 channel->removed = FALSE;
211 channel->obj = obj;
212 channel->log_obj = NULL;
213 channel->pool = pool;
214
215 channel->request_timer = apt_poller_task_timer_create(
216 agent->task,
217 mrcp_client_timer_proc,
218 channel,
219 pool);
220 return channel;
221 }
222
223 /** Set the logger object */
mrcp_client_control_channel_log_obj_set(mrcp_control_channel_t * channel,void * log_obj)224 MRCP_DECLARE(void) mrcp_client_control_channel_log_obj_set(mrcp_control_channel_t *channel, void *log_obj)
225 {
226 channel->log_obj = log_obj;
227 }
228
229 /** Destroy MRCPv2 control channel */
mrcp_client_control_channel_destroy(mrcp_control_channel_t * channel)230 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_destroy(mrcp_control_channel_t *channel)
231 {
232 if(channel && channel->connection && channel->removed == TRUE) {
233 mrcp_connection_t *connection = channel->connection;
234 channel->connection = NULL;
235 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy TCP/MRCPv2 Connection %s",connection->id);
236 mrcp_connection_destroy(connection);
237 }
238 return TRUE;
239 }
240
241 /** Signal task message */
mrcp_client_control_message_signal(connection_task_msg_type_e type,mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor,mrcp_message_t * message)242 static apt_bool_t mrcp_client_control_message_signal(
243 connection_task_msg_type_e type,
244 mrcp_connection_agent_t *agent,
245 mrcp_control_channel_t *channel,
246 mrcp_control_descriptor_t *descriptor,
247 mrcp_message_t *message)
248 {
249 apt_task_t *task = apt_poller_task_base_get(agent->task);
250 apt_task_msg_t *task_msg = apt_task_msg_get(task);
251 if(task_msg) {
252 connection_task_msg_t *msg = (connection_task_msg_t*)task_msg->data;
253 msg->type = type;
254 msg->agent = agent;
255 msg->channel = channel;
256 msg->descriptor = descriptor;
257 msg->message = message;
258 apt_task_msg_signal(task,task_msg);
259 }
260 return TRUE;
261 }
262
263 /** Add MRCPv2 control channel */
mrcp_client_control_channel_add(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)264 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_add(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
265 {
266 return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_ADD_CHANNEL,channel->agent,channel,descriptor,NULL);
267 }
268
269 /** Modify MRCPv2 control channel */
mrcp_client_control_channel_modify(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)270 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_modify(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
271 {
272 return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_MODIFY_CHANNEL,channel->agent,channel,descriptor,NULL);
273 }
274
275 /** Remove MRCPv2 control channel */
mrcp_client_control_channel_remove(mrcp_control_channel_t * channel)276 MRCP_DECLARE(apt_bool_t) mrcp_client_control_channel_remove(mrcp_control_channel_t *channel)
277 {
278 return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_REMOVE_CHANNEL,channel->agent,channel,NULL,NULL);
279 }
280
281 /** Send MRCPv2 message */
mrcp_client_control_message_send(mrcp_control_channel_t * channel,mrcp_message_t * message)282 MRCP_DECLARE(apt_bool_t) mrcp_client_control_message_send(mrcp_control_channel_t *channel, mrcp_message_t *message)
283 {
284 return mrcp_client_control_message_signal(CONNECTION_TASK_MSG_SEND_MESSAGE,channel->agent,channel,NULL,message);
285 }
286
mrcp_client_agent_connection_create(mrcp_connection_agent_t * agent,mrcp_control_descriptor_t * descriptor)287 static mrcp_connection_t* mrcp_client_agent_connection_create(mrcp_connection_agent_t *agent, mrcp_control_descriptor_t *descriptor)
288 {
289 char *local_ip = NULL;
290 char *remote_ip = NULL;
291 mrcp_connection_t *connection = mrcp_connection_create();
292
293 apr_sockaddr_info_get(&connection->r_sockaddr,descriptor->ip.buf,APR_INET,descriptor->port,0,connection->pool);
294 if(!connection->r_sockaddr) {
295 mrcp_connection_destroy(connection);
296 return NULL;
297 }
298
299 if(apr_socket_create(&connection->sock,connection->r_sockaddr->family,SOCK_STREAM,APR_PROTO_TCP,connection->pool) != APR_SUCCESS) {
300 mrcp_connection_destroy(connection);
301 return NULL;
302 }
303
304 apr_socket_opt_set(connection->sock, APR_SO_NONBLOCK, 0);
305 apr_socket_timeout_set(connection->sock, -1);
306 apr_socket_opt_set(connection->sock, APR_SO_REUSEADDR, 1);
307
308 if(apr_socket_connect(connection->sock, connection->r_sockaddr) != APR_SUCCESS) {
309 apr_socket_close(connection->sock);
310 mrcp_connection_destroy(connection);
311 return NULL;
312 }
313
314 if(apr_socket_addr_get(&connection->l_sockaddr,APR_LOCAL,connection->sock) != APR_SUCCESS) {
315 apr_socket_close(connection->sock);
316 mrcp_connection_destroy(connection);
317 return NULL;
318 }
319
320 apr_sockaddr_ip_get(&local_ip,connection->l_sockaddr);
321 apr_sockaddr_ip_get(&remote_ip,connection->r_sockaddr);
322 connection->id = apr_psprintf(connection->pool,"%s:%hu <-> %s:%hu",
323 local_ip,connection->l_sockaddr->port,
324 remote_ip,connection->r_sockaddr->port);
325
326 memset(&connection->sock_pfd,0,sizeof(apr_pollfd_t));
327 connection->sock_pfd.desc_type = APR_POLL_SOCKET;
328 connection->sock_pfd.reqevents = APR_POLLIN;
329 connection->sock_pfd.desc.s = connection->sock;
330 connection->sock_pfd.client_data = connection;
331 if(apt_poller_task_descriptor_add(agent->task, &connection->sock_pfd) != TRUE) {
332 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add to Pollset %s",connection->id);
333 apr_socket_close(connection->sock);
334 mrcp_connection_destroy(connection);
335 return NULL;
336 }
337
338 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Established TCP/MRCPv2 Connection %s",connection->id);
339 connection->agent = agent;
340 APR_RING_INSERT_TAIL(&agent->connection_list,connection,mrcp_connection_t,link);
341
342 connection->parser = mrcp_parser_create(agent->resource_factory,connection->pool);
343 connection->generator = mrcp_generator_create(agent->resource_factory,connection->pool);
344
345 connection->tx_buffer_size = agent->tx_buffer_size;
346 connection->tx_buffer = apr_palloc(connection->pool,connection->tx_buffer_size+1);
347
348 connection->rx_buffer_size = agent->rx_buffer_size;
349 connection->rx_buffer = apr_palloc(connection->pool,connection->rx_buffer_size+1);
350 apt_text_stream_init(&connection->rx_stream,connection->rx_buffer,connection->rx_buffer_size);
351
352 if(apt_log_masking_get() != APT_LOG_MASKING_NONE) {
353 connection->verbose = FALSE;
354 mrcp_parser_verbose_set(connection->parser,TRUE);
355 mrcp_generator_verbose_set(connection->generator,TRUE);
356 }
357
358 return connection;
359 }
360
mrcp_client_agent_connection_find(mrcp_connection_agent_t * agent,mrcp_control_descriptor_t * descriptor)361 static mrcp_connection_t* mrcp_client_agent_connection_find(mrcp_connection_agent_t *agent, mrcp_control_descriptor_t *descriptor)
362 {
363 apr_sockaddr_t *sockaddr;
364 mrcp_connection_t *connection;
365
366 for(connection = APR_RING_FIRST(&agent->connection_list);
367 connection != APR_RING_SENTINEL(&agent->connection_list, mrcp_connection_t, link);
368 connection = APR_RING_NEXT(connection, link)) {
369 if(apr_sockaddr_info_get(&sockaddr,descriptor->ip.buf,APR_INET,descriptor->port,0,connection->pool) == APR_SUCCESS) {
370 if(apr_sockaddr_equal(sockaddr,connection->r_sockaddr) != 0 &&
371 descriptor->port == connection->r_sockaddr->port) {
372 return connection;
373 }
374 }
375 }
376
377 return NULL;
378 }
379
mrcp_client_agent_connection_remove(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)380 static apt_bool_t mrcp_client_agent_connection_remove(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
381 {
382 /* remove from the list */
383 APR_RING_REMOVE(connection,link);
384
385 if(connection->sock) {
386 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Close TCP/MRCPv2 Connection %s",connection->id);
387 apt_poller_task_descriptor_remove(agent->task,&connection->sock_pfd);
388 apr_socket_close(connection->sock);
389 connection->sock = NULL;
390 }
391 return TRUE;
392 }
393
mrcp_client_agent_channel_add(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)394 static apt_bool_t mrcp_client_agent_channel_add(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
395 {
396 if(agent->offer_new_connection == TRUE) {
397 descriptor->connection_type = MRCP_CONNECTION_TYPE_NEW;
398 }
399 else {
400 descriptor->connection_type = MRCP_CONNECTION_TYPE_EXISTING;
401 if(APR_RING_EMPTY(&agent->connection_list, mrcp_connection_t, link)) {
402 /* offer new connection if there is no established connection yet */
403 descriptor->connection_type = MRCP_CONNECTION_TYPE_NEW;
404 }
405 }
406 /* send response */
407 return mrcp_control_channel_add_respond(agent->vtable,channel,descriptor,TRUE);
408 }
409
mrcp_client_agent_channel_modify(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)410 static apt_bool_t mrcp_client_agent_channel_modify(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
411 {
412 apt_bool_t status = TRUE;
413 if(descriptor->port) {
414 if(!channel->connection) {
415 mrcp_connection_t *connection = NULL;
416 apt_id_resource_generate(&descriptor->session_id,&descriptor->resource_name,'@',&channel->identifier,channel->pool);
417 /* no connection yet */
418 if(descriptor->connection_type == MRCP_CONNECTION_TYPE_EXISTING) {
419 /* try to find existing connection */
420 connection = mrcp_client_agent_connection_find(agent,descriptor);
421 if(!connection) {
422 apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Found No Existing TCP/MRCPv2 Connection");
423 }
424 }
425 if(!connection) {
426 /* create new connection */
427 connection = mrcp_client_agent_connection_create(agent,descriptor);
428 if(!connection) {
429 apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Failed to Establish TCP/MRCPv2 Connection");
430 }
431 }
432
433 if(connection) {
434 mrcp_connection_channel_add(connection,channel);
435 apt_obj_log(APT_LOG_MARK,APT_PRIO_INFO,channel->log_obj,"Add Control Channel <%s> %s [%d]",
436 channel->identifier.buf,
437 connection->id,
438 apr_hash_count(connection->channel_table));
439 if(descriptor->connection_type == MRCP_CONNECTION_TYPE_NEW) {
440 /* set connection type to existing for the next offers / if any */
441 descriptor->connection_type = MRCP_CONNECTION_TYPE_EXISTING;
442 }
443 }
444 else {
445 descriptor->port = 0;
446 status = FALSE;
447 }
448 }
449 }
450 /* send response */
451 return mrcp_control_channel_modify_respond(agent->vtable,channel,descriptor,status);
452 }
453
mrcp_client_agent_channel_remove(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel)454 static apt_bool_t mrcp_client_agent_channel_remove(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel)
455 {
456 if(channel->connection) {
457 mrcp_connection_t *connection = channel->connection;
458 mrcp_connection_channel_remove(connection,channel);
459 apt_obj_log(APT_LOG_MARK,APT_PRIO_INFO,channel->log_obj,"Remove Control Channel <%s> [%d]",
460 channel->identifier.buf,
461 apr_hash_count(connection->channel_table));
462 if(!connection->access_count) {
463 mrcp_client_agent_connection_remove(agent,connection);
464 /* set connection to be destroyed on channel destroy */
465 channel->connection = connection;
466 channel->removed = TRUE;
467 }
468 }
469
470 /* send response */
471 return mrcp_control_channel_remove_respond(agent->vtable,channel,TRUE);
472 }
473
mrcp_client_agent_request_cancel(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_message_t * message)474 static apt_bool_t mrcp_client_agent_request_cancel(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_message_t *message)
475 {
476 mrcp_message_t *response;
477 apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Cancel MRCP Request <%s@%s> [%d]",
478 MRCP_MESSAGE_SIDRES(message),
479 message->start_line.request_id);
480 response = mrcp_response_create(message,message->pool);
481 response->start_line.status_code = MRCP_STATUS_CODE_METHOD_FAILED;
482 return mrcp_connection_message_receive(agent->vtable,channel,response);
483 }
484
mrcp_client_agent_disconnect_raise(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)485 static apt_bool_t mrcp_client_agent_disconnect_raise(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
486 {
487 mrcp_control_channel_t *channel;
488 void *val;
489 apr_hash_index_t *it = apr_hash_first(connection->pool,connection->channel_table);
490 /* walk through the list of channels and raise disconnect event for them */
491 for(; it; it = apr_hash_next(it)) {
492 apr_hash_this(it,NULL,NULL,&val);
493 channel = val;
494 if(!channel) continue;
495
496 if(channel->active_request) {
497 mrcp_client_agent_request_cancel(channel->agent,channel,channel->active_request);
498 channel->active_request = NULL;
499 if(channel->request_timer) {
500 apt_timer_kill(channel->request_timer);
501 }
502 }
503 else if(agent->vtable->on_disconnect){
504 agent->vtable->on_disconnect(channel);
505 }
506 }
507 return TRUE;
508 }
509
mrcp_client_agent_messsage_send(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_message_t * message)510 static apt_bool_t mrcp_client_agent_messsage_send(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_message_t *message)
511 {
512 apt_bool_t status = FALSE;
513 mrcp_connection_t *connection = channel->connection;
514 apt_text_stream_t stream;
515 apt_message_status_e result;
516
517 if(!connection || !connection->sock) {
518 apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Null MRCPv2 Connection "APT_SIDRES_FMT,MRCP_MESSAGE_SIDRES(message));
519 mrcp_client_agent_request_cancel(agent,channel,message);
520 return FALSE;
521 }
522
523 do {
524 apt_text_stream_init(&stream,connection->tx_buffer,connection->tx_buffer_size);
525 result = mrcp_generator_run(connection->generator,message,&stream);
526 if(result != APT_MESSAGE_STATUS_INVALID) {
527 stream.text.length = stream.pos - stream.text.buf;
528 *stream.pos = '\0';
529
530 apt_obj_log(APT_LOG_MARK,APT_PRIO_INFO,channel->log_obj,"Send MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
531 connection->id,
532 stream.text.length,
533 connection->verbose == TRUE ? stream.text.length : 0,
534 stream.text.buf);
535
536 if(apr_socket_send(connection->sock,stream.text.buf,&stream.text.length) == APR_SUCCESS) {
537 status = TRUE;
538 }
539 else {
540 apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Failed to Send MRCPv2 Data %s",
541 connection->id);
542 }
543 }
544 else {
545 apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Failed to Generate MRCPv2 Data %s",
546 connection->id);
547 }
548 }
549 while(result == APT_MESSAGE_STATUS_INCOMPLETE);
550
551 if(status == TRUE) {
552 channel->active_request = message;
553 if(channel->request_timer && agent->request_timeout) {
554 apt_timer_set(channel->request_timer,agent->request_timeout);
555 }
556 }
557 else {
558 mrcp_client_agent_request_cancel(agent,channel,message);
559 }
560 return status;
561 }
562
mrcp_client_message_handler(mrcp_connection_t * connection,mrcp_message_t * message,apt_message_status_e status)563 static apt_bool_t mrcp_client_message_handler(mrcp_connection_t *connection, mrcp_message_t *message, apt_message_status_e status)
564 {
565 if(status == APT_MESSAGE_STATUS_COMPLETE) {
566 /* message is completely parsed */
567 mrcp_control_channel_t *channel;
568 apt_str_t identifier;
569 apt_id_resource_generate(&message->channel_id.session_id,&message->channel_id.resource_name,'@',&identifier,message->pool);
570 channel = mrcp_connection_channel_find(connection,&identifier);
571 if(channel) {
572 mrcp_connection_agent_t *agent = connection->agent;
573 if(message->start_line.message_type == MRCP_MESSAGE_TYPE_RESPONSE) {
574 if(!channel->active_request ||
575 channel->active_request->start_line.request_id != message->start_line.request_id) {
576 apt_obj_log(APT_LOG_MARK,APT_PRIO_WARNING,channel->log_obj,"Unexpected MRCP Response "APT_SIDRES_FMT" [%d]",
577 MRCP_MESSAGE_SIDRES(message),
578 message->start_line.request_id);
579 return FALSE;
580 }
581 if(channel->request_timer) {
582 apt_timer_kill(channel->request_timer);
583 }
584 channel->active_request = NULL;
585 }
586
587 mrcp_connection_message_receive(agent->vtable,channel,message);
588 }
589 else {
590 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Find Channel "APT_SIDRES_FMT" in Connection %s [%d]",
591 MRCP_MESSAGE_SIDRES(message),
592 connection->id,
593 apr_hash_count(connection->channel_table));
594 }
595 }
596 return TRUE;
597 }
598
599 /* Receive MRCP message through TCP/MRCPv2 connection */
mrcp_client_poller_signal_process(void * obj,const apr_pollfd_t * descriptor)600 static apt_bool_t mrcp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
601 {
602 mrcp_connection_agent_t *agent = obj;
603 mrcp_connection_t *connection = descriptor->client_data;
604 apr_status_t status;
605 apr_size_t offset;
606 apr_size_t length;
607 apt_text_stream_t *stream;
608 mrcp_message_t *message;
609 apt_message_status_e msg_status;
610
611 if(!connection || !connection->sock) {
612 return FALSE;
613 }
614 stream = &connection->rx_stream;
615
616 /* calculate offset remaining from the previous receive / if any */
617 offset = stream->pos - stream->text.buf;
618 /* calculate available length */
619 length = connection->rx_buffer_size - offset;
620
621 status = apr_socket_recv(connection->sock,stream->pos,&length);
622 if(status == APR_EOF || length == 0) {
623 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"TCP/MRCPv2 Peer Disconnected %s",connection->id);
624 apt_poller_task_descriptor_remove(agent->task,&connection->sock_pfd);
625 apr_socket_close(connection->sock);
626 connection->sock = NULL;
627
628 mrcp_client_agent_disconnect_raise(agent,connection);
629 return TRUE;
630 }
631
632 /* calculate actual length of the stream */
633 stream->text.length = offset + length;
634 stream->pos[length] = '\0';
635 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Receive MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
636 connection->id,
637 length,
638 connection->verbose == TRUE ? length : 0,
639 stream->pos);
640
641 /* reset pos */
642 apt_text_stream_reset(stream);
643
644 do {
645 msg_status = mrcp_parser_run(connection->parser,stream,&message);
646 if(mrcp_client_message_handler(connection,message,msg_status) == FALSE) {
647 return FALSE;
648 }
649 }
650 while(apt_text_is_eos(stream) == FALSE);
651
652 /* scroll remaining stream */
653 apt_text_stream_scroll(stream);
654 return TRUE;
655 }
656
657 /* Process task message */
mrcp_client_agent_msg_process(apt_task_t * task,apt_task_msg_t * task_msg)658 static apt_bool_t mrcp_client_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
659 {
660 apt_poller_task_t *poller_task = apt_task_object_get(task);
661 mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
662 connection_task_msg_t *msg = (connection_task_msg_t*) task_msg->data;
663
664 switch(msg->type) {
665 case CONNECTION_TASK_MSG_ADD_CHANNEL:
666 mrcp_client_agent_channel_add(agent,msg->channel,msg->descriptor);
667 break;
668 case CONNECTION_TASK_MSG_MODIFY_CHANNEL:
669 mrcp_client_agent_channel_modify(agent,msg->channel,msg->descriptor);
670 break;
671 case CONNECTION_TASK_MSG_REMOVE_CHANNEL:
672 mrcp_client_agent_channel_remove(agent,msg->channel);
673 break;
674 case CONNECTION_TASK_MSG_SEND_MESSAGE:
675 mrcp_client_agent_messsage_send(agent,msg->channel,msg->message);
676 break;
677 }
678
679 return TRUE;
680 }
681
682 /* Timer callback */
mrcp_client_timer_proc(apt_timer_t * timer,void * obj)683 static void mrcp_client_timer_proc(apt_timer_t *timer, void *obj)
684 {
685 mrcp_control_channel_t *channel = obj;
686 if(!channel) {
687 return;
688 }
689
690 if(channel->request_timer == timer) {
691 if(channel->active_request) {
692 mrcp_client_agent_request_cancel(channel->agent,channel,channel->active_request);
693 channel->active_request = NULL;
694 }
695 }
696 }
697