1 /*
2 * Copyright 2008-2014 Arsen Chaloyan
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * $Id: mrcp_server_connection.c 2249 2014-11-19 05:26:24Z achaloyan@gmail.com $
17 */
18
19 #include "mrcp_connection.h"
20 #include "mrcp_server_connection.h"
21 #include "mrcp_control_descriptor.h"
22 #include "mrcp_resource_factory.h"
23 #include "mrcp_message.h"
24 #include "apt_text_stream.h"
25 #include "apt_poller_task.h"
26 #include "apt_pool.h"
27 #include "apt_log.h"
28
29
30 struct mrcp_connection_agent_t {
31 apr_pool_t *pool;
32 apt_poller_task_t *task;
33 const mrcp_resource_factory_t *resource_factory;
34
35 /** List (ring) of MRCP connections */
36 APR_RING_HEAD(mrcp_connection_head_t, mrcp_connection_t) connection_list;
37 /** Table of pending control channels */
38 apr_hash_t *pending_channel_table;
39
40 apt_bool_t force_new_connection;
41 apr_size_t tx_buffer_size;
42 apr_size_t rx_buffer_size;
43
44 /* Listening socket */
45 apr_sockaddr_t *sockaddr;
46 apr_socket_t *listen_sock;
47 apr_pollfd_t listen_sock_pfd;
48
49 void *obj;
50 const mrcp_connection_event_vtable_t *vtable;
51 };
52
53 typedef enum {
54 CONNECTION_TASK_MSG_ADD_CHANNEL,
55 CONNECTION_TASK_MSG_MODIFY_CHANNEL,
56 CONNECTION_TASK_MSG_REMOVE_CHANNEL,
57 CONNECTION_TASK_MSG_SEND_MESSAGE
58 } connection_task_msg_type_e;
59
60 typedef struct connection_task_msg_t connection_task_msg_t;
61 struct connection_task_msg_t {
62 connection_task_msg_type_e type;
63 mrcp_connection_agent_t *agent;
64 mrcp_control_channel_t *channel;
65 mrcp_control_descriptor_t *descriptor;
66 mrcp_message_t *message;
67 };
68
69 static apt_bool_t mrcp_server_agent_on_destroy(apt_task_t *task);
70 static apt_bool_t mrcp_server_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg);
71 static apt_bool_t mrcp_server_poller_signal_process(void *obj, const apr_pollfd_t *descriptor);
72
73 static apt_bool_t mrcp_server_agent_listening_socket_create(mrcp_connection_agent_t *agent);
74 static void mrcp_server_agent_listening_socket_destroy(mrcp_connection_agent_t *agent);
75
76
77 /** Create connection agent */
mrcp_server_connection_agent_create(const char * id,const char * listen_ip,apr_port_t listen_port,apr_size_t max_connection_count,apt_bool_t force_new_connection,apr_pool_t * pool)78 MRCP_DECLARE(mrcp_connection_agent_t*) mrcp_server_connection_agent_create(
79 const char *id,
80 const char *listen_ip,
81 apr_port_t listen_port,
82 apr_size_t max_connection_count,
83 apt_bool_t force_new_connection,
84 apr_pool_t *pool)
85 {
86 apt_task_t *task;
87 apt_task_vtable_t *vtable;
88 apt_task_msg_pool_t *msg_pool;
89 mrcp_connection_agent_t *agent;
90
91 if(!listen_ip) {
92 return NULL;
93 }
94
95 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create MRCPv2 Agent [%s] %s:%hu [%"APR_SIZE_T_FMT"]",
96 id,listen_ip,listen_port,max_connection_count);
97 agent = apr_palloc(pool,sizeof(mrcp_connection_agent_t));
98 agent->pool = pool;
99 agent->sockaddr = NULL;
100 agent->listen_sock = NULL;
101 agent->force_new_connection = force_new_connection;
102 agent->rx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
103 agent->tx_buffer_size = MRCP_STREAM_BUFFER_SIZE;
104
105 apr_sockaddr_info_get(&agent->sockaddr,listen_ip,APR_INET,listen_port,0,pool);
106 if(!agent->sockaddr) {
107 return NULL;
108 }
109
110 msg_pool = apt_task_msg_pool_create_dynamic(sizeof(connection_task_msg_t),pool);
111
112 agent->task = apt_poller_task_create(
113 max_connection_count + 1,
114 mrcp_server_poller_signal_process,
115 agent,
116 msg_pool,
117 pool);
118 if(!agent->task) {
119 return NULL;
120 }
121
122 task = apt_poller_task_base_get(agent->task);
123 if(task) {
124 apt_task_name_set(task,id);
125 }
126
127 vtable = apt_poller_task_vtable_get(agent->task);
128 if(vtable) {
129 vtable->destroy = mrcp_server_agent_on_destroy;
130 vtable->process_msg = mrcp_server_agent_msg_process;
131 }
132
133 APR_RING_INIT(&agent->connection_list, mrcp_connection_t, link);
134 agent->pending_channel_table = apr_hash_make(pool);
135
136 if(mrcp_server_agent_listening_socket_create(agent) != TRUE) {
137 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Create Listening Socket [%s] %s:%hu",
138 id,
139 listen_ip,
140 listen_port);
141 }
142 return agent;
143 }
144
mrcp_server_agent_on_destroy(apt_task_t * task)145 static apt_bool_t mrcp_server_agent_on_destroy(apt_task_t *task)
146 {
147 apt_poller_task_t *poller_task = apt_task_object_get(task);
148 mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
149
150 mrcp_server_agent_listening_socket_destroy(agent);
151 apt_poller_task_cleanup(poller_task);
152 return TRUE;
153 }
154
155 /** Destroy connection agent. */
mrcp_server_connection_agent_destroy(mrcp_connection_agent_t * agent)156 MRCP_DECLARE(apt_bool_t) mrcp_server_connection_agent_destroy(mrcp_connection_agent_t *agent)
157 {
158 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy MRCPv2 Agent [%s]",
159 mrcp_server_connection_agent_id_get(agent));
160 return apt_poller_task_destroy(agent->task);
161 }
162
163 /** Start connection agent. */
mrcp_server_connection_agent_start(mrcp_connection_agent_t * agent)164 MRCP_DECLARE(apt_bool_t) mrcp_server_connection_agent_start(mrcp_connection_agent_t *agent)
165 {
166 return apt_poller_task_start(agent->task);
167 }
168
169 /** Terminate connection agent. */
mrcp_server_connection_agent_terminate(mrcp_connection_agent_t * agent)170 MRCP_DECLARE(apt_bool_t) mrcp_server_connection_agent_terminate(mrcp_connection_agent_t *agent)
171 {
172 return apt_poller_task_terminate(agent->task);
173 }
174
175 /** Set connection event handler. */
mrcp_server_connection_agent_handler_set(mrcp_connection_agent_t * agent,void * obj,const mrcp_connection_event_vtable_t * vtable)176 MRCP_DECLARE(void) mrcp_server_connection_agent_handler_set(
177 mrcp_connection_agent_t *agent,
178 void *obj,
179 const mrcp_connection_event_vtable_t *vtable)
180 {
181 agent->obj = obj;
182 agent->vtable = vtable;
183 }
184
185 /** Set MRCP resource factory */
mrcp_server_connection_resource_factory_set(mrcp_connection_agent_t * agent,const mrcp_resource_factory_t * resource_factroy)186 MRCP_DECLARE(void) mrcp_server_connection_resource_factory_set(
187 mrcp_connection_agent_t *agent,
188 const mrcp_resource_factory_t *resource_factroy)
189 {
190 agent->resource_factory = resource_factroy;
191 }
192
193 /** Set rx buffer size */
mrcp_server_connection_rx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)194 MRCP_DECLARE(void) mrcp_server_connection_rx_size_set(
195 mrcp_connection_agent_t *agent,
196 apr_size_t size)
197 {
198 if(size < MRCP_STREAM_BUFFER_SIZE) {
199 size = MRCP_STREAM_BUFFER_SIZE;
200 }
201 agent->rx_buffer_size = size;
202 }
203
204 /** Set tx buffer size */
mrcp_server_connection_tx_size_set(mrcp_connection_agent_t * agent,apr_size_t size)205 MRCP_DECLARE(void) mrcp_server_connection_tx_size_set(
206 mrcp_connection_agent_t *agent,
207 apr_size_t size)
208 {
209 if(size < MRCP_STREAM_BUFFER_SIZE) {
210 size = MRCP_STREAM_BUFFER_SIZE;
211 }
212 agent->tx_buffer_size = size;
213 }
214
215 /** Get task */
mrcp_server_connection_agent_task_get(const mrcp_connection_agent_t * agent)216 MRCP_DECLARE(apt_task_t*) mrcp_server_connection_agent_task_get(const mrcp_connection_agent_t *agent)
217 {
218 return apt_poller_task_base_get(agent->task);
219 }
220
221 /** Get external object */
mrcp_server_connection_agent_object_get(const mrcp_connection_agent_t * agent)222 MRCP_DECLARE(void*) mrcp_server_connection_agent_object_get(const mrcp_connection_agent_t *agent)
223 {
224 return agent->obj;
225 }
226
227 /** Get string identifier */
mrcp_server_connection_agent_id_get(const mrcp_connection_agent_t * agent)228 MRCP_DECLARE(const char*) mrcp_server_connection_agent_id_get(const mrcp_connection_agent_t *agent)
229 {
230 apt_task_t *task = apt_poller_task_base_get(agent->task);
231 return apt_task_name_get(task);
232 }
233
234
235 /** Create MRCPv2 control channel */
mrcp_server_control_channel_create(mrcp_connection_agent_t * agent,void * obj,apr_pool_t * pool)236 MRCP_DECLARE(mrcp_control_channel_t*) mrcp_server_control_channel_create(mrcp_connection_agent_t *agent, void *obj, apr_pool_t *pool)
237 {
238 mrcp_control_channel_t *channel = apr_palloc(pool,sizeof(mrcp_control_channel_t));
239 channel->agent = agent;
240 channel->connection = NULL;
241 channel->active_request = NULL;
242 channel->request_timer = NULL;
243 channel->removed = FALSE;
244 channel->obj = obj;
245 channel->log_obj = NULL;
246 channel->pool = pool;
247 return channel;
248 }
249
250 /** Destroy MRCPv2 control channel */
mrcp_server_control_channel_destroy(mrcp_control_channel_t * channel)251 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_destroy(mrcp_control_channel_t *channel)
252 {
253 if(channel && channel->connection && channel->removed == TRUE) {
254 mrcp_connection_t *connection = channel->connection;
255 channel->connection = NULL;
256 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy TCP/MRCPv2 Connection %s",connection->id);
257 mrcp_connection_destroy(connection);
258 }
259 return TRUE;
260 }
261
262 /** Signal task message */
mrcp_server_control_message_signal(connection_task_msg_type_e type,mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor,mrcp_message_t * message)263 static apt_bool_t mrcp_server_control_message_signal(
264 connection_task_msg_type_e type,
265 mrcp_connection_agent_t *agent,
266 mrcp_control_channel_t *channel,
267 mrcp_control_descriptor_t *descriptor,
268 mrcp_message_t *message)
269 {
270 apt_task_t *task = apt_poller_task_base_get(agent->task);
271 apt_task_msg_t *task_msg = apt_task_msg_get(task);
272 if(task_msg) {
273 connection_task_msg_t *msg = (connection_task_msg_t*)task_msg->data;
274 msg->type = type;
275 msg->agent = agent;
276 msg->channel = channel;
277 msg->descriptor = descriptor;
278 msg->message = message;
279 apt_task_msg_signal(task,task_msg);
280 }
281 return TRUE;
282 }
283
284 /** Add MRCPv2 control channel */
mrcp_server_control_channel_add(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)285 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_add(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
286 {
287 return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_ADD_CHANNEL,channel->agent,channel,descriptor,NULL);
288 }
289
290 /** Modify MRCPv2 control channel */
mrcp_server_control_channel_modify(mrcp_control_channel_t * channel,mrcp_control_descriptor_t * descriptor)291 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_modify(mrcp_control_channel_t *channel, mrcp_control_descriptor_t *descriptor)
292 {
293 return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_MODIFY_CHANNEL,channel->agent,channel,descriptor,NULL);
294 }
295
296 /** Remove MRCPv2 control channel */
mrcp_server_control_channel_remove(mrcp_control_channel_t * channel)297 MRCP_DECLARE(apt_bool_t) mrcp_server_control_channel_remove(mrcp_control_channel_t *channel)
298 {
299 return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_REMOVE_CHANNEL,channel->agent,channel,NULL,NULL);
300 }
301
302 /** Send MRCPv2 message */
mrcp_server_control_message_send(mrcp_control_channel_t * channel,mrcp_message_t * message)303 MRCP_DECLARE(apt_bool_t) mrcp_server_control_message_send(mrcp_control_channel_t *channel, mrcp_message_t *message)
304 {
305 return mrcp_server_control_message_signal(CONNECTION_TASK_MSG_SEND_MESSAGE,channel->agent,channel,NULL,message);
306 }
307
308 /** Create listening socket and add it to pollset */
mrcp_server_agent_listening_socket_create(mrcp_connection_agent_t * agent)309 static apt_bool_t mrcp_server_agent_listening_socket_create(mrcp_connection_agent_t *agent)
310 {
311 apr_status_t status;
312 if(!agent->sockaddr) {
313 return FALSE;
314 }
315
316 /* create listening socket */
317 status = apr_socket_create(&agent->listen_sock, agent->sockaddr->family, SOCK_STREAM, APR_PROTO_TCP, agent->pool);
318 if(status != APR_SUCCESS) {
319 return FALSE;
320 }
321
322 apr_socket_opt_set(agent->listen_sock, APR_SO_NONBLOCK, 0);
323 apr_socket_timeout_set(agent->listen_sock, -1);
324 apr_socket_opt_set(agent->listen_sock, APR_SO_REUSEADDR, 1);
325
326 status = apr_socket_bind(agent->listen_sock, agent->sockaddr);
327 if(status != APR_SUCCESS) {
328 apr_socket_close(agent->listen_sock);
329 agent->listen_sock = NULL;
330 return FALSE;
331 }
332 status = apr_socket_listen(agent->listen_sock, SOMAXCONN);
333 if(status != APR_SUCCESS) {
334 apr_socket_close(agent->listen_sock);
335 agent->listen_sock = NULL;
336 return FALSE;
337 }
338
339 /* add listening socket to pollset */
340 memset(&agent->listen_sock_pfd,0,sizeof(apr_pollfd_t));
341 agent->listen_sock_pfd.desc_type = APR_POLL_SOCKET;
342 agent->listen_sock_pfd.reqevents = APR_POLLIN;
343 agent->listen_sock_pfd.desc.s = agent->listen_sock;
344 agent->listen_sock_pfd.client_data = agent->listen_sock;
345 if(apt_poller_task_descriptor_add(agent->task, &agent->listen_sock_pfd) != TRUE) {
346 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add Listening Socket to Pollset [%s]",
347 apt_task_name_get(apt_poller_task_base_get(agent->task)));
348 apr_socket_close(agent->listen_sock);
349 agent->listen_sock = NULL;
350 return FALSE;
351 }
352
353 return TRUE;
354 }
355
356 /** Remove from pollset and destroy listening socket */
mrcp_server_agent_listening_socket_destroy(mrcp_connection_agent_t * agent)357 static void mrcp_server_agent_listening_socket_destroy(mrcp_connection_agent_t *agent)
358 {
359 if(agent->listen_sock) {
360 apt_poller_task_descriptor_remove(agent->task,&agent->listen_sock_pfd);
361 apr_socket_close(agent->listen_sock);
362 agent->listen_sock = NULL;
363 }
364 }
365
mrcp_connection_channel_associate(mrcp_connection_agent_t * agent,mrcp_connection_t * connection,const mrcp_message_t * message)366 static mrcp_control_channel_t* mrcp_connection_channel_associate(mrcp_connection_agent_t *agent, mrcp_connection_t *connection, const mrcp_message_t *message)
367 {
368 apt_str_t identifier;
369 mrcp_control_channel_t *channel;
370 if(!connection || !message) {
371 return NULL;
372 }
373 apt_id_resource_generate(&message->channel_id.session_id,&message->channel_id.resource_name,'@',&identifier,connection->pool);
374 channel = mrcp_connection_channel_find(connection,&identifier);
375 if(!channel) {
376 channel = apr_hash_get(agent->pending_channel_table,identifier.buf,identifier.length);
377 if(channel) {
378 apr_hash_set(agent->pending_channel_table,identifier.buf,identifier.length,NULL);
379 mrcp_connection_channel_add(connection,channel);
380 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Assign Control Channel <%s> to Connection %s [%d] -> [%d]",
381 channel->identifier.buf,
382 connection->id,
383 apr_hash_count(agent->pending_channel_table),
384 apr_hash_count(connection->channel_table));
385 }
386 }
387 return channel;
388 }
389
mrcp_connection_find(mrcp_connection_agent_t * agent,const apt_str_t * remote_ip)390 static mrcp_connection_t* mrcp_connection_find(mrcp_connection_agent_t *agent, const apt_str_t *remote_ip)
391 {
392 mrcp_connection_t *connection;
393 if(!agent || !remote_ip) {
394 return NULL;
395 }
396
397 for(connection = APR_RING_FIRST(&agent->connection_list);
398 connection != APR_RING_SENTINEL(&agent->connection_list, mrcp_connection_t, link);
399 connection = APR_RING_NEXT(connection, link)) {
400 if(apt_string_compare(&connection->remote_ip,remote_ip) == TRUE) {
401 return connection;
402 }
403 }
404
405 return NULL;
406 }
407
mrcp_connection_remove(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)408 static apt_bool_t mrcp_connection_remove(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
409 {
410 APR_RING_REMOVE(connection,link);
411 return TRUE;
412 }
413
mrcp_server_agent_connection_accept(mrcp_connection_agent_t * agent)414 static apt_bool_t mrcp_server_agent_connection_accept(mrcp_connection_agent_t *agent)
415 {
416 char *local_ip = NULL;
417 char *remote_ip = NULL;
418
419 mrcp_connection_t *connection = mrcp_connection_create();
420
421 if(apr_socket_accept(&connection->sock,agent->listen_sock,connection->pool) != APR_SUCCESS) {
422 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Accept Connection");
423 mrcp_connection_destroy(connection);
424 return FALSE;
425 }
426
427 if(apr_socket_addr_get(&connection->r_sockaddr,APR_REMOTE,connection->sock) != APR_SUCCESS ||
428 apr_socket_addr_get(&connection->l_sockaddr,APR_LOCAL,connection->sock) != APR_SUCCESS) {
429 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Get Socket Address");
430 apr_socket_close(connection->sock);
431 mrcp_connection_destroy(connection);
432 return FALSE;
433 }
434
435 apr_sockaddr_ip_get(&local_ip,connection->l_sockaddr);
436 apr_sockaddr_ip_get(&remote_ip,connection->r_sockaddr);
437 apt_string_set(&connection->remote_ip,remote_ip);
438 connection->id = apr_psprintf(connection->pool,"%s:%hu <-> %s:%hu",
439 local_ip,connection->l_sockaddr->port,
440 remote_ip,connection->r_sockaddr->port);
441
442 if(apr_hash_count(agent->pending_channel_table) == 0) {
443 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Reject Unexpected TCP/MRCPv2 Connection %s",connection->id);
444 apr_socket_close(connection->sock);
445 mrcp_connection_destroy(connection);
446 return FALSE;
447 }
448
449 memset(&connection->sock_pfd,0,sizeof(apr_pollfd_t));
450 connection->sock_pfd.desc_type = APR_POLL_SOCKET;
451 connection->sock_pfd.reqevents = APR_POLLIN;
452 connection->sock_pfd.desc.s = connection->sock;
453 connection->sock_pfd.client_data = connection;
454 if(apt_poller_task_descriptor_add(agent->task, &connection->sock_pfd) != TRUE) {
455 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add to Pollset %s",connection->id);
456 apr_socket_close(connection->sock);
457 mrcp_connection_destroy(connection);
458 return FALSE;
459 }
460
461 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Accepted TCP/MRCPv2 Connection %s",connection->id);
462 connection->agent = agent;
463 APR_RING_INSERT_TAIL(&agent->connection_list,connection,mrcp_connection_t,link);
464
465 connection->parser = mrcp_parser_create(agent->resource_factory,connection->pool);
466 connection->generator = mrcp_generator_create(agent->resource_factory,connection->pool);
467
468 connection->tx_buffer_size = agent->tx_buffer_size;
469 connection->tx_buffer = apr_palloc(connection->pool,connection->tx_buffer_size+1);
470
471 connection->rx_buffer_size = agent->rx_buffer_size;
472 connection->rx_buffer = apr_palloc(connection->pool,connection->rx_buffer_size+1);
473 apt_text_stream_init(&connection->rx_stream,connection->rx_buffer,connection->rx_buffer_size);
474
475 if(apt_log_masking_get() != APT_LOG_MASKING_NONE) {
476 connection->verbose = FALSE;
477 mrcp_parser_verbose_set(connection->parser,TRUE);
478 mrcp_generator_verbose_set(connection->generator,TRUE);
479 }
480 return TRUE;
481 }
482
mrcp_server_agent_connection_close(mrcp_connection_agent_t * agent,mrcp_connection_t * connection)483 static apt_bool_t mrcp_server_agent_connection_close(mrcp_connection_agent_t *agent, mrcp_connection_t *connection)
484 {
485 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"TCP/MRCPv2 Peer Disconnected %s",connection->id);
486 apt_poller_task_descriptor_remove(agent->task,&connection->sock_pfd);
487 apr_socket_close(connection->sock);
488 connection->sock = NULL;
489 if(!connection->access_count) {
490 mrcp_connection_remove(agent,connection);
491 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy TCP/MRCPv2 Connection %s",connection->id);
492 mrcp_connection_destroy(connection);
493 }
494 return TRUE;
495 }
496
mrcp_server_agent_channel_add(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * offer)497 static apt_bool_t mrcp_server_agent_channel_add(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *offer)
498 {
499 mrcp_control_descriptor_t *answer = mrcp_control_answer_create(offer,channel->pool);
500 apt_id_resource_generate(&offer->session_id,&offer->resource_name,'@',&channel->identifier,channel->pool);
501 if(offer->port) {
502 answer->port = agent->sockaddr->port;
503 }
504 if(offer->connection_type == MRCP_CONNECTION_TYPE_EXISTING) {
505 if(agent->force_new_connection == TRUE) {
506 /* force client to establish new connection */
507 answer->connection_type = MRCP_CONNECTION_TYPE_NEW;
508 }
509 else {
510 mrcp_connection_t *connection = NULL;
511 /* try to find any existing connection */
512 connection = mrcp_connection_find(agent,&offer->ip);
513 if(!connection) {
514 /* no existing conection found, force the new one */
515 answer->connection_type = MRCP_CONNECTION_TYPE_NEW;
516 }
517 }
518 }
519
520 apr_hash_set(agent->pending_channel_table,channel->identifier.buf,channel->identifier.length,channel);
521 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Add Pending Control Channel <%s> [%d]",
522 channel->identifier.buf,
523 apr_hash_count(agent->pending_channel_table));
524 /* send response */
525 return mrcp_control_channel_add_respond(agent->vtable,channel,answer,TRUE);
526 }
527
mrcp_server_agent_channel_modify(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel,mrcp_control_descriptor_t * offer)528 static apt_bool_t mrcp_server_agent_channel_modify(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel, mrcp_control_descriptor_t *offer)
529 {
530 mrcp_control_descriptor_t *answer = mrcp_control_answer_create(offer,channel->pool);
531 if(offer->port) {
532 answer->port = agent->sockaddr->port;
533 }
534 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Modify Control Channel <%s>",channel->identifier.buf);
535 /* send response */
536 return mrcp_control_channel_modify_respond(agent->vtable,channel,answer,TRUE);
537 }
538
mrcp_server_agent_channel_remove(mrcp_connection_agent_t * agent,mrcp_control_channel_t * channel)539 static apt_bool_t mrcp_server_agent_channel_remove(mrcp_connection_agent_t *agent, mrcp_control_channel_t *channel)
540 {
541 mrcp_connection_t *connection = channel->connection;
542 if(connection) {
543 mrcp_connection_channel_remove(connection,channel);
544 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove Control Channel <%s> [%d]",
545 channel->identifier.buf,
546 apr_hash_count(connection->channel_table));
547 if(!connection->access_count) {
548 if(!connection->sock) {
549 mrcp_connection_remove(agent,connection);
550 /* set connection to be destroyed on channel destroy */
551 apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Mark Connection for Removal %s",connection->id);
552 channel->connection = connection;
553 channel->removed = TRUE;
554 }
555 }
556 }
557 else {
558 apr_hash_set(agent->pending_channel_table,channel->identifier.buf,channel->identifier.length,NULL);
559 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove Pending Control Channel <%s> [%d]",
560 channel->identifier.buf,
561 apr_hash_count(agent->pending_channel_table));
562 }
563 /* send response */
564 return mrcp_control_channel_remove_respond(agent->vtable,channel,TRUE);
565 }
566
mrcp_server_agent_messsage_send(mrcp_connection_agent_t * agent,mrcp_connection_t * connection,mrcp_message_t * message)567 static apt_bool_t mrcp_server_agent_messsage_send(mrcp_connection_agent_t *agent, mrcp_connection_t *connection, mrcp_message_t *message)
568 {
569 apt_bool_t status = FALSE;
570 apt_text_stream_t stream;
571 apt_message_status_e result;
572 if(!connection || !connection->sock) {
573 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Null MRCPv2 Connection "APT_SIDRES_FMT,MRCP_MESSAGE_SIDRES(message));
574 return FALSE;
575 }
576
577 do {
578 apt_text_stream_init(&stream,connection->tx_buffer,connection->tx_buffer_size);
579 result = mrcp_generator_run(connection->generator,message,&stream);
580 if(result != APT_MESSAGE_STATUS_INVALID) {
581 stream.text.length = stream.pos - stream.text.buf;
582 *stream.pos = '\0';
583
584 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Send MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
585 connection->id,
586 stream.text.length,
587 connection->verbose == TRUE ? stream.text.length : 0,
588 stream.text.buf);
589
590 if(apr_socket_send(connection->sock,stream.text.buf,&stream.text.length) == APR_SUCCESS) {
591 status = TRUE;
592 }
593 else {
594 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send MRCPv2 Data");
595 }
596 }
597 else {
598 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Generate MRCPv2 Data");
599 }
600 }
601 while(result == APT_MESSAGE_STATUS_INCOMPLETE);
602
603 return status;
604 }
605
mrcp_server_message_handler(mrcp_connection_t * connection,mrcp_message_t * message,apt_message_status_e status)606 static apt_bool_t mrcp_server_message_handler(mrcp_connection_t *connection, mrcp_message_t *message, apt_message_status_e status)
607 {
608 mrcp_connection_agent_t *agent = connection->agent;
609 if(status == APT_MESSAGE_STATUS_COMPLETE) {
610 /* message is completely parsed */
611 mrcp_control_channel_t *channel = mrcp_connection_channel_associate(agent,connection,message);
612 if(channel) {
613 mrcp_connection_message_receive(agent->vtable,channel,message);
614 }
615 else {
616 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Find Channel "APT_SIDRES_FMT" in Connection %s",
617 MRCP_MESSAGE_SIDRES(message),
618 connection->id);
619 }
620 }
621 else if(status == APT_MESSAGE_STATUS_INVALID) {
622 /* error case */
623 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Parse MRCPv2 Data");
624 if(message && message->resource) {
625 mrcp_message_t *response;
626 response = mrcp_response_create(message,message->pool);
627 response->start_line.status_code = MRCP_STATUS_CODE_UNRECOGNIZED_MESSAGE;
628 if(mrcp_server_agent_messsage_send(agent,connection,response) == FALSE) {
629 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send MRCPv2 Response");
630 }
631 }
632 }
633 return TRUE;
634 }
635
636 /* Receive MRCP message through TCP/MRCPv2 connection */
mrcp_server_poller_signal_process(void * obj,const apr_pollfd_t * descriptor)637 static apt_bool_t mrcp_server_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
638 {
639 mrcp_connection_agent_t *agent = obj;
640 mrcp_connection_t *connection = descriptor->client_data;
641 apr_status_t status;
642 apr_size_t offset;
643 apr_size_t length;
644 apt_text_stream_t *stream;
645 mrcp_message_t *message;
646 apt_message_status_e msg_status;
647
648 if(descriptor->desc.s == agent->listen_sock) {
649 return mrcp_server_agent_connection_accept(agent);
650 }
651
652 if(!connection || !connection->sock) {
653 return FALSE;
654 }
655 stream = &connection->rx_stream;
656
657 /* calculate offset remaining from the previous receive / if any */
658 offset = stream->pos - stream->text.buf;
659 /* calculate available length */
660 length = connection->rx_buffer_size - offset;
661
662 status = apr_socket_recv(connection->sock,stream->pos,&length);
663 if(status == APR_EOF || length == 0) {
664 return mrcp_server_agent_connection_close(agent,connection);
665 }
666
667 /* calculate actual length of the stream */
668 stream->text.length = offset + length;
669 stream->pos[length] = '\0';
670
671 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Receive MRCPv2 Data %s [%"APR_SIZE_T_FMT" bytes]\n%.*s",
672 connection->id,
673 length,
674 connection->verbose == TRUE ? length : 0,
675 stream->pos);
676
677 /* reset pos */
678 apt_text_stream_reset(stream);
679
680 do {
681 msg_status = mrcp_parser_run(connection->parser,stream,&message);
682 if(mrcp_server_message_handler(connection,message,msg_status) == FALSE) {
683 return FALSE;
684 }
685 }
686 while(apt_text_is_eos(stream) == FALSE);
687
688 /* scroll remaining stream */
689 apt_text_stream_scroll(stream);
690 return TRUE;
691 }
692
693 /* Process task message */
mrcp_server_agent_msg_process(apt_task_t * task,apt_task_msg_t * task_msg)694 static apt_bool_t mrcp_server_agent_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
695 {
696 apt_poller_task_t *poller_task = apt_task_object_get(task);
697 mrcp_connection_agent_t *agent = apt_poller_task_object_get(poller_task);
698 connection_task_msg_t *msg = (connection_task_msg_t*) task_msg->data;
699 switch(msg->type) {
700 case CONNECTION_TASK_MSG_ADD_CHANNEL:
701 mrcp_server_agent_channel_add(agent,msg->channel,msg->descriptor);
702 break;
703 case CONNECTION_TASK_MSG_MODIFY_CHANNEL:
704 mrcp_server_agent_channel_modify(agent,msg->channel,msg->descriptor);
705 break;
706 case CONNECTION_TASK_MSG_REMOVE_CHANNEL:
707 mrcp_server_agent_channel_remove(agent,msg->channel);
708 break;
709 case CONNECTION_TASK_MSG_SEND_MESSAGE:
710 mrcp_server_agent_messsage_send(agent,msg->channel->connection,msg->message);
711 break;
712 }
713
714 return TRUE;
715 }
716