1 /*
2  * Copyright 2008-2014 Arsen Chaloyan
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * $Id: rtsp_client.c 2249 2014-11-19 05:26:24Z achaloyan@gmail.com $
17  */
18 
19 #ifdef WIN32
20 #pragma warning(disable: 4127)
21 #endif
22 #include <apr_ring.h>
23 #include <apr_hash.h>
24 #include "rtsp_client.h"
25 #include "rtsp_stream.h"
26 #include "apt_poller_task.h"
27 #include "apt_text_stream.h"
28 #include "apt_pool.h"
29 #include "apt_obj_list.h"
30 #include "apt_log.h"
31 
32 #define RTSP_STREAM_BUFFER_SIZE 1024
33 
34 typedef struct rtsp_client_connection_t rtsp_client_connection_t;
35 
36 typedef enum {
37 	TERMINATION_STATE_NONE,
38 	TERMINATION_STATE_REQUESTED,
39 	TERMINATION_STATE_INPROGRESS
40 } termination_state_e;
41 
42 /** RTSP client */
43 struct rtsp_client_t {
44 	apr_pool_t                 *pool;
45 	apt_poller_task_t          *task;
46 
47 	/** List (ring) of RTSP connections */
48 	APR_RING_HEAD(rtsp_client_connection_head_t, rtsp_client_connection_t) connection_list;
49 
50 	apr_uint32_t                request_timeout;
51 
52 	void                       *obj;
53 	const rtsp_client_vtable_t *vtable;
54 };
55 
56 /** RTSP connection */
57 struct rtsp_client_connection_t {
58 	/** Ring entry */
59 	APR_RING_ENTRY(rtsp_client_connection_t) link;
60 
61 	/** Memory pool */
62 	apr_pool_t       *pool;
63 	/** Connected socket */
64 	apr_socket_t     *sock;
65 	/** Socket poll descriptor */
66 	apr_pollfd_t      sock_pfd;
67 	/** String identifier used for traces */
68 	const char       *id;
69 	/** RTSP client, connection belongs to */
70 	rtsp_client_t    *client;
71 
72 	/** Handle table (rtsp_client_session_t*) */
73 	apr_hash_t       *handle_table;
74 	/** Session table (rtsp_client_session_t*) */
75 	apr_hash_t       *session_table;
76 
77 	/** Inprogress request/session queue (rtsp_client_session_t*) */
78 	apt_obj_list_t   *inprogress_request_queue;
79 
80 	/** Last CSeq sent */
81 	apr_size_t        last_cseq;
82 
83 	char              rx_buffer[RTSP_STREAM_BUFFER_SIZE];
84 	apt_text_stream_t rx_stream;
85 	rtsp_parser_t    *parser;
86 
87 	char              tx_buffer[RTSP_STREAM_BUFFER_SIZE];
88 	apt_text_stream_t tx_stream;
89 	rtsp_generator_t *generator;
90 };
91 
92 /** RTSP session */
93 struct rtsp_client_session_t {
94 	apr_pool_t               *pool;
95 	void                     *obj;
96 
97 	/** Connection */
98 	rtsp_client_connection_t *connection;
99 	/** Session identifier */
100 	apt_str_t                 id;
101 
102 	apt_str_t                 server_ip;
103 	apr_port_t                server_port;
104 	apt_str_t                 resource_location;
105 
106 	/** In-progress request */
107 	rtsp_message_t           *active_request;
108 	/** Pending request queue (rtsp_message_t*) */
109 	apt_obj_list_t           *pending_request_queue;
110 
111 	/** Timer used for request timeouts */
112 	apt_timer_t              *request_timer;
113 
114 	/** Resource table */
115 	apr_hash_t               *resource_table;
116 
117 	/** termination state (none -> requested -> terminating) */
118 	termination_state_e       term_state;
119 };
120 
121 typedef enum {
122 	TASK_MSG_SEND_MESSAGE,
123 	TASK_MSG_TERMINATE_SESSION
124 } task_msg_data_type_e;
125 
126 typedef struct task_msg_data_t task_msg_data_t;
127 
128 struct task_msg_data_t {
129 	task_msg_data_type_e   type;
130 	rtsp_client_t         *client;
131 	rtsp_client_session_t *session;
132 	rtsp_message_t        *message;
133 };
134 
135 static apt_bool_t rtsp_client_task_msg_process(apt_task_t *task, apt_task_msg_t *msg);
136 
137 static apt_bool_t rtsp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor);
138 
139 static apt_bool_t rtsp_client_message_handler(rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message, apt_message_status_e status);
140 static apt_bool_t rtsp_client_message_send(rtsp_client_t *client, rtsp_client_connection_t *connection, rtsp_message_t *message);
141 static apt_bool_t rtsp_client_session_message_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message);
142 static apt_bool_t rtsp_client_session_response_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *request, rtsp_message_t *response);
143 
144 static void rtsp_client_timer_proc(apt_timer_t *timer, void *obj);
145 
146 /** Get string identifier */
rtsp_client_id_get(const rtsp_client_t * client)147 static const char* rtsp_client_id_get(const rtsp_client_t *client)
148 {
149 	apt_task_t *task = apt_poller_task_base_get(client->task);
150 	return apt_task_name_get(task);
151 }
152 
153 /** Create RTSP client */
rtsp_client_create(const char * id,apr_size_t max_connection_count,apr_size_t request_timeout,void * obj,const rtsp_client_vtable_t * handler,apr_pool_t * pool)154 RTSP_DECLARE(rtsp_client_t*) rtsp_client_create(
155 									const char *id,
156 									apr_size_t max_connection_count,
157 									apr_size_t request_timeout,
158 									void *obj,
159 									const rtsp_client_vtable_t *handler,
160 									apr_pool_t *pool)
161 {
162 	apt_task_t *task;
163 	apt_task_vtable_t *vtable;
164 	apt_task_msg_pool_t *msg_pool;
165 	rtsp_client_t *client;
166 
167 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create RTSP Client [%s] [%"APR_SIZE_T_FMT"]",
168 			id, max_connection_count);
169 	client = apr_palloc(pool,sizeof(rtsp_client_t));
170 	client->pool = pool;
171 	client->obj = obj;
172 	client->vtable = handler;
173 
174 	msg_pool = apt_task_msg_pool_create_dynamic(sizeof(task_msg_data_t),pool);
175 
176 	client->task = apt_poller_task_create(
177 						max_connection_count,
178 						rtsp_client_poller_signal_process,
179 						client,
180 						msg_pool,
181 						pool);
182 	if(!client->task) {
183 		return NULL;
184 	}
185 
186 	task = apt_poller_task_base_get(client->task);
187 	if(task) {
188 		apt_task_name_set(task,id);
189 	}
190 
191 	vtable = apt_poller_task_vtable_get(client->task);
192 	if(vtable) {
193 		vtable->process_msg = rtsp_client_task_msg_process;
194 	}
195 
196 	APR_RING_INIT(&client->connection_list, rtsp_client_connection_t, link);
197 	client->request_timeout = (apr_uint32_t)request_timeout;
198 	return client;
199 }
200 
201 /** Destroy RTSP client */
rtsp_client_destroy(rtsp_client_t * client)202 RTSP_DECLARE(apt_bool_t) rtsp_client_destroy(rtsp_client_t *client)
203 {
204 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy RTSP Client [%s]",
205 			rtsp_client_id_get(client));
206 	return apt_poller_task_destroy(client->task);
207 }
208 
209 /** Start connection agent */
rtsp_client_start(rtsp_client_t * client)210 RTSP_DECLARE(apt_bool_t) rtsp_client_start(rtsp_client_t *client)
211 {
212 	return apt_poller_task_start(client->task);
213 }
214 
215 /** Terminate connection agent */
rtsp_client_terminate(rtsp_client_t * client)216 RTSP_DECLARE(apt_bool_t) rtsp_client_terminate(rtsp_client_t *client)
217 {
218 	return apt_poller_task_terminate(client->task);
219 }
220 
221 /** Get task */
rtsp_client_task_get(const rtsp_client_t * client)222 RTSP_DECLARE(apt_task_t*) rtsp_client_task_get(const rtsp_client_t *client)
223 {
224 	return apt_poller_task_base_get(client->task);
225 }
226 
227 /** Get external object */
rtsp_client_object_get(const rtsp_client_t * client)228 RTSP_DECLARE(void*) rtsp_client_object_get(const rtsp_client_t *client)
229 {
230 	return client->obj;
231 }
232 
233 /** Get object associated with the session */
rtsp_client_session_object_get(const rtsp_client_session_t * session)234 RTSP_DECLARE(void*) rtsp_client_session_object_get(const rtsp_client_session_t *session)
235 {
236 	return session->obj;
237 }
238 
239 /** Set object associated with the session */
rtsp_client_session_object_set(rtsp_client_session_t * session,void * obj)240 RTSP_DECLARE(void) rtsp_client_session_object_set(rtsp_client_session_t *session, void *obj)
241 {
242 	session->obj = obj;
243 }
244 
245 /** Get the session identifier */
rtsp_client_session_id_get(const rtsp_client_session_t * session)246 RTSP_DECLARE(const apt_str_t*) rtsp_client_session_id_get(const rtsp_client_session_t *session)
247 {
248 	return &session->id;
249 }
250 
251 /** Signal task message */
rtsp_client_control_message_signal(task_msg_data_type_e type,rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)252 static apt_bool_t rtsp_client_control_message_signal(
253 								task_msg_data_type_e type,
254 								rtsp_client_t *client,
255 								rtsp_client_session_t *session,
256 								rtsp_message_t *message)
257 {
258 	apt_task_t *task = apt_poller_task_base_get(client->task);
259 	apt_task_msg_t *task_msg = apt_task_msg_get(task);
260 	if(task_msg) {
261 		task_msg_data_t *data = (task_msg_data_t*)task_msg->data;
262 		data->type = type;
263 		data->client = client;
264 		data->session = session;
265 		data->message = message;
266 		apt_task_msg_signal(task,task_msg);
267 	}
268 	return TRUE;
269 }
270 
271 /** Create RTSP session handle */
rtsp_client_session_create(rtsp_client_t * client,const char * server_ip,apr_port_t server_port,const char * resource_location)272 RTSP_DECLARE(rtsp_client_session_t*) rtsp_client_session_create(
273 											rtsp_client_t *client,
274 											const char *server_ip,
275 											apr_port_t server_port,
276 											const char *resource_location)
277 {
278 	rtsp_client_session_t *session;
279 	apr_pool_t *pool = apt_pool_create();
280 	session = apr_palloc(pool,sizeof(rtsp_client_session_t));
281 	session->pool = pool;
282 	session->obj = NULL;
283 	session->connection = NULL;
284 	session->active_request = NULL;
285 	session->pending_request_queue = apt_list_create(pool);
286 	session->request_timer = apt_poller_task_timer_create(
287 								client->task,
288 								rtsp_client_timer_proc,
289 								session,
290 								pool);
291 	session->resource_table = apr_hash_make(pool);
292 	session->term_state = TERMINATION_STATE_NONE;
293 
294 	apt_string_assign(&session->server_ip,server_ip,pool);
295 	session->server_port = server_port;
296 	apt_string_assign(&session->resource_location,resource_location,pool);
297 	apt_string_reset(&session->id);
298 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create RTSP Handle "APT_PTR_FMT,session);
299 	return session;
300 }
301 
302 /** Destroy RTSP session handle */
rtsp_client_session_destroy(rtsp_client_session_t * session)303 RTSP_DECLARE(void) rtsp_client_session_destroy(rtsp_client_session_t *session)
304 {
305 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy RTSP Handle "APT_PTR_FMT,session);
306 	if(session && session->pool) {
307 		apr_pool_destroy(session->pool);
308 	}
309 }
310 
311 /** Signal terminate request */
rtsp_client_session_terminate(rtsp_client_t * client,rtsp_client_session_t * session)312 RTSP_DECLARE(apt_bool_t) rtsp_client_session_terminate(rtsp_client_t *client, rtsp_client_session_t *session)
313 {
314 	return rtsp_client_control_message_signal(TASK_MSG_TERMINATE_SESSION,client,session,NULL);
315 }
316 
317 /** Signal RTSP message */
rtsp_client_session_request(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)318 RTSP_DECLARE(apt_bool_t) rtsp_client_session_request(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message)
319 {
320 	return rtsp_client_control_message_signal(TASK_MSG_SEND_MESSAGE,client,session,message);
321 }
322 
323 
324 /** Create connection */
rtsp_client_connect(rtsp_client_t * client,rtsp_client_connection_t * connection,const char * ip,apr_port_t port)325 static apt_bool_t rtsp_client_connect(rtsp_client_t *client, rtsp_client_connection_t *connection, const char *ip, apr_port_t port)
326 {
327 	char *local_ip = NULL;
328 	char *remote_ip = NULL;
329 	apr_sockaddr_t *l_sockaddr = NULL;
330 	apr_sockaddr_t *r_sockaddr = NULL;
331 
332 	if(apr_sockaddr_info_get(&r_sockaddr,ip,APR_INET,port,0,connection->pool) != APR_SUCCESS) {
333 		return FALSE;
334 	}
335 
336 	if(apr_socket_create(&connection->sock,r_sockaddr->family,SOCK_STREAM,APR_PROTO_TCP,connection->pool) != APR_SUCCESS) {
337 		return FALSE;
338 	}
339 
340 	apr_socket_opt_set(connection->sock, APR_SO_NONBLOCK, 0);
341 	apr_socket_timeout_set(connection->sock, -1);
342 	apr_socket_opt_set(connection->sock, APR_SO_REUSEADDR, 1);
343 
344 	if(apr_socket_connect(connection->sock,r_sockaddr) != APR_SUCCESS) {
345 		apr_socket_close(connection->sock);
346 		connection->sock = NULL;
347 		return FALSE;
348 	}
349 
350 	if(apr_socket_addr_get(&l_sockaddr,APR_LOCAL,connection->sock) != APR_SUCCESS) {
351 		apr_socket_close(connection->sock);
352 		connection->sock = NULL;
353 		return FALSE;
354 	}
355 
356 	apr_sockaddr_ip_get(&local_ip,l_sockaddr);
357 	apr_sockaddr_ip_get(&remote_ip,r_sockaddr);
358 	connection->id = apr_psprintf(connection->pool,"%s:%hu <-> %s:%hu",
359 		local_ip,l_sockaddr->port,
360 		remote_ip,r_sockaddr->port);
361 
362 	memset(&connection->sock_pfd,0,sizeof(apr_pollfd_t));
363 	connection->sock_pfd.desc_type = APR_POLL_SOCKET;
364 	connection->sock_pfd.reqevents = APR_POLLIN;
365 	connection->sock_pfd.desc.s = connection->sock;
366 	connection->sock_pfd.client_data = connection;
367 	if(apt_poller_task_descriptor_add(client->task,&connection->sock_pfd) != TRUE) {
368 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Add to Pollset %s",connection->id);
369 		apr_socket_close(connection->sock);
370 		connection->sock = NULL;
371 		return FALSE;
372 	}
373 
374 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Established RTSP Connection %s",connection->id);
375 	return TRUE;
376 }
377 
378 /** Close connection */
rtsp_client_connection_close(rtsp_client_t * client,rtsp_client_connection_t * connection)379 static apt_bool_t rtsp_client_connection_close(rtsp_client_t *client, rtsp_client_connection_t *connection)
380 {
381 	if(connection->sock) {
382 		apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Close RTSP Connection %s",connection->id);
383 		apt_poller_task_descriptor_remove(client->task,&connection->sock_pfd);
384 		apr_socket_close(connection->sock);
385 		connection->sock = NULL;
386 	}
387 	return TRUE;
388 }
389 
390 
391 /* Create RTSP connection */
rtsp_client_connection_create(rtsp_client_t * client,rtsp_client_session_t * session)392 static apt_bool_t rtsp_client_connection_create(rtsp_client_t *client, rtsp_client_session_t *session)
393 {
394 	rtsp_client_connection_t *rtsp_connection;
395 	apr_pool_t *pool = apt_pool_create();
396 	if(!pool) {
397 		return FALSE;
398 	}
399 
400 	rtsp_connection = apr_palloc(pool,sizeof(rtsp_client_connection_t));
401 	rtsp_connection->pool = pool;
402 	rtsp_connection->sock = NULL;
403 	APR_RING_ELEM_INIT(rtsp_connection,link);
404 
405 	if(rtsp_client_connect(client,rtsp_connection,session->server_ip.buf,session->server_port) == FALSE) {
406 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Connect to RTSP Server %s:%hu",
407 			session->server_ip.buf,session->server_port);
408 		apr_pool_destroy(pool);
409 		return FALSE;
410 	}
411 	rtsp_connection->handle_table = apr_hash_make(pool);
412 	rtsp_connection->session_table = apr_hash_make(pool);
413 	rtsp_connection->inprogress_request_queue = apt_list_create(pool);
414 	apt_text_stream_init(&rtsp_connection->rx_stream,rtsp_connection->rx_buffer,sizeof(rtsp_connection->rx_buffer)-1);
415 	apt_text_stream_init(&rtsp_connection->tx_stream,rtsp_connection->tx_buffer,sizeof(rtsp_connection->tx_buffer)-1);
416 	rtsp_connection->parser = rtsp_parser_create(pool);
417 	rtsp_connection->generator = rtsp_generator_create(pool);
418 	rtsp_connection->last_cseq = 0;
419 
420 	rtsp_connection->client = client;
421 	APR_RING_INSERT_TAIL(&client->connection_list,rtsp_connection,rtsp_client_connection_t,link);
422 	session->connection = rtsp_connection;
423 	return TRUE;
424 }
425 
426 /* Destroy RTSP connection */
rtsp_client_connection_destroy(rtsp_client_connection_t * rtsp_connection)427 static apt_bool_t rtsp_client_connection_destroy(rtsp_client_connection_t *rtsp_connection)
428 {
429 	rtsp_client_t *client = rtsp_connection->client;
430 	APR_RING_REMOVE(rtsp_connection,link);
431 	rtsp_client_connection_close(client,rtsp_connection);
432 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Destroy RTSP Connection %s",rtsp_connection->id);
433 	apr_pool_destroy(rtsp_connection->pool);
434 
435 	return TRUE;
436 }
437 
438 /* Respond to session termination request */
rtsp_client_session_terminate_respond(rtsp_client_t * client,rtsp_client_session_t * session)439 static apt_bool_t rtsp_client_session_terminate_respond(rtsp_client_t *client, rtsp_client_session_t *session)
440 {
441 	rtsp_client_connection_t *rtsp_connection = session->connection;
442 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove RTSP Handle "APT_PTR_FMT,session);
443 	apr_hash_set(rtsp_connection->handle_table,session,sizeof(void*),NULL);
444 
445 	session->term_state = TERMINATION_STATE_NONE;
446 	client->vtable->on_session_terminate_response(client,session);
447 	return TRUE;
448 }
449 
450 /* Teardown session resources */
rtsp_client_session_resources_teardown(rtsp_client_t * client,rtsp_client_session_t * session)451 static apt_bool_t rtsp_client_session_resources_teardown(rtsp_client_t *client, rtsp_client_session_t *session)
452 {
453 	void *val;
454 	rtsp_message_t *setup_request;
455 	rtsp_message_t *teardown_request;
456 	apr_hash_index_t *it;
457 
458 	/* set termination state to in-progress and teardown remaining resources */
459 	session->term_state = TERMINATION_STATE_INPROGRESS;
460 	it = apr_hash_first(session->pool,session->resource_table);
461 	for(; it; it = apr_hash_next(it)) {
462 		apr_hash_this(it,NULL,NULL,&val);
463 		setup_request = val;
464 		if(!setup_request) continue;
465 
466 		teardown_request = rtsp_request_create(session->pool);
467 		teardown_request->start_line.common.request_line.resource_name = setup_request->start_line.common.request_line.resource_name;
468 		teardown_request->start_line.common.request_line.method_id = RTSP_METHOD_TEARDOWN;
469 		rtsp_client_session_message_process(client,session,teardown_request);
470 	}
471 	return TRUE;
472 }
473 
474 /* Process session termination request */
rtsp_client_session_terminate_process(rtsp_client_t * client,rtsp_client_session_t * session)475 static apt_bool_t rtsp_client_session_terminate_process(rtsp_client_t *client, rtsp_client_session_t *session)
476 {
477 	rtsp_client_connection_t *rtsp_connection = session->connection;
478 	if(!rtsp_connection) {
479 		client->vtable->on_session_terminate_response(client,session);
480 		return FALSE;
481 	}
482 
483 	if(session->active_request) {
484 		/* set termination state to requested */
485 		session->term_state = TERMINATION_STATE_REQUESTED;
486 	}
487 	else {
488 		rtsp_client_session_resources_teardown(client,session);
489 
490 		/* respond immediately if no resources left */
491 		if(apr_hash_count(session->resource_table) == 0) {
492 			rtsp_client_session_terminate_respond(client,session);
493 
494 			if(apr_hash_count(rtsp_connection->handle_table) == 0) {
495 				rtsp_client_connection_destroy(rtsp_connection);
496 			}
497 		}
498 	}
499 
500 	return TRUE;
501 }
502 
rtsp_client_session_url_generate(rtsp_client_session_t * session,rtsp_message_t * message)503 static apt_bool_t rtsp_client_session_url_generate(rtsp_client_session_t *session, rtsp_message_t *message)
504 {
505 	apt_str_t *url = &message->start_line.common.request_line.url;
506 	if(session->resource_location.length) {
507 		url->buf = apr_psprintf(message->pool,"rtsp://%s:%hu/%s/%s",
508 						session->server_ip.buf,
509 						session->server_port,
510 						session->resource_location.buf,
511 						message->start_line.common.request_line.resource_name);
512 	}
513 	else {
514 		url->buf = apr_psprintf(message->pool,"rtsp://%s:%hu/%s",
515 						session->server_ip.buf,
516 						session->server_port,
517 						message->start_line.common.request_line.resource_name);
518 	}
519 	url->length = strlen(url->buf);
520 	return TRUE;
521 }
522 
rtsp_client_request_push(rtsp_client_connection_t * rtsp_connection,rtsp_client_session_t * session,rtsp_message_t * message)523 static apt_bool_t rtsp_client_request_push(rtsp_client_connection_t *rtsp_connection, rtsp_client_session_t *session, rtsp_message_t *message)
524 {
525 	/* add request to inprogress request queue */
526 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Push RTSP Request to In-Progress Queue "APT_PTRSID_FMT" CSeq:%"APR_SIZE_T_FMT,
527 		session,
528 		message->header.session_id.buf ? message->header.session_id.buf : "new",
529 		message->header.cseq);
530 	apt_list_push_back(rtsp_connection->inprogress_request_queue,session,session->pool);
531 	session->active_request = message;
532 	if(rtsp_connection->client->request_timeout) {
533 		apt_timer_set(session->request_timer,rtsp_connection->client->request_timeout);
534 	}
535 	return TRUE;
536 }
537 
rtsp_client_request_pop(rtsp_client_connection_t * rtsp_connection,rtsp_message_t * response,rtsp_message_t ** ret_request,rtsp_client_session_t ** ret_session)538 static apt_bool_t rtsp_client_request_pop(rtsp_client_connection_t *rtsp_connection, rtsp_message_t *response, rtsp_message_t **ret_request, rtsp_client_session_t **ret_session)
539 {
540 	rtsp_client_session_t *session;
541 	apt_list_elem_t *elem = apt_list_first_elem_get(rtsp_connection->inprogress_request_queue);
542 	while(elem) {
543 		session = apt_list_elem_object_get(elem);
544 		if(session->active_request && session->active_request->header.cseq == response->header.cseq) {
545 			if(ret_session) {
546 				*ret_session = session;
547 			}
548 			if(ret_request) {
549 				*ret_request = session->active_request;
550 			}
551 			apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Pop In-Progress RTSP Request "APT_PTR_FMT" CSeq:%"APR_SIZE_T_FMT,
552 				session,
553 				response->header.cseq);
554 			apt_list_elem_remove(rtsp_connection->inprogress_request_queue,elem);
555 			session->active_request = NULL;
556 			apt_timer_kill(session->request_timer);
557 			return TRUE;
558 		}
559 		elem = apt_list_next_elem_get(rtsp_connection->inprogress_request_queue,elem);
560 	}
561 	return FALSE;
562 }
563 
564 /* Process outgoing RTSP request */
rtsp_client_session_request_process(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)565 static apt_bool_t rtsp_client_session_request_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message)
566 {
567 	if(!session->connection) {
568 		/* create RTSP connection */
569 		if(rtsp_client_connection_create(client,session) == FALSE) {
570 			/* respond with error */
571 			return FALSE;
572 		}
573 		apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Add RTSP Handle "APT_PTR_FMT,session);
574 		apr_hash_set(session->connection->handle_table,session,sizeof(void*),session);
575 	}
576 
577 	rtsp_client_session_url_generate(session,message);
578 
579 	if(session->id.length) {
580 		message->header.session_id = session->id;
581 		rtsp_header_property_add(&message->header,RTSP_HEADER_FIELD_SESSION_ID,message->pool);
582 	}
583 
584 	message->header.cseq = ++session->connection->last_cseq;
585 	rtsp_header_property_add(&message->header,RTSP_HEADER_FIELD_CSEQ,message->pool);
586 
587 	if(rtsp_client_message_send(client,session->connection,message) == FALSE) {
588 		/* respond with error */
589 		return FALSE;
590 	}
591 
592 	return rtsp_client_request_push(session->connection,session,message);
593 }
594 
595 /* Process pending RTSP requests */
rtsp_client_session_pending_requests_process(rtsp_client_t * client,rtsp_client_session_t * session)596 static apt_bool_t rtsp_client_session_pending_requests_process(rtsp_client_t *client, rtsp_client_session_t *session)
597 {
598 	rtsp_message_t *request = apt_list_pop_front(session->pending_request_queue);
599 	if(!request) {
600 		/* pending queue is empty, no in-progress request */
601 		return FALSE;
602 	}
603 
604 	/* process pending request; get the next one, if current is failed */
605 	do {
606 		rtsp_message_t *response;
607 		if(rtsp_client_session_request_process(client,session,request) == TRUE) {
608 			return TRUE;
609 		}
610 
611 		/* respond with error */
612 		response = rtsp_response_create(
613 							request,
614 							RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
615 							RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR,
616 							session->pool);
617 		rtsp_client_session_response_process(client,session,request,response);
618 
619 		/* process the next pending request / if any */
620 		request = apt_list_pop_front(session->pending_request_queue);
621 	}
622 	while(request);
623 
624 	/* no in-progress request */
625 	return FALSE;
626 }
627 
628 
629 /* Process outgoing RTSP message */
rtsp_client_session_message_process(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * message)630 static apt_bool_t rtsp_client_session_message_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *message)
631 {
632 	if(session->active_request) {
633 		apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Push RTSP Request to Pending Queue "APT_PTR_FMT,session);
634 		apt_list_push_back(session->pending_request_queue,message,message->pool);
635 		return TRUE;
636 	}
637 
638 	if(rtsp_client_session_request_process(client,session,message) == FALSE) {
639 		/* respond with error in case request cannot be processed */
640 		rtsp_message_t *response = rtsp_response_create(
641 							message,
642 							RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
643 							RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR,
644 							session->pool);
645 		rtsp_client_session_response_process(client,session,message,response);
646 	}
647 	return TRUE;
648 }
649 
650 /* Process incoming RTSP event (request) */
rtsp_client_session_event_process(rtsp_client_t * client,rtsp_client_connection_t * rtsp_connection,rtsp_message_t * message)651 static apt_bool_t rtsp_client_session_event_process(rtsp_client_t *client, rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message)
652 {
653 	rtsp_message_t *response = NULL;
654 	rtsp_client_session_t *session = NULL;
655 	if(rtsp_header_property_check(&message->header,RTSP_HEADER_FIELD_SESSION_ID) == TRUE) {
656 		/* find existing session */
657 		session = apr_hash_get(
658 					rtsp_connection->session_table,
659 					message->header.session_id.buf,
660 					message->header.session_id.length);
661 	}
662 
663 	if(session) {
664 		response = rtsp_response_create(message,RTSP_STATUS_CODE_OK,RTSP_REASON_PHRASE_OK,message->pool);
665 		if(rtsp_header_property_check(&message->header,RTSP_HEADER_FIELD_SESSION_ID) == TRUE) {
666 			response->header.session_id = message->header.session_id;
667 			rtsp_header_property_add(&response->header,RTSP_HEADER_FIELD_SESSION_ID,message->pool);
668 		}
669 		client->vtable->on_session_event(client,session,message);
670 	}
671 	else {
672 		response = rtsp_response_create(message,RTSP_STATUS_CODE_NOT_FOUND,RTSP_REASON_PHRASE_NOT_FOUND,message->pool);
673 	}
674 
675 	return rtsp_client_message_send(client,rtsp_connection,response);
676 }
677 
678 /* Process incoming RTSP response */
rtsp_client_session_response_process(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_message_t * request,rtsp_message_t * response)679 static apt_bool_t rtsp_client_session_response_process(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_message_t *request, rtsp_message_t *response)
680 {
681 	const char *resource_name;
682 	if(request->start_line.common.request_line.method_id == RTSP_METHOD_SETUP &&
683 		response->start_line.common.status_line.status_code == RTSP_STATUS_CODE_OK) {
684 
685 		if(apr_hash_count(session->resource_table) == 0) {
686 			if(rtsp_header_property_check(&response->header,RTSP_HEADER_FIELD_SESSION_ID) == TRUE) {
687 				session->id = response->header.session_id;
688 				apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Add RTSP Session "APT_PTRSID_FMT,
689 					session,
690 					session->id.buf);
691 				apr_hash_set(session->connection->session_table,session->id.buf,session->id.length,session);
692 			}
693 		}
694 
695 		/* add resource */
696 		resource_name = request->start_line.common.request_line.resource_name;
697 		apr_hash_set(session->resource_table,resource_name,APR_HASH_KEY_STRING,request);
698 	}
699 	else if(request->start_line.common.request_line.method_id == RTSP_METHOD_TEARDOWN) {
700 		/* remove resource */
701 		resource_name = request->start_line.common.request_line.resource_name;
702 		apr_hash_set(session->resource_table,resource_name,APR_HASH_KEY_STRING,NULL);
703 
704 		if(apr_hash_count(session->resource_table) == 0) {
705 			if(session->connection) {
706 				apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove RTSP Session "APT_PTRSID_FMT,
707 					session,
708 					session->id.buf);
709 				apr_hash_set(session->connection->session_table,session->id.buf,session->id.length,NULL);
710 			}
711 		}
712 	}
713 
714 	if(session->term_state != TERMINATION_STATE_INPROGRESS) {
715 		client->vtable->on_session_response(client,session,request,response);
716 	}
717 
718 	return TRUE;
719 }
720 
721 /* Raise RTSP session terminate event */
rtsp_client_session_terminate_raise(rtsp_client_t * client,rtsp_client_session_t * session)722 static apt_bool_t rtsp_client_session_terminate_raise(rtsp_client_t *client, rtsp_client_session_t *session)
723 {
724 	rtsp_message_t *request;
725 	rtsp_message_t *response;
726 
727 	/* cancel pending requests */
728 	do {
729 		request = apt_list_pop_front(session->pending_request_queue);
730 		if(request) {
731 			response = rtsp_response_create(
732 							session->active_request,
733 							RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
734 							RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR,
735 							session->pool);
736 			rtsp_client_session_response_process(client,session,request,response);
737 		}
738 	}
739 	while(request);
740 
741 	if(session->term_state == TERMINATION_STATE_NONE) {
742 		client->vtable->on_session_terminate_event(client,session);
743 	}
744 	else {
745 		rtsp_client_session_terminate_respond(client,session);
746 	}
747 	return TRUE;
748 }
749 
750 /* Cancel RTSP request */
rtsp_client_request_cancel(rtsp_client_t * client,rtsp_client_session_t * session,rtsp_status_code_e status_code,rtsp_reason_phrase_e reason)751 static apt_bool_t rtsp_client_request_cancel(rtsp_client_t *client, rtsp_client_session_t *session, rtsp_status_code_e status_code, rtsp_reason_phrase_e reason)
752 {
753 	rtsp_message_t *request;
754 	rtsp_message_t *response;
755 	if(!session->active_request) {
756 		return FALSE;
757 	}
758 
759 	request = session->active_request;
760 	response = rtsp_response_create(
761 						request,
762 						status_code,
763 						reason,
764 						session->pool);
765 
766 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Cancel RTSP Request "APT_PTRSID_FMT" CSeq:%"APR_SIZE_T_FMT" [%d]",
767 		session,
768 		request->header.session_id.buf ? request->header.session_id.buf : "new",
769 		request->header.cseq,
770 		status_code);
771 
772 	return rtsp_client_message_handler(session->connection, response, APT_MESSAGE_STATUS_COMPLETE);
773 }
774 
775 /* RTSP connection disconnected */
rtsp_client_on_disconnect(rtsp_client_t * client,rtsp_client_connection_t * rtsp_connection)776 static apt_bool_t rtsp_client_on_disconnect(rtsp_client_t *client, rtsp_client_connection_t *rtsp_connection)
777 {
778 	rtsp_client_session_t *session;
779 	apr_size_t remaining_handles;
780 
781 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"RTSP Peer Disconnected %s", rtsp_connection->id);
782 	rtsp_client_connection_close(client,rtsp_connection);
783 
784 	/* Cancel in-progreess requests */
785 	do {
786 		session = apt_list_pop_front(rtsp_connection->inprogress_request_queue);
787 		if(session) {
788 			if(rtsp_client_request_cancel(
789 						client,
790 						session,
791 						RTSP_STATUS_CODE_INTERNAL_SERVER_ERROR,
792 						RTSP_REASON_PHRASE_INTERNAL_SERVER_ERROR) == TRUE) {
793 				apt_timer_kill(session->request_timer);
794 			}
795 		}
796 	}
797 	while(session);
798 
799 	/* Walk through RTSP handles and raise termination event for them */
800 	remaining_handles = apr_hash_count(rtsp_connection->handle_table);
801 	if(remaining_handles) {
802 		void *val;
803 		apr_hash_index_t *it;
804 		apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Terminate Remaining RTSP Handles [%"APR_SIZE_T_FMT"]",remaining_handles);
805 		it = apr_hash_first(rtsp_connection->pool,rtsp_connection->session_table);
806 		for(; it; it = apr_hash_next(it)) {
807 			apr_hash_this(it,NULL,NULL,&val);
808 			session = val;
809 			if(session) {
810 				rtsp_client_session_terminate_raise(client,session);
811 			}
812 		}
813 	}
814 
815 	return TRUE;
816 }
817 
818 /* Send RTSP message through RTSP connection */
rtsp_client_message_send(rtsp_client_t * client,rtsp_client_connection_t * rtsp_connection,rtsp_message_t * message)819 static apt_bool_t rtsp_client_message_send(rtsp_client_t *client, rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message)
820 {
821 	apt_bool_t status = FALSE;
822 	apt_text_stream_t *stream;
823 	apt_message_status_e result;
824 
825 	if(!rtsp_connection || !rtsp_connection->sock) {
826 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"No RTSP Connection");
827 		return FALSE;
828 	}
829 	stream = &rtsp_connection->tx_stream;
830 
831 	do {
832 		stream->text.length = sizeof(rtsp_connection->tx_buffer)-1;
833 		apt_text_stream_reset(stream);
834 		result = rtsp_generator_run(rtsp_connection->generator,message,stream);
835 		if(result != APT_MESSAGE_STATUS_INVALID) {
836 			stream->text.length = stream->pos - stream->text.buf;
837 			*stream->pos = '\0';
838 
839 			apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Send RTSP Data %s [%"APR_SIZE_T_FMT" bytes]\n%s",
840 				rtsp_connection->id,
841 				stream->text.length,
842 				stream->text.buf);
843 			if(apr_socket_send(rtsp_connection->sock,stream->text.buf,&stream->text.length) == APR_SUCCESS) {
844 				status = TRUE;
845 			}
846 			else {
847 				apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send RTSP Data");
848 			}
849 		}
850 		else {
851 			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Generate RTSP Data");
852 		}
853 	}
854 	while(result == APT_MESSAGE_STATUS_INCOMPLETE);
855 
856 	return status;
857 }
858 
859 /** Return TRUE to proceed with the next message in the stream (if any) */
rtsp_client_message_handler(rtsp_client_connection_t * rtsp_connection,rtsp_message_t * message,apt_message_status_e status)860 static apt_bool_t rtsp_client_message_handler(rtsp_client_connection_t *rtsp_connection, rtsp_message_t *message, apt_message_status_e status)
861 {
862 	if(status != APT_MESSAGE_STATUS_COMPLETE) {
863 		/* message is not completely parsed, nothing to do */
864 		return TRUE;
865 	}
866 	/* process parsed message */
867 	if(message->start_line.message_type == RTSP_MESSAGE_TYPE_RESPONSE) {
868 		rtsp_message_t *request;
869 		rtsp_client_session_t *session;
870 		/* at first, pop in-progress request/session */
871 		if(rtsp_client_request_pop(rtsp_connection,message,&request,&session) == FALSE) {
872 			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Unexpected RTSP Response Received CSeq:%"APR_SIZE_T_FMT,
873 				message->header.cseq);
874 			return TRUE;
875 		}
876 
877 		/* next, process session response */
878 		rtsp_client_session_response_process(rtsp_connection->client,session,request,message);
879 
880 		/* process session pending requests */
881 		if(rtsp_client_session_pending_requests_process(rtsp_connection->client,session) == FALSE) {
882 			/* no in-progress request, check the termination state now */
883 			if(session->term_state != TERMINATION_STATE_NONE) {
884 				if(session->term_state == TERMINATION_STATE_REQUESTED) {
885 					rtsp_client_session_resources_teardown(rtsp_connection->client,session);
886 				}
887 
888 				/* respond if no resources left */
889 				if(apr_hash_count(session->resource_table) == 0) {
890 					rtsp_client_session_terminate_respond(rtsp_connection->client,session);
891 
892 					if(apr_hash_count(rtsp_connection->handle_table) == 0) {
893 						rtsp_client_connection_destroy(rtsp_connection);
894 						/* return FALSE to indicate connection has been destroyed */
895 						return FALSE;
896 					}
897 				}
898 			}
899 		}
900 	}
901 	else if(message->start_line.message_type == RTSP_MESSAGE_TYPE_REQUEST) {
902 		rtsp_client_session_event_process(rtsp_connection->client,rtsp_connection,message);
903 	}
904 	return TRUE;
905 }
906 
907 /* Receive RTSP message through RTSP connection */
rtsp_client_poller_signal_process(void * obj,const apr_pollfd_t * descriptor)908 static apt_bool_t rtsp_client_poller_signal_process(void *obj, const apr_pollfd_t *descriptor)
909 {
910 	rtsp_client_t *client = obj;
911 	rtsp_client_connection_t *rtsp_connection = descriptor->client_data;
912 	apr_status_t status;
913 	apr_size_t offset;
914 	apr_size_t length;
915 	apt_text_stream_t *stream;
916 	rtsp_message_t *message;
917 	apt_message_status_e msg_status;
918 
919 	if(!rtsp_connection || !rtsp_connection->sock) {
920 		return FALSE;
921 	}
922 	stream = &rtsp_connection->rx_stream;
923 
924 	/* calculate offset remaining from the previous receive / if any */
925 	offset = stream->pos - stream->text.buf;
926 	/* calculate available length */
927 	length = sizeof(rtsp_connection->rx_buffer) - 1 - offset;
928 
929 	status = apr_socket_recv(rtsp_connection->sock,stream->pos,&length);
930 	if(status == APR_EOF || length == 0) {
931 		return rtsp_client_on_disconnect(client,rtsp_connection);
932 	}
933 
934 	/* calculate actual length of the stream */
935 	stream->text.length = offset + length;
936 	stream->pos[length] = '\0';
937 	apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Receive RTSP Data %s [%"APR_SIZE_T_FMT" bytes]\n%s",
938 		rtsp_connection->id,
939 		length,
940 		stream->pos);
941 
942 	/* reset pos */
943 	apt_text_stream_reset(stream);
944 
945 	do {
946 		msg_status = rtsp_parser_run(rtsp_connection->parser,stream,&message);
947 		if(rtsp_client_message_handler(rtsp_connection,message,msg_status) == FALSE) {
948 			return FALSE;
949 		}
950 	}
951 	while(apt_text_is_eos(stream) == FALSE);
952 
953 	/* scroll remaining stream */
954 	apt_text_stream_scroll(stream);
955 	return TRUE;
956 }
957 
958 /* Process task message */
rtsp_client_task_msg_process(apt_task_t * task,apt_task_msg_t * task_msg)959 static apt_bool_t rtsp_client_task_msg_process(apt_task_t *task, apt_task_msg_t *task_msg)
960 {
961 	apt_poller_task_t *poller_task = apt_task_object_get(task);
962 	rtsp_client_t *client = apt_poller_task_object_get(poller_task);
963 
964 	task_msg_data_t *data = (task_msg_data_t*) task_msg->data;
965 	switch(data->type) {
966 		case TASK_MSG_SEND_MESSAGE:
967 			rtsp_client_session_message_process(client,data->session,data->message);
968 			break;
969 		case TASK_MSG_TERMINATE_SESSION:
970 			rtsp_client_session_terminate_process(client,data->session);
971 			break;
972 	}
973 
974 	return TRUE;
975 }
976 
977 /* Timer callback */
rtsp_client_timer_proc(apt_timer_t * timer,void * obj)978 static void rtsp_client_timer_proc(apt_timer_t *timer, void *obj)
979 {
980 	rtsp_client_session_t *session = obj;
981 	if(!session || !session->connection || !session->connection->client) {
982 		return;
983 	}
984 
985 	if(session->request_timer == timer) {
986 		rtsp_client_request_cancel(
987 				session->connection->client,
988 				session,
989 				RTSP_STATUS_CODE_REQUEST_TIMEOUT,
990 				RTSP_REASON_PHRASE_REQUEST_TIMEOUT);
991 	}
992 }
993