1 /*
2 * Copyright 2008-2014 Arsen Chaloyan
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * $Id: rtsp_client.c 2249 2014-11-19 05:26:24Z achaloyan@gmail.com $
17 */
18
19 #ifdef WIN32
20 #pragma warning(disable: 4127)
21 #endif
22 #include <apr_ring.h>
23 #include <apr_hash.h>
24 #include "rtsp_client.h"
25 #include "rtsp_stream.h"
26 #include "apt_poller_task.h"
27 #include "apt_text_stream.h"
28 #include "apt_pool.h"
29 #include "apt_obj_list.h"
30 #include "apt_log.h"
31
32 #define RTSP_STREAM_BUFFER_SIZE 1024
33
34 typedef struct rtsp_client_connection_t rtsp_client_connection_t;
35
36 typedef enum {
37 TERMINATION_STATE_NONE,
38 TERMINATION_STATE_REQUESTED,
39 TERMINATION_STATE_INPROGRESS
40 } termination_state_e;
41
42 /** RTSP client */
43 struct rtsp_client_t {
44 apr_pool_t *pool;
45 apt_poller_task_t *task;
46
47 /** List (ring) of RTSP connections */
48 APR_RING_HEAD(rtsp_client_connection_head_t, rtsp_client_connection_t) connection_list;
49
50 apr_uint32_t request_timeout;
51
52 void *obj;
53 const rtsp_client_vtable_t *vtable;
54 };
55
56 /** RTSP connection */
57 struct rtsp_client_connection_t {
58 /** Ring entry */
59 APR_RING_ENTRY(rtsp_client_connection_t) link;
60
61 /** Memory pool */
62 apr_pool_t *pool;
63 /** Connected socket */
64 apr_socket_t *sock;
65 /** Socket poll descriptor */
66 apr_pollfd_t sock_pfd;
67 /** String identifier used for traces */
68 const char *id;
69 /** RTSP client, connection belongs to */
70 rtsp_client_t *client;
71
72 /** Handle table (rtsp_client_session_t*) */
73 apr_hash_t *handle_table;
74 /** Session table (rtsp_client_session_t*) */
75 apr_hash_t *session_table;
76
77 /** Inprogress request/session queue (rtsp_client_session_t*) */
78 apt_obj_list_t *inprogress_request_queue;
79
80 /** Last CSeq sent */
81 apr_size_t last_cseq;
82
83 char rx_buffer[RTSP_STREAM_BUFFER_SIZE];
84 apt_text_stream_t rx_stream;
85 rtsp_parser_t *parser;
86
87 char tx_buffer[RTSP_STREAM_BUFFER_SIZE];
88 apt_text_stream_t tx_stream;
89 rtsp_generator_t *generator;
90 };
91
92 /** RTSP session */
93 struct rtsp_client_session_t {
94 apr_pool_t *pool;
95 void *obj;
96
97 /** Connection */
98 rtsp_client_connection_t *connection;
99 /** Session identifier */
100 apt_str_t id;
101
102 apt_str_t server_ip;
103 apr_port_t server_port;
104 apt_str_t resource_location;
105
106 /** In-progress request */
107 rtsp_message_t *active_request;
108 /** Pending request queue (rtsp_message_t*) */
109 apt_obj_list_t *pending_request_queue;
110
111 /** Timer used for request timeouts */
112 apt_timer_t *request_timer;
113
114 /** Resource table */
115 apr_hash_t *resource_table;
116
117 /** termination state (none -> requested -> terminating) */
118 termination_state_e term_state;
119 };
120
121 typedef enum {
122 TASK_MSG_SEND_MESSAGE,
123 TASK_MSG_TERMINATE_SESSION
124 } task_msg_data_type_e;
125
126 typedef struct task_msg_data_t task_msg_data_t;
127
128 struct task_msg_data_t {
129 task_msg_data_type_e type;
130 rtsp_client_t *client;
131 rtsp_client_session_t *session;
132 rtsp_message_t *message;
133 };
134
135 static apt_bool_t rtsp_client_task_msg_process(apt_task_t *task, apt_task_msg_t *msg);
136
137 static apt_bool_t rtsp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor);
138
139 static apt_bool_t rtsp_client_message_handler(rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message, apt_message_status_e status);
140 static apt_bool_t rtsp_client_message_send(rtsp_client_t *client, rtsp_client_connection_t *connection, rtsp_message_t *message);
141 static apt_bool_t rtsp_client_session_message_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message);
142 static apt_bool_t rtsp_client_session_response_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *request, rtsp_message_t *response);
143
144 static void rtsp_client_timer_proc(apt_timer_t *timer, void *obj);
145
146 /** Get string identifier */
rtsp_client_id_get(const rtsp_client_t * client)147 static const char* rtsp_client_id_get(const rtsp_client_t *client)
148 {
149 apt_task_t *task = apt_poller_task_base_get(client->task);
150 return apt_task_name_get(task);
151 }
152
153 /** Create RTSP client */
rtsp_client_create(const char * id,apr_size_t max_connection_count,apr_size_t request_timeout,void * obj,const rtsp_client_vtable_t * handler,apr_pool_t * pool)154 RTSP_DECLARE(rtsp_client_t*) rtsp_client_create(
155 const char *id,
156 apr_size_t max_connection_count,
157 apr_size_t request_timeout,
158 void *obj,
159 const rtsp_client_vtable_t *handler,
160 apr_pool_t *pool)
161 {
162 apt_task_t *task;
163 apt_task_vtable_t *vtable;
164 apt_task_msg_pool_t *msg_pool;
165 rtsp_client_t *client;
166
167 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create RTSP Client [%s] [%"APR_SIZE_T_FMT"]",
168 id, max_connection_count);
169 client = apr_palloc(pool,sizeof(rtsp_client_t));
170 client->pool = pool;
171 client->obj = obj;
172 client->vtable = handler;
173
174 msg_pool = apt_task_msg_pool_create_dynamic(sizeof(task_msg_data_t),pool);
175
176 client->task = apt_poller_task_create(
177 max_connection_count,
178 rtsp_client_poller_signal_process,
179 client,
180 msg_pool,
181 pool);
182 if(!client->task) {
183 return NULL;
184 }
185
186 task = apt_poller_task_base_get(client->task);
187 if(task) {
188 apt_task_name_set(task,id);
189 }
190
191 vtable = apt_poller_task_vtable_get(client->task);
192 if(vtable) {
193 vtable->process_msg = rtsp_client_task_msg_process;
194 }
195
196 APR_RING_INIT(&client->connection_list, rtsp_client_connection_t, link);
197 client->request_timeout = (apr_uint32_t)request_timeout;
198 return client;
199 }
200
201 /** Destroy RTSP client */
rtsp_client_destroy(rtsp_client_t * client)202 RTSP_DECLARE(apt_bool_t) rtsp_client_destroy(rtsp_client_t *client)
203 {
204 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy RTSP Client [%s]",
205 rtsp_client_id_get(client));
206 return apt_poller_task_destroy(client->task);
207 }
208
209 /** Start connection agent */
rtsp_client_start(rtsp_client_t * client)210 RTSP_DECLARE(apt_bool_t) rtsp_client_start(rtsp_client_t *client)
211 {
212 return apt_poller_task_start(client->task);
213 }
214
215 /** Terminate connection agent */
rtsp_client_terminate(rtsp_client_t * client)216 RTSP_DECLARE(apt_bool_t) rtsp_client_terminate(rtsp_client_t *client)
217 {
218 return apt_poller_task_terminate(client->task);
219 }
220
221 /** Get task */
rtsp_client_task_get(const rtsp_client_t * client)222 RTSP_DECLARE(apt_task_t*) rtsp_client_task_get(const rtsp_client_t *client)
223 {
224 return apt_poller_task_base_get(client->task);
225 }
226
227 /** Get external object */
rtsp_client_object_get(const rtsp_client_t * client)228 RTSP_DECLARE(void*) rtsp_client_object_get(const rtsp_client_t *client)
229 {
230 return client->obj;
231 }
232
233 /** Get object associated with the session */
rtsp_client_session_object_get(const rtsp_client_session_t * session)234 RTSP_DECLARE(void*) rtsp_client_session_object_get(const rtsp_client_session_t *session)
235 {
236 return session->obj;
237 }
238
239 /** Set object associated with the session */
rtsp_client_session_object_set(rtsp_client_session_t * session,void * obj)240 RTSP_DECLARE(void) rtsp_client_session_object_set(rtsp_client_session_t *session, void *obj)
241 {
242 session->obj = obj;
243 }
244
245 /** Get the session identifier */
rtsp_client_session_id_get(const rtsp_client_session_t * session)246 RTSP_DECLARE(const apt_str_t*) rtsp_client_session_id_get(const rtsp_client_session_t *session)
247 {
248 return &session->id;
249 }
250
251 /** Signal task message */
rtsp_client_control_message_signal(task_msg_data_type_e type,rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)252 static apt_bool_t rtsp_client_control_message_signal(
253 task_msg_data_type_e type,
254 rtsp_client_t *client,
255 rtsp_client_session_t *session,
256 rtsp_message_t *message)
257 {
258 apt_task_t *task = apt_poller_task_base_get(client->task);
259 apt_task_msg_t *task_msg = apt_task_msg_get(task);
260 if(task_msg) {
261 task_msg_data_t *data = (task_msg_data_t*)task_msg->data;
262 data->type = type;
263 data->client = client;
264 data->session = session;
265 data->message = message;
266 apt_task_msg_signal(task,task_msg);
267 }
268 return TRUE;
269 }
270
271 /** Create RTSP session handle */
rtsp_client_session_create(rtsp_client_t * client,const char * server_ip,apr_port_t server_port,const char * resource_location)272 RTSP_DECLARE(rtsp_client_session_t*) rtsp_client_session_create(
273 rtsp_client_t *client,
274 const char *server_ip,
275 apr_port_t server_port,
276 const char *resource_location)
277 {
278 rtsp_client_session_t *session;
279 apr_pool_t *pool = apt_pool_create();
280 session = apr_palloc(pool,sizeof(rtsp_client_session_t));
281 session->pool = pool;
282 session->obj = NULL;
283 session->connection = NULL;
284 session->active_request = NULL;
285 session->pending_request_queue = apt_list_create(pool);
286 session->request_timer = apt_poller_task_timer_create(
287 client->task,
288 rtsp_client_timer_proc,
289 session,
290 pool);
291 session->resource_table = apr_hash_make(pool);
292 session->term_state = TERMINATION_STATE_NONE;
293
294 apt_string_assign(&session->server_ip,server_ip,pool);
295 session->server_port = server_port;
296 apt_string_assign(&session->resource_location,resource_location,pool);
297 apt_string_reset(&session->id);
298 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create RTSP Handle "APT_PTR_FMT,session);
299 return session;
300 }
301
302 /** Destroy RTSP session handle */
rtsp_client_session_destroy(rtsp_client_session_t * session)303 RTSP_DECLARE(void) rtsp_client_session_destroy(rtsp_client_session_t *session)
304 {
305 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy RTSP Handle "APT_PTR_FMT,session);
306 if(session && session->pool) {
307 apr_pool_destroy(session->pool);
308 }
309 }
310
311 /** Signal terminate request */
rtsp_client_session_terminate(rtsp_client_t * client,rtsp_client_session_t * session)312 RTSP_DECLARE(apt_bool_t) rtsp_client_session_terminate(rtsp_client_t *client, rtsp_client_session_t *session)
313 {
314 return rtsp_client_control_message_signal(TASK_MSG_TERMINATE_SESSION,client,session,NULL);
315 }
316
317 /** Signal RTSP message */
rtsp_client_session_request(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)318 RTSP_DECLARE(apt_bool_t) rtsp_client_session_request(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message)
319 {
320 return rtsp_client_control_message_signal(TASK_MSG_SEND_MESSAGE,client,session,message);
321 }
322
323
324 /** Create connection */
rtsp_client_connect(rtsp_client_t * client,rtsp_client_connection_t * connection,const char * ip,apr_port_t port)325 static apt_bool_t rtsp_client_connect(rtsp_client_t *client, rtsp_client_connection_t *connection, const char *ip, apr_port_t port)
326 {
327 char *local_ip = NULL;
328 char *remote_ip = NULL;
329 apr_sockaddr_t *l_sockaddr = NULL;
330 apr_sockaddr_t *r_sockaddr = NULL;
331
332 if(apr_sockaddr_info_get(&r_sockaddr,ip,APR_INET,port,0,connection->pool) != APR_SUCCESS) {
333 return FALSE;
334 }
335
336 if(apr_socket_create(&connection->sock,r_sockaddr->family,SOCK_STREAM,APR_PROTO_TCP,connection->pool) != APR_SUCCESS) {
337 return FALSE;
338 }
339
340 apr_socket_opt_set(connection->sock, APR_SO_NONBLOCK, 0);
341 apr_socket_timeout_set(connection->sock, -1);
342 apr_socket_opt_set(connection->sock, APR_SO_REUSEADDR, 1);
343
344 if(apr_socket_connect(connection->sock,r_sockaddr) != APR_SUCCESS) {
345 apr_socket_close(connection->sock);
346 connection->sock = NULL;
347 return FALSE;
348 }
349
350 if(apr_socket_addr_get(&l_sockaddr,APR_LOCAL,connection->sock) != APR_SUCCESS) {
351 apr_socket_close(connection->sock);
352 connection->sock = NULL;
353 return FALSE;
354 }
355
356 apr_sockaddr_ip_get(&local_ip,l_sockaddr);
357 apr_sockaddr_ip_get(&remote_ip,r_sockaddr);
358 connection->id = apr_psprintf(connection->pool,"%s:%hu <-> %s:%hu",
359 local_ip,l_sockaddr->port,
360 remote_ip,r_sockaddr->port);
361
362 memset(&connection->sock_pfd,0,sizeof(apr_pollfd_t));
363 connection->sock_pfd.desc_type = APR_POLL_SOCKET;
364 connection->sock_pfd.reqevents = APR_POLLIN;
365 connection->sock_pfd.desc.s = connection->sock;
366 connection->sock_pfd.client_data = connection;
367 if(apt_poller_task_descriptor_add(client->task,&connection->sock_pfd) != TRUE) {
368 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add to Pollset %s",connection->id);
369 apr_socket_close(connection->sock);
370 connection->sock = NULL;
371 return FALSE;
372 }
373
374 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Established RTSP Connection %s",connection->id);
375 return TRUE;
376 }
377
378 /** Close connection */
rtsp_client_connection_close(rtsp_client_t * client,rtsp_client_connection_t * connection)379 static apt_bool_t rtsp_client_connection_close(rtsp_client_t *client, rtsp_client_connection_t *connection)
380 {
381 if(connection->sock) {
382 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Close RTSP Connection %s",connection->id);
383 apt_poller_task_descriptor_remove(client->task,&connection->sock_pfd);
384 apr_socket_close(connection->sock);
385 connection->sock = NULL;
386 }
387 return TRUE;
388 }
389
390
391 /* Create RTSP connection */
rtsp_client_connection_create(rtsp_client_t * client,rtsp_client_session_t * session)392 static apt_bool_t rtsp_client_connection_create(rtsp_client_t *client, rtsp_client_session_t *session)
393 {
394 rtsp_client_connection_t *rtsp_connection;
395 apr_pool_t *pool = apt_pool_create();
396 if(!pool) {
397 return FALSE;
398 }
399
400 rtsp_connection = apr_palloc(pool,sizeof(rtsp_client_connection_t));
401 rtsp_connection->pool = pool;
402 rtsp_connection->sock = NULL;
403 APR_RING_ELEM_INIT(rtsp_connection,link);
404
405 if(rtsp_client_connect(client,rtsp_connection,session->server_ip.buf,session->server_port) == FALSE) {
406 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Connect to RTSP Server %s:%hu",
407 session->server_ip.buf,session->server_port);
408 apr_pool_destroy(pool);
409 return FALSE;
410 }
411 rtsp_connection->handle_table = apr_hash_make(pool);
412 rtsp_connection->session_table = apr_hash_make(pool);
413 rtsp_connection->inprogress_request_queue = apt_list_create(pool);
414 apt_text_stream_init(&rtsp_connection->rx_stream,rtsp_connection->rx_buffer,sizeof(rtsp_connection->rx_buffer)-1);
415 apt_text_stream_init(&rtsp_connection->tx_stream,rtsp_connection->tx_buffer,sizeof(rtsp_connection->tx_buffer)-1);
416 rtsp_connection->parser = rtsp_parser_create(pool);
417 rtsp_connection->generator = rtsp_generator_create(pool);
418 rtsp_connection->last_cseq = 0;
419
420 rtsp_connection->client = client;
421 APR_RING_INSERT_TAIL(&client->connection_list,rtsp_connection,rtsp_client_connection_t,link);
422 session->connection = rtsp_connection;
423 return TRUE;
424 }
425
426 /* Destroy RTSP connection */
rtsp_client_connection_destroy(rtsp_client_connection_t * rtsp_connection)427 static apt_bool_t rtsp_client_connection_destroy(rtsp_client_connection_t *rtsp_connection)
428 {
429 rtsp_client_t *client = rtsp_connection->client;
430 APR_RING_REMOVE(rtsp_connection,link);
431 rtsp_client_connection_close(client,rtsp_connection);
432 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy RTSP Connection %s",rtsp_connection->id);
433 apr_pool_destroy(rtsp_connection->pool);
434
435 return TRUE;
436 }
437
438 /* Respond to session termination request */
rtsp_client_session_terminate_respond(rtsp_client_t * client,rtsp_client_session_t * session)439 static apt_bool_t rtsp_client_session_terminate_respond(rtsp_client_t *client, rtsp_client_session_t *session)
440 {
441 rtsp_client_connection_t *rtsp_connection = session->connection;
442 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove RTSP Handle "APT_PTR_FMT,session);
443 apr_hash_set(rtsp_connection->handle_table,session,sizeof(void*),NULL);
444
445 session->term_state = TERMINATION_STATE_NONE;
446 client->vtable->on_session_terminate_response(client,session);
447 return TRUE;
448 }
449
450 /* Teardown session resources */
rtsp_client_session_resources_teardown(rtsp_client_t * client,rtsp_client_session_t * session)451 static apt_bool_t rtsp_client_session_resources_teardown(rtsp_client_t *client, rtsp_client_session_t *session)
452 {
453 void *val;
454 rtsp_message_t *setup_request;
455 rtsp_message_t *teardown_request;
456 apr_hash_index_t *it;
457
458 /* set termination state to in-progress and teardown remaining resources */
459 session->term_state = TERMINATION_STATE_INPROGRESS;
460 it = apr_hash_first(session->pool,session->resource_table);
461 for(; it; it = apr_hash_next(it)) {
462 apr_hash_this(it,NULL,NULL,&val);
463 setup_request = val;
464 if(!setup_request) continue;
465
466 teardown_request = rtsp_request_create(session->pool);
467 teardown_request->start_line.common.request_line.resource_name = setup_request->start_line.common.request_line.resource_name;
468 teardown_request->start_line.common.request_line.method_id = RTSP_METHOD_TEARDOWN;
469 rtsp_client_session_message_process(client,session,teardown_request);
470 }
471 return TRUE;
472 }
473
474 /* Process session termination request */
rtsp_client_session_terminate_process(rtsp_client_t * client,rtsp_client_session_t * session)475 static apt_bool_t rtsp_client_session_terminate_process(rtsp_client_t *client, rtsp_client_session_t *session)
476 {
477 rtsp_client_connection_t *rtsp_connection = session->connection;
478 if(!rtsp_connection) {
479 client->vtable->on_session_terminate_response(client,session);
480 return FALSE;
481 }
482
483 if(session->active_request) {
484 /* set termination state to requested */
485 session->term_state = TERMINATION_STATE_REQUESTED;
486 }
487 else {
488 rtsp_client_session_resources_teardown(client,session);
489
490 /* respond immediately if no resources left */
491 if(apr_hash_count(session->resource_table) == 0) {
492 rtsp_client_session_terminate_respond(client,session);
493
494 if(apr_hash_count(rtsp_connection->handle_table) == 0) {
495 rtsp_client_connection_destroy(rtsp_connection);
496 }
497 }
498 }
499
500 return TRUE;
501 }
502
rtsp_client_session_url_generate(rtsp_client_session_t * session,rtsp_message_t * message)503 static apt_bool_t rtsp_client_session_url_generate(rtsp_client_session_t *session, rtsp_message_t *message)
504 {
505 apt_str_t *url = &message->start_line.common.request_line.url;
506 if(session->resource_location.length) {
507 url->buf = apr_psprintf(message->pool,"rtsp://%s:%hu/%s/%s",
508 session->server_ip.buf,
509 session->server_port,
510 session->resource_location.buf,
511 message->start_line.common.request_line.resource_name);
512 }
513 else {
514 url->buf = apr_psprintf(message->pool,"rtsp://%s:%hu/%s",
515 session->server_ip.buf,
516 session->server_port,
517 message->start_line.common.request_line.resource_name);
518 }
519 url->length = strlen(url->buf);
520 return TRUE;
521 }
522
rtsp_client_request_push(rtsp_client_connection_t * rtsp_connection,rtsp_client_session_t * session,rtsp_message_t * message)523 static apt_bool_t rtsp_client_request_push(rtsp_client_connection_t *rtsp_connection, rtsp_client_session_t *session, rtsp_message_t *message)
524 {
525 /* add request to inprogress request queue */
526 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Push RTSP Request to In-Progress Queue "APT_PTRSID_FMT" CSeq:%"APR_SIZE_T_FMT,
527 session,
528 message->header.session_id.buf ? message->header.session_id.buf : "new",
529 message->header.cseq);
530 apt_list_push_back(rtsp_connection->inprogress_request_queue,session,session->pool);
531 session->active_request = message;
532 if(rtsp_connection->client->request_timeout) {
533 apt_timer_set(session->request_timer,rtsp_connection->client->request_timeout);
534 }
535 return TRUE;
536 }
537
rtsp_client_request_pop(rtsp_client_connection_t * rtsp_connection,rtsp_message_t * response,rtsp_message_t ** ret_request,rtsp_client_session_t ** ret_session)538 static apt_bool_t rtsp_client_request_pop(rtsp_client_connection_t *rtsp_connection, rtsp_message_t *response, rtsp_message_t **ret_request, rtsp_client_session_t **ret_session)
539 {
540 rtsp_client_session_t *session;
541 apt_list_elem_t *elem = apt_list_first_elem_get(rtsp_connection->inprogress_request_queue);
542 while(elem) {
543 session = apt_list_elem_object_get(elem);
544 if(session->active_request && session->active_request->header.cseq == response->header.cseq) {
545 if(ret_session) {
546 *ret_session = session;
547 }
548 if(ret_request) {
549 *ret_request = session->active_request;
550 }
551 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Pop In-Progress RTSP Request "APT_PTR_FMT" CSeq:%"APR_SIZE_T_FMT,
552 session,
553 response->header.cseq);
554 apt_list_elem_remove(rtsp_connection->inprogress_request_queue,elem);
555 session->active_request = NULL;
556 apt_timer_kill(session->request_timer);
557 return TRUE;
558 }
559 elem = apt_list_next_elem_get(rtsp_connection->inprogress_request_queue,elem);
560 }
561 return FALSE;
562 }
563
564 /* Process outgoing RTSP request */
rtsp_client_session_request_process(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)565 static apt_bool_t rtsp_client_session_request_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message)
566 {
567 if(!session->connection) {
568 /* create RTSP connection */
569 if(rtsp_client_connection_create(client,session) == FALSE) {
570 /* respond with error */
571 return FALSE;
572 }
573 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Add RTSP Handle "APT_PTR_FMT,session);
574 apr_hash_set(session->connection->handle_table,session,sizeof(void*),session);
575 }
576
577 rtsp_client_session_url_generate(session,message);
578
579 if(session->id.length) {
580 message->header.session_id = session->id;
581 rtsp_header_property_add(&message->header,RTSP_HEADER_FIELD_SESSION_ID,message->pool);
582 }
583
584 message->header.cseq = ++session->connection->last_cseq;
585 rtsp_header_property_add(&message->header,RTSP_HEADER_FIELD_CSEQ,message->pool);
586
587 if(rtsp_client_message_send(client,session->connection,message) == FALSE) {
588 /* respond with error */
589 return FALSE;
590 }
591
592 return rtsp_client_request_push(session->connection,session,message);
593 }
594
595 /* Process pending RTSP requests */
rtsp_client_session_pending_requests_process(rtsp_client_t * client,rtsp_client_session_t * session)596 static apt_bool_t rtsp_client_session_pending_requests_process(rtsp_client_t *client, rtsp_client_session_t *session)
597 {
598 rtsp_message_t *request = apt_list_pop_front(session->pending_request_queue);
599 if(!request) {
600 /* pending queue is empty, no in-progress request */
601 return FALSE;
602 }
603
604 /* process pending request; get the next one, if current is failed */
605 do {
606 rtsp_message_t *response;
607 if(rtsp_client_session_request_process(client,session,request) == TRUE) {
608 return TRUE;
609 }
610
611 /* respond with error */
612 response = rtsp_response_create(
613 request,
614 RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
615 RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR,
616 session->pool);
617 rtsp_client_session_response_process(client,session,request,response);
618
619 /* process the next pending request / if any */
620 request = apt_list_pop_front(session->pending_request_queue);
621 }
622 while(request);
623
624 /* no in-progress request */
625 return FALSE;
626 }
627
628
629 /* Process outgoing RTSP message */
rtsp_client_session_message_process(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)630 static apt_bool_t rtsp_client_session_message_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message)
631 {
632 if(session->active_request) {
633 apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Push RTSP Request to Pending Queue "APT_PTR_FMT,session);
634 apt_list_push_back(session->pending_request_queue,message,message->pool);
635 return TRUE;
636 }
637
638 if(rtsp_client_session_request_process(client,session,message) == FALSE) {
639 /* respond with error in case request cannot be processed */
640 rtsp_message_t *response = rtsp_response_create(
641 message,
642 RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
643 RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR,
644 session->pool);
645 rtsp_client_session_response_process(client,session,message,response);
646 }
647 return TRUE;
648 }
649
650 /* Process incoming RTSP event (request) */
rtsp_client_session_event_process(rtsp_client_t * client,rtsp_client_connection_t * rtsp_connection,rtsp_message_t * message)651 static apt_bool_t rtsp_client_session_event_process(rtsp_client_t *client, rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message)
652 {
653 rtsp_message_t *response = NULL;
654 rtsp_client_session_t *session = NULL;
655 if(rtsp_header_property_check(&message->header,RTSP_HEADER_FIELD_SESSION_ID) == TRUE) {
656 /* find existing session */
657 session = apr_hash_get(
658 rtsp_connection->session_table,
659 message->header.session_id.buf,
660 message->header.session_id.length);
661 }
662
663 if(session) {
664 response = rtsp_response_create(message,RTSP_STATUS_CODE_OK,RTSP_REASON_PHRASE_OK,message->pool);
665 if(rtsp_header_property_check(&message->header,RTSP_HEADER_FIELD_SESSION_ID) == TRUE) {
666 response->header.session_id = message->header.session_id;
667 rtsp_header_property_add(&response->header,RTSP_HEADER_FIELD_SESSION_ID,message->pool);
668 }
669 client->vtable->on_session_event(client,session,message);
670 }
671 else {
672 response = rtsp_response_create(message,RTSP_STATUS_CODE_NOT_FOUND,RTSP_REASON_PHRASE_NOT_FOUND,message->pool);
673 }
674
675 return rtsp_client_message_send(client,rtsp_connection,response);
676 }
677
678 /* Process incoming RTSP response */
rtsp_client_session_response_process(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * request,rtsp_message_t * response)679 static apt_bool_t rtsp_client_session_response_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *request, rtsp_message_t *response)
680 {
681 const char *resource_name;
682 if(request->start_line.common.request_line.method_id == RTSP_METHOD_SETUP &&
683 response->start_line.common.status_line.status_code == RTSP_STATUS_CODE_OK) {
684
685 if(apr_hash_count(session->resource_table) == 0) {
686 if(rtsp_header_property_check(&response->header,RTSP_HEADER_FIELD_SESSION_ID) == TRUE) {
687 session->id = response->header.session_id;
688 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Add RTSP Session "APT_PTRSID_FMT,
689 session,
690 session->id.buf);
691 apr_hash_set(session->connection->session_table,session->id.buf,session->id.length,session);
692 }
693 }
694
695 /* add resource */
696 resource_name = request->start_line.common.request_line.resource_name;
697 apr_hash_set(session->resource_table,resource_name,APR_HASH_KEY_STRING,request);
698 }
699 else if(request->start_line.common.request_line.method_id == RTSP_METHOD_TEARDOWN) {
700 /* remove resource */
701 resource_name = request->start_line.common.request_line.resource_name;
702 apr_hash_set(session->resource_table,resource_name,APR_HASH_KEY_STRING,NULL);
703
704 if(apr_hash_count(session->resource_table) == 0) {
705 if(session->connection) {
706 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove RTSP Session "APT_PTRSID_FMT,
707 session,
708 session->id.buf);
709 apr_hash_set(session->connection->session_table,session->id.buf,session->id.length,NULL);
710 }
711 }
712 }
713
714 if(session->term_state != TERMINATION_STATE_INPROGRESS) {
715 client->vtable->on_session_response(client,session,request,response);
716 }
717
718 return TRUE;
719 }
720
721 /* Raise RTSP session terminate event */
rtsp_client_session_terminate_raise(rtsp_client_t * client,rtsp_client_session_t * session)722 static apt_bool_t rtsp_client_session_terminate_raise(rtsp_client_t *client, rtsp_client_session_t *session)
723 {
724 rtsp_message_t *request;
725 rtsp_message_t *response;
726
727 /* cancel pending requests */
728 do {
729 request = apt_list_pop_front(session->pending_request_queue);
730 if(request) {
731 response = rtsp_response_create(
732 session->active_request,
733 RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
734 RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR,
735 session->pool);
736 rtsp_client_session_response_process(client,session,request,response);
737 }
738 }
739 while(request);
740
741 if(session->term_state == TERMINATION_STATE_NONE) {
742 client->vtable->on_session_terminate_event(client,session);
743 }
744 else {
745 rtsp_client_session_terminate_respond(client,session);
746 }
747 return TRUE;
748 }
749
750 /* Cancel RTSP request */
rtsp_client_request_cancel(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_status_code_e status_code,rtsp_reason_phrase_e reason)751 static apt_bool_t rtsp_client_request_cancel(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_status_code_e status_code, rtsp_reason_phrase_e reason)
752 {
753 rtsp_message_t *request;
754 rtsp_message_t *response;
755 if(!session->active_request) {
756 return FALSE;
757 }
758
759 request = session->active_request;
760 response = rtsp_response_create(
761 request,
762 status_code,
763 reason,
764 session->pool);
765
766 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Cancel RTSP Request "APT_PTRSID_FMT" CSeq:%"APR_SIZE_T_FMT" [%d]",
767 session,
768 request->header.session_id.buf ? request->header.session_id.buf : "new",
769 request->header.cseq,
770 status_code);
771
772 return rtsp_client_message_handler(session->connection, response, APT_MESSAGE_STATUS_COMPLETE);
773 }
774
775 /* RTSP connection disconnected */
rtsp_client_on_disconnect(rtsp_client_t * client,rtsp_client_connection_t * rtsp_connection)776 static apt_bool_t rtsp_client_on_disconnect(rtsp_client_t *client, rtsp_client_connection_t *rtsp_connection)
777 {
778 rtsp_client_session_t *session;
779 apr_size_t remaining_handles;
780
781 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"RTSP Peer Disconnected %s", rtsp_connection->id);
782 rtsp_client_connection_close(client,rtsp_connection);
783
784 /* Cancel in-progreess requests */
785 do {
786 session = apt_list_pop_front(rtsp_connection->inprogress_request_queue);
787 if(session) {
788 if(rtsp_client_request_cancel(
789 client,
790 session,
791 RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
792 RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR) == TRUE) {
793 apt_timer_kill(session->request_timer);
794 }
795 }
796 }
797 while(session);
798
799 /* Walk through RTSP handles and raise termination event for them */
800 remaining_handles = apr_hash_count(rtsp_connection->handle_table);
801 if(remaining_handles) {
802 void *val;
803 apr_hash_index_t *it;
804 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Terminate Remaining RTSP Handles [%"APR_SIZE_T_FMT"]",remaining_handles);
805 it = apr_hash_first(rtsp_connection->pool,rtsp_connection->session_table);
806 for(; it; it = apr_hash_next(it)) {
807 apr_hash_this(it,NULL,NULL,&val);
808 session = val;
809 if(session) {
810 rtsp_client_session_terminate_raise(client,session);
811 }
812 }
813 }
814
815 return TRUE;
816 }
817
818 /* Send RTSP message through RTSP connection */
rtsp_client_message_send(rtsp_client_t * client,rtsp_client_connection_t * rtsp_connection,rtsp_message_t * message)819 static apt_bool_t rtsp_client_message_send(rtsp_client_t *client, rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message)
820 {
821 apt_bool_t status = FALSE;
822 apt_text_stream_t *stream;
823 apt_message_status_e result;
824
825 if(!rtsp_connection || !rtsp_connection->sock) {
826 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"No RTSP Connection");
827 return FALSE;
828 }
829 stream = &rtsp_connection->tx_stream;
830
831 do {
832 stream->text.length = sizeof(rtsp_connection->tx_buffer)-1;
833 apt_text_stream_reset(stream);
834 result = rtsp_generator_run(rtsp_connection->generator,message,stream);
835 if(result != APT_MESSAGE_STATUS_INVALID) {
836 stream->text.length = stream->pos - stream->text.buf;
837 *stream->pos = '\0';
838
839 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Send RTSP Data %s [%"APR_SIZE_T_FMT" bytes]\n%s",
840 rtsp_connection->id,
841 stream->text.length,
842 stream->text.buf);
843 if(apr_socket_send(rtsp_connection->sock,stream->text.buf,&stream->text.length) == APR_SUCCESS) {
844 status = TRUE;
845 }
846 else {
847 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send RTSP Data");
848 }
849 }
850 else {
851 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Generate RTSP Data");
852 }
853 }
854 while(result == APT_MESSAGE_STATUS_INCOMPLETE);
855
856 return status;
857 }
858
859 /** Return TRUE to proceed with the next message in the stream (if any) */
rtsp_client_message_handler(rtsp_client_connection_t * rtsp_connection,rtsp_message_t * message,apt_message_status_e status)860 static apt_bool_t rtsp_client_message_handler(rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message, apt_message_status_e status)
861 {
862 if(status != APT_MESSAGE_STATUS_COMPLETE) {
863 /* message is not completely parsed, nothing to do */
864 return TRUE;
865 }
866 /* process parsed message */
867 if(message->start_line.message_type == RTSP_MESSAGE_TYPE_RESPONSE) {
868 rtsp_message_t *request;
869 rtsp_client_session_t *session;
870 /* at first, pop in-progress request/session */
871 if(rtsp_client_request_pop(rtsp_connection,message,&request,&session) == FALSE) {
872 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Unexpected RTSP Response Received CSeq:%"APR_SIZE_T_FMT,
873 message->header.cseq);
874 return TRUE;
875 }
876
877 /* next, process session response */
878 rtsp_client_session_response_process(rtsp_connection->client,session,request,message);
879
880 /* process session pending requests */
881 if(rtsp_client_session_pending_requests_process(rtsp_connection->client,session) == FALSE) {
882 /* no in-progress request, check the termination state now */
883 if(session->term_state != TERMINATION_STATE_NONE) {
884 if(session->term_state == TERMINATION_STATE_REQUESTED) {
885 rtsp_client_session_resources_teardown(rtsp_connection->client,session);
886 }
887
888 /* respond if no resources left */
889 if(apr_hash_count(session->resource_table) == 0) {
890 rtsp_client_session_terminate_respond(rtsp_connection->client,session);
891
892 if(apr_hash_count(rtsp_connection->handle_table) == 0) {
893 rtsp_client_connection_destroy(rtsp_connection);
894 /* return FALSE to indicate connection has been destroyed */
895 return FALSE;
896 }
897 }
898 }
899 }
900 }
901 else if(message->start_line.message_type == RTSP_MESSAGE_TYPE_REQUEST) {
902 rtsp_client_session_event_process(rtsp_connection->client,rtsp_connection,message);
903 }
904 return TRUE;
905 }
906
907 /* Receive RTSP message through RTSP connection */
rtsp_client_poller_signal_process(void * obj,const apr_pollfd_t * descriptor)908 static apt_bool_t rtsp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
909 {
910 rtsp_client_t *client = obj;
911 rtsp_client_connection_t *rtsp_connection = descriptor->client_data;
912 apr_status_t status;
913 apr_size_t offset;
914 apr_size_t length;
915 apt_text_stream_t *stream;
916 rtsp_message_t *message;
917 apt_message_status_e msg_status;
918
919 if(!rtsp_connection || !rtsp_connection->sock) {
920 return FALSE;
921 }
922 stream = &rtsp_connection->rx_stream;
923
924 /* calculate offset remaining from the previous receive / if any */
925 offset = stream->pos - stream->text.buf;
926 /* calculate available length */
927 length = sizeof(rtsp_connection->rx_buffer) - 1 - offset;
928
929 status = apr_socket_recv(rtsp_connection->sock,stream->pos,&length);
930 if(status == APR_EOF || length == 0) {
931 return rtsp_client_on_disconnect(client,rtsp_connection);
932 }
933
934 /* calculate actual length of the stream */
935 stream->text.length = offset + length;
936 stream->pos[length] = '\0';
937 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Receive RTSP Data %s [%"APR_SIZE_T_FMT" bytes]\n%s",
938 rtsp_connection->id,
939 length,
940 stream->pos);
941
942 /* reset pos */
943 apt_text_stream_reset(stream);
944
945 do {
946 msg_status = rtsp_parser_run(rtsp_connection->parser,stream,&message);
947 if(rtsp_client_message_handler(rtsp_connection,message,msg_status) == FALSE) {
948 return FALSE;
949 }
950 }
951 while(apt_text_is_eos(stream) == FALSE);
952
953 /* scroll remaining stream */
954 apt_text_stream_scroll(stream);
955 return TRUE;
956 }
957
958 /* Process task message */
rtsp_client_task_msg_process(apt_task_t * task,apt_task_msg_t * task_msg)959 static apt_bool_t rtsp_client_task_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
960 {
961 apt_poller_task_t *poller_task = apt_task_object_get(task);
962 rtsp_client_t *client = apt_poller_task_object_get(poller_task);
963
964 task_msg_data_t *data = (task_msg_data_t*) task_msg->data;
965 switch(data->type) {
966 case TASK_MSG_SEND_MESSAGE:
967 rtsp_client_session_message_process(client,data->session,data->message);
968 break;
969 case TASK_MSG_TERMINATE_SESSION:
970 rtsp_client_session_terminate_process(client,data->session);
971 break;
972 }
973
974 return TRUE;
975 }
976
977 /* Timer callback */
rtsp_client_timer_proc(apt_timer_t * timer,void * obj)978 static void rtsp_client_timer_proc(apt_timer_t *timer, void *obj)
979 {
980 rtsp_client_session_t *session = obj;
981 if(!session || !session->connection || !session->connection->client) {
982 return;
983 }
984
985 if(session->request_timer == timer) {
986 rtsp_client_request_cancel(
987 session->connection->client,
988 session,
989 RTSP_STATUS_CODE_REQUEST_TIMEOUT,
990 RTSP_REASON_PHRASE_REQUEST_TIMEOUT);
991 }
992 }
993