1 /* GStreamer
2 * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19 /*
20 * Unless otherwise indicated, Source Code is licensed under MIT license.
21 * See further explanation attached in License Statement (distributed in the file
22 * LICENSE).
23 *
24 * Permission is hereby granted, free of charge, to any person obtaining a copy of
25 * this software and associated documentation files (the "Software"), to deal in
26 * the Software without restriction, including without limitation the rights to
27 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28 * of the Software, and to permit persons to whom the Software is furnished to do
29 * so, subject to the following conditions:
30 *
31 * The above copyright notice and this permission notice shall be included in all
32 * copies or substantial portions of the Software.
33 *
34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40 * SOFTWARE.
41 */
42
43 /**
44 * SECTION:gstrtspconnection
45 * @title: GstRTSPConnection
46 * @short_description: manage RTSP connections
47 * @see_also: gstrtspurl
48 *
49 * This object manages the RTSP connection to the server. It provides function
50 * to receive and send bytes and messages.
51 */
52
53 #ifdef HAVE_CONFIG_H
54 # include <config.h>
55 #endif
56
57 #include <stdio.h>
58 #include <errno.h>
59 #include <stdlib.h>
60 #include <string.h>
61 #include <time.h>
62
63 /* we include this here to get the G_OS_* defines */
64 #include <glib.h>
65 #include <gst/gst.h>
66 #include <gst/base/base.h>
67
68 /* necessary for IP_TOS define */
69 #include <gio/gnetworking.h>
70
71 #include "gstrtspconnection.h"
72
73 #ifdef IP_TOS
74 union gst_sockaddr
75 {
76 struct sockaddr sa;
77 struct sockaddr_in sa_in;
78 struct sockaddr_in6 sa_in6;
79 struct sockaddr_storage sa_stor;
80 };
81 #endif
82
83 typedef struct
84 {
85 gint state;
86 guint save;
87 guchar out[3]; /* the size must be evenly divisible by 3 */
88 guint cout;
89 guint coutl;
90 } DecodeCtx;
91
92 typedef struct
93 {
94 /* If %TRUE we only own data and none of the
95 * other fields
96 */
97 gboolean borrowed;
98
99 /* Header or full message */
100 guint8 *data;
101 guint data_size;
102 gboolean data_is_data_header;
103
104 /* Payload following data, if any */
105 guint8 *body_data;
106 guint body_data_size;
107 /* or */
108 GstBuffer *body_buffer;
109
110 /* DATA packet header statically allocated for above */
111 guint8 data_header[4];
112
113 /* all below only for async writing */
114
115 guint data_offset; /* == data_size when done */
116 guint body_offset; /* into body_data or the buffer */
117
118 /* ID of the message for notification */
119 guint id;
120 } GstRTSPSerializedMessage;
121
122 static void
gst_rtsp_serialized_message_clear(GstRTSPSerializedMessage * msg)123 gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
124 {
125 if (!msg->borrowed) {
126 g_free (msg->body_data);
127 gst_buffer_replace (&msg->body_buffer, NULL);
128 }
129 g_free (msg->data);
130 }
131
132 #ifdef MSG_NOSIGNAL
133 #define SEND_FLAGS MSG_NOSIGNAL
134 #else
135 #define SEND_FLAGS 0
136 #endif
137
138 typedef enum
139 {
140 TUNNEL_STATE_NONE,
141 TUNNEL_STATE_GET,
142 TUNNEL_STATE_POST,
143 TUNNEL_STATE_COMPLETE
144 } GstRTSPTunnelState;
145
146 #define TUNNELID_LEN 24
147
148 struct _GstRTSPConnection
149 {
150 /*< private > */
151 /* URL for the remote connection */
152 GstRTSPUrl *url;
153 GstRTSPVersion version;
154
155 gboolean server;
156 GSocketClient *client;
157 GIOStream *stream0;
158 GIOStream *stream1;
159
160 GInputStream *input_stream;
161 GOutputStream *output_stream;
162 /* this is a read source we add on the write socket in tunneled mode to be
163 * able to detect when client disconnects the GET channel */
164 GInputStream *control_stream;
165
166 /* connection state */
167 GSocket *read_socket;
168 GSocket *write_socket;
169 GSocket *socket0, *socket1;
170 gboolean manual_http;
171 gboolean may_cancel;
172 GCancellable *cancellable;
173
174 gchar tunnelid[TUNNELID_LEN];
175 gboolean tunneled;
176 GstRTSPTunnelState tstate;
177
178 /* the remote and local ip */
179 gchar *remote_ip;
180 gchar *local_ip;
181
182 gint read_ahead;
183
184 gchar *initial_buffer;
185 gsize initial_buffer_offset;
186
187 gboolean remember_session_id; /* remember the session id or not */
188
189 /* Session state */
190 gint cseq; /* sequence number */
191 gchar session_id[512]; /* session id */
192 gint timeout; /* session timeout in seconds */
193 GTimer *timer; /* timeout timer */
194
195 /* Authentication */
196 GstRTSPAuthMethod auth_method;
197 gchar *username;
198 gchar *passwd;
199 GHashTable *auth_params;
200
201 /* TLS */
202 GTlsDatabase *tls_database;
203 GTlsInteraction *tls_interaction;
204
205 GstRTSPConnectionAcceptCertificateFunc accept_certificate_func;
206 GDestroyNotify accept_certificate_destroy_notify;
207 gpointer accept_certificate_user_data;
208
209 DecodeCtx ctx;
210 DecodeCtx *ctxp;
211
212 gchar *proxy_host;
213 guint proxy_port;
214 };
215
216 enum
217 {
218 STATE_START = 0,
219 STATE_DATA_HEADER,
220 STATE_DATA_BODY,
221 STATE_READ_LINES,
222 STATE_END,
223 STATE_LAST
224 };
225
226 enum
227 {
228 READ_AHEAD_EOH = -1, /* end of headers */
229 READ_AHEAD_CRLF = -2,
230 READ_AHEAD_CRLFCR = -3
231 };
232
233 /* a structure for constructing RTSPMessages */
234 typedef struct
235 {
236 gint state;
237 GstRTSPResult status;
238 guint8 buffer[4096];
239 guint offset;
240
241 guint line;
242 guint8 *body_data;
243 glong body_len;
244 } GstRTSPBuilder;
245
246 /* function prototypes */
247 static void add_auth_header (GstRTSPConnection * conn,
248 GstRTSPMessage * message);
249
250 static void
build_reset(GstRTSPBuilder * builder)251 build_reset (GstRTSPBuilder * builder)
252 {
253 g_free (builder->body_data);
254 memset (builder, 0, sizeof (GstRTSPBuilder));
255 }
256
257 static gboolean
tls_accept_certificate(GTlsConnection * conn,GTlsCertificate * peer_cert,GTlsCertificateFlags errors,GstRTSPConnection * rtspconn)258 tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
259 GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
260 {
261 GError *error = NULL;
262 gboolean accept = FALSE;
263
264 if (rtspconn->tls_database) {
265 GSocketConnectable *peer_identity;
266 GTlsCertificateFlags validation_flags;
267
268 GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
269
270 peer_identity =
271 g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
272 (conn));
273
274 errors =
275 g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
276 G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
277 g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
278 NULL, &error);
279
280 if (error)
281 goto verify_error;
282
283 validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
284
285 accept = ((errors & validation_flags) == 0);
286 if (accept)
287 GST_DEBUG ("Peer certificate accepted");
288 else
289 GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
290 }
291
292 if (!accept && rtspconn->accept_certificate_func) {
293 accept =
294 rtspconn->accept_certificate_func (conn, peer_cert, errors,
295 rtspconn->accept_certificate_user_data);
296 GST_DEBUG ("Peer certificate %saccepted by accept-certificate function",
297 accept ? "" : "not ");
298 }
299
300 return accept;
301
302 /* ERRORS */
303 verify_error:
304 {
305 GST_ERROR ("An error occurred while verifying the peer certificate: %s",
306 error->message);
307 g_clear_error (&error);
308 return FALSE;
309 }
310 }
311
312 static void
socket_client_event(GSocketClient * client,GSocketClientEvent event,GSocketConnectable * connectable,GTlsConnection * connection,GstRTSPConnection * rtspconn)313 socket_client_event (GSocketClient * client, GSocketClientEvent event,
314 GSocketConnectable * connectable, GTlsConnection * connection,
315 GstRTSPConnection * rtspconn)
316 {
317 if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
318 GST_DEBUG ("TLS handshaking about to start...");
319
320 g_signal_connect (connection, "accept-certificate",
321 (GCallback) tls_accept_certificate, rtspconn);
322
323 g_tls_connection_set_interaction (connection, rtspconn->tls_interaction);
324 }
325 }
326
327 /**
328 * gst_rtsp_connection_create:
329 * @url: a #GstRTSPUrl
330 * @conn: (out) (transfer full): storage for a #GstRTSPConnection
331 *
332 * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
333 * The connection will not yet attempt to connect to @url, use
334 * gst_rtsp_connection_connect().
335 *
336 * A copy of @url will be made.
337 *
338 * Returns: #GST_RTSP_OK when @conn contains a valid connection.
339 */
340 GstRTSPResult
gst_rtsp_connection_create(const GstRTSPUrl * url,GstRTSPConnection ** conn)341 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
342 {
343 GstRTSPConnection *newconn;
344
345 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
346 g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL);
347
348 newconn = g_new0 (GstRTSPConnection, 1);
349
350 newconn->may_cancel = TRUE;
351 newconn->cancellable = g_cancellable_new ();
352 newconn->client = g_socket_client_new ();
353
354 if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
355 g_socket_client_set_tls (newconn->client, TRUE);
356
357 g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
358 newconn);
359
360 newconn->url = gst_rtsp_url_copy (url);
361 newconn->timer = g_timer_new ();
362 newconn->timeout = 60;
363 newconn->cseq = 1; /* RFC 7826: "it is RECOMMENDED to start at 0.",
364 but some servers don't copy values <1 due to bugs. */
365
366 newconn->remember_session_id = TRUE;
367
368 newconn->auth_method = GST_RTSP_AUTH_NONE;
369 newconn->username = NULL;
370 newconn->passwd = NULL;
371 newconn->auth_params = NULL;
372 newconn->version = 0;
373
374 *conn = newconn;
375
376 return GST_RTSP_OK;
377 }
378
379 static gboolean
collect_addresses(GSocket * socket,gchar ** ip,guint16 * port,gboolean remote,GError ** error)380 collect_addresses (GSocket * socket, gchar ** ip, guint16 * port,
381 gboolean remote, GError ** error)
382 {
383 GSocketAddress *addr;
384
385 if (remote)
386 addr = g_socket_get_remote_address (socket, error);
387 else
388 addr = g_socket_get_local_address (socket, error);
389 if (!addr)
390 return FALSE;
391
392 if (ip)
393 *ip = g_inet_address_to_string (g_inet_socket_address_get_address
394 (G_INET_SOCKET_ADDRESS (addr)));
395 if (port)
396 *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
397
398 g_object_unref (addr);
399
400 return TRUE;
401 }
402
403
404 /**
405 * gst_rtsp_connection_create_from_socket:
406 * @socket: a #GSocket
407 * @ip: the IP address of the other end
408 * @port: the port used by the other end
409 * @initial_buffer: data already read from @fd
410 * @conn: (out) (transfer full): storage for a #GstRTSPConnection
411 *
412 * Create a new #GstRTSPConnection for handling communication on the existing
413 * socket @socket. The @initial_buffer contains zero terminated data already
414 * read from @socket which should be used before starting to read new data.
415 *
416 * Returns: #GST_RTSP_OK when @conn contains a valid connection.
417 */
418 /* FIXME 2.0 We don't need the ip and port since they can be got from the
419 * GSocket */
420 GstRTSPResult
gst_rtsp_connection_create_from_socket(GSocket * socket,const gchar * ip,guint16 port,const gchar * initial_buffer,GstRTSPConnection ** conn)421 gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
422 guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn)
423 {
424 GstRTSPConnection *newconn = NULL;
425 GstRTSPUrl *url;
426 GstRTSPResult res;
427 GError *err = NULL;
428 gchar *local_ip;
429 GIOStream *stream;
430
431 g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
432 g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
433 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
434
435 if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
436 goto getnameinfo_failed;
437
438 /* create a url for the client address */
439 url = g_new0 (GstRTSPUrl, 1);
440 url->host = g_strdup (ip);
441 url->port = port;
442
443 /* now create the connection object */
444 GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
445 gst_rtsp_url_free (url);
446
447 stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
448
449 /* both read and write initially */
450 newconn->server = TRUE;
451 newconn->socket0 = socket;
452 newconn->stream0 = stream;
453 newconn->write_socket = newconn->read_socket = newconn->socket0;
454 newconn->input_stream = g_io_stream_get_input_stream (stream);
455 newconn->output_stream = g_io_stream_get_output_stream (stream);
456 newconn->control_stream = NULL;
457 newconn->remote_ip = g_strdup (ip);
458 newconn->local_ip = local_ip;
459 newconn->initial_buffer = g_strdup (initial_buffer);
460
461 *conn = newconn;
462
463 return GST_RTSP_OK;
464
465 /* ERRORS */
466 getnameinfo_failed:
467 {
468 GST_ERROR ("failed to get local address: %s", err->message);
469 g_clear_error (&err);
470 return GST_RTSP_ERROR;
471 }
472 newconn_failed:
473 {
474 GST_ERROR ("failed to make connection");
475 g_free (local_ip);
476 gst_rtsp_url_free (url);
477 return res;
478 }
479 }
480
481 /**
482 * gst_rtsp_connection_accept:
483 * @socket: a socket
484 * @conn: (out) (transfer full): storage for a #GstRTSPConnection
485 * @cancellable: a #GCancellable to cancel the operation
486 *
487 * Accept a new connection on @socket and create a new #GstRTSPConnection for
488 * handling communication on new socket.
489 *
490 * Returns: #GST_RTSP_OK when @conn contains a valid connection.
491 */
492 GstRTSPResult
gst_rtsp_connection_accept(GSocket * socket,GstRTSPConnection ** conn,GCancellable * cancellable)493 gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn,
494 GCancellable * cancellable)
495 {
496 GError *err = NULL;
497 gchar *ip;
498 guint16 port;
499 GSocket *client_sock;
500 GstRTSPResult ret;
501
502 g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
503 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
504
505 client_sock = g_socket_accept (socket, cancellable, &err);
506 if (!client_sock)
507 goto accept_failed;
508
509 /* get the remote ip address and port */
510 if (!collect_addresses (client_sock, &ip, &port, TRUE, &err))
511 goto getnameinfo_failed;
512
513 ret =
514 gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL,
515 conn);
516 g_object_unref (client_sock);
517 g_free (ip);
518
519 return ret;
520
521 /* ERRORS */
522 accept_failed:
523 {
524 GST_DEBUG ("Accepting client failed: %s", err->message);
525 g_clear_error (&err);
526 return GST_RTSP_ESYS;
527 }
528 getnameinfo_failed:
529 {
530 GST_DEBUG ("getnameinfo failed: %s", err->message);
531 g_clear_error (&err);
532 if (!g_socket_close (client_sock, &err)) {
533 GST_DEBUG ("Closing socket failed: %s", err->message);
534 g_clear_error (&err);
535 }
536 g_object_unref (client_sock);
537 return GST_RTSP_ERROR;
538 }
539 }
540
541 /**
542 * gst_rtsp_connection_get_tls:
543 * @conn: a #GstRTSPConnection
544 * @error: #GError for error reporting, or NULL to ignore.
545 *
546 * Get the TLS connection of @conn.
547 *
548 * For client side this will return the #GTlsClientConnection when connected
549 * over TLS.
550 *
551 * For server side connections, this function will create a GTlsServerConnection
552 * when called the first time and will return that same connection on subsequent
553 * calls. The server is then responsible for configuring the TLS connection.
554 *
555 * Returns: (transfer none): the TLS connection for @conn.
556 *
557 * Since: 1.2
558 */
559 GTlsConnection *
gst_rtsp_connection_get_tls(GstRTSPConnection * conn,GError ** error)560 gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
561 {
562 GTlsConnection *result;
563
564 if (G_IS_TLS_CONNECTION (conn->stream0)) {
565 /* we already had one, return it */
566 result = G_TLS_CONNECTION (conn->stream0);
567 } else if (conn->server) {
568 /* no TLS connection but we are server, make one */
569 result = (GTlsConnection *)
570 g_tls_server_connection_new (conn->stream0, NULL, error);
571 if (result) {
572 g_object_unref (conn->stream0);
573 conn->stream0 = G_IO_STREAM (result);
574 conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
575 conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
576 }
577 } else {
578 /* client */
579 result = NULL;
580 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
581 "client not connected with TLS");
582 }
583 return result;
584 }
585
586 /**
587 * gst_rtsp_connection_set_tls_validation_flags:
588 * @conn: a #GstRTSPConnection
589 * @flags: the validation flags.
590 *
591 * Sets the TLS validation flags to be used to verify the peer
592 * certificate when a TLS connection is established.
593 *
594 * Returns: TRUE if the validation flags are set correctly, or FALSE if
595 * @conn is NULL or is not a TLS connection.
596 *
597 * Since: 1.2.1
598 */
599 gboolean
gst_rtsp_connection_set_tls_validation_flags(GstRTSPConnection * conn,GTlsCertificateFlags flags)600 gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
601 GTlsCertificateFlags flags)
602 {
603 gboolean res = FALSE;
604
605 g_return_val_if_fail (conn != NULL, FALSE);
606
607 res = g_socket_client_get_tls (conn->client);
608 if (res)
609 g_socket_client_set_tls_validation_flags (conn->client, flags);
610
611 return res;
612 }
613
614 /**
615 * gst_rtsp_connection_get_tls_validation_flags:
616 * @conn: a #GstRTSPConnection
617 *
618 * Gets the TLS validation flags used to verify the peer certificate
619 * when a TLS connection is established.
620 *
621 * Returns: the validationg flags.
622 *
623 * Since: 1.2.1
624 */
625 GTlsCertificateFlags
gst_rtsp_connection_get_tls_validation_flags(GstRTSPConnection * conn)626 gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
627 {
628 g_return_val_if_fail (conn != NULL, 0);
629
630 return g_socket_client_get_tls_validation_flags (conn->client);
631 }
632
633 /**
634 * gst_rtsp_connection_set_tls_database:
635 * @conn: a #GstRTSPConnection
636 * @database: a #GTlsDatabase
637 *
638 * Sets the anchor certificate authorities database. This certificate
639 * database will be used to verify the server's certificate in case it
640 * can't be verified with the default certificate database first.
641 *
642 * Since: 1.4
643 */
644 void
gst_rtsp_connection_set_tls_database(GstRTSPConnection * conn,GTlsDatabase * database)645 gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
646 GTlsDatabase * database)
647 {
648 GTlsDatabase *old_db;
649
650 g_return_if_fail (conn != NULL);
651
652 if (database)
653 g_object_ref (database);
654
655 old_db = conn->tls_database;
656 conn->tls_database = database;
657
658 if (old_db)
659 g_object_unref (old_db);
660 }
661
662 /**
663 * gst_rtsp_connection_get_tls_database:
664 * @conn: a #GstRTSPConnection
665 *
666 * Gets the anchor certificate authorities database that will be used
667 * after a server certificate can't be verified with the default
668 * certificate database.
669 *
670 * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
671 * database has been previously set. Use g_object_unref() to release the
672 * certificate database.
673 *
674 * Since: 1.4
675 */
676 GTlsDatabase *
gst_rtsp_connection_get_tls_database(GstRTSPConnection * conn)677 gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
678 {
679 GTlsDatabase *result;
680
681 g_return_val_if_fail (conn != NULL, NULL);
682
683 if ((result = conn->tls_database))
684 g_object_ref (result);
685
686 return result;
687 }
688
689 /**
690 * gst_rtsp_connection_set_tls_interaction:
691 * @conn: a #GstRTSPConnection
692 * @interaction: a #GTlsInteraction
693 *
694 * Sets a #GTlsInteraction object to be used when the connection or certificate
695 * database need to interact with the user. This will be used to prompt the
696 * user for passwords where necessary.
697 *
698 * Since: 1.6
699 */
700 void
gst_rtsp_connection_set_tls_interaction(GstRTSPConnection * conn,GTlsInteraction * interaction)701 gst_rtsp_connection_set_tls_interaction (GstRTSPConnection * conn,
702 GTlsInteraction * interaction)
703 {
704 GTlsInteraction *old_interaction;
705
706 g_return_if_fail (conn != NULL);
707
708 if (interaction)
709 g_object_ref (interaction);
710
711 old_interaction = conn->tls_interaction;
712 conn->tls_interaction = interaction;
713
714 if (old_interaction)
715 g_object_unref (old_interaction);
716 }
717
718 /**
719 * gst_rtsp_connection_get_tls_interaction:
720 * @conn: a #GstRTSPConnection
721 *
722 * Gets a #GTlsInteraction object to be used when the connection or certificate
723 * database need to interact with the user. This will be used to prompt the
724 * user for passwords where necessary.
725 *
726 * Returns: (transfer full): a reference on the #GTlsInteraction. Use
727 * g_object_unref() to release.
728 *
729 * Since: 1.6
730 */
731 GTlsInteraction *
gst_rtsp_connection_get_tls_interaction(GstRTSPConnection * conn)732 gst_rtsp_connection_get_tls_interaction (GstRTSPConnection * conn)
733 {
734 GTlsInteraction *result;
735
736 g_return_val_if_fail (conn != NULL, NULL);
737
738 if ((result = conn->tls_interaction))
739 g_object_ref (result);
740
741 return result;
742 }
743
744 /**
745 * gst_rtsp_connection_set_accept_certificate_func:
746 * @conn: a #GstRTSPConnection
747 * @func: a #GstRTSPConnectionAcceptCertificateFunc to check certificates
748 * @destroy_notify: #GDestroyNotify for @user_data
749 * @user_data: User data passed to @func
750 *
751 * Sets a custom accept-certificate function for checking certificates for
752 * validity. This will directly map to #GTlsConnection 's "accept-certificate"
753 * signal and be performed after the default checks of #GstRTSPConnection
754 * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
755 * have failed. If no #GTlsDatabase is set on this connection, only @func will
756 * be called.
757 *
758 * Since: 1.14
759 */
760 void
gst_rtsp_connection_set_accept_certificate_func(GstRTSPConnection * conn,GstRTSPConnectionAcceptCertificateFunc func,gpointer user_data,GDestroyNotify destroy_notify)761 gst_rtsp_connection_set_accept_certificate_func (GstRTSPConnection * conn,
762 GstRTSPConnectionAcceptCertificateFunc func,
763 gpointer user_data, GDestroyNotify destroy_notify)
764 {
765 if (conn->accept_certificate_destroy_notify)
766 conn->
767 accept_certificate_destroy_notify (conn->accept_certificate_user_data);
768 conn->accept_certificate_func = func;
769 conn->accept_certificate_user_data = user_data;
770 conn->accept_certificate_destroy_notify = destroy_notify;
771 }
772
773 static GstRTSPResult
setup_tunneling(GstRTSPConnection * conn,GTimeVal * timeout,gchar * uri,GstRTSPMessage * response)774 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri,
775 GstRTSPMessage * response)
776 {
777 gint i;
778 GstRTSPResult res;
779 gchar *value;
780 guint16 url_port;
781 GstRTSPMessage *msg;
782 gboolean old_http;
783 GstRTSPUrl *url;
784 GError *error = NULL;
785 GSocketConnection *connection;
786 GSocket *socket;
787 gchar *connection_uri = NULL;
788 gchar *request_uri = NULL;
789 gchar *host = NULL;
790
791 url = conn->url;
792
793 gst_rtsp_url_get_port (url, &url_port);
794 host = g_strdup_printf ("%s:%d", url->host, url_port);
795
796 /* create a random sessionid */
797 for (i = 0; i < TUNNELID_LEN; i++)
798 conn->tunnelid[i] = g_random_int_range ('a', 'z');
799 conn->tunnelid[TUNNELID_LEN - 1] = '\0';
800
801 /* create the GET request for the read connection */
802 GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri),
803 no_message);
804 msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
805
806 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
807 conn->tunnelid);
808 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
809 "application/x-rtsp-tunnelled");
810 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
811 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
812 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
813
814 /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
815 * request from being base64 encoded */
816 conn->tunneled = FALSE;
817 GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed);
818 gst_rtsp_message_free (msg);
819 conn->tunneled = TRUE;
820
821 /* receive the response to the GET request */
822 /* we need to temporarily set manual_http to TRUE since
823 * gst_rtsp_connection_receive() will treat the HTTP response as a parsing
824 * failure otherwise */
825 old_http = conn->manual_http;
826 conn->manual_http = TRUE;
827 GST_RTSP_CHECK (gst_rtsp_connection_receive (conn, response, timeout),
828 read_failed);
829 conn->manual_http = old_http;
830
831 if (response->type != GST_RTSP_MESSAGE_HTTP_RESPONSE ||
832 response->type_data.response.code != GST_RTSP_STS_OK)
833 goto wrong_result;
834
835 if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
836 &value, 0) == GST_RTSP_OK) {
837 g_free (url->host);
838 url->host = g_strdup (value);
839 g_free (conn->remote_ip);
840 conn->remote_ip = g_strdup (value);
841 }
842
843 connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
844 url->abspath, url->query ? "?" : "", url->query ? url->query : "");
845
846 /* connect to the host/port */
847 if (conn->proxy_host) {
848 connection = g_socket_client_connect_to_host (conn->client,
849 conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
850 request_uri = g_strdup (connection_uri);
851 } else {
852 connection = g_socket_client_connect_to_uri (conn->client,
853 connection_uri, 0, conn->cancellable, &error);
854 request_uri =
855 g_strdup_printf ("%s%s%s", url->abspath,
856 url->query ? "?" : "", url->query ? url->query : "");
857 }
858 if (connection == NULL)
859 goto connect_failed;
860
861 socket = g_socket_connection_get_socket (connection);
862
863 /* get remote address */
864 g_free (conn->remote_ip);
865 conn->remote_ip = NULL;
866
867 if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error))
868 goto remote_address_failed;
869
870 /* this is now our writing socket */
871 conn->stream1 = G_IO_STREAM (connection);
872 conn->socket1 = socket;
873 conn->write_socket = conn->socket1;
874 conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
875 conn->control_stream = NULL;
876
877 /* create the POST request for the write connection */
878 GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST,
879 request_uri), no_message);
880 msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
881
882 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
883 conn->tunnelid);
884 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
885 "application/x-rtsp-tunnelled");
886 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
887 "application/x-rtsp-tunnelled");
888 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
889 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
890 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES,
891 "Sun, 9 Jan 1972 00:00:00 GMT");
892 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767");
893 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
894
895 /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
896 * request from being base64 encoded */
897 conn->tunneled = FALSE;
898 GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed);
899 gst_rtsp_message_free (msg);
900 conn->tunneled = TRUE;
901
902 exit:
903 g_free (connection_uri);
904 g_free (request_uri);
905 g_free (host);
906
907 return res;
908
909 /* ERRORS */
910 no_message:
911 {
912 GST_ERROR ("failed to create request (%d)", res);
913 goto exit;
914 }
915 write_failed:
916 {
917 GST_ERROR ("write failed (%d)", res);
918 gst_rtsp_message_free (msg);
919 conn->tunneled = TRUE;
920 goto exit;
921 }
922 read_failed:
923 {
924 GST_ERROR ("read failed (%d)", res);
925 conn->manual_http = FALSE;
926 goto exit;
927 }
928 wrong_result:
929 {
930 GST_ERROR ("got failure response %d %s",
931 response->type_data.response.code, response->type_data.response.reason);
932 res = GST_RTSP_ERROR;
933 goto exit;
934 }
935 connect_failed:
936 {
937 GST_ERROR ("failed to connect: %s", error->message);
938 res = GST_RTSP_ERROR;
939 g_clear_error (&error);
940 goto exit;
941 }
942 remote_address_failed:
943 {
944 GST_ERROR ("failed to resolve address: %s", error->message);
945 g_object_unref (connection);
946 g_clear_error (&error);
947 return GST_RTSP_ERROR;
948 }
949 }
950
951 /**
952 * gst_rtsp_connection_connect_with_response:
953 * @conn: a #GstRTSPConnection
954 * @timeout: a #GTimeVal timeout
955 * @response: a #GstRTSPMessage
956 *
957 * Attempt to connect to the url of @conn made with
958 * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
959 * forever. If @timeout contains a valid timeout, this function will return
960 * #GST_RTSP_ETIMEOUT after the timeout expired. If @conn is set to tunneled,
961 * @response will contain a response to the tunneling request messages.
962 *
963 * This function can be cancelled with gst_rtsp_connection_flush().
964 *
965 * Returns: #GST_RTSP_OK when a connection could be made.
966 *
967 * Since: 1.8
968 */
969 GstRTSPResult
gst_rtsp_connection_connect_with_response(GstRTSPConnection * conn,GTimeVal * timeout,GstRTSPMessage * response)970 gst_rtsp_connection_connect_with_response (GstRTSPConnection * conn,
971 GTimeVal * timeout, GstRTSPMessage * response)
972 {
973 GstRTSPResult res;
974 GSocketConnection *connection;
975 GSocket *socket;
976 GError *error = NULL;
977 gchar *connection_uri, *request_uri, *remote_ip;
978 GstClockTime to;
979 guint16 url_port;
980 GstRTSPUrl *url;
981
982 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
983 g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
984 g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL);
985
986 to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
987 g_socket_client_set_timeout (conn->client,
988 (to + GST_SECOND - 1) / GST_SECOND);
989
990 url = conn->url;
991
992 gst_rtsp_url_get_port (url, &url_port);
993
994 if (conn->tunneled) {
995 connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
996 url->abspath, url->query ? "?" : "", url->query ? url->query : "");
997 } else {
998 connection_uri = gst_rtsp_url_get_request_uri (url);
999 }
1000
1001 if (conn->proxy_host) {
1002 connection = g_socket_client_connect_to_host (conn->client,
1003 conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
1004 request_uri = g_strdup (connection_uri);
1005 } else {
1006 connection = g_socket_client_connect_to_uri (conn->client,
1007 connection_uri, url_port, conn->cancellable, &error);
1008
1009 /* use the relative component of the uri for non-proxy connections */
1010 request_uri = g_strdup_printf ("%s%s%s", url->abspath,
1011 url->query ? "?" : "", url->query ? url->query : "");
1012 }
1013 if (connection == NULL)
1014 goto connect_failed;
1015
1016 /* get remote address */
1017 socket = g_socket_connection_get_socket (connection);
1018
1019 if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
1020 goto remote_address_failed;
1021
1022 g_free (conn->remote_ip);
1023 conn->remote_ip = remote_ip;
1024 conn->stream0 = G_IO_STREAM (connection);
1025 conn->socket0 = socket;
1026 /* this is our read socket */
1027 conn->read_socket = conn->socket0;
1028 conn->write_socket = conn->socket0;
1029 conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
1030 conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
1031 conn->control_stream = NULL;
1032
1033 if (conn->tunneled) {
1034 res = setup_tunneling (conn, timeout, request_uri, response);
1035 if (res != GST_RTSP_OK)
1036 goto tunneling_failed;
1037 }
1038 g_free (connection_uri);
1039 g_free (request_uri);
1040
1041 return GST_RTSP_OK;
1042
1043 /* ERRORS */
1044 connect_failed:
1045 {
1046 GST_ERROR ("failed to connect: %s", error->message);
1047 g_clear_error (&error);
1048 g_free (connection_uri);
1049 g_free (request_uri);
1050 return GST_RTSP_ERROR;
1051 }
1052 remote_address_failed:
1053 {
1054 GST_ERROR ("failed to connect: %s", error->message);
1055 g_object_unref (connection);
1056 g_clear_error (&error);
1057 g_free (connection_uri);
1058 g_free (request_uri);
1059 return GST_RTSP_ERROR;
1060 }
1061 tunneling_failed:
1062 {
1063 GST_ERROR ("failed to setup tunneling");
1064 g_free (connection_uri);
1065 g_free (request_uri);
1066 return res;
1067 }
1068 }
1069
1070 static void
add_auth_header(GstRTSPConnection * conn,GstRTSPMessage * message)1071 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
1072 {
1073 switch (conn->auth_method) {
1074 case GST_RTSP_AUTH_BASIC:{
1075 gchar *user_pass;
1076 gchar *user_pass64;
1077 gchar *auth_string;
1078
1079 if (conn->username == NULL || conn->passwd == NULL)
1080 break;
1081
1082 user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1083 user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1084 auth_string = g_strdup_printf ("Basic %s", user_pass64);
1085
1086 gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1087 auth_string);
1088
1089 g_free (user_pass);
1090 g_free (user_pass64);
1091 break;
1092 }
1093 case GST_RTSP_AUTH_DIGEST:{
1094 gchar *response;
1095 gchar *auth_string, *auth_string2;
1096 gchar *realm;
1097 gchar *nonce;
1098 gchar *opaque;
1099 const gchar *uri;
1100 const gchar *method;
1101
1102 /* we need to have some params set */
1103 if (conn->auth_params == NULL || conn->username == NULL ||
1104 conn->passwd == NULL)
1105 break;
1106
1107 /* we need the realm and nonce */
1108 realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1109 nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1110 if (realm == NULL || nonce == NULL)
1111 break;
1112
1113 method = gst_rtsp_method_as_text (message->type_data.request.method);
1114 uri = message->type_data.request.uri;
1115
1116 response =
1117 gst_rtsp_generate_digest_auth_response (NULL, method, realm,
1118 conn->username, conn->passwd, uri, nonce);
1119 auth_string =
1120 g_strdup_printf ("Digest username=\"%s\", "
1121 "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1122 conn->username, realm, nonce, uri, response);
1123 g_free (response);
1124
1125 opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1126 if (opaque) {
1127 auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1128 opaque);
1129 g_free (auth_string);
1130 auth_string = auth_string2;
1131 }
1132 /* Do not keep any old Authorization headers */
1133 gst_rtsp_message_remove_header (message, GST_RTSP_HDR_AUTHORIZATION, -1);
1134 gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1135 auth_string);
1136 break;
1137 }
1138 default:
1139 /* Nothing to do */
1140 break;
1141 }
1142 }
1143
1144 /**
1145 * gst_rtsp_connection_connect:
1146 * @conn: a #GstRTSPConnection
1147 * @timeout: a #GTimeVal timeout
1148 *
1149 * Attempt to connect to the url of @conn made with
1150 * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
1151 * forever. If @timeout contains a valid timeout, this function will return
1152 * #GST_RTSP_ETIMEOUT after the timeout expired.
1153 *
1154 * This function can be cancelled with gst_rtsp_connection_flush().
1155 *
1156 * Returns: #GST_RTSP_OK when a connection could be made.
1157 */
1158 GstRTSPResult
gst_rtsp_connection_connect(GstRTSPConnection * conn,GTimeVal * timeout)1159 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
1160 {
1161 GstRTSPResult result;
1162 GstRTSPMessage response;
1163
1164 memset (&response, 0, sizeof (response));
1165 gst_rtsp_message_init (&response);
1166
1167 result = gst_rtsp_connection_connect_with_response (conn, timeout, &response);
1168
1169 gst_rtsp_message_unset (&response);
1170
1171 return result;
1172 }
1173
1174 static void
gen_date_string(gchar * date_string,guint len)1175 gen_date_string (gchar * date_string, guint len)
1176 {
1177 static const char wkdays[7][4] =
1178 { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
1179 static const char months[12][4] =
1180 { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
1181 "Nov", "Dec"
1182 };
1183 struct tm tm;
1184 time_t t;
1185
1186 time (&t);
1187
1188 #ifdef HAVE_GMTIME_R
1189 gmtime_r (&t, &tm);
1190 #else
1191 tm = *gmtime (&t);
1192 #endif
1193
1194 g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
1195 wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
1196 tm.tm_hour, tm.tm_min, tm.tm_sec);
1197 }
1198
1199 static GstRTSPResult
write_bytes(GOutputStream * stream,const guint8 * buffer,guint * idx,guint size,gboolean block,GCancellable * cancellable)1200 write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
1201 guint size, gboolean block, GCancellable * cancellable)
1202 {
1203 guint left;
1204 gssize r;
1205 GError *err = NULL;
1206
1207 if (G_UNLIKELY (*idx > size))
1208 return GST_RTSP_ERROR;
1209
1210 left = size - *idx;
1211
1212 while (left) {
1213 if (block)
1214 r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
1215 cancellable, &err);
1216 else
1217 r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
1218 (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
1219 if (G_UNLIKELY (r < 0))
1220 goto error;
1221
1222 left -= r;
1223 *idx += r;
1224 }
1225 return GST_RTSP_OK;
1226
1227 /* ERRORS */
1228 error:
1229 {
1230 if (G_UNLIKELY (r == 0))
1231 return GST_RTSP_EEOF;
1232
1233 if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1234 GST_WARNING ("%s", err->message);
1235 else
1236 GST_DEBUG ("%s", err->message);
1237
1238 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1239 g_clear_error (&err);
1240 return GST_RTSP_EINTR;
1241 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1242 g_clear_error (&err);
1243 return GST_RTSP_EINTR;
1244 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1245 g_clear_error (&err);
1246 return GST_RTSP_ETIMEOUT;
1247 }
1248 g_clear_error (&err);
1249 return GST_RTSP_ESYS;
1250 }
1251 }
1252
1253 /* NOTE: This changes the values of vectors if multiple iterations are needed! */
1254 #if GLIB_CHECK_VERSION(2, 59, 1)
1255 static GstRTSPResult
writev_bytes(GOutputStream * stream,GOutputVector * vectors,gint n_vectors,gsize * bytes_written,gboolean block,GCancellable * cancellable)1256 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1257 gsize * bytes_written, gboolean block, GCancellable * cancellable)
1258 {
1259 gsize _bytes_written = 0;
1260 gsize written;
1261 GError *err = NULL;
1262 GPollableReturn res = G_POLLABLE_RETURN_OK;
1263
1264 while (n_vectors > 0) {
1265 if (block) {
1266 if (G_UNLIKELY (!g_output_stream_writev (stream, vectors, n_vectors,
1267 &written, cancellable, &err))) {
1268 /* This will never return G_IO_ERROR_WOULD_BLOCK */
1269 res = G_POLLABLE_RETURN_FAILED;
1270 goto error;
1271 }
1272 } else {
1273 res =
1274 g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
1275 (stream), vectors, n_vectors, &written, cancellable, &err);
1276
1277 if (res != G_POLLABLE_RETURN_OK) {
1278 g_assert (written == 0);
1279 goto error;
1280 }
1281 }
1282 _bytes_written += written;
1283
1284 /* skip vectors that have been written in full */
1285 while (written > 0 && written >= vectors[0].size) {
1286 written -= vectors[0].size;
1287 ++vectors;
1288 --n_vectors;
1289 }
1290
1291 /* skip partially written vector data */
1292 if (written > 0) {
1293 vectors[0].size -= written;
1294 vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
1295 }
1296 }
1297
1298 *bytes_written = _bytes_written;
1299
1300 return GST_RTSP_OK;
1301
1302 /* ERRORS */
1303 error:
1304 {
1305 *bytes_written = _bytes_written;
1306
1307 if (err)
1308 GST_WARNING ("%s", err->message);
1309 if (res == G_POLLABLE_RETURN_WOULD_BLOCK) {
1310 g_assert (!err);
1311 return GST_RTSP_EINTR;
1312 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1313 g_clear_error (&err);
1314 return GST_RTSP_EINTR;
1315 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1316 g_clear_error (&err);
1317 return GST_RTSP_ETIMEOUT;
1318 } else if (G_UNLIKELY (written == 0)) {
1319 g_clear_error (&err);
1320 return GST_RTSP_EEOF;
1321 }
1322
1323 g_clear_error (&err);
1324 return GST_RTSP_ESYS;
1325 }
1326 }
1327 #else
1328 static GstRTSPResult
writev_bytes(GOutputStream * stream,GOutputVector * vectors,gint n_vectors,gsize * bytes_written,gboolean block,GCancellable * cancellable)1329 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1330 gsize * bytes_written, gboolean block, GCancellable * cancellable)
1331 {
1332 gsize _bytes_written = 0;
1333 guint written;
1334 gint i;
1335 GstRTSPResult res = GST_RTSP_OK;
1336
1337 for (i = 0; i < n_vectors; i++) {
1338 written = 0;
1339 res =
1340 write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
1341 block, cancellable);
1342 _bytes_written += written;
1343 if (G_UNLIKELY (res != GST_RTSP_OK))
1344 break;
1345 }
1346
1347 *bytes_written = _bytes_written;
1348
1349 return res;
1350 }
1351 #endif
1352
1353 static gint
fill_raw_bytes(GstRTSPConnection * conn,guint8 * buffer,guint size,gboolean block,GError ** err)1354 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1355 gboolean block, GError ** err)
1356 {
1357 gint out = 0;
1358
1359 if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1360 gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1361
1362 out = MIN (left, size);
1363 memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1364
1365 if (left == (gsize) out) {
1366 g_free (conn->initial_buffer);
1367 conn->initial_buffer = NULL;
1368 conn->initial_buffer_offset = 0;
1369 } else
1370 conn->initial_buffer_offset += out;
1371 }
1372
1373 if (G_LIKELY (size > (guint) out)) {
1374 gssize r;
1375 gsize count = size - out;
1376 if (block)
1377 r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
1378 count, conn->may_cancel ? conn->cancellable : NULL, err);
1379 else
1380 r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
1381 (conn->input_stream), (gchar *) & buffer[out], count,
1382 conn->may_cancel ? conn->cancellable : NULL, err);
1383
1384 if (G_UNLIKELY (r < 0)) {
1385 if (out == 0) {
1386 /* propagate the error */
1387 out = r;
1388 } else {
1389 /* we have some data ignore error */
1390 g_clear_error (err);
1391 }
1392 } else
1393 out += r;
1394 }
1395
1396 return out;
1397 }
1398
1399 static gint
fill_bytes(GstRTSPConnection * conn,guint8 * buffer,guint size,gboolean block,GError ** err)1400 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1401 gboolean block, GError ** err)
1402 {
1403 DecodeCtx *ctx = conn->ctxp;
1404 gint out = 0;
1405
1406 if (ctx) {
1407 while (size > 0) {
1408 guint8 in[sizeof (ctx->out) * 4 / 3];
1409 gint r;
1410
1411 while (size > 0 && ctx->cout < ctx->coutl) {
1412 /* we have some leftover bytes */
1413 *buffer++ = ctx->out[ctx->cout++];
1414 size--;
1415 out++;
1416 }
1417
1418 /* got what we needed? */
1419 if (size == 0)
1420 break;
1421
1422 /* try to read more bytes */
1423 r = fill_raw_bytes (conn, in, sizeof (in), block, err);
1424 if (r <= 0) {
1425 if (out == 0) {
1426 out = r;
1427 } else {
1428 /* we have some data ignore error */
1429 g_clear_error (err);
1430 }
1431 break;
1432 }
1433
1434 ctx->cout = 0;
1435 ctx->coutl =
1436 g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1437 &ctx->save);
1438 }
1439 } else {
1440 out = fill_raw_bytes (conn, buffer, size, block, err);
1441 }
1442
1443 return out;
1444 }
1445
1446 static GstRTSPResult
read_bytes(GstRTSPConnection * conn,guint8 * buffer,guint * idx,guint size,gboolean block)1447 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1448 gboolean block)
1449 {
1450 guint left;
1451 gint r;
1452 GError *err = NULL;
1453
1454 if (G_UNLIKELY (*idx > size))
1455 return GST_RTSP_ERROR;
1456
1457 left = size - *idx;
1458
1459 while (left) {
1460 r = fill_bytes (conn, &buffer[*idx], left, block, &err);
1461 if (G_UNLIKELY (r <= 0))
1462 goto error;
1463
1464 left -= r;
1465 *idx += r;
1466 }
1467 return GST_RTSP_OK;
1468
1469 /* ERRORS */
1470 error:
1471 {
1472 if (G_UNLIKELY (r == 0))
1473 return GST_RTSP_EEOF;
1474
1475 GST_DEBUG ("%s", err->message);
1476 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1477 g_clear_error (&err);
1478 return GST_RTSP_EINTR;
1479 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1480 g_clear_error (&err);
1481 return GST_RTSP_EINTR;
1482 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1483 g_clear_error (&err);
1484 return GST_RTSP_ETIMEOUT;
1485 }
1486 g_clear_error (&err);
1487 return GST_RTSP_ESYS;
1488 }
1489 }
1490
1491 /* The code below tries to handle clients using \r, \n or \r\n to indicate the
1492 * end of a line. It even does its best to handle clients which mix them (even
1493 * though this is a really stupid idea (tm).) It also handles Line White Space
1494 * (LWS), where a line end followed by whitespace is considered LWS. This is
1495 * the method used in RTSP (and HTTP) to break long lines.
1496 */
1497 static GstRTSPResult
read_line(GstRTSPConnection * conn,guint8 * buffer,guint * idx,guint size,gboolean block)1498 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1499 gboolean block)
1500 {
1501 GstRTSPResult res;
1502
1503 while (TRUE) {
1504 guint8 c;
1505 guint i;
1506
1507 if (conn->read_ahead == READ_AHEAD_EOH) {
1508 /* the last call to read_line() already determined that we have reached
1509 * the end of the headers, so convey that information now */
1510 conn->read_ahead = 0;
1511 break;
1512 } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1513 /* the last call to read_line() left off after having read \r\n */
1514 c = '\n';
1515 } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1516 /* the last call to read_line() left off after having read \r\n\r */
1517 c = '\r';
1518 } else if (conn->read_ahead != 0) {
1519 /* the last call to read_line() left us with a character to start with */
1520 c = (guint8) conn->read_ahead;
1521 conn->read_ahead = 0;
1522 } else {
1523 /* read the next character */
1524 i = 0;
1525 res = read_bytes (conn, &c, &i, 1, block);
1526 if (G_UNLIKELY (res != GST_RTSP_OK))
1527 return res;
1528 }
1529
1530 /* special treatment of line endings */
1531 if (c == '\r' || c == '\n') {
1532 guint8 read_ahead;
1533
1534 retry:
1535 /* need to read ahead one more character to know what to do... */
1536 i = 0;
1537 res = read_bytes (conn, &read_ahead, &i, 1, block);
1538 if (G_UNLIKELY (res != GST_RTSP_OK))
1539 return res;
1540
1541 if (read_ahead == ' ' || read_ahead == '\t') {
1542 if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1543 /* got \r\n\r followed by whitespace, treat it as a normal line
1544 * followed by one starting with LWS */
1545 conn->read_ahead = read_ahead;
1546 break;
1547 } else {
1548 /* got LWS, change the line ending to a space and continue */
1549 c = ' ';
1550 conn->read_ahead = read_ahead;
1551 }
1552 } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1553 if (read_ahead == '\r' || read_ahead == '\n') {
1554 /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */
1555 conn->read_ahead = READ_AHEAD_EOH;
1556 break;
1557 } else {
1558 /* got \r\n\r followed by something else, this is not really
1559 * supported since we have probably just eaten the first character
1560 * of the body or the next message, so just ignore the second \r
1561 * and live with it... */
1562 conn->read_ahead = read_ahead;
1563 break;
1564 }
1565 } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1566 if (read_ahead == '\r') {
1567 /* got \r\n\r so far, need one more character... */
1568 conn->read_ahead = READ_AHEAD_CRLFCR;
1569 goto retry;
1570 } else if (read_ahead == '\n') {
1571 /* got \r\n\n, treat it as the end of the headers */
1572 conn->read_ahead = READ_AHEAD_EOH;
1573 break;
1574 } else {
1575 /* found the end of a line, keep read_ahead for the next line */
1576 conn->read_ahead = read_ahead;
1577 break;
1578 }
1579 } else if (c == read_ahead) {
1580 /* got double \r or \n, treat it as the end of the headers */
1581 conn->read_ahead = READ_AHEAD_EOH;
1582 break;
1583 } else if (c == '\r' && read_ahead == '\n') {
1584 /* got \r\n so far, still need more to know what to do... */
1585 conn->read_ahead = READ_AHEAD_CRLF;
1586 goto retry;
1587 } else {
1588 /* found the end of a line, keep read_ahead for the next line */
1589 conn->read_ahead = read_ahead;
1590 break;
1591 }
1592 }
1593
1594 if (G_LIKELY (*idx < size - 1))
1595 buffer[(*idx)++] = c;
1596 }
1597 buffer[*idx] = '\0';
1598
1599 return GST_RTSP_OK;
1600 }
1601
1602 /**
1603 * gst_rtsp_connection_write:
1604 * @conn: a #GstRTSPConnection
1605 * @data: the data to write
1606 * @size: the size of @data
1607 * @timeout: a timeout value or %NULL
1608 *
1609 * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1610 * the specified @timeout. @timeout can be %NULL, in which case this function
1611 * might block forever.
1612 *
1613 * This function can be cancelled with gst_rtsp_connection_flush().
1614 *
1615 * Returns: #GST_RTSP_OK on success.
1616 */
1617 /* FIXME 2.0: This should've been static! */
1618 GstRTSPResult
gst_rtsp_connection_write(GstRTSPConnection * conn,const guint8 * data,guint size,GTimeVal * timeout)1619 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
1620 guint size, GTimeVal * timeout)
1621 {
1622 guint offset;
1623 GstClockTime to;
1624 GstRTSPResult res;
1625
1626 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1627 g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1628 g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
1629
1630 offset = 0;
1631
1632 to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
1633
1634 g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1635 res =
1636 write_bytes (conn->output_stream, data, &offset, size, TRUE,
1637 conn->cancellable);
1638 g_socket_set_timeout (conn->write_socket, 0);
1639
1640 return res;
1641 }
1642
1643 static gboolean
serialize_message(GstRTSPConnection * conn,GstRTSPMessage * message,GstRTSPSerializedMessage * serialized_message)1644 serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
1645 GstRTSPSerializedMessage * serialized_message)
1646 {
1647 GString *str = NULL;
1648
1649 memset (serialized_message, 0, sizeof (*serialized_message));
1650
1651 /* Initially we borrow the body_data / body_buffer fields from
1652 * the message */
1653 serialized_message->borrowed = TRUE;
1654
1655 switch (message->type) {
1656 case GST_RTSP_MESSAGE_REQUEST:
1657 str = g_string_new ("");
1658
1659 /* create request string, add CSeq */
1660 g_string_append_printf (str, "%s %s RTSP/%s\r\n"
1661 "CSeq: %d\r\n",
1662 gst_rtsp_method_as_text (message->type_data.request.method),
1663 message->type_data.request.uri,
1664 gst_rtsp_version_as_text (message->type_data.request.version),
1665 conn->cseq++);
1666 /* add session id if we have one */
1667 if (conn->session_id[0] != '\0') {
1668 gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1669 gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1670 conn->session_id);
1671 }
1672 /* add any authentication headers */
1673 add_auth_header (conn, message);
1674 break;
1675 case GST_RTSP_MESSAGE_RESPONSE:
1676 str = g_string_new ("");
1677
1678 /* create response string */
1679 g_string_append_printf (str, "RTSP/%s %d %s\r\n",
1680 gst_rtsp_version_as_text (message->type_data.response.version),
1681 message->type_data.response.code, message->type_data.response.reason);
1682 break;
1683 case GST_RTSP_MESSAGE_HTTP_REQUEST:
1684 str = g_string_new ("");
1685
1686 /* create request string */
1687 g_string_append_printf (str, "%s %s HTTP/%s\r\n",
1688 gst_rtsp_method_as_text (message->type_data.request.method),
1689 message->type_data.request.uri,
1690 gst_rtsp_version_as_text (message->type_data.request.version));
1691 /* add any authentication headers */
1692 add_auth_header (conn, message);
1693 break;
1694 case GST_RTSP_MESSAGE_HTTP_RESPONSE:
1695 str = g_string_new ("");
1696
1697 /* create response string */
1698 g_string_append_printf (str, "HTTP/%s %d %s\r\n",
1699 gst_rtsp_version_as_text (message->type_data.request.version),
1700 message->type_data.response.code, message->type_data.response.reason);
1701 break;
1702 case GST_RTSP_MESSAGE_DATA:
1703 {
1704 guint8 *data_header = serialized_message->data_header;
1705
1706 /* prepare data header */
1707 data_header[0] = '$';
1708 data_header[1] = message->type_data.data.channel;
1709 data_header[2] = (message->body_size >> 8) & 0xff;
1710 data_header[3] = message->body_size & 0xff;
1711
1712 /* create serialized message with header and data */
1713 serialized_message->data_is_data_header = TRUE;
1714 serialized_message->data_size = 4;
1715
1716 if (message->body) {
1717 serialized_message->body_data = message->body;
1718 serialized_message->body_data_size = message->body_size;
1719 } else {
1720 g_assert (message->body_buffer != NULL);
1721 serialized_message->body_buffer = message->body_buffer;
1722 }
1723 break;
1724 }
1725 default:
1726 g_string_free (str, TRUE);
1727 g_return_val_if_reached (FALSE);
1728 break;
1729 }
1730
1731 /* append headers and body */
1732 if (message->type != GST_RTSP_MESSAGE_DATA) {
1733 gchar date_string[100];
1734
1735 g_assert (str != NULL);
1736
1737 gen_date_string (date_string, sizeof (date_string));
1738
1739 /* add date header */
1740 gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1741 gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1742
1743 /* append headers */
1744 gst_rtsp_message_append_headers (message, str);
1745
1746 /* append Content-Length and body if needed */
1747 if (message->body_size > 0) {
1748 gchar *len;
1749
1750 len = g_strdup_printf ("%d", message->body_size);
1751 g_string_append_printf (str, "%s: %s\r\n",
1752 gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1753 g_free (len);
1754 /* header ends here */
1755 g_string_append (str, "\r\n");
1756
1757 if (message->body) {
1758 serialized_message->body_data = message->body;
1759 serialized_message->body_data_size = message->body_size;
1760 } else {
1761 g_assert (message->body_buffer != NULL);
1762 serialized_message->body_buffer = message->body_buffer;
1763 }
1764 } else {
1765 /* just end headers */
1766 g_string_append (str, "\r\n");
1767 }
1768
1769 serialized_message->data_size = str->len;
1770 serialized_message->data = (guint8 *) g_string_free (str, FALSE);
1771 }
1772
1773 return TRUE;
1774 }
1775
1776 /**
1777 * gst_rtsp_connection_send:
1778 * @conn: a #GstRTSPConnection
1779 * @message: the message to send
1780 * @timeout: a timeout value or %NULL
1781 *
1782 * Attempt to send @message to the connected @conn, blocking up to
1783 * the specified @timeout. @timeout can be %NULL, in which case this function
1784 * might block forever.
1785 *
1786 * This function can be cancelled with gst_rtsp_connection_flush().
1787 *
1788 * Returns: #GST_RTSP_OK on success.
1789 */
1790 GstRTSPResult
gst_rtsp_connection_send(GstRTSPConnection * conn,GstRTSPMessage * message,GTimeVal * timeout)1791 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
1792 GTimeVal * timeout)
1793 {
1794 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1795 g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1796
1797 return gst_rtsp_connection_send_messages (conn, message, 1, timeout);
1798 }
1799
1800 /**
1801 * gst_rtsp_connection_send_messages:
1802 * @conn: a #GstRTSPConnection
1803 * @messages: (array length=n_messages): the messages to send
1804 * @n_messages: the number of messages to send
1805 * @timeout: a timeout value or %NULL
1806 *
1807 * Attempt to send @messages to the connected @conn, blocking up to
1808 * the specified @timeout. @timeout can be %NULL, in which case this function
1809 * might block forever.
1810 *
1811 * This function can be cancelled with gst_rtsp_connection_flush().
1812 *
1813 * Returns: #GST_RTSP_OK on success.
1814 *
1815 * Since: 1.16
1816 */
1817 GstRTSPResult
gst_rtsp_connection_send_messages(GstRTSPConnection * conn,GstRTSPMessage * messages,guint n_messages,GTimeVal * timeout)1818 gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
1819 GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
1820 {
1821 GstClockTime to;
1822 GstRTSPResult res;
1823 GstRTSPSerializedMessage *serialized_messages;
1824 GOutputVector *vectors;
1825 GstMapInfo *map_infos;
1826 guint n_vectors, n_memories;
1827 gint i, j, k;
1828 gsize bytes_to_write, bytes_written;
1829
1830 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1831 g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
1832
1833 serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
1834 memset (serialized_messages, 0,
1835 sizeof (GstRTSPSerializedMessage) * n_messages);
1836
1837 for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
1838 i++) {
1839 if (G_UNLIKELY (!serialize_message (conn, &messages[i],
1840 &serialized_messages[i])))
1841 goto no_message;
1842
1843 if (conn->tunneled) {
1844 gint state = 0, save = 0;
1845 gchar *base64_buffer, *out_buffer;
1846 gsize written = 0;
1847 gsize in_length;
1848
1849 in_length = serialized_messages[i].data_size;
1850 if (serialized_messages[i].body_data)
1851 in_length += serialized_messages[i].body_data_size;
1852 else if (serialized_messages[i].body_buffer)
1853 in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
1854
1855 in_length = (in_length / 3 + 1) * 4 + 4 + 1;
1856 base64_buffer = out_buffer = g_malloc0 (in_length);
1857
1858 written =
1859 g_base64_encode_step (serialized_messages[i].data_is_data_header ?
1860 serialized_messages[i].data_header : serialized_messages[i].data,
1861 serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
1862 out_buffer += written;
1863
1864 if (serialized_messages[i].body_data) {
1865 written =
1866 g_base64_encode_step (serialized_messages[i].body_data,
1867 serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
1868 &save);
1869 out_buffer += written;
1870 } else if (serialized_messages[i].body_buffer) {
1871 guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1872
1873 for (j = 0; j < n; j++) {
1874 GstMemory *mem =
1875 gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
1876 GstMapInfo map;
1877
1878 gst_memory_map (mem, &map, GST_MAP_READ);
1879
1880 written = g_base64_encode_step (map.data, map.size,
1881 FALSE, out_buffer, &state, &save);
1882 out_buffer += written;
1883
1884 gst_memory_unmap (mem, &map);
1885 }
1886 }
1887
1888 written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
1889 out_buffer += written;
1890
1891 gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1892 memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
1893
1894 serialized_messages[i].data = (guint8 *) base64_buffer;
1895 serialized_messages[i].data_size = (out_buffer - base64_buffer) + 1;
1896 n_vectors++;
1897 } else {
1898 n_vectors++;
1899 if (serialized_messages[i].body_data) {
1900 n_vectors++;
1901 } else if (serialized_messages[i].body_buffer) {
1902 n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1903 n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1904 }
1905 }
1906 }
1907
1908 vectors = g_newa (GOutputVector, n_vectors);
1909 map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
1910
1911 for (i = 0, j = 0, k = 0; i < n_messages; i++) {
1912 vectors[j].buffer = serialized_messages[i].data_is_data_header ?
1913 serialized_messages[i].data_header : serialized_messages[i].data;
1914 vectors[j].size = serialized_messages[i].data_size;
1915 bytes_to_write += vectors[j].size;
1916 j++;
1917
1918 if (serialized_messages[i].body_data) {
1919 vectors[j].buffer = serialized_messages[i].body_data;
1920 vectors[j].size = serialized_messages[i].body_data_size;
1921 bytes_to_write += vectors[j].size;
1922 j++;
1923 } else if (serialized_messages[i].body_buffer) {
1924 gint l, n;
1925
1926 n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1927 for (l = 0; l < n; l++) {
1928 GstMemory *mem =
1929 gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1930
1931 gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
1932 vectors[j].buffer = map_infos[k].data;
1933 vectors[j].size = map_infos[k].size;
1934 bytes_to_write += vectors[j].size;
1935
1936 k++;
1937 j++;
1938 }
1939 }
1940 }
1941
1942 /* write request: this is synchronous */
1943 to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
1944
1945 g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1946 res =
1947 writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
1948 TRUE, conn->cancellable);
1949 g_socket_set_timeout (conn->write_socket, 0);
1950
1951 g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
1952
1953 /* free everything */
1954 for (i = 0, k = 0; i < n_messages; i++) {
1955 if (serialized_messages[i].body_buffer) {
1956 gint l, n;
1957
1958 n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1959 for (l = 0; l < n; l++) {
1960 GstMemory *mem =
1961 gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1962
1963 gst_memory_unmap (mem, &map_infos[k]);
1964 k++;
1965 }
1966 }
1967
1968 g_free (serialized_messages[i].data);
1969 }
1970
1971 return res;
1972
1973 no_message:
1974 {
1975 for (i = 0; i < n_messages; i++) {
1976 gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1977 }
1978 g_warning ("Wrong message");
1979 return GST_RTSP_EINVAL;
1980 }
1981 }
1982
1983 static GstRTSPResult
parse_string(gchar * dest,gint size,gchar ** src)1984 parse_string (gchar * dest, gint size, gchar ** src)
1985 {
1986 GstRTSPResult res = GST_RTSP_OK;
1987 gint idx;
1988
1989 idx = 0;
1990 /* skip spaces */
1991 while (g_ascii_isspace (**src))
1992 (*src)++;
1993
1994 while (!g_ascii_isspace (**src) && **src != '\0') {
1995 if (idx < size - 1)
1996 dest[idx++] = **src;
1997 else
1998 res = GST_RTSP_EPARSE;
1999 (*src)++;
2000 }
2001 if (size > 0)
2002 dest[idx] = '\0';
2003
2004 return res;
2005 }
2006
2007 static GstRTSPResult
parse_protocol_version(gchar * protocol,GstRTSPMsgType * type,GstRTSPVersion * version)2008 parse_protocol_version (gchar * protocol, GstRTSPMsgType * type,
2009 GstRTSPVersion * version)
2010 {
2011 GstRTSPVersion rversion;
2012 GstRTSPResult res = GST_RTSP_OK;
2013 gchar *ver;
2014
2015 if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) {
2016 guint major;
2017 guint minor;
2018 gchar dummychar;
2019
2020 *ver++ = '\0';
2021
2022 /* the version number must be formatted as X.Y with nothing following */
2023 if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2)
2024 res = GST_RTSP_EPARSE;
2025
2026 rversion = major * 0x10 + minor;
2027 if (g_ascii_strcasecmp (protocol, "RTSP") == 0) {
2028
2029 if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_2_0) {
2030 *version = GST_RTSP_VERSION_INVALID;
2031 res = GST_RTSP_ERROR;
2032 }
2033 } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) {
2034 if (*type == GST_RTSP_MESSAGE_REQUEST)
2035 *type = GST_RTSP_MESSAGE_HTTP_REQUEST;
2036 else if (*type == GST_RTSP_MESSAGE_RESPONSE)
2037 *type = GST_RTSP_MESSAGE_HTTP_RESPONSE;
2038
2039 if (rversion != GST_RTSP_VERSION_1_0 &&
2040 rversion != GST_RTSP_VERSION_1_1 && rversion != GST_RTSP_VERSION_2_0)
2041 res = GST_RTSP_ERROR;
2042 } else
2043 res = GST_RTSP_EPARSE;
2044 } else
2045 res = GST_RTSP_EPARSE;
2046
2047 if (res == GST_RTSP_OK)
2048 *version = rversion;
2049
2050 return res;
2051 }
2052
2053 static GstRTSPResult
parse_response_status(guint8 * buffer,GstRTSPMessage * msg)2054 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
2055 {
2056 GstRTSPResult res = GST_RTSP_OK;
2057 GstRTSPResult res2;
2058 gchar versionstr[20];
2059 gchar codestr[4];
2060 gint code;
2061 gchar *bptr;
2062
2063 bptr = (gchar *) buffer;
2064
2065 if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2066 res = GST_RTSP_EPARSE;
2067
2068 if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK)
2069 res = GST_RTSP_EPARSE;
2070 code = atoi (codestr);
2071 if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600))
2072 res = GST_RTSP_EPARSE;
2073
2074 while (g_ascii_isspace (*bptr))
2075 bptr++;
2076
2077 if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr,
2078 NULL) != GST_RTSP_OK))
2079 res = GST_RTSP_EPARSE;
2080
2081 res2 = parse_protocol_version (versionstr, &msg->type,
2082 &msg->type_data.response.version);
2083 if (G_LIKELY (res == GST_RTSP_OK))
2084 res = res2;
2085
2086 return res;
2087 }
2088
2089 static GstRTSPResult
parse_request_line(guint8 * buffer,GstRTSPMessage * msg)2090 parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
2091 {
2092 GstRTSPResult res = GST_RTSP_OK;
2093 GstRTSPResult res2;
2094 gchar versionstr[20];
2095 gchar methodstr[20];
2096 gchar urlstr[4096];
2097 gchar *bptr;
2098 GstRTSPMethod method;
2099
2100 bptr = (gchar *) buffer;
2101
2102 if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK)
2103 res = GST_RTSP_EPARSE;
2104 method = gst_rtsp_find_method (methodstr);
2105
2106 if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK)
2107 res = GST_RTSP_EPARSE;
2108 if (G_UNLIKELY (*urlstr == '\0'))
2109 res = GST_RTSP_EPARSE;
2110
2111 if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2112 res = GST_RTSP_EPARSE;
2113
2114 if (G_UNLIKELY (*bptr != '\0'))
2115 res = GST_RTSP_EPARSE;
2116
2117 if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method,
2118 urlstr) != GST_RTSP_OK))
2119 res = GST_RTSP_EPARSE;
2120
2121 res2 = parse_protocol_version (versionstr, &msg->type,
2122 &msg->type_data.request.version);
2123 if (G_LIKELY (res == GST_RTSP_OK))
2124 res = res2;
2125
2126 if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) {
2127 /* GET and POST are not allowed as RTSP methods */
2128 if (msg->type_data.request.method == GST_RTSP_GET ||
2129 msg->type_data.request.method == GST_RTSP_POST) {
2130 msg->type_data.request.method = GST_RTSP_INVALID;
2131 if (res == GST_RTSP_OK)
2132 res = GST_RTSP_ERROR;
2133 }
2134 } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2135 /* only GET and POST are allowed as HTTP methods */
2136 if (msg->type_data.request.method != GST_RTSP_GET &&
2137 msg->type_data.request.method != GST_RTSP_POST) {
2138 msg->type_data.request.method = GST_RTSP_INVALID;
2139 if (res == GST_RTSP_OK)
2140 res = GST_RTSP_ERROR;
2141 }
2142 }
2143
2144 return res;
2145 }
2146
2147 /* parsing lines means reading a Key: Value pair */
2148 static GstRTSPResult
parse_line(guint8 * buffer,GstRTSPMessage * msg)2149 parse_line (guint8 * buffer, GstRTSPMessage * msg)
2150 {
2151 GstRTSPHeaderField field;
2152 gchar *line = (gchar *) buffer;
2153 gchar *field_name = NULL;
2154 gchar *value;
2155
2156 if ((value = strchr (line, ':')) == NULL || value == line)
2157 goto parse_error;
2158
2159 /* trim space before the colon */
2160 if (value[-1] == ' ')
2161 value[-1] = '\0';
2162
2163 /* replace the colon with a NUL */
2164 *value++ = '\0';
2165
2166 /* find the header */
2167 field = gst_rtsp_find_header_field (line);
2168 /* custom header not present in the list of pre-defined headers */
2169 if (field == GST_RTSP_HDR_INVALID)
2170 field_name = line;
2171
2172 /* split up the value in multiple key:value pairs if it contains comma(s) */
2173 while (*value != '\0') {
2174 gchar *next_value;
2175 gchar *comma = NULL;
2176 gboolean quoted = FALSE;
2177 guint comment = 0;
2178
2179 /* trim leading space */
2180 if (*value == ' ')
2181 value++;
2182
2183 /* for headers which may not appear multiple times, and thus may not
2184 * contain multiple values on the same line, we can short-circuit the loop
2185 * below and the entire value results in just one key:value pair*/
2186 if (!gst_rtsp_header_allow_multiple (field))
2187 next_value = value + strlen (value);
2188 else
2189 next_value = value;
2190
2191 /* find the next value, taking special care of quotes and comments */
2192 while (*next_value != '\0') {
2193 if ((quoted || comment != 0) && *next_value == '\\' &&
2194 next_value[1] != '\0')
2195 next_value++;
2196 else if (comment == 0 && *next_value == '"')
2197 quoted = !quoted;
2198 else if (!quoted && *next_value == '(')
2199 comment++;
2200 else if (comment != 0 && *next_value == ')')
2201 comment--;
2202 else if (!quoted && comment == 0) {
2203 /* To quote RFC 2068: "User agents MUST take special care in parsing
2204 * the WWW-Authenticate field value if it contains more than one
2205 * challenge, or if more than one WWW-Authenticate header field is
2206 * provided, since the contents of a challenge may itself contain a
2207 * comma-separated list of authentication parameters."
2208 *
2209 * What this means is that we cannot just look for an unquoted comma
2210 * when looking for multiple values in Proxy-Authenticate and
2211 * WWW-Authenticate headers. Instead we need to look for the sequence
2212 * "comma [space] token space token" before we can split after the
2213 * comma...
2214 */
2215 if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE ||
2216 field == GST_RTSP_HDR_WWW_AUTHENTICATE) {
2217 if (*next_value == ',') {
2218 if (next_value[1] == ' ') {
2219 /* skip any space following the comma so we do not mistake it for
2220 * separating between two tokens */
2221 next_value++;
2222 }
2223 comma = next_value;
2224 } else if (*next_value == ' ' && next_value[1] != ',' &&
2225 next_value[1] != '=' && comma != NULL) {
2226 next_value = comma;
2227 comma = NULL;
2228 break;
2229 }
2230 } else if (*next_value == ',')
2231 break;
2232 }
2233
2234 next_value++;
2235 }
2236
2237 if (msg->type == GST_RTSP_MESSAGE_REQUEST && field == GST_RTSP_HDR_SESSION) {
2238 /* The timeout parameter is only allowed in a session response header
2239 * but some clients send it as part of the session request header.
2240 * Ignore everything from the semicolon to the end of the line. */
2241 next_value = value;
2242 while (*next_value != '\0') {
2243 if (*next_value == ';') {
2244 break;
2245 }
2246 next_value++;
2247 }
2248 }
2249
2250 /* trim space */
2251 if (value != next_value && next_value[-1] == ' ')
2252 next_value[-1] = '\0';
2253
2254 if (*next_value != '\0')
2255 *next_value++ = '\0';
2256
2257 /* add the key:value pair */
2258 if (*value != '\0') {
2259 if (field != GST_RTSP_HDR_INVALID)
2260 gst_rtsp_message_add_header (msg, field, value);
2261 else
2262 gst_rtsp_message_add_header_by_name (msg, field_name, value);
2263 }
2264
2265 value = next_value;
2266 }
2267
2268 return GST_RTSP_OK;
2269
2270 /* ERRORS */
2271 parse_error:
2272 {
2273 return GST_RTSP_EPARSE;
2274 }
2275 }
2276
2277 /* convert all consecutive whitespace to a single space */
2278 static void
normalize_line(guint8 * buffer)2279 normalize_line (guint8 * buffer)
2280 {
2281 while (*buffer) {
2282 if (g_ascii_isspace (*buffer)) {
2283 guint8 *tmp;
2284
2285 *buffer++ = ' ';
2286 for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) {
2287 }
2288 if (buffer != tmp)
2289 memmove (buffer, tmp, strlen ((gchar *) tmp) + 1);
2290 } else {
2291 buffer++;
2292 }
2293 }
2294 }
2295
2296 static gboolean
cseq_validation(GstRTSPConnection * conn,GstRTSPMessage * message)2297 cseq_validation (GstRTSPConnection * conn, GstRTSPMessage * message)
2298 {
2299 gchar *cseq_header;
2300 gint64 cseq = 0;
2301 GstRTSPResult res;
2302
2303 if (message->type == GST_RTSP_MESSAGE_RESPONSE ||
2304 message->type == GST_RTSP_MESSAGE_REQUEST) {
2305 if ((res = gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ,
2306 &cseq_header, 0)) != GST_RTSP_OK) {
2307 /* rfc2326 This field MUST be present in all RTSP req and resp */
2308 goto invalid_format;
2309 }
2310
2311 errno = 0;
2312 cseq = g_ascii_strtoll (cseq_header, NULL, 10);
2313 if (errno != 0 || cseq < 0) {
2314 /* CSeq has no valid value */
2315 goto invalid_format;
2316 }
2317
2318 if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2319 (conn->cseq == 0 || conn->cseq < cseq)) {
2320 /* Response CSeq can't be higher than the number of outgoing requests
2321 * neither is a response valid if no request has been made */
2322 goto invalid_format;
2323 }
2324 }
2325 return GST_RTSP_OK;
2326
2327 invalid_format:
2328 {
2329 return GST_RTSP_EPARSE;
2330 }
2331 }
2332
2333 /* returns:
2334 * GST_RTSP_OK when a complete message was read.
2335 * GST_RTSP_EEOF: when the read socket is closed
2336 * GST_RTSP_EINTR: when more data is needed.
2337 * GST_RTSP_..: some other error occured.
2338 */
2339 static GstRTSPResult
build_next(GstRTSPBuilder * builder,GstRTSPMessage * message,GstRTSPConnection * conn,gboolean block)2340 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
2341 GstRTSPConnection * conn, gboolean block)
2342 {
2343 GstRTSPResult res;
2344
2345 while (TRUE) {
2346 switch (builder->state) {
2347 case STATE_START:
2348 {
2349 guint8 c;
2350
2351 builder->offset = 0;
2352 res =
2353 read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
2354 block);
2355 if (res != GST_RTSP_OK)
2356 goto done;
2357
2358 c = builder->buffer[0];
2359
2360 /* we have 1 bytes now and we can see if this is a data message or
2361 * not */
2362 if (c == '$') {
2363 /* data message, prepare for the header */
2364 builder->state = STATE_DATA_HEADER;
2365 conn->may_cancel = FALSE;
2366 } else if (c == '\n' || c == '\r') {
2367 /* skip \n and \r */
2368 builder->offset = 0;
2369 } else {
2370 builder->line = 0;
2371 builder->state = STATE_READ_LINES;
2372 conn->may_cancel = FALSE;
2373 }
2374 break;
2375 }
2376 case STATE_DATA_HEADER:
2377 {
2378 res =
2379 read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
2380 block);
2381 if (res != GST_RTSP_OK)
2382 goto done;
2383
2384 gst_rtsp_message_init_data (message, builder->buffer[1]);
2385
2386 builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
2387 builder->body_data = g_malloc (builder->body_len + 1);
2388 builder->body_data[builder->body_len] = '\0';
2389 builder->offset = 0;
2390 builder->state = STATE_DATA_BODY;
2391 break;
2392 }
2393 case STATE_DATA_BODY:
2394 {
2395 res =
2396 read_bytes (conn, builder->body_data, &builder->offset,
2397 builder->body_len, block);
2398 if (res != GST_RTSP_OK)
2399 goto done;
2400
2401 /* we have the complete body now, store in the message adjusting the
2402 * length to include the trailing '\0' */
2403 gst_rtsp_message_take_body (message,
2404 (guint8 *) builder->body_data, builder->body_len + 1);
2405 builder->body_data = NULL;
2406 builder->body_len = 0;
2407
2408 builder->state = STATE_END;
2409 break;
2410 }
2411 case STATE_READ_LINES:
2412 {
2413 res = read_line (conn, builder->buffer, &builder->offset,
2414 sizeof (builder->buffer), block);
2415 if (res != GST_RTSP_OK)
2416 goto done;
2417
2418 /* we have a regular response */
2419 if (builder->buffer[0] == '\0') {
2420 gchar *hdrval;
2421
2422 /* empty line, end of message header */
2423 /* see if there is a Content-Length header, but ignore it if this
2424 * is a POST request with an x-sessioncookie header */
2425 if (gst_rtsp_message_get_header (message,
2426 GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK &&
2427 (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST ||
2428 message->type_data.request.method != GST_RTSP_POST ||
2429 gst_rtsp_message_get_header (message,
2430 GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) {
2431 /* there is, prepare to read the body */
2432 builder->body_len = atol (hdrval);
2433 builder->body_data = g_try_malloc (builder->body_len + 1);
2434 /* we can't do much here, we need the length to know how many bytes
2435 * we need to read next and when allocation fails, something is
2436 * probably wrong with the length. */
2437 if (builder->body_data == NULL)
2438 goto invalid_body_len;
2439
2440 builder->body_data[builder->body_len] = '\0';
2441 builder->offset = 0;
2442 builder->state = STATE_DATA_BODY;
2443 } else {
2444 builder->state = STATE_END;
2445 }
2446 break;
2447 }
2448
2449 /* we have a line */
2450 normalize_line (builder->buffer);
2451 if (builder->line == 0) {
2452 /* first line, check for response status */
2453 if (memcmp (builder->buffer, "RTSP", 4) == 0 ||
2454 memcmp (builder->buffer, "HTTP", 4) == 0) {
2455 builder->status = parse_response_status (builder->buffer, message);
2456 } else {
2457 builder->status = parse_request_line (builder->buffer, message);
2458 }
2459 } else {
2460 /* else just parse the line */
2461 res = parse_line (builder->buffer, message);
2462 if (res != GST_RTSP_OK)
2463 builder->status = res;
2464 }
2465 if (builder->status != GST_RTSP_OK) {
2466 res = builder->status;
2467 goto invalid_format;
2468 }
2469
2470 builder->line++;
2471 builder->offset = 0;
2472 break;
2473 }
2474 case STATE_END:
2475 {
2476 gchar *session_cookie;
2477 gchar *session_id;
2478
2479 conn->may_cancel = TRUE;
2480
2481 if ((res = cseq_validation (conn, message)) != GST_RTSP_OK) {
2482 /* message don't comply with rfc2326 regarding CSeq */
2483 goto invalid_format;
2484 }
2485
2486 if (message->type == GST_RTSP_MESSAGE_DATA) {
2487 /* data messages don't have headers */
2488 res = GST_RTSP_OK;
2489 goto done;
2490 }
2491
2492 /* save the tunnel session in the connection */
2493 if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST &&
2494 !conn->manual_http &&
2495 conn->tstate == TUNNEL_STATE_NONE &&
2496 gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE,
2497 &session_cookie, 0) == GST_RTSP_OK) {
2498 strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN);
2499 conn->tunnelid[TUNNELID_LEN - 1] = '\0';
2500 conn->tunneled = TRUE;
2501 }
2502
2503 /* save session id in the connection for further use */
2504 if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2505 gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
2506 &session_id, 0) == GST_RTSP_OK) {
2507 gint maxlen, i;
2508
2509 maxlen = sizeof (conn->session_id) - 1;
2510 /* the sessionid can have attributes marked with ;
2511 * Make sure we strip them */
2512 for (i = 0; i < maxlen && session_id[i] != '\0'; i++) {
2513 if (session_id[i] == ';') {
2514 maxlen = i;
2515 /* parse timeout */
2516 do {
2517 i++;
2518 } while (g_ascii_isspace (session_id[i]));
2519 if (g_str_has_prefix (&session_id[i], "timeout=")) {
2520 gint to;
2521
2522 /* if we parsed something valid, configure */
2523 if ((to = atoi (&session_id[i + 8])) > 0)
2524 conn->timeout = to;
2525 }
2526 break;
2527 }
2528 }
2529
2530 /* make sure to not overflow */
2531 if (conn->remember_session_id) {
2532 strncpy (conn->session_id, session_id, maxlen);
2533 conn->session_id[maxlen] = '\0';
2534 }
2535 }
2536 res = builder->status;
2537 goto done;
2538 }
2539 default:
2540 res = GST_RTSP_ERROR;
2541 goto done;
2542 }
2543 }
2544 done:
2545 conn->may_cancel = TRUE;
2546 return res;
2547
2548 /* ERRORS */
2549 invalid_body_len:
2550 {
2551 conn->may_cancel = TRUE;
2552 GST_DEBUG ("could not allocate body");
2553 return GST_RTSP_ERROR;
2554 }
2555 invalid_format:
2556 {
2557 conn->may_cancel = TRUE;
2558 GST_DEBUG ("could not parse");
2559 return res;
2560 }
2561 }
2562
2563 /**
2564 * gst_rtsp_connection_read:
2565 * @conn: a #GstRTSPConnection
2566 * @data: the data to read
2567 * @size: the size of @data
2568 * @timeout: a timeout value or %NULL
2569 *
2570 * Attempt to read @size bytes into @data from the connected @conn, blocking up to
2571 * the specified @timeout. @timeout can be %NULL, in which case this function
2572 * might block forever.
2573 *
2574 * This function can be cancelled with gst_rtsp_connection_flush().
2575 *
2576 * Returns: #GST_RTSP_OK on success.
2577 */
2578 GstRTSPResult
gst_rtsp_connection_read(GstRTSPConnection * conn,guint8 * data,guint size,GTimeVal * timeout)2579 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
2580 GTimeVal * timeout)
2581 {
2582 guint offset;
2583 GstClockTime to;
2584 GstRTSPResult res;
2585
2586 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2587 g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
2588 g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2589
2590 if (G_UNLIKELY (size == 0))
2591 return GST_RTSP_OK;
2592
2593 offset = 0;
2594
2595 /* configure timeout if any */
2596 to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
2597
2598 g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2599 res = read_bytes (conn, data, &offset, size, TRUE);
2600 g_socket_set_timeout (conn->read_socket, 0);
2601
2602 return res;
2603 }
2604
2605 static GstRTSPMessage *
gen_tunnel_reply(GstRTSPConnection * conn,GstRTSPStatusCode code,const GstRTSPMessage * request)2606 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code,
2607 const GstRTSPMessage * request)
2608 {
2609 GstRTSPMessage *msg;
2610 GstRTSPResult res;
2611
2612 if (gst_rtsp_status_as_text (code) == NULL)
2613 code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
2614
2615 GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request),
2616 no_message);
2617
2618 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER,
2619 "GStreamer RTSP Server");
2620 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close");
2621 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store");
2622 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
2623
2624 if (code == GST_RTSP_STS_OK) {
2625 /* add the local ip address to the tunnel reply, this is where the client
2626 * should send the POST request to */
2627 if (conn->local_ip)
2628 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
2629 conn->local_ip);
2630 gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
2631 "application/x-rtsp-tunnelled");
2632 }
2633
2634 return msg;
2635
2636 /* ERRORS */
2637 no_message:
2638 {
2639 return NULL;
2640 }
2641 }
2642
2643 /**
2644 * gst_rtsp_connection_receive:
2645 * @conn: a #GstRTSPConnection
2646 * @message: the message to read
2647 * @timeout: a timeout value or %NULL
2648 *
2649 * Attempt to read into @message from the connected @conn, blocking up to
2650 * the specified @timeout. @timeout can be %NULL, in which case this function
2651 * might block forever.
2652 *
2653 * This function can be cancelled with gst_rtsp_connection_flush().
2654 *
2655 * Returns: #GST_RTSP_OK on success.
2656 */
2657 GstRTSPResult
gst_rtsp_connection_receive(GstRTSPConnection * conn,GstRTSPMessage * message,GTimeVal * timeout)2658 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
2659 GTimeVal * timeout)
2660 {
2661 GstRTSPResult res;
2662 GstRTSPBuilder builder;
2663 GstClockTime to;
2664
2665 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2666 g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2667 g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2668
2669 /* configure timeout if any */
2670 to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
2671
2672 g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2673 memset (&builder, 0, sizeof (GstRTSPBuilder));
2674 res = build_next (&builder, message, conn, TRUE);
2675 g_socket_set_timeout (conn->read_socket, 0);
2676
2677 if (G_UNLIKELY (res != GST_RTSP_OK))
2678 goto read_error;
2679
2680 if (!conn->manual_http) {
2681 if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2682 if (conn->tstate == TUNNEL_STATE_NONE &&
2683 message->type_data.request.method == GST_RTSP_GET) {
2684 GstRTSPMessage *response;
2685
2686 conn->tstate = TUNNEL_STATE_GET;
2687
2688 /* tunnel GET request, we can reply now */
2689 response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
2690 res = gst_rtsp_connection_send (conn, response, timeout);
2691 gst_rtsp_message_free (response);
2692 if (res == GST_RTSP_OK)
2693 res = GST_RTSP_ETGET;
2694 goto cleanup;
2695 } else if (conn->tstate == TUNNEL_STATE_NONE &&
2696 message->type_data.request.method == GST_RTSP_POST) {
2697 conn->tstate = TUNNEL_STATE_POST;
2698
2699 /* tunnel POST request, the caller now has to link the two
2700 * connections. */
2701 res = GST_RTSP_ETPOST;
2702 goto cleanup;
2703 } else {
2704 res = GST_RTSP_EPARSE;
2705 goto cleanup;
2706 }
2707 } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
2708 res = GST_RTSP_EPARSE;
2709 goto cleanup;
2710 }
2711 }
2712
2713 /* we have a message here */
2714 build_reset (&builder);
2715
2716 return GST_RTSP_OK;
2717
2718 /* ERRORS */
2719 read_error:
2720 cleanup:
2721 {
2722 build_reset (&builder);
2723 gst_rtsp_message_unset (message);
2724 return res;
2725 }
2726 }
2727
2728 /**
2729 * gst_rtsp_connection_close:
2730 * @conn: a #GstRTSPConnection
2731 *
2732 * Close the connected @conn. After this call, the connection is in the same
2733 * state as when it was first created.
2734 *
2735 * Returns: #GST_RTSP_OK on success.
2736 */
2737 GstRTSPResult
gst_rtsp_connection_close(GstRTSPConnection * conn)2738 gst_rtsp_connection_close (GstRTSPConnection * conn)
2739 {
2740 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2741
2742 /* last unref closes the connection we don't want to explicitly close here
2743 * because these sockets might have been provided at construction */
2744 if (conn->stream0) {
2745 g_object_unref (conn->stream0);
2746 conn->stream0 = NULL;
2747 conn->socket0 = NULL;
2748 }
2749 if (conn->stream1) {
2750 g_object_unref (conn->stream1);
2751 conn->stream1 = NULL;
2752 conn->socket1 = NULL;
2753 }
2754
2755 /* these were owned by the stream */
2756 conn->input_stream = NULL;
2757 conn->output_stream = NULL;
2758 conn->control_stream = NULL;
2759
2760 g_free (conn->remote_ip);
2761 conn->remote_ip = NULL;
2762 g_free (conn->local_ip);
2763 conn->local_ip = NULL;
2764
2765 conn->read_ahead = 0;
2766
2767 g_free (conn->initial_buffer);
2768 conn->initial_buffer = NULL;
2769 conn->initial_buffer_offset = 0;
2770
2771 conn->write_socket = NULL;
2772 conn->read_socket = NULL;
2773 conn->tunneled = FALSE;
2774 conn->tstate = TUNNEL_STATE_NONE;
2775 conn->ctxp = NULL;
2776 g_free (conn->username);
2777 conn->username = NULL;
2778 g_free (conn->passwd);
2779 conn->passwd = NULL;
2780 gst_rtsp_connection_clear_auth_params (conn);
2781 conn->timeout = 60;
2782 conn->cseq = 0;
2783 conn->session_id[0] = '\0';
2784
2785 return GST_RTSP_OK;
2786 }
2787
2788 /**
2789 * gst_rtsp_connection_free:
2790 * @conn: a #GstRTSPConnection
2791 *
2792 * Close and free @conn.
2793 *
2794 * Returns: #GST_RTSP_OK on success.
2795 */
2796 GstRTSPResult
gst_rtsp_connection_free(GstRTSPConnection * conn)2797 gst_rtsp_connection_free (GstRTSPConnection * conn)
2798 {
2799 GstRTSPResult res;
2800
2801 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2802
2803 res = gst_rtsp_connection_close (conn);
2804
2805 if (conn->cancellable)
2806 g_object_unref (conn->cancellable);
2807 if (conn->client)
2808 g_object_unref (conn->client);
2809 if (conn->tls_database)
2810 g_object_unref (conn->tls_database);
2811 if (conn->tls_interaction)
2812 g_object_unref (conn->tls_interaction);
2813 if (conn->accept_certificate_destroy_notify)
2814 conn->
2815 accept_certificate_destroy_notify (conn->accept_certificate_user_data);
2816
2817 g_timer_destroy (conn->timer);
2818 gst_rtsp_url_free (conn->url);
2819 g_free (conn->proxy_host);
2820 g_free (conn);
2821
2822 return res;
2823 }
2824
2825 /**
2826 * gst_rtsp_connection_poll:
2827 * @conn: a #GstRTSPConnection
2828 * @events: a bitmask of #GstRTSPEvent flags to check
2829 * @revents: location for result flags
2830 * @timeout: a timeout
2831 *
2832 * Wait up to the specified @timeout for the connection to become available for
2833 * at least one of the operations specified in @events. When the function returns
2834 * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2835 * @conn.
2836 *
2837 * @timeout can be %NULL, in which case this function might block forever.
2838 *
2839 * This function can be cancelled with gst_rtsp_connection_flush().
2840 *
2841 * Returns: #GST_RTSP_OK on success.
2842 */
2843 GstRTSPResult
gst_rtsp_connection_poll(GstRTSPConnection * conn,GstRTSPEvent events,GstRTSPEvent * revents,GTimeVal * timeout)2844 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
2845 GstRTSPEvent * revents, GTimeVal * timeout)
2846 {
2847 GMainContext *ctx;
2848 GSource *rs, *ws, *ts;
2849 GIOCondition condition;
2850
2851 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2852 g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2853 g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2854 g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2855 g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
2856
2857 ctx = g_main_context_new ();
2858
2859 /* configure timeout if any */
2860 if (timeout) {
2861 ts = g_timeout_source_new (GST_TIMEVAL_TO_TIME (*timeout) / GST_MSECOND);
2862 g_source_set_dummy_callback (ts);
2863 g_source_attach (ts, ctx);
2864 g_source_unref (ts);
2865 }
2866
2867 if (events & GST_RTSP_EV_READ) {
2868 rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
2869 conn->cancellable);
2870 g_source_set_dummy_callback (rs);
2871 g_source_attach (rs, ctx);
2872 g_source_unref (rs);
2873 }
2874
2875 if (events & GST_RTSP_EV_WRITE) {
2876 ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
2877 conn->cancellable);
2878 g_source_set_dummy_callback (ws);
2879 g_source_attach (ws, ctx);
2880 g_source_unref (ws);
2881 }
2882
2883 /* Returns after handling all pending events */
2884 while (!g_main_context_iteration (ctx, TRUE));
2885
2886 g_main_context_unref (ctx);
2887
2888 *revents = 0;
2889 if (events & GST_RTSP_EV_READ) {
2890 condition = g_socket_condition_check (conn->read_socket,
2891 G_IO_IN | G_IO_PRI);
2892 if ((condition & G_IO_IN) || (condition & G_IO_PRI))
2893 *revents |= GST_RTSP_EV_READ;
2894 }
2895 if (events & GST_RTSP_EV_WRITE) {
2896 condition = g_socket_condition_check (conn->write_socket, G_IO_OUT);
2897 if ((condition & G_IO_OUT))
2898 *revents |= GST_RTSP_EV_WRITE;
2899 }
2900
2901 if (*revents == 0)
2902 return GST_RTSP_ETIMEOUT;
2903
2904 return GST_RTSP_OK;
2905 }
2906
2907 /**
2908 * gst_rtsp_connection_next_timeout:
2909 * @conn: a #GstRTSPConnection
2910 * @timeout: a timeout
2911 *
2912 * Calculate the next timeout for @conn, storing the result in @timeout.
2913 *
2914 * Returns: #GST_RTSP_OK.
2915 */
2916 GstRTSPResult
gst_rtsp_connection_next_timeout(GstRTSPConnection * conn,GTimeVal * timeout)2917 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
2918 {
2919 gdouble elapsed;
2920 glong sec;
2921 gulong usec;
2922 gint ctimeout;
2923
2924 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2925 g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
2926
2927 ctimeout = conn->timeout;
2928 if (ctimeout >= 20) {
2929 /* Because we should act before the timeout we timeout 5
2930 * seconds in advance. */
2931 ctimeout -= 5;
2932 } else if (ctimeout >= 5) {
2933 /* else timeout 20% earlier */
2934 ctimeout -= ctimeout / 5;
2935 } else if (ctimeout >= 1) {
2936 /* else timeout 1 second earlier */
2937 ctimeout -= 1;
2938 }
2939
2940 elapsed = g_timer_elapsed (conn->timer, &usec);
2941 if (elapsed >= ctimeout) {
2942 sec = 0;
2943 usec = 0;
2944 } else {
2945 sec = ctimeout - elapsed;
2946 if (usec <= G_USEC_PER_SEC)
2947 usec = G_USEC_PER_SEC - usec;
2948 else
2949 usec = 0;
2950 }
2951
2952 timeout->tv_sec = sec;
2953 timeout->tv_usec = usec;
2954
2955 return GST_RTSP_OK;
2956 }
2957
2958 /**
2959 * gst_rtsp_connection_reset_timeout:
2960 * @conn: a #GstRTSPConnection
2961 *
2962 * Reset the timeout of @conn.
2963 *
2964 * Returns: #GST_RTSP_OK.
2965 */
2966 GstRTSPResult
gst_rtsp_connection_reset_timeout(GstRTSPConnection * conn)2967 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
2968 {
2969 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2970
2971 g_timer_start (conn->timer);
2972
2973 return GST_RTSP_OK;
2974 }
2975
2976 /**
2977 * gst_rtsp_connection_flush:
2978 * @conn: a #GstRTSPConnection
2979 * @flush: start or stop the flush
2980 *
2981 * Start or stop the flushing action on @conn. When flushing, all current
2982 * and future actions on @conn will return #GST_RTSP_EINTR until the connection
2983 * is set to non-flushing mode again.
2984 *
2985 * Returns: #GST_RTSP_OK.
2986 */
2987 GstRTSPResult
gst_rtsp_connection_flush(GstRTSPConnection * conn,gboolean flush)2988 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
2989 {
2990 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2991
2992 if (flush) {
2993 g_cancellable_cancel (conn->cancellable);
2994 } else {
2995 g_object_unref (conn->cancellable);
2996 conn->cancellable = g_cancellable_new ();
2997 }
2998
2999 return GST_RTSP_OK;
3000 }
3001
3002 /**
3003 * gst_rtsp_connection_set_proxy:
3004 * @conn: a #GstRTSPConnection
3005 * @host: the proxy host
3006 * @port: the proxy port
3007 *
3008 * Set the proxy host and port.
3009 *
3010 * Returns: #GST_RTSP_OK.
3011 */
3012 GstRTSPResult
gst_rtsp_connection_set_proxy(GstRTSPConnection * conn,const gchar * host,guint port)3013 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
3014 const gchar * host, guint port)
3015 {
3016 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3017
3018 g_free (conn->proxy_host);
3019 conn->proxy_host = g_strdup (host);
3020 conn->proxy_port = port;
3021
3022 return GST_RTSP_OK;
3023 }
3024
3025 /**
3026 * gst_rtsp_connection_set_auth:
3027 * @conn: a #GstRTSPConnection
3028 * @method: authentication method
3029 * @user: the user
3030 * @pass: the password
3031 *
3032 * Configure @conn for authentication mode @method with @user and @pass as the
3033 * user and password respectively.
3034 *
3035 * Returns: #GST_RTSP_OK.
3036 */
3037 GstRTSPResult
gst_rtsp_connection_set_auth(GstRTSPConnection * conn,GstRTSPAuthMethod method,const gchar * user,const gchar * pass)3038 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
3039 GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
3040 {
3041 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3042
3043 if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
3044 || g_strrstr (user, ":") != NULL))
3045 return GST_RTSP_EINVAL;
3046
3047 /* Make sure the username and passwd are being set for authentication */
3048 if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
3049 return GST_RTSP_EINVAL;
3050
3051 /* ":" chars are not allowed in usernames for basic auth */
3052 if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
3053 return GST_RTSP_EINVAL;
3054
3055 g_free (conn->username);
3056 g_free (conn->passwd);
3057
3058 conn->auth_method = method;
3059 conn->username = g_strdup (user);
3060 conn->passwd = g_strdup (pass);
3061
3062 return GST_RTSP_OK;
3063 }
3064
3065 /**
3066 * str_case_hash:
3067 * @key: ASCII string to hash
3068 *
3069 * Hashes @key in a case-insensitive manner.
3070 *
3071 * Returns: the hash code.
3072 **/
3073 static guint
str_case_hash(gconstpointer key)3074 str_case_hash (gconstpointer key)
3075 {
3076 const char *p = key;
3077 guint h = g_ascii_toupper (*p);
3078
3079 if (h)
3080 for (p += 1; *p != '\0'; p++)
3081 h = (h << 5) - h + g_ascii_toupper (*p);
3082
3083 return h;
3084 }
3085
3086 /**
3087 * str_case_equal:
3088 * @v1: an ASCII string
3089 * @v2: another ASCII string
3090 *
3091 * Compares @v1 and @v2 in a case-insensitive manner
3092 *
3093 * Returns: %TRUE if they are equal (modulo case)
3094 **/
3095 static gboolean
str_case_equal(gconstpointer v1,gconstpointer v2)3096 str_case_equal (gconstpointer v1, gconstpointer v2)
3097 {
3098 const char *string1 = v1;
3099 const char *string2 = v2;
3100
3101 return g_ascii_strcasecmp (string1, string2) == 0;
3102 }
3103
3104 /**
3105 * gst_rtsp_connection_set_auth_param:
3106 * @conn: a #GstRTSPConnection
3107 * @param: authentication directive
3108 * @value: value
3109 *
3110 * Setup @conn with authentication directives. This is not necesary for
3111 * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
3112 * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
3113 * in the WWW-Authenticate response header and can include realm, domain,
3114 * nonce, opaque, stale, algorithm, qop as per RFC2617.
3115 */
3116 void
gst_rtsp_connection_set_auth_param(GstRTSPConnection * conn,const gchar * param,const gchar * value)3117 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
3118 const gchar * param, const gchar * value)
3119 {
3120 g_return_if_fail (conn != NULL);
3121 g_return_if_fail (param != NULL);
3122
3123 if (conn->auth_params == NULL) {
3124 conn->auth_params =
3125 g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
3126 }
3127 g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
3128 }
3129
3130 /**
3131 * gst_rtsp_connection_clear_auth_params:
3132 * @conn: a #GstRTSPConnection
3133 *
3134 * Clear the list of authentication directives stored in @conn.
3135 */
3136 void
gst_rtsp_connection_clear_auth_params(GstRTSPConnection * conn)3137 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
3138 {
3139 g_return_if_fail (conn != NULL);
3140
3141 if (conn->auth_params != NULL) {
3142 g_hash_table_destroy (conn->auth_params);
3143 conn->auth_params = NULL;
3144 }
3145 }
3146
3147 static GstRTSPResult
set_qos_dscp(GSocket * socket,guint qos_dscp)3148 set_qos_dscp (GSocket * socket, guint qos_dscp)
3149 {
3150 #ifndef IP_TOS
3151 GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
3152 return GST_RTSP_OK;
3153 #else
3154 gint fd;
3155 union gst_sockaddr sa;
3156 socklen_t slen = sizeof (sa);
3157 gint af;
3158 gint tos;
3159
3160 if (!socket)
3161 return GST_RTSP_OK;
3162
3163 fd = g_socket_get_fd (socket);
3164 if (getsockname (fd, &sa.sa, &slen) < 0)
3165 goto no_getsockname;
3166
3167 af = sa.sa.sa_family;
3168
3169 /* if this is an IPv4-mapped address then do IPv4 QoS */
3170 if (af == AF_INET6) {
3171 if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
3172 af = AF_INET;
3173 }
3174
3175 /* extract and shift 6 bits of the DSCP */
3176 tos = (qos_dscp & 0x3f) << 2;
3177
3178 #ifdef G_OS_WIN32
3179 # define SETSOCKOPT_ARG4_TYPE const char *
3180 #else
3181 # define SETSOCKOPT_ARG4_TYPE const void *
3182 #endif
3183
3184 switch (af) {
3185 case AF_INET:
3186 if (setsockopt (fd, IPPROTO_IP, IP_TOS, (SETSOCKOPT_ARG4_TYPE) & tos,
3187 sizeof (tos)) < 0)
3188 goto no_setsockopt;
3189 break;
3190 case AF_INET6:
3191 #ifdef IPV6_TCLASS
3192 if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS,
3193 (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0)
3194 goto no_setsockopt;
3195 break;
3196 #endif
3197 default:
3198 goto wrong_family;
3199 }
3200
3201 return GST_RTSP_OK;
3202
3203 /* ERRORS */
3204 no_getsockname:
3205 no_setsockopt:
3206 {
3207 return GST_RTSP_ESYS;
3208 }
3209 wrong_family:
3210 {
3211 return GST_RTSP_ERROR;
3212 }
3213 #endif
3214 }
3215
3216 /**
3217 * gst_rtsp_connection_set_qos_dscp:
3218 * @conn: a #GstRTSPConnection
3219 * @qos_dscp: DSCP value
3220 *
3221 * Configure @conn to use the specified DSCP value.
3222 *
3223 * Returns: #GST_RTSP_OK on success.
3224 */
3225 GstRTSPResult
gst_rtsp_connection_set_qos_dscp(GstRTSPConnection * conn,guint qos_dscp)3226 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
3227 {
3228 GstRTSPResult res;
3229
3230 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3231 g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
3232 g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
3233
3234 res = set_qos_dscp (conn->socket0, qos_dscp);
3235 if (res == GST_RTSP_OK)
3236 res = set_qos_dscp (conn->socket1, qos_dscp);
3237
3238 return res;
3239 }
3240
3241
3242 /**
3243 * gst_rtsp_connection_get_url:
3244 * @conn: a #GstRTSPConnection
3245 *
3246 * Retrieve the URL of the other end of @conn.
3247 *
3248 * Returns: The URL. This value remains valid until the
3249 * connection is freed.
3250 */
3251 GstRTSPUrl *
gst_rtsp_connection_get_url(const GstRTSPConnection * conn)3252 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
3253 {
3254 g_return_val_if_fail (conn != NULL, NULL);
3255
3256 return conn->url;
3257 }
3258
3259 /**
3260 * gst_rtsp_connection_get_ip:
3261 * @conn: a #GstRTSPConnection
3262 *
3263 * Retrieve the IP address of the other end of @conn.
3264 *
3265 * Returns: The IP address as a string. this value remains valid until the
3266 * connection is closed.
3267 */
3268 const gchar *
gst_rtsp_connection_get_ip(const GstRTSPConnection * conn)3269 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
3270 {
3271 g_return_val_if_fail (conn != NULL, NULL);
3272
3273 return conn->remote_ip;
3274 }
3275
3276 /**
3277 * gst_rtsp_connection_set_ip:
3278 * @conn: a #GstRTSPConnection
3279 * @ip: an ip address
3280 *
3281 * Set the IP address of the server.
3282 */
3283 void
gst_rtsp_connection_set_ip(GstRTSPConnection * conn,const gchar * ip)3284 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
3285 {
3286 g_return_if_fail (conn != NULL);
3287
3288 g_free (conn->remote_ip);
3289 conn->remote_ip = g_strdup (ip);
3290 }
3291
3292 /**
3293 * gst_rtsp_connection_get_read_socket:
3294 * @conn: a #GstRTSPConnection
3295 *
3296 * Get the file descriptor for reading.
3297 *
3298 * Returns: (transfer none): the file descriptor used for reading or %NULL on
3299 * error. The file descriptor remains valid until the connection is closed.
3300 */
3301 GSocket *
gst_rtsp_connection_get_read_socket(const GstRTSPConnection * conn)3302 gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
3303 {
3304 g_return_val_if_fail (conn != NULL, NULL);
3305 g_return_val_if_fail (conn->read_socket != NULL, NULL);
3306
3307 return conn->read_socket;
3308 }
3309
3310 /**
3311 * gst_rtsp_connection_get_write_socket:
3312 * @conn: a #GstRTSPConnection
3313 *
3314 * Get the file descriptor for writing.
3315 *
3316 * Returns: (transfer none): the file descriptor used for writing or NULL on
3317 * error. The file descriptor remains valid until the connection is closed.
3318 */
3319 GSocket *
gst_rtsp_connection_get_write_socket(const GstRTSPConnection * conn)3320 gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
3321 {
3322 g_return_val_if_fail (conn != NULL, NULL);
3323 g_return_val_if_fail (conn->write_socket != NULL, NULL);
3324
3325 return conn->write_socket;
3326 }
3327
3328 /**
3329 * gst_rtsp_connection_set_http_mode:
3330 * @conn: a #GstRTSPConnection
3331 * @enable: %TRUE to enable manual HTTP mode
3332 *
3333 * By setting the HTTP mode to %TRUE the message parsing will support HTTP
3334 * messages in addition to the RTSP messages. It will also disable the
3335 * automatic handling of setting up an HTTP tunnel.
3336 */
3337 void
gst_rtsp_connection_set_http_mode(GstRTSPConnection * conn,gboolean enable)3338 gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable)
3339 {
3340 g_return_if_fail (conn != NULL);
3341
3342 conn->manual_http = enable;
3343 }
3344
3345 /**
3346 * gst_rtsp_connection_set_tunneled:
3347 * @conn: a #GstRTSPConnection
3348 * @tunneled: the new state
3349 *
3350 * Set the HTTP tunneling state of the connection. This must be configured before
3351 * the @conn is connected.
3352 */
3353 void
gst_rtsp_connection_set_tunneled(GstRTSPConnection * conn,gboolean tunneled)3354 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
3355 {
3356 g_return_if_fail (conn != NULL);
3357 g_return_if_fail (conn->read_socket == NULL);
3358 g_return_if_fail (conn->write_socket == NULL);
3359
3360 conn->tunneled = tunneled;
3361 }
3362
3363 /**
3364 * gst_rtsp_connection_is_tunneled:
3365 * @conn: a #GstRTSPConnection
3366 *
3367 * Get the tunneling state of the connection.
3368 *
3369 * Returns: if @conn is using HTTP tunneling.
3370 */
3371 gboolean
gst_rtsp_connection_is_tunneled(const GstRTSPConnection * conn)3372 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
3373 {
3374 g_return_val_if_fail (conn != NULL, FALSE);
3375
3376 return conn->tunneled;
3377 }
3378
3379 /**
3380 * gst_rtsp_connection_get_tunnelid:
3381 * @conn: a #GstRTSPConnection
3382 *
3383 * Get the tunnel session id the connection.
3384 *
3385 * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
3386 */
3387 const gchar *
gst_rtsp_connection_get_tunnelid(const GstRTSPConnection * conn)3388 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
3389 {
3390 g_return_val_if_fail (conn != NULL, NULL);
3391
3392 if (!conn->tunneled)
3393 return NULL;
3394
3395 return conn->tunnelid;
3396 }
3397
3398 /**
3399 * gst_rtsp_connection_do_tunnel:
3400 * @conn: a #GstRTSPConnection
3401 * @conn2: a #GstRTSPConnection or %NULL
3402 *
3403 * If @conn received the first tunnel connection and @conn2 received
3404 * the second tunnel connection, link the two connections together so that
3405 * @conn manages the tunneled connection.
3406 *
3407 * After this call, @conn2 cannot be used anymore and must be freed with
3408 * gst_rtsp_connection_free().
3409 *
3410 * If @conn2 is %NULL then only the base64 decoding context will be setup for
3411 * @conn.
3412 *
3413 * Returns: return GST_RTSP_OK on success.
3414 */
3415 GstRTSPResult
gst_rtsp_connection_do_tunnel(GstRTSPConnection * conn,GstRTSPConnection * conn2)3416 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
3417 GstRTSPConnection * conn2)
3418 {
3419 g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3420
3421 if (conn2 != NULL) {
3422 GstRTSPTunnelState ts1 = conn->tstate;
3423 GstRTSPTunnelState ts2 = conn2->tstate;
3424
3425 g_return_val_if_fail ((ts1 == TUNNEL_STATE_GET && ts2 == TUNNEL_STATE_POST)
3426 || (ts1 == TUNNEL_STATE_POST && ts2 == TUNNEL_STATE_GET),
3427 GST_RTSP_EINVAL);
3428 g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid,
3429 TUNNELID_LEN), GST_RTSP_EINVAL);
3430
3431 /* both connections have socket0 as the read/write socket */
3432 if (ts1 == TUNNEL_STATE_GET) {
3433 /* conn2 is the HTTP POST channel. take its socket and set it as read
3434 * socket in conn */
3435 conn->socket1 = conn2->socket0;
3436 conn->stream1 = conn2->stream0;
3437 conn->input_stream = conn2->input_stream;
3438 conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3439 conn2->output_stream = NULL;
3440 } else {
3441 /* conn2 is the HTTP GET channel. take its socket and set it as write
3442 * socket in conn */
3443 conn->socket1 = conn->socket0;
3444 conn->stream1 = conn->stream0;
3445 conn->socket0 = conn2->socket0;
3446 conn->stream0 = conn2->stream0;
3447 conn->output_stream = conn2->output_stream;
3448 conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3449 }
3450
3451 /* clean up some of the state of conn2 */
3452 g_cancellable_cancel (conn2->cancellable);
3453 conn2->write_socket = conn2->read_socket = NULL;
3454 conn2->socket0 = NULL;
3455 conn2->stream0 = NULL;
3456 conn2->socket1 = NULL;
3457 conn2->stream1 = NULL;
3458 conn2->input_stream = NULL;
3459 conn2->control_stream = NULL;
3460 g_object_unref (conn2->cancellable);
3461 conn2->cancellable = NULL;
3462
3463 /* We make socket0 the write socket and socket1 the read socket. */
3464 conn->write_socket = conn->socket0;
3465 conn->read_socket = conn->socket1;
3466
3467 conn->tstate = TUNNEL_STATE_COMPLETE;
3468
3469 g_free (conn->initial_buffer);
3470 conn->initial_buffer = conn2->initial_buffer;
3471 conn2->initial_buffer = NULL;
3472 conn->initial_buffer_offset = conn2->initial_buffer_offset;
3473 }
3474
3475 /* we need base64 decoding for the readfd */
3476 conn->ctx.state = 0;
3477 conn->ctx.save = 0;
3478 conn->ctx.cout = 0;
3479 conn->ctx.coutl = 0;
3480 conn->ctxp = &conn->ctx;
3481
3482 return GST_RTSP_OK;
3483 }
3484
3485 /**
3486 * gst_rtsp_connection_set_remember_session_id:
3487 * @conn: a #GstRTSPConnection
3488 * @remember: %TRUE if the connection should remember the session id
3489 *
3490 * Sets if the #GstRTSPConnection should remember the session id from the last
3491 * response received and force it onto any further requests.
3492 *
3493 * The default value is %TRUE
3494 */
3495
3496 void
gst_rtsp_connection_set_remember_session_id(GstRTSPConnection * conn,gboolean remember)3497 gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn,
3498 gboolean remember)
3499 {
3500 conn->remember_session_id = remember;
3501 if (!remember)
3502 conn->session_id[0] = '\0';
3503 }
3504
3505 /**
3506 * gst_rtsp_connection_get_remember_session_id:
3507 * @conn: a #GstRTSPConnection
3508 *
3509 * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the
3510 * last response to set it on any further request.
3511 */
3512
3513 gboolean
gst_rtsp_connection_get_remember_session_id(GstRTSPConnection * conn)3514 gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
3515 {
3516 return conn->remember_session_id;
3517 }
3518
3519
3520 #define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3521 #define READ_COND (G_IO_IN | READ_ERR)
3522 #define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3523 #define WRITE_COND (G_IO_OUT | WRITE_ERR)
3524
3525 /* async functions */
3526 struct _GstRTSPWatch
3527 {
3528 GSource source;
3529
3530 GstRTSPConnection *conn;
3531
3532 GstRTSPBuilder builder;
3533 GstRTSPMessage message;
3534
3535 GSource *readsrc;
3536 GSource *writesrc;
3537 GSource *controlsrc;
3538
3539 gboolean keep_running;
3540
3541 /* queued message for transmission */
3542 guint id;
3543 GMutex mutex;
3544 GstQueueArray *messages;
3545 gsize messages_bytes;
3546 guint messages_count;
3547
3548 gsize max_bytes;
3549 guint max_messages;
3550 GCond queue_not_full;
3551 gboolean flushing;
3552
3553 GstRTSPWatchFuncs funcs;
3554
3555 gpointer user_data;
3556 GDestroyNotify notify;
3557 };
3558
3559 #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
3560 ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
3561
3562 static gboolean
gst_rtsp_source_prepare(GSource * source,gint * timeout)3563 gst_rtsp_source_prepare (GSource * source, gint * timeout)
3564 {
3565 GstRTSPWatch *watch = (GstRTSPWatch *) source;
3566
3567 if (watch->conn->initial_buffer != NULL)
3568 return TRUE;
3569
3570 *timeout = (watch->conn->timeout * 1000);
3571
3572 return FALSE;
3573 }
3574
3575 static gboolean
gst_rtsp_source_check(GSource * source)3576 gst_rtsp_source_check (GSource * source)
3577 {
3578 return FALSE;
3579 }
3580
3581 static gboolean
gst_rtsp_source_dispatch_read_get_channel(GPollableInputStream * stream,GstRTSPWatch * watch)3582 gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
3583 GstRTSPWatch * watch)
3584 {
3585 gssize count;
3586 guint8 buffer[1024];
3587 GError *error = NULL;
3588
3589 /* try to read in order to be able to detect errors, we read 1k in case some
3590 * client actually decides to send data on the GET channel */
3591 count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
3592 &error);
3593 if (count == 0) {
3594 /* other end closed the socket */
3595 goto eof;
3596 }
3597
3598 if (count < 0) {
3599 GST_DEBUG ("%s", error->message);
3600 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
3601 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
3602 g_clear_error (&error);
3603 goto done;
3604 }
3605 g_clear_error (&error);
3606 goto read_error;
3607 }
3608
3609 /* client sent data on the GET channel, ignore it */
3610
3611 done:
3612 return TRUE;
3613
3614 /* ERRORS */
3615 eof:
3616 {
3617 if (watch->funcs.closed)
3618 watch->funcs.closed (watch, watch->user_data);
3619
3620 /* the read connection was closed, stop the watch now */
3621 watch->keep_running = FALSE;
3622
3623 return FALSE;
3624 }
3625 read_error:
3626 {
3627 if (watch->funcs.error_full)
3628 watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
3629 0, watch->user_data);
3630 else if (watch->funcs.error)
3631 watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
3632
3633 goto eof;
3634 }
3635 }
3636
3637 static gboolean
gst_rtsp_source_dispatch_read(GPollableInputStream * stream,GstRTSPWatch * watch)3638 gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
3639 GstRTSPWatch * watch)
3640 {
3641 GstRTSPResult res = GST_RTSP_ERROR;
3642 GstRTSPConnection *conn = watch->conn;
3643
3644 /* if this connection was already closed, stop now */
3645 if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
3646 goto eof;
3647
3648 res = build_next (&watch->builder, &watch->message, conn, FALSE);
3649 if (res == GST_RTSP_EINTR)
3650 goto done;
3651 else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
3652 g_mutex_lock (&watch->mutex);
3653 if (watch->readsrc) {
3654 if (!g_source_is_destroyed ((GSource *) watch))
3655 g_source_remove_child_source ((GSource *) watch, watch->readsrc);
3656 g_source_unref (watch->readsrc);
3657 watch->readsrc = NULL;
3658 }
3659
3660 if (conn->stream1) {
3661 g_object_unref (conn->stream1);
3662 conn->stream1 = NULL;
3663 conn->socket1 = NULL;
3664 conn->input_stream = NULL;
3665 }
3666 g_mutex_unlock (&watch->mutex);
3667
3668 /* When we are in tunnelled mode, the read socket can be closed and we
3669 * should be prepared for a new POST method to reopen it */
3670 if (conn->tstate == TUNNEL_STATE_COMPLETE) {
3671 /* remove the read connection for the tunnel */
3672 /* we accept a new POST request */
3673 conn->tstate = TUNNEL_STATE_GET;
3674 /* and signal that we lost our tunnel */
3675 if (watch->funcs.tunnel_lost)
3676 res = watch->funcs.tunnel_lost (watch, watch->user_data);
3677 /* we add read source on the write socket able to detect when client closes get channel in tunneled mode */
3678 g_mutex_lock (&watch->mutex);
3679 if (watch->conn->control_stream && !watch->controlsrc) {
3680 watch->controlsrc =
3681 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3682 (watch->conn->control_stream), NULL);
3683 g_source_set_callback (watch->controlsrc,
3684 (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3685 NULL);
3686 g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3687 }
3688 g_mutex_unlock (&watch->mutex);
3689 goto read_done;
3690 } else
3691 goto eof;
3692 } else if (G_LIKELY (res == GST_RTSP_OK)) {
3693 if (!conn->manual_http &&
3694 watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3695 if (conn->tstate == TUNNEL_STATE_NONE &&
3696 watch->message.type_data.request.method == GST_RTSP_GET) {
3697 GstRTSPMessage *response;
3698 GstRTSPStatusCode code;
3699
3700 conn->tstate = TUNNEL_STATE_GET;
3701
3702 if (watch->funcs.tunnel_start)
3703 code = watch->funcs.tunnel_start (watch, watch->user_data);
3704 else
3705 code = GST_RTSP_STS_OK;
3706
3707 /* queue the response */
3708 response = gen_tunnel_reply (conn, code, &watch->message);
3709 if (watch->funcs.tunnel_http_response)
3710 watch->funcs.tunnel_http_response (watch, &watch->message, response,
3711 watch->user_data);
3712 gst_rtsp_watch_send_message (watch, response, NULL);
3713 gst_rtsp_message_free (response);
3714 goto read_done;
3715 } else if (conn->tstate == TUNNEL_STATE_NONE &&
3716 watch->message.type_data.request.method == GST_RTSP_POST) {
3717 conn->tstate = TUNNEL_STATE_POST;
3718
3719 /* in the callback the connection should be tunneled with the
3720 * GET connection */
3721 if (watch->funcs.tunnel_complete) {
3722 watch->funcs.tunnel_complete (watch, watch->user_data);
3723 }
3724 goto read_done;
3725 }
3726 }
3727 } else
3728 goto read_error;
3729
3730 if (!conn->manual_http) {
3731 /* if manual HTTP support is not enabled, then restore the message to
3732 * what it would have looked like without the support for parsing HTTP
3733 * messages being present */
3734 if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3735 watch->message.type = GST_RTSP_MESSAGE_REQUEST;
3736 watch->message.type_data.request.method = GST_RTSP_INVALID;
3737 if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
3738 watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
3739 res = GST_RTSP_EPARSE;
3740 } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
3741 watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
3742 if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
3743 watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
3744 res = GST_RTSP_EPARSE;
3745 }
3746 }
3747 if (G_LIKELY (res != GST_RTSP_OK))
3748 goto read_error;
3749
3750 if (watch->funcs.message_received)
3751 watch->funcs.message_received (watch, &watch->message, watch->user_data);
3752
3753 read_done:
3754 gst_rtsp_message_unset (&watch->message);
3755 build_reset (&watch->builder);
3756
3757 done:
3758 return TRUE;
3759
3760 /* ERRORS */
3761 eof:
3762 {
3763 if (watch->funcs.closed)
3764 watch->funcs.closed (watch, watch->user_data);
3765
3766 /* we closed the read connection, stop the watch now */
3767 watch->keep_running = FALSE;
3768
3769 /* always stop when the input returns EOF in non-tunneled mode */
3770 return FALSE;
3771 }
3772 read_error:
3773 {
3774 if (watch->funcs.error_full)
3775 watch->funcs.error_full (watch, res, &watch->message,
3776 0, watch->user_data);
3777 else if (watch->funcs.error)
3778 watch->funcs.error (watch, res, watch->user_data);
3779
3780 goto eof;
3781 }
3782 }
3783
3784 static gboolean
gst_rtsp_source_dispatch(GSource * source,GSourceFunc callback G_GNUC_UNUSED,gpointer user_data G_GNUC_UNUSED)3785 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
3786 gpointer user_data G_GNUC_UNUSED)
3787 {
3788 GstRTSPWatch *watch = (GstRTSPWatch *) source;
3789 GstRTSPConnection *conn = watch->conn;
3790
3791 if (conn->initial_buffer != NULL) {
3792 gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
3793 watch);
3794 }
3795 return watch->keep_running;
3796 }
3797
3798 static gboolean
gst_rtsp_source_dispatch_write(GPollableOutputStream * stream,GstRTSPWatch * watch)3799 gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
3800 GstRTSPWatch * watch)
3801 {
3802 GstRTSPResult res = GST_RTSP_ERROR;
3803 GstRTSPConnection *conn = watch->conn;
3804
3805 /* if this connection was already closed, stop now */
3806 if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3807 !watch->messages)
3808 goto eof;
3809
3810 g_mutex_lock (&watch->mutex);
3811 do {
3812 guint n_messages = gst_queue_array_get_length (watch->messages);
3813 GOutputVector *vectors;
3814 GstMapInfo *map_infos;
3815 guint *ids;
3816 gsize bytes_to_write, bytes_written;
3817 guint n_vectors, n_memories, n_ids, drop_messages;
3818 gint i, j, l, n_mmap;
3819 GstRTSPSerializedMessage *msg;
3820
3821 /* if this connection was already closed, stop now */
3822 if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3823 !watch->messages) {
3824 g_mutex_unlock (&watch->mutex);
3825 goto eof;
3826 }
3827
3828 if (n_messages == 0) {
3829 if (watch->writesrc) {
3830 if (!g_source_is_destroyed ((GSource *) watch))
3831 g_source_remove_child_source ((GSource *) watch, watch->writesrc);
3832 g_source_unref (watch->writesrc);
3833 watch->writesrc = NULL;
3834 /* we create and add the write source again when we actually have
3835 * something to write */
3836
3837 /* since write source is now removed we add read source on the write
3838 * socket instead to be able to detect when client closes get channel
3839 * in tunneled mode */
3840 if (watch->conn->control_stream) {
3841 watch->controlsrc =
3842 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3843 (watch->conn->control_stream), NULL);
3844 g_source_set_callback (watch->controlsrc,
3845 (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3846 NULL);
3847 g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3848 } else {
3849 watch->controlsrc = NULL;
3850 }
3851 }
3852 break;
3853 }
3854
3855 for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
3856 msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3857 if (msg->id != 0)
3858 n_ids++;
3859
3860 if (msg->data_offset < msg->data_size)
3861 n_vectors++;
3862
3863 if (msg->body_data && msg->body_offset < msg->body_data_size) {
3864 n_vectors++;
3865 } else if (msg->body_buffer) {
3866 guint m, n;
3867 guint offset = 0;
3868
3869 n = gst_buffer_n_memory (msg->body_buffer);
3870 for (m = 0; m < n; m++) {
3871 GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
3872
3873 /* Skip all memories we already wrote */
3874 if (offset + mem->size <= msg->body_offset) {
3875 offset += mem->size;
3876 continue;
3877 }
3878 offset += mem->size;
3879
3880 n_memories++;
3881 n_vectors++;
3882 }
3883 }
3884 }
3885
3886 vectors = g_newa (GOutputVector, n_vectors);
3887 map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
3888 ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
3889 if (ids)
3890 memset (ids, 0, sizeof (guint) * (n_ids + 1));
3891
3892 for (i = 0, j = 0, n_mmap = 0, l = 0, bytes_to_write = 0; i < n_messages;
3893 i++) {
3894 msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3895
3896 if (msg->data_offset < msg->data_size) {
3897 vectors[j].buffer = (msg->data_is_data_header ?
3898 msg->data_header : msg->data) + msg->data_offset;
3899 vectors[j].size = msg->data_size - msg->data_offset;
3900 bytes_to_write += vectors[j].size;
3901 j++;
3902 }
3903
3904 if (msg->body_data) {
3905 if (msg->body_offset < msg->body_data_size) {
3906 vectors[j].buffer = msg->body_data + msg->body_offset;
3907 vectors[j].size = msg->body_data_size - msg->body_offset;
3908 bytes_to_write += vectors[j].size;
3909 j++;
3910 }
3911 } else if (msg->body_buffer) {
3912 guint m, n;
3913 guint offset = 0;
3914 n = gst_buffer_n_memory (msg->body_buffer);
3915 for (m = 0; m < n; m++) {
3916 GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
3917 guint off;
3918
3919 /* Skip all memories we already wrote */
3920 if (offset + mem->size <= msg->body_offset) {
3921 offset += mem->size;
3922 continue;
3923 }
3924
3925 if (offset < msg->body_offset)
3926 off = msg->body_offset - offset;
3927 else
3928 off = 0;
3929
3930 offset += mem->size;
3931
3932 g_assert (off < mem->size);
3933
3934 gst_memory_map (mem, &map_infos[n_mmap], GST_MAP_READ);
3935 vectors[j].buffer = map_infos[n_mmap].data + off;
3936 vectors[j].size = map_infos[n_mmap].size - off;
3937 bytes_to_write += vectors[j].size;
3938
3939 n_mmap++;
3940 j++;
3941 }
3942 }
3943 }
3944
3945 res =
3946 writev_bytes (watch->conn->output_stream, vectors, n_vectors,
3947 &bytes_written, FALSE, watch->conn->cancellable);
3948 g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
3949
3950 /* First unmap all memories here, this simplifies the code below
3951 * as we don't have to skip all memories that were already written
3952 * before */
3953 for (i = 0; i < n_mmap; i++) {
3954 gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
3955 }
3956
3957 if (bytes_written == bytes_to_write) {
3958 /* fast path, just unmap all memories, free memory, drop all messages and notify them */
3959 l = 0;
3960 while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
3961 if (msg->id) {
3962 ids[l] = msg->id;
3963 l++;
3964 }
3965
3966 gst_rtsp_serialized_message_clear (msg);
3967 }
3968
3969 g_assert (watch->messages_bytes >= bytes_written);
3970 watch->messages_bytes -= bytes_written;
3971 } else if (bytes_written > 0) {
3972 /* not done, let's skip all messages that were sent already and free them */
3973 for (i = 0, drop_messages = 0; i < n_messages; i++) {
3974 msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3975
3976 if (bytes_written >= msg->data_size - msg->data_offset) {
3977 guint body_size;
3978
3979 /* all data of this message is sent, check body and otherwise
3980 * skip the whole message for next time */
3981 bytes_written -= (msg->data_size - msg->data_offset);
3982 watch->messages_bytes -= (msg->data_size - msg->data_offset);
3983 msg->data_offset = msg->data_size;
3984
3985 if (msg->body_data) {
3986 body_size = msg->body_data_size;
3987 } else if (msg->body_buffer) {
3988 body_size = gst_buffer_get_size (msg->body_buffer);
3989 } else {
3990 body_size = 0;
3991 }
3992
3993 if (bytes_written + msg->body_offset >= body_size) {
3994 /* body written, drop this message */
3995 bytes_written -= body_size - msg->body_offset;
3996 watch->messages_bytes -= body_size - msg->body_offset;
3997 msg->body_offset = body_size;
3998 drop_messages++;
3999
4000 if (msg->id) {
4001 ids[l] = msg->id;
4002 l++;
4003 }
4004
4005 gst_rtsp_serialized_message_clear (msg);
4006 } else {
4007 msg->body_offset += bytes_written;
4008 watch->messages_bytes -= bytes_written;
4009 bytes_written = 0;
4010 }
4011 } else {
4012 /* Need to continue sending from the data of this message */
4013 msg->data_offset += bytes_written;
4014 watch->messages_bytes -= bytes_written;
4015 bytes_written = 0;
4016 }
4017 }
4018
4019 while (drop_messages > 0) {
4020 msg = gst_queue_array_pop_head_struct (watch->messages);
4021 g_assert (msg);
4022 drop_messages--;
4023 }
4024
4025 g_assert (watch->messages_bytes >= bytes_written);
4026 watch->messages_bytes -= bytes_written;
4027 }
4028
4029 if (!IS_BACKLOG_FULL (watch))
4030 g_cond_signal (&watch->queue_not_full);
4031 g_mutex_unlock (&watch->mutex);
4032
4033 /* notify all messages that were successfully written */
4034 if (ids) {
4035 while (*ids) {
4036 /* only decrease the counter for messages that have an id. Only
4037 * the last message of a messages chunk is counted */
4038 watch->messages_count--;
4039
4040 if (watch->funcs.message_sent)
4041 watch->funcs.message_sent (watch, *ids, watch->user_data);
4042 ids++;
4043 }
4044 }
4045
4046 if (res == GST_RTSP_EINTR) {
4047 goto write_blocked;
4048 } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
4049 goto write_error;
4050 }
4051 g_mutex_lock (&watch->mutex);
4052 } while (TRUE);
4053 g_mutex_unlock (&watch->mutex);
4054
4055 write_blocked:
4056 return TRUE;
4057
4058 /* ERRORS */
4059 eof:
4060 {
4061 return FALSE;
4062 }
4063 write_error:
4064 {
4065 if (watch->funcs.error_full) {
4066 guint i, n_messages;
4067
4068 n_messages = gst_queue_array_get_length (watch->messages);
4069 for (i = 0; i < n_messages; i++) {
4070 GstRTSPSerializedMessage *msg =
4071 gst_queue_array_peek_nth_struct (watch->messages, i);
4072 if (msg->id)
4073 watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
4074 }
4075 } else if (watch->funcs.error) {
4076 watch->funcs.error (watch, res, watch->user_data);
4077 }
4078
4079 return FALSE;
4080 }
4081 }
4082
4083 static void
gst_rtsp_source_finalize(GSource * source)4084 gst_rtsp_source_finalize (GSource * source)
4085 {
4086 GstRTSPWatch *watch = (GstRTSPWatch *) source;
4087 GstRTSPSerializedMessage *msg;
4088
4089 if (watch->notify)
4090 watch->notify (watch->user_data);
4091
4092 build_reset (&watch->builder);
4093 gst_rtsp_message_unset (&watch->message);
4094
4095 while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4096 gst_rtsp_serialized_message_clear (msg);
4097 }
4098 gst_queue_array_free (watch->messages);
4099 watch->messages = NULL;
4100 watch->messages_bytes = 0;
4101 watch->messages_count = 0;
4102
4103 g_cond_clear (&watch->queue_not_full);
4104
4105 if (watch->readsrc)
4106 g_source_unref (watch->readsrc);
4107 if (watch->writesrc)
4108 g_source_unref (watch->writesrc);
4109 if (watch->controlsrc)
4110 g_source_unref (watch->controlsrc);
4111
4112 g_mutex_clear (&watch->mutex);
4113 }
4114
4115 static GSourceFuncs gst_rtsp_source_funcs = {
4116 gst_rtsp_source_prepare,
4117 gst_rtsp_source_check,
4118 gst_rtsp_source_dispatch,
4119 gst_rtsp_source_finalize,
4120 NULL,
4121 NULL
4122 };
4123
4124 /**
4125 * gst_rtsp_watch_new: (skip)
4126 * @conn: a #GstRTSPConnection
4127 * @funcs: watch functions
4128 * @user_data: user data to pass to @funcs
4129 * @notify: notify when @user_data is not referenced anymore
4130 *
4131 * Create a watch object for @conn. The functions provided in @funcs will be
4132 * called with @user_data when activity happened on the watch.
4133 *
4134 * The new watch is usually created so that it can be attached to a
4135 * maincontext with gst_rtsp_watch_attach().
4136 *
4137 * @conn must exist for the entire lifetime of the watch.
4138 *
4139 * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
4140 * communication. Free with gst_rtsp_watch_unref () after usage.
4141 */
4142 GstRTSPWatch *
gst_rtsp_watch_new(GstRTSPConnection * conn,GstRTSPWatchFuncs * funcs,gpointer user_data,GDestroyNotify notify)4143 gst_rtsp_watch_new (GstRTSPConnection * conn,
4144 GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
4145 {
4146 GstRTSPWatch *result;
4147
4148 g_return_val_if_fail (conn != NULL, NULL);
4149 g_return_val_if_fail (funcs != NULL, NULL);
4150 g_return_val_if_fail (conn->read_socket != NULL, NULL);
4151 g_return_val_if_fail (conn->write_socket != NULL, NULL);
4152
4153 result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
4154 sizeof (GstRTSPWatch));
4155
4156 result->conn = conn;
4157 result->builder.state = STATE_START;
4158
4159 g_mutex_init (&result->mutex);
4160 result->messages =
4161 gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
4162 g_cond_init (&result->queue_not_full);
4163
4164 gst_rtsp_watch_reset (result);
4165 result->keep_running = TRUE;
4166 result->flushing = FALSE;
4167
4168 result->funcs = *funcs;
4169 result->user_data = user_data;
4170 result->notify = notify;
4171
4172 return result;
4173 }
4174
4175 /**
4176 * gst_rtsp_watch_reset:
4177 * @watch: a #GstRTSPWatch
4178 *
4179 * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
4180 * when the file descriptors of the connection might have changed.
4181 */
4182 void
gst_rtsp_watch_reset(GstRTSPWatch * watch)4183 gst_rtsp_watch_reset (GstRTSPWatch * watch)
4184 {
4185 g_mutex_lock (&watch->mutex);
4186 if (watch->readsrc) {
4187 g_source_remove_child_source ((GSource *) watch, watch->readsrc);
4188 g_source_unref (watch->readsrc);
4189 }
4190 if (watch->writesrc) {
4191 g_source_remove_child_source ((GSource *) watch, watch->writesrc);
4192 g_source_unref (watch->writesrc);
4193 watch->writesrc = NULL;
4194 }
4195 if (watch->controlsrc) {
4196 g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4197 g_source_unref (watch->controlsrc);
4198 watch->controlsrc = NULL;
4199 }
4200
4201 if (watch->conn->input_stream) {
4202 watch->readsrc =
4203 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4204 (watch->conn->input_stream), NULL);
4205 g_source_set_callback (watch->readsrc,
4206 (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
4207 g_source_add_child_source ((GSource *) watch, watch->readsrc);
4208 } else {
4209 watch->readsrc = NULL;
4210 }
4211
4212 /* we create and add the write source when we actually have something to
4213 * write */
4214
4215 /* when write source is not added we add read source on the write socket
4216 * instead to be able to detect when client closes get channel in tunneled
4217 * mode */
4218 if (watch->conn->control_stream) {
4219 watch->controlsrc =
4220 g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4221 (watch->conn->control_stream), NULL);
4222 g_source_set_callback (watch->controlsrc,
4223 (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
4224 g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4225 } else {
4226 watch->controlsrc = NULL;
4227 }
4228 g_mutex_unlock (&watch->mutex);
4229 }
4230
4231 /**
4232 * gst_rtsp_watch_attach:
4233 * @watch: a #GstRTSPWatch
4234 * @context: a GMainContext (if NULL, the default context will be used)
4235 *
4236 * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
4237 *
4238 * Returns: the ID (greater than 0) for the watch within the GMainContext.
4239 */
4240 guint
gst_rtsp_watch_attach(GstRTSPWatch * watch,GMainContext * context)4241 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
4242 {
4243 g_return_val_if_fail (watch != NULL, 0);
4244
4245 return g_source_attach ((GSource *) watch, context);
4246 }
4247
4248 /**
4249 * gst_rtsp_watch_unref:
4250 * @watch: a #GstRTSPWatch
4251 *
4252 * Decreases the reference count of @watch by one. If the resulting reference
4253 * count is zero the watch and associated memory will be destroyed.
4254 */
4255 void
gst_rtsp_watch_unref(GstRTSPWatch * watch)4256 gst_rtsp_watch_unref (GstRTSPWatch * watch)
4257 {
4258 g_return_if_fail (watch != NULL);
4259
4260 g_source_unref ((GSource *) watch);
4261 }
4262
4263 /**
4264 * gst_rtsp_watch_set_send_backlog:
4265 * @watch: a #GstRTSPWatch
4266 * @bytes: maximum bytes
4267 * @messages: maximum messages
4268 *
4269 * Set the maximum amount of bytes and messages that will be queued in @watch.
4270 * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
4271 * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
4272 *
4273 * A value of 0 for @bytes or @messages means no limits.
4274 *
4275 * Since: 1.2
4276 */
4277 void
gst_rtsp_watch_set_send_backlog(GstRTSPWatch * watch,gsize bytes,guint messages)4278 gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
4279 gsize bytes, guint messages)
4280 {
4281 g_return_if_fail (watch != NULL);
4282
4283 g_mutex_lock (&watch->mutex);
4284 watch->max_bytes = bytes;
4285 watch->max_messages = messages;
4286 if (!IS_BACKLOG_FULL (watch))
4287 g_cond_signal (&watch->queue_not_full);
4288 g_mutex_unlock (&watch->mutex);
4289
4290 GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
4291 bytes, messages);
4292 }
4293
4294 /**
4295 * gst_rtsp_watch_get_send_backlog:
4296 * @watch: a #GstRTSPWatch
4297 * @bytes: (out) (allow-none): maximum bytes
4298 * @messages: (out) (allow-none): maximum messages
4299 *
4300 * Get the maximum amount of bytes and messages that will be queued in @watch.
4301 * See gst_rtsp_watch_set_send_backlog().
4302 *
4303 * Since: 1.2
4304 */
4305 void
gst_rtsp_watch_get_send_backlog(GstRTSPWatch * watch,gsize * bytes,guint * messages)4306 gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
4307 gsize * bytes, guint * messages)
4308 {
4309 g_return_if_fail (watch != NULL);
4310
4311 g_mutex_lock (&watch->mutex);
4312 if (bytes)
4313 *bytes = watch->max_bytes;
4314 if (messages)
4315 *messages = watch->max_messages;
4316 g_mutex_unlock (&watch->mutex);
4317 }
4318
4319 static GstRTSPResult
gst_rtsp_watch_write_serialized_messages(GstRTSPWatch * watch,GstRTSPSerializedMessage * messages,guint n_messages,guint * id)4320 gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
4321 GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
4322 {
4323 GstRTSPResult res;
4324 GMainContext *context = NULL;
4325 gint i;
4326
4327 g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4328 g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
4329
4330 g_mutex_lock (&watch->mutex);
4331 if (watch->flushing)
4332 goto flushing;
4333
4334 /* try to send the message synchronously first */
4335 if (gst_queue_array_get_length (watch->messages) == 0) {
4336 gint j, k;
4337 GOutputVector *vectors;
4338 GstMapInfo *map_infos;
4339 gsize bytes_to_write, bytes_written;
4340 guint n_vectors, n_memories, drop_messages;
4341
4342 for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
4343 n_vectors++;
4344 if (messages[i].body_data) {
4345 n_vectors++;
4346 } else if (messages[i].body_buffer) {
4347 n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
4348 n_memories += gst_buffer_n_memory (messages[i].body_buffer);
4349 }
4350 }
4351
4352 vectors = g_newa (GOutputVector, n_vectors);
4353 map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4354
4355 for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
4356 vectors[j].buffer = messages[i].data_is_data_header ?
4357 messages[i].data_header : messages[i].data;
4358 vectors[j].size = messages[i].data_size;
4359 bytes_to_write += vectors[j].size;
4360 j++;
4361
4362 if (messages[i].body_data) {
4363 vectors[j].buffer = messages[i].body_data;
4364 vectors[j].size = messages[i].body_data_size;
4365 bytes_to_write += vectors[j].size;
4366 j++;
4367 } else if (messages[i].body_buffer) {
4368 gint l, n;
4369
4370 n = gst_buffer_n_memory (messages[i].body_buffer);
4371 for (l = 0; l < n; l++) {
4372 GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
4373
4374 gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
4375 vectors[j].buffer = map_infos[k].data;
4376 vectors[j].size = map_infos[k].size;
4377 bytes_to_write += vectors[j].size;
4378
4379 k++;
4380 j++;
4381 }
4382 }
4383 }
4384
4385 res =
4386 writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4387 &bytes_written, FALSE, watch->conn->cancellable);
4388 g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4389
4390 /* At this point we sent everything we could without blocking or
4391 * error and updated the offsets inside the message accordingly */
4392
4393 /* First of all unmap all memories. This simplifies the code below */
4394 for (k = 0; k < n_memories; k++) {
4395 gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
4396 }
4397
4398 if (res != GST_RTSP_EINTR) {
4399 /* actual error or done completely */
4400 if (id != NULL)
4401 *id = 0;
4402
4403 /* free everything */
4404 for (i = 0, k = 0; i < n_messages; i++) {
4405 gst_rtsp_serialized_message_clear (&messages[i]);
4406 }
4407
4408 goto done;
4409 }
4410
4411 /* not done, let's skip all messages that were sent already and free them */
4412 for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
4413 if (bytes_written >= messages[i].data_size) {
4414 guint body_size;
4415
4416 /* all data of this message is sent, check body and otherwise
4417 * skip the whole message for next time */
4418 messages[i].data_offset = messages[i].data_size;
4419 bytes_written -= messages[i].data_size;
4420
4421 if (messages[i].body_data) {
4422 body_size = messages[i].body_data_size;
4423
4424 } else if (messages[i].body_buffer) {
4425 body_size = gst_buffer_get_size (messages[i].body_buffer);
4426 } else {
4427 body_size = 0;
4428 }
4429
4430 if (bytes_written >= body_size) {
4431 /* body written, drop this message */
4432 messages[i].body_offset = body_size;
4433 bytes_written -= body_size;
4434 drop_messages++;
4435
4436 gst_rtsp_serialized_message_clear (&messages[i]);
4437 } else {
4438 messages[i].body_offset = bytes_written;
4439 bytes_written = 0;
4440 }
4441 } else {
4442 /* Need to continue sending from the data of this message */
4443 messages[i].data_offset = bytes_written;
4444 bytes_written = 0;
4445 }
4446 }
4447
4448 g_assert (n_messages > drop_messages);
4449
4450 messages += drop_messages;
4451 n_messages -= drop_messages;
4452 }
4453
4454 /* check limits */
4455 if (IS_BACKLOG_FULL (watch))
4456 goto too_much_backlog;
4457
4458 for (i = 0; i < n_messages; i++) {
4459 GstRTSPSerializedMessage local_message;
4460
4461 /* make a record with the data and id for sending async */
4462 local_message = messages[i];
4463
4464 /* copy the body data or take an additional reference to the body buffer
4465 * we don't own them here */
4466 if (local_message.body_data) {
4467 local_message.body_data =
4468 g_memdup (local_message.body_data, local_message.body_data_size);
4469 } else if (local_message.body_buffer) {
4470 gst_buffer_ref (local_message.body_buffer);
4471 }
4472 local_message.borrowed = FALSE;
4473
4474 /* set an id for the very last message */
4475 if (i == n_messages - 1) {
4476 do {
4477 /* make sure rec->id is never 0 */
4478 local_message.id = ++watch->id;
4479 } while (G_UNLIKELY (local_message.id == 0));
4480
4481 if (id != NULL)
4482 *id = local_message.id;
4483 } else {
4484 local_message.id = 0;
4485 }
4486
4487 /* add the record to a queue. */
4488 gst_queue_array_push_tail_struct (watch->messages, &local_message);
4489 watch->messages_bytes +=
4490 (local_message.data_size - local_message.data_offset);
4491 if (local_message.body_data)
4492 watch->messages_bytes +=
4493 (local_message.body_data_size - local_message.body_offset);
4494 else if (local_message.body_buffer)
4495 watch->messages_bytes +=
4496 (gst_buffer_get_size (local_message.body_buffer) -
4497 local_message.body_offset);
4498 }
4499 /* each message chunks is one unit */
4500 watch->messages_count++;
4501
4502 /* make sure the main context will now also check for writability on the
4503 * socket */
4504 context = ((GSource *) watch)->context;
4505 if (!watch->writesrc) {
4506 /* remove the read source on the write socket, we will be able to detect
4507 * errors while writing */
4508 if (watch->controlsrc) {
4509 g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4510 g_source_unref (watch->controlsrc);
4511 watch->controlsrc = NULL;
4512 }
4513
4514 watch->writesrc =
4515 g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
4516 (watch->conn->output_stream), NULL);
4517 g_source_set_callback (watch->writesrc,
4518 (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
4519 g_source_add_child_source ((GSource *) watch, watch->writesrc);
4520 }
4521 res = GST_RTSP_OK;
4522
4523 done:
4524 g_mutex_unlock (&watch->mutex);
4525
4526 if (context)
4527 g_main_context_wakeup (context);
4528
4529 return res;
4530
4531 /* ERRORS */
4532 flushing:
4533 {
4534 GST_DEBUG ("we are flushing");
4535 g_mutex_unlock (&watch->mutex);
4536 for (i = 0; i < n_messages; i++) {
4537 gst_rtsp_serialized_message_clear (&messages[i]);
4538 }
4539 return GST_RTSP_EINTR;
4540 }
4541 too_much_backlog:
4542 {
4543 GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
4544 G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
4545 watch->messages_bytes, watch->max_messages, watch->messages_count);
4546 g_mutex_unlock (&watch->mutex);
4547 for (i = 0; i < n_messages; i++) {
4548 gst_rtsp_serialized_message_clear (&messages[i]);
4549 }
4550 return GST_RTSP_ENOMEM;
4551 }
4552
4553 return GST_RTSP_OK;
4554 }
4555
4556 /**
4557 * gst_rtsp_watch_write_data:
4558 * @watch: a #GstRTSPWatch
4559 * @data: (array length=size) (transfer full): the data to queue
4560 * @size: the size of @data
4561 * @id: (out) (allow-none): location for a message ID or %NULL
4562 *
4563 * Write @data using the connection of the @watch. If it cannot be sent
4564 * immediately, it will be queued for transmission in @watch. The contents of
4565 * @message will then be serialized and transmitted when the connection of the
4566 * @watch becomes writable. In case the @message is queued, the ID returned in
4567 * @id will be non-zero and used as the ID argument in the message_sent
4568 * callback.
4569 *
4570 * This function will take ownership of @data and g_free() it after use.
4571 *
4572 * If the amount of queued data exceeds the limits set with
4573 * gst_rtsp_watch_set_send_backlog(), this function will return
4574 * #GST_RTSP_ENOMEM.
4575 *
4576 * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
4577 * are reached. #GST_RTSP_EINTR when @watch was flushing.
4578 */
4579 /* FIXME 2.0: This should've been static! */
4580 GstRTSPResult
gst_rtsp_watch_write_data(GstRTSPWatch * watch,const guint8 * data,guint size,guint * id)4581 gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
4582 guint size, guint * id)
4583 {
4584 GstRTSPSerializedMessage serialized_message;
4585
4586 memset (&serialized_message, 0, sizeof (serialized_message));
4587 serialized_message.data = (guint8 *) data;
4588 serialized_message.data_size = size;
4589
4590 return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
4591 1, id);
4592 }
4593
4594 /**
4595 * gst_rtsp_watch_send_message:
4596 * @watch: a #GstRTSPWatch
4597 * @message: a #GstRTSPMessage
4598 * @id: (out) (allow-none): location for a message ID or %NULL
4599 *
4600 * Send a @message using the connection of the @watch. If it cannot be sent
4601 * immediately, it will be queued for transmission in @watch. The contents of
4602 * @message will then be serialized and transmitted when the connection of the
4603 * @watch becomes writable. In case the @message is queued, the ID returned in
4604 * @id will be non-zero and used as the ID argument in the message_sent
4605 * callback.
4606 *
4607 * Returns: #GST_RTSP_OK on success.
4608 */
4609 GstRTSPResult
gst_rtsp_watch_send_message(GstRTSPWatch * watch,GstRTSPMessage * message,guint * id)4610 gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
4611 guint * id)
4612 {
4613 g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4614 g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
4615
4616 return gst_rtsp_watch_send_messages (watch, message, 1, id);
4617 }
4618
4619 /**
4620 * gst_rtsp_watch_send_messages:
4621 * @watch: a #GstRTSPWatch
4622 * @messages: (array length=n_messages): the messages to send
4623 * @n_messages: the number of messages to send
4624 * @id: (out) (allow-none): location for a message ID or %NULL
4625 *
4626 * Sends @messages using the connection of the @watch. If they cannot be sent
4627 * immediately, they will be queued for transmission in @watch. The contents of
4628 * @messages will then be serialized and transmitted when the connection of the
4629 * @watch becomes writable. In case the @messages are queued, the ID returned in
4630 * @id will be non-zero and used as the ID argument in the message_sent
4631 * callback once the last message is sent. The callback will only be called
4632 * once for the last message.
4633 *
4634 * Returns: #GST_RTSP_OK on success.
4635 *
4636 * Since: 1.16
4637 */
4638 GstRTSPResult
gst_rtsp_watch_send_messages(GstRTSPWatch * watch,GstRTSPMessage * messages,guint n_messages,guint * id)4639 gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
4640 guint n_messages, guint * id)
4641 {
4642 GstRTSPSerializedMessage *serialized_messages;
4643 gint i;
4644
4645 g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4646 g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
4647
4648 serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
4649 memset (serialized_messages, 0,
4650 sizeof (GstRTSPSerializedMessage) * n_messages);
4651
4652 for (i = 0; i < n_messages; i++) {
4653 if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
4654 goto error;
4655 }
4656
4657 return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
4658 n_messages, id);
4659
4660 error:
4661 for (i = 0; i < n_messages; i++) {
4662 gst_rtsp_serialized_message_clear (&serialized_messages[i]);
4663 }
4664
4665 return GST_RTSP_EINVAL;
4666 }
4667
4668 /**
4669 * gst_rtsp_watch_wait_backlog:
4670 * @watch: a #GstRTSPWatch
4671 * @timeout: a #GTimeVal timeout
4672 *
4673 * Wait until there is place in the backlog queue, @timeout is reached
4674 * or @watch is set to flushing.
4675 *
4676 * If @timeout is %NULL this function can block forever. If @timeout
4677 * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
4678 * after the timeout expired.
4679 *
4680 * The typically use of this function is when gst_rtsp_watch_write_data
4681 * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
4682 * free space in the backlog queue and try again.
4683 *
4684 * Returns: %GST_RTSP_OK when if there is room in queue.
4685 * %GST_RTSP_ETIMEOUT when @timeout was reached.
4686 * %GST_RTSP_EINTR when @watch is flushing
4687 * %GST_RTSP_EINVAL when called with invalid parameters.
4688 *
4689 * Since: 1.4
4690 */
4691 GstRTSPResult
gst_rtsp_watch_wait_backlog(GstRTSPWatch * watch,GTimeVal * timeout)4692 gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout)
4693 {
4694 gint64 end_time;
4695 GstClockTime to;
4696
4697 g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4698
4699 to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
4700 end_time = g_get_monotonic_time () + GST_TIME_AS_USECONDS (to);
4701
4702 g_mutex_lock (&watch->mutex);
4703 if (watch->flushing)
4704 goto flushing;
4705
4706 while (IS_BACKLOG_FULL (watch)) {
4707 gboolean res;
4708
4709 res = g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time);
4710 if (watch->flushing)
4711 goto flushing;
4712
4713 if (!res)
4714 goto timeout;
4715 }
4716 g_mutex_unlock (&watch->mutex);
4717
4718 return GST_RTSP_OK;
4719
4720 /* ERRORS */
4721 flushing:
4722 {
4723 GST_DEBUG ("we are flushing");
4724 g_mutex_unlock (&watch->mutex);
4725 return GST_RTSP_EINTR;
4726 }
4727 timeout:
4728 {
4729 GST_DEBUG ("we timed out");
4730 g_mutex_unlock (&watch->mutex);
4731 return GST_RTSP_ETIMEOUT;
4732 }
4733 }
4734
4735 /**
4736 * gst_rtsp_watch_set_flushing:
4737 * @watch: a #GstRTSPWatch
4738 * @flushing: new flushing state
4739 *
4740 * When @flushing is %TRUE, abort a call to gst_rtsp_watch_wait_backlog()
4741 * and make sure gst_rtsp_watch_write_data() returns immediately with
4742 * #GST_RTSP_EINTR. And empty the queue.
4743 *
4744 * Since: 1.4
4745 */
4746 void
gst_rtsp_watch_set_flushing(GstRTSPWatch * watch,gboolean flushing)4747 gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
4748 {
4749 g_return_if_fail (watch != NULL);
4750
4751 g_mutex_lock (&watch->mutex);
4752 watch->flushing = flushing;
4753 g_cond_signal (&watch->queue_not_full);
4754 if (flushing) {
4755 GstRTSPSerializedMessage *msg;
4756
4757 while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4758 gst_rtsp_serialized_message_clear (msg);
4759 }
4760 }
4761 g_mutex_unlock (&watch->mutex);
4762 }
4763