1 /* GStreamer
2  * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /*
20  * Unless otherwise indicated, Source Code is licensed under MIT license.
21  * See further explanation attached in License Statement (distributed in the file
22  * LICENSE).
23  *
24  * Permission is hereby granted, free of charge, to any person obtaining a copy of
25  * this software and associated documentation files (the "Software"), to deal in
26  * the Software without restriction, including without limitation the rights to
27  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28  * of the Software, and to permit persons to whom the Software is furnished to do
29  * so, subject to the following conditions:
30  *
31  * The above copyright notice and this permission notice shall be included in all
32  * copies or substantial portions of the Software.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40  * SOFTWARE.
41  */
42 
43 /**
44  * SECTION:gstrtspconnection
45  * @title: GstRTSPConnection
46  * @short_description: manage RTSP connections
47  * @see_also: gstrtspurl
48  *
49  * This object manages the RTSP connection to the server. It provides function
50  * to receive and send bytes and messages.
51  */
52 
53 #ifdef HAVE_CONFIG_H
54 #  include <config.h>
55 #endif
56 
57 #include <stdio.h>
58 #include <errno.h>
59 #include <stdlib.h>
60 #include <string.h>
61 #include <time.h>
62 
63 /* we include this here to get the G_OS_* defines */
64 #include <glib.h>
65 #include <gst/gst.h>
66 #include <gst/base/base.h>
67 
68 /* necessary for IP_TOS define */
69 #include <gio/gnetworking.h>
70 
71 #include "gstrtspconnection.h"
72 
73 #ifdef IP_TOS
74 union gst_sockaddr
75 {
76   struct sockaddr sa;
77   struct sockaddr_in sa_in;
78   struct sockaddr_in6 sa_in6;
79   struct sockaddr_storage sa_stor;
80 };
81 #endif
82 
83 typedef struct
84 {
85   gint state;
86   guint save;
87   guchar out[3];                /* the size must be evenly divisible by 3 */
88   guint cout;
89   guint coutl;
90 } DecodeCtx;
91 
92 typedef struct
93 {
94   /* If %TRUE we only own data and none of the
95    * other fields
96    */
97   gboolean borrowed;
98 
99   /* Header or full message */
100   guint8 *data;
101   guint data_size;
102   gboolean data_is_data_header;
103 
104   /* Payload following data, if any */
105   guint8 *body_data;
106   guint body_data_size;
107   /* or */
108   GstBuffer *body_buffer;
109 
110   /* DATA packet header statically allocated for above */
111   guint8 data_header[4];
112 
113   /* all below only for async writing */
114 
115   guint data_offset;            /* == data_size when done */
116   guint body_offset;            /* into body_data or the buffer */
117 
118   /* ID of the message for notification */
119   guint id;
120 } GstRTSPSerializedMessage;
121 
122 static void
gst_rtsp_serialized_message_clear(GstRTSPSerializedMessage * msg)123 gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
124 {
125   if (!msg->borrowed) {
126     g_free (msg->body_data);
127     gst_buffer_replace (&msg->body_buffer, NULL);
128   }
129   g_free (msg->data);
130 }
131 
132 #ifdef MSG_NOSIGNAL
133 #define SEND_FLAGS MSG_NOSIGNAL
134 #else
135 #define SEND_FLAGS 0
136 #endif
137 
138 typedef enum
139 {
140   TUNNEL_STATE_NONE,
141   TUNNEL_STATE_GET,
142   TUNNEL_STATE_POST,
143   TUNNEL_STATE_COMPLETE
144 } GstRTSPTunnelState;
145 
146 #define TUNNELID_LEN   24
147 
148 struct _GstRTSPConnection
149 {
150   /*< private > */
151   /* URL for the remote connection */
152   GstRTSPUrl *url;
153   GstRTSPVersion version;
154 
155   gboolean server;
156   GSocketClient *client;
157   GIOStream *stream0;
158   GIOStream *stream1;
159 
160   GInputStream *input_stream;
161   GOutputStream *output_stream;
162   /* this is a read source we add on the write socket in tunneled mode to be
163    * able to detect when client disconnects the GET channel */
164   GInputStream *control_stream;
165 
166   /* connection state */
167   GSocket *read_socket;
168   GSocket *write_socket;
169   GSocket *socket0, *socket1;
170   gboolean manual_http;
171   gboolean may_cancel;
172   GCancellable *cancellable;
173 
174   gchar tunnelid[TUNNELID_LEN];
175   gboolean tunneled;
176   GstRTSPTunnelState tstate;
177 
178   /* the remote and local ip */
179   gchar *remote_ip;
180   gchar *local_ip;
181 
182   gint read_ahead;
183 
184   gchar *initial_buffer;
185   gsize initial_buffer_offset;
186 
187   gboolean remember_session_id; /* remember the session id or not */
188 
189   /* Session state */
190   gint cseq;                    /* sequence number */
191   gchar session_id[512];        /* session id */
192   gint timeout;                 /* session timeout in seconds */
193   GTimer *timer;                /* timeout timer */
194 
195   /* Authentication */
196   GstRTSPAuthMethod auth_method;
197   gchar *username;
198   gchar *passwd;
199   GHashTable *auth_params;
200 
201   /* TLS */
202   GTlsDatabase *tls_database;
203   GTlsInteraction *tls_interaction;
204 
205   GstRTSPConnectionAcceptCertificateFunc accept_certificate_func;
206   GDestroyNotify accept_certificate_destroy_notify;
207   gpointer accept_certificate_user_data;
208 
209   DecodeCtx ctx;
210   DecodeCtx *ctxp;
211 
212   gchar *proxy_host;
213   guint proxy_port;
214 };
215 
216 enum
217 {
218   STATE_START = 0,
219   STATE_DATA_HEADER,
220   STATE_DATA_BODY,
221   STATE_READ_LINES,
222   STATE_END,
223   STATE_LAST
224 };
225 
226 enum
227 {
228   READ_AHEAD_EOH = -1,          /* end of headers */
229   READ_AHEAD_CRLF = -2,
230   READ_AHEAD_CRLFCR = -3
231 };
232 
233 /* a structure for constructing RTSPMessages */
234 typedef struct
235 {
236   gint state;
237   GstRTSPResult status;
238   guint8 buffer[4096];
239   guint offset;
240 
241   guint line;
242   guint8 *body_data;
243   glong body_len;
244 } GstRTSPBuilder;
245 
246 /* function prototypes */
247 static void add_auth_header (GstRTSPConnection * conn,
248     GstRTSPMessage * message);
249 
250 static void
build_reset(GstRTSPBuilder * builder)251 build_reset (GstRTSPBuilder * builder)
252 {
253   g_free (builder->body_data);
254   memset (builder, 0, sizeof (GstRTSPBuilder));
255 }
256 
257 static gboolean
tls_accept_certificate(GTlsConnection * conn,GTlsCertificate * peer_cert,GTlsCertificateFlags errors,GstRTSPConnection * rtspconn)258 tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
259     GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
260 {
261   GError *error = NULL;
262   gboolean accept = FALSE;
263 
264   if (rtspconn->tls_database) {
265     GSocketConnectable *peer_identity;
266     GTlsCertificateFlags validation_flags;
267 
268     GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
269 
270     peer_identity =
271         g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
272         (conn));
273 
274     errors =
275         g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
276         G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
277         g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
278         NULL, &error);
279 
280     if (error)
281       goto verify_error;
282 
283     validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
284 
285     accept = ((errors & validation_flags) == 0);
286     if (accept)
287       GST_DEBUG ("Peer certificate accepted");
288     else
289       GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
290   }
291 
292   if (!accept && rtspconn->accept_certificate_func) {
293     accept =
294         rtspconn->accept_certificate_func (conn, peer_cert, errors,
295         rtspconn->accept_certificate_user_data);
296     GST_DEBUG ("Peer certificate %saccepted by accept-certificate function",
297         accept ? "" : "not ");
298   }
299 
300   return accept;
301 
302 /* ERRORS */
303 verify_error:
304   {
305     GST_ERROR ("An error occurred while verifying the peer certificate: %s",
306         error->message);
307     g_clear_error (&error);
308     return FALSE;
309   }
310 }
311 
312 static void
socket_client_event(GSocketClient * client,GSocketClientEvent event,GSocketConnectable * connectable,GTlsConnection * connection,GstRTSPConnection * rtspconn)313 socket_client_event (GSocketClient * client, GSocketClientEvent event,
314     GSocketConnectable * connectable, GTlsConnection * connection,
315     GstRTSPConnection * rtspconn)
316 {
317   if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
318     GST_DEBUG ("TLS handshaking about to start...");
319 
320     g_signal_connect (connection, "accept-certificate",
321         (GCallback) tls_accept_certificate, rtspconn);
322 
323     g_tls_connection_set_interaction (connection, rtspconn->tls_interaction);
324   }
325 }
326 
327 /**
328  * gst_rtsp_connection_create:
329  * @url: a #GstRTSPUrl
330  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
331  *
332  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
333  * The connection will not yet attempt to connect to @url, use
334  * gst_rtsp_connection_connect().
335  *
336  * A copy of @url will be made.
337  *
338  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
339  */
340 GstRTSPResult
gst_rtsp_connection_create(const GstRTSPUrl * url,GstRTSPConnection ** conn)341 gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
342 {
343   GstRTSPConnection *newconn;
344 
345   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
346   g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL);
347 
348   newconn = g_new0 (GstRTSPConnection, 1);
349 
350   newconn->may_cancel = TRUE;
351   newconn->cancellable = g_cancellable_new ();
352   newconn->client = g_socket_client_new ();
353 
354   if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
355     g_socket_client_set_tls (newconn->client, TRUE);
356 
357   g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
358       newconn);
359 
360   newconn->url = gst_rtsp_url_copy (url);
361   newconn->timer = g_timer_new ();
362   newconn->timeout = 60;
363   newconn->cseq = 1;            /* RFC 7826: "it is RECOMMENDED to start at 0.",
364                                    but some servers don't copy values <1 due to bugs. */
365 
366   newconn->remember_session_id = TRUE;
367 
368   newconn->auth_method = GST_RTSP_AUTH_NONE;
369   newconn->username = NULL;
370   newconn->passwd = NULL;
371   newconn->auth_params = NULL;
372   newconn->version = 0;
373 
374   *conn = newconn;
375 
376   return GST_RTSP_OK;
377 }
378 
379 static gboolean
collect_addresses(GSocket * socket,gchar ** ip,guint16 * port,gboolean remote,GError ** error)380 collect_addresses (GSocket * socket, gchar ** ip, guint16 * port,
381     gboolean remote, GError ** error)
382 {
383   GSocketAddress *addr;
384 
385   if (remote)
386     addr = g_socket_get_remote_address (socket, error);
387   else
388     addr = g_socket_get_local_address (socket, error);
389   if (!addr)
390     return FALSE;
391 
392   if (ip)
393     *ip = g_inet_address_to_string (g_inet_socket_address_get_address
394         (G_INET_SOCKET_ADDRESS (addr)));
395   if (port)
396     *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
397 
398   g_object_unref (addr);
399 
400   return TRUE;
401 }
402 
403 
404 /**
405  * gst_rtsp_connection_create_from_socket:
406  * @socket: a #GSocket
407  * @ip: the IP address of the other end
408  * @port: the port used by the other end
409  * @initial_buffer: data already read from @fd
410  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
411  *
412  * Create a new #GstRTSPConnection for handling communication on the existing
413  * socket @socket. The @initial_buffer contains zero terminated data already
414  * read from @socket which should be used before starting to read new data.
415  *
416  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
417  */
418 /* FIXME 2.0 We don't need the ip and port since they can be got from the
419  * GSocket */
420 GstRTSPResult
gst_rtsp_connection_create_from_socket(GSocket * socket,const gchar * ip,guint16 port,const gchar * initial_buffer,GstRTSPConnection ** conn)421 gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
422     guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn)
423 {
424   GstRTSPConnection *newconn = NULL;
425   GstRTSPUrl *url;
426   GstRTSPResult res;
427   GError *err = NULL;
428   gchar *local_ip;
429   GIOStream *stream;
430 
431   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
432   g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
433   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
434 
435   if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
436     goto getnameinfo_failed;
437 
438   /* create a url for the client address */
439   url = g_new0 (GstRTSPUrl, 1);
440   url->host = g_strdup (ip);
441   url->port = port;
442 
443   /* now create the connection object */
444   GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
445   gst_rtsp_url_free (url);
446 
447   stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
448 
449   /* both read and write initially */
450   newconn->server = TRUE;
451   newconn->socket0 = socket;
452   newconn->stream0 = stream;
453   newconn->write_socket = newconn->read_socket = newconn->socket0;
454   newconn->input_stream = g_io_stream_get_input_stream (stream);
455   newconn->output_stream = g_io_stream_get_output_stream (stream);
456   newconn->control_stream = NULL;
457   newconn->remote_ip = g_strdup (ip);
458   newconn->local_ip = local_ip;
459   newconn->initial_buffer = g_strdup (initial_buffer);
460 
461   *conn = newconn;
462 
463   return GST_RTSP_OK;
464 
465   /* ERRORS */
466 getnameinfo_failed:
467   {
468     GST_ERROR ("failed to get local address: %s", err->message);
469     g_clear_error (&err);
470     return GST_RTSP_ERROR;
471   }
472 newconn_failed:
473   {
474     GST_ERROR ("failed to make connection");
475     g_free (local_ip);
476     gst_rtsp_url_free (url);
477     return res;
478   }
479 }
480 
481 /**
482  * gst_rtsp_connection_accept:
483  * @socket: a socket
484  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
485  * @cancellable: a #GCancellable to cancel the operation
486  *
487  * Accept a new connection on @socket and create a new #GstRTSPConnection for
488  * handling communication on new socket.
489  *
490  * Returns: #GST_RTSP_OK when @conn contains a valid connection.
491  */
492 GstRTSPResult
gst_rtsp_connection_accept(GSocket * socket,GstRTSPConnection ** conn,GCancellable * cancellable)493 gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn,
494     GCancellable * cancellable)
495 {
496   GError *err = NULL;
497   gchar *ip;
498   guint16 port;
499   GSocket *client_sock;
500   GstRTSPResult ret;
501 
502   g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
503   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
504 
505   client_sock = g_socket_accept (socket, cancellable, &err);
506   if (!client_sock)
507     goto accept_failed;
508 
509   /* get the remote ip address and port */
510   if (!collect_addresses (client_sock, &ip, &port, TRUE, &err))
511     goto getnameinfo_failed;
512 
513   ret =
514       gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL,
515       conn);
516   g_object_unref (client_sock);
517   g_free (ip);
518 
519   return ret;
520 
521   /* ERRORS */
522 accept_failed:
523   {
524     GST_DEBUG ("Accepting client failed: %s", err->message);
525     g_clear_error (&err);
526     return GST_RTSP_ESYS;
527   }
528 getnameinfo_failed:
529   {
530     GST_DEBUG ("getnameinfo failed: %s", err->message);
531     g_clear_error (&err);
532     if (!g_socket_close (client_sock, &err)) {
533       GST_DEBUG ("Closing socket failed: %s", err->message);
534       g_clear_error (&err);
535     }
536     g_object_unref (client_sock);
537     return GST_RTSP_ERROR;
538   }
539 }
540 
541 /**
542  * gst_rtsp_connection_get_tls:
543  * @conn: a #GstRTSPConnection
544  * @error: #GError for error reporting, or NULL to ignore.
545  *
546  * Get the TLS connection of @conn.
547  *
548  * For client side this will return the #GTlsClientConnection when connected
549  * over TLS.
550  *
551  * For server side connections, this function will create a GTlsServerConnection
552  * when called the first time and will return that same connection on subsequent
553  * calls. The server is then responsible for configuring the TLS connection.
554  *
555  * Returns: (transfer none): the TLS connection for @conn.
556  *
557  * Since: 1.2
558  */
559 GTlsConnection *
gst_rtsp_connection_get_tls(GstRTSPConnection * conn,GError ** error)560 gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
561 {
562   GTlsConnection *result;
563 
564   if (G_IS_TLS_CONNECTION (conn->stream0)) {
565     /* we already had one, return it */
566     result = G_TLS_CONNECTION (conn->stream0);
567   } else if (conn->server) {
568     /* no TLS connection but we are server, make one */
569     result = (GTlsConnection *)
570         g_tls_server_connection_new (conn->stream0, NULL, error);
571     if (result) {
572       g_object_unref (conn->stream0);
573       conn->stream0 = G_IO_STREAM (result);
574       conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
575       conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
576     }
577   } else {
578     /* client */
579     result = NULL;
580     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
581         "client not connected with TLS");
582   }
583   return result;
584 }
585 
586 /**
587  * gst_rtsp_connection_set_tls_validation_flags:
588  * @conn: a #GstRTSPConnection
589  * @flags: the validation flags.
590  *
591  * Sets the TLS validation flags to be used to verify the peer
592  * certificate when a TLS connection is established.
593  *
594  * Returns: TRUE if the validation flags are set correctly, or FALSE if
595  * @conn is NULL or is not a TLS connection.
596  *
597  * Since: 1.2.1
598  */
599 gboolean
gst_rtsp_connection_set_tls_validation_flags(GstRTSPConnection * conn,GTlsCertificateFlags flags)600 gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
601     GTlsCertificateFlags flags)
602 {
603   gboolean res = FALSE;
604 
605   g_return_val_if_fail (conn != NULL, FALSE);
606 
607   res = g_socket_client_get_tls (conn->client);
608   if (res)
609     g_socket_client_set_tls_validation_flags (conn->client, flags);
610 
611   return res;
612 }
613 
614 /**
615  * gst_rtsp_connection_get_tls_validation_flags:
616  * @conn: a #GstRTSPConnection
617  *
618  * Gets the TLS validation flags used to verify the peer certificate
619  * when a TLS connection is established.
620  *
621  * Returns: the validationg flags.
622  *
623  * Since: 1.2.1
624  */
625 GTlsCertificateFlags
gst_rtsp_connection_get_tls_validation_flags(GstRTSPConnection * conn)626 gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
627 {
628   g_return_val_if_fail (conn != NULL, 0);
629 
630   return g_socket_client_get_tls_validation_flags (conn->client);
631 }
632 
633 /**
634  * gst_rtsp_connection_set_tls_database:
635  * @conn: a #GstRTSPConnection
636  * @database: a #GTlsDatabase
637  *
638  * Sets the anchor certificate authorities database. This certificate
639  * database will be used to verify the server's certificate in case it
640  * can't be verified with the default certificate database first.
641  *
642  * Since: 1.4
643  */
644 void
gst_rtsp_connection_set_tls_database(GstRTSPConnection * conn,GTlsDatabase * database)645 gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
646     GTlsDatabase * database)
647 {
648   GTlsDatabase *old_db;
649 
650   g_return_if_fail (conn != NULL);
651 
652   if (database)
653     g_object_ref (database);
654 
655   old_db = conn->tls_database;
656   conn->tls_database = database;
657 
658   if (old_db)
659     g_object_unref (old_db);
660 }
661 
662 /**
663  * gst_rtsp_connection_get_tls_database:
664  * @conn: a #GstRTSPConnection
665  *
666  * Gets the anchor certificate authorities database that will be used
667  * after a server certificate can't be verified with the default
668  * certificate database.
669  *
670  * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
671  * database has been previously set. Use g_object_unref() to release the
672  * certificate database.
673  *
674  * Since: 1.4
675  */
676 GTlsDatabase *
gst_rtsp_connection_get_tls_database(GstRTSPConnection * conn)677 gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
678 {
679   GTlsDatabase *result;
680 
681   g_return_val_if_fail (conn != NULL, NULL);
682 
683   if ((result = conn->tls_database))
684     g_object_ref (result);
685 
686   return result;
687 }
688 
689 /**
690  * gst_rtsp_connection_set_tls_interaction:
691  * @conn: a #GstRTSPConnection
692  * @interaction: a #GTlsInteraction
693  *
694  * Sets a #GTlsInteraction object to be used when the connection or certificate
695  * database need to interact with the user. This will be used to prompt the
696  * user for passwords where necessary.
697  *
698  * Since: 1.6
699  */
700 void
gst_rtsp_connection_set_tls_interaction(GstRTSPConnection * conn,GTlsInteraction * interaction)701 gst_rtsp_connection_set_tls_interaction (GstRTSPConnection * conn,
702     GTlsInteraction * interaction)
703 {
704   GTlsInteraction *old_interaction;
705 
706   g_return_if_fail (conn != NULL);
707 
708   if (interaction)
709     g_object_ref (interaction);
710 
711   old_interaction = conn->tls_interaction;
712   conn->tls_interaction = interaction;
713 
714   if (old_interaction)
715     g_object_unref (old_interaction);
716 }
717 
718 /**
719  * gst_rtsp_connection_get_tls_interaction:
720  * @conn: a #GstRTSPConnection
721  *
722  * Gets a #GTlsInteraction object to be used when the connection or certificate
723  * database need to interact with the user. This will be used to prompt the
724  * user for passwords where necessary.
725  *
726  * Returns: (transfer full): a reference on the #GTlsInteraction. Use
727  * g_object_unref() to release.
728  *
729  * Since: 1.6
730  */
731 GTlsInteraction *
gst_rtsp_connection_get_tls_interaction(GstRTSPConnection * conn)732 gst_rtsp_connection_get_tls_interaction (GstRTSPConnection * conn)
733 {
734   GTlsInteraction *result;
735 
736   g_return_val_if_fail (conn != NULL, NULL);
737 
738   if ((result = conn->tls_interaction))
739     g_object_ref (result);
740 
741   return result;
742 }
743 
744 /**
745  * gst_rtsp_connection_set_accept_certificate_func:
746  * @conn: a #GstRTSPConnection
747  * @func: a #GstRTSPConnectionAcceptCertificateFunc to check certificates
748  * @destroy_notify: #GDestroyNotify for @user_data
749  * @user_data: User data passed to @func
750  *
751  * Sets a custom accept-certificate function for checking certificates for
752  * validity. This will directly map to #GTlsConnection 's "accept-certificate"
753  * signal and be performed after the default checks of #GstRTSPConnection
754  * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags)
755  * have failed. If no #GTlsDatabase is set on this connection, only @func will
756  * be called.
757  *
758  * Since: 1.14
759  */
760 void
gst_rtsp_connection_set_accept_certificate_func(GstRTSPConnection * conn,GstRTSPConnectionAcceptCertificateFunc func,gpointer user_data,GDestroyNotify destroy_notify)761 gst_rtsp_connection_set_accept_certificate_func (GstRTSPConnection * conn,
762     GstRTSPConnectionAcceptCertificateFunc func,
763     gpointer user_data, GDestroyNotify destroy_notify)
764 {
765   if (conn->accept_certificate_destroy_notify)
766     conn->
767         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
768   conn->accept_certificate_func = func;
769   conn->accept_certificate_user_data = user_data;
770   conn->accept_certificate_destroy_notify = destroy_notify;
771 }
772 
773 static GstRTSPResult
setup_tunneling(GstRTSPConnection * conn,GTimeVal * timeout,gchar * uri,GstRTSPMessage * response)774 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri,
775     GstRTSPMessage * response)
776 {
777   gint i;
778   GstRTSPResult res;
779   gchar *value;
780   guint16 url_port;
781   GstRTSPMessage *msg;
782   gboolean old_http;
783   GstRTSPUrl *url;
784   GError *error = NULL;
785   GSocketConnection *connection;
786   GSocket *socket;
787   gchar *connection_uri = NULL;
788   gchar *request_uri = NULL;
789   gchar *host = NULL;
790 
791   url = conn->url;
792 
793   gst_rtsp_url_get_port (url, &url_port);
794   host = g_strdup_printf ("%s:%d", url->host, url_port);
795 
796   /* create a random sessionid */
797   for (i = 0; i < TUNNELID_LEN; i++)
798     conn->tunnelid[i] = g_random_int_range ('a', 'z');
799   conn->tunnelid[TUNNELID_LEN - 1] = '\0';
800 
801   /* create the GET request for the read connection */
802   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri),
803       no_message);
804   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
805 
806   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
807       conn->tunnelid);
808   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
809       "application/x-rtsp-tunnelled");
810   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
811   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
812   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
813 
814   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
815    * request from being base64 encoded */
816   conn->tunneled = FALSE;
817   GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed);
818   gst_rtsp_message_free (msg);
819   conn->tunneled = TRUE;
820 
821   /* receive the response to the GET request */
822   /* we need to temporarily set manual_http to TRUE since
823    * gst_rtsp_connection_receive() will treat the HTTP response as a parsing
824    * failure otherwise */
825   old_http = conn->manual_http;
826   conn->manual_http = TRUE;
827   GST_RTSP_CHECK (gst_rtsp_connection_receive (conn, response, timeout),
828       read_failed);
829   conn->manual_http = old_http;
830 
831   if (response->type != GST_RTSP_MESSAGE_HTTP_RESPONSE ||
832       response->type_data.response.code != GST_RTSP_STS_OK)
833     goto wrong_result;
834 
835   if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
836           &value, 0) == GST_RTSP_OK) {
837     g_free (url->host);
838     url->host = g_strdup (value);
839     g_free (conn->remote_ip);
840     conn->remote_ip = g_strdup (value);
841   }
842 
843   connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
844       url->abspath, url->query ? "?" : "", url->query ? url->query : "");
845 
846   /* connect to the host/port */
847   if (conn->proxy_host) {
848     connection = g_socket_client_connect_to_host (conn->client,
849         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
850     request_uri = g_strdup (connection_uri);
851   } else {
852     connection = g_socket_client_connect_to_uri (conn->client,
853         connection_uri, 0, conn->cancellable, &error);
854     request_uri =
855         g_strdup_printf ("%s%s%s", url->abspath,
856         url->query ? "?" : "", url->query ? url->query : "");
857   }
858   if (connection == NULL)
859     goto connect_failed;
860 
861   socket = g_socket_connection_get_socket (connection);
862 
863   /* get remote address */
864   g_free (conn->remote_ip);
865   conn->remote_ip = NULL;
866 
867   if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error))
868     goto remote_address_failed;
869 
870   /* this is now our writing socket */
871   conn->stream1 = G_IO_STREAM (connection);
872   conn->socket1 = socket;
873   conn->write_socket = conn->socket1;
874   conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
875   conn->control_stream = NULL;
876 
877   /* create the POST request for the write connection */
878   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST,
879           request_uri), no_message);
880   msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
881 
882   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
883       conn->tunnelid);
884   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
885       "application/x-rtsp-tunnelled");
886   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
887       "application/x-rtsp-tunnelled");
888   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
889   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
890   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES,
891       "Sun, 9 Jan 1972 00:00:00 GMT");
892   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767");
893   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host);
894 
895   /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
896    * request from being base64 encoded */
897   conn->tunneled = FALSE;
898   GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed);
899   gst_rtsp_message_free (msg);
900   conn->tunneled = TRUE;
901 
902 exit:
903   g_free (connection_uri);
904   g_free (request_uri);
905   g_free (host);
906 
907   return res;
908 
909   /* ERRORS */
910 no_message:
911   {
912     GST_ERROR ("failed to create request (%d)", res);
913     goto exit;
914   }
915 write_failed:
916   {
917     GST_ERROR ("write failed (%d)", res);
918     gst_rtsp_message_free (msg);
919     conn->tunneled = TRUE;
920     goto exit;
921   }
922 read_failed:
923   {
924     GST_ERROR ("read failed (%d)", res);
925     conn->manual_http = FALSE;
926     goto exit;
927   }
928 wrong_result:
929   {
930     GST_ERROR ("got failure response %d %s",
931         response->type_data.response.code, response->type_data.response.reason);
932     res = GST_RTSP_ERROR;
933     goto exit;
934   }
935 connect_failed:
936   {
937     GST_ERROR ("failed to connect: %s", error->message);
938     res = GST_RTSP_ERROR;
939     g_clear_error (&error);
940     goto exit;
941   }
942 remote_address_failed:
943   {
944     GST_ERROR ("failed to resolve address: %s", error->message);
945     g_object_unref (connection);
946     g_clear_error (&error);
947     return GST_RTSP_ERROR;
948   }
949 }
950 
951 /**
952  * gst_rtsp_connection_connect_with_response:
953  * @conn: a #GstRTSPConnection
954  * @timeout: a #GTimeVal timeout
955  * @response: a #GstRTSPMessage
956  *
957  * Attempt to connect to the url of @conn made with
958  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
959  * forever. If @timeout contains a valid timeout, this function will return
960  * #GST_RTSP_ETIMEOUT after the timeout expired.  If @conn is set to tunneled,
961  * @response will contain a response to the tunneling request messages.
962  *
963  * This function can be cancelled with gst_rtsp_connection_flush().
964  *
965  * Returns: #GST_RTSP_OK when a connection could be made.
966  *
967  * Since: 1.8
968  */
969 GstRTSPResult
gst_rtsp_connection_connect_with_response(GstRTSPConnection * conn,GTimeVal * timeout,GstRTSPMessage * response)970 gst_rtsp_connection_connect_with_response (GstRTSPConnection * conn,
971     GTimeVal * timeout, GstRTSPMessage * response)
972 {
973   GstRTSPResult res;
974   GSocketConnection *connection;
975   GSocket *socket;
976   GError *error = NULL;
977   gchar *connection_uri, *request_uri, *remote_ip;
978   GstClockTime to;
979   guint16 url_port;
980   GstRTSPUrl *url;
981 
982   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
983   g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
984   g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL);
985 
986   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
987   g_socket_client_set_timeout (conn->client,
988       (to + GST_SECOND - 1) / GST_SECOND);
989 
990   url = conn->url;
991 
992   gst_rtsp_url_get_port (url, &url_port);
993 
994   if (conn->tunneled) {
995     connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
996         url->abspath, url->query ? "?" : "", url->query ? url->query : "");
997   } else {
998     connection_uri = gst_rtsp_url_get_request_uri (url);
999   }
1000 
1001   if (conn->proxy_host) {
1002     connection = g_socket_client_connect_to_host (conn->client,
1003         conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
1004     request_uri = g_strdup (connection_uri);
1005   } else {
1006     connection = g_socket_client_connect_to_uri (conn->client,
1007         connection_uri, url_port, conn->cancellable, &error);
1008 
1009     /* use the relative component of the uri for non-proxy connections */
1010     request_uri = g_strdup_printf ("%s%s%s", url->abspath,
1011         url->query ? "?" : "", url->query ? url->query : "");
1012   }
1013   if (connection == NULL)
1014     goto connect_failed;
1015 
1016   /* get remote address */
1017   socket = g_socket_connection_get_socket (connection);
1018 
1019   if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
1020     goto remote_address_failed;
1021 
1022   g_free (conn->remote_ip);
1023   conn->remote_ip = remote_ip;
1024   conn->stream0 = G_IO_STREAM (connection);
1025   conn->socket0 = socket;
1026   /* this is our read socket */
1027   conn->read_socket = conn->socket0;
1028   conn->write_socket = conn->socket0;
1029   conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
1030   conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
1031   conn->control_stream = NULL;
1032 
1033   if (conn->tunneled) {
1034     res = setup_tunneling (conn, timeout, request_uri, response);
1035     if (res != GST_RTSP_OK)
1036       goto tunneling_failed;
1037   }
1038   g_free (connection_uri);
1039   g_free (request_uri);
1040 
1041   return GST_RTSP_OK;
1042 
1043   /* ERRORS */
1044 connect_failed:
1045   {
1046     GST_ERROR ("failed to connect: %s", error->message);
1047     g_clear_error (&error);
1048     g_free (connection_uri);
1049     g_free (request_uri);
1050     return GST_RTSP_ERROR;
1051   }
1052 remote_address_failed:
1053   {
1054     GST_ERROR ("failed to connect: %s", error->message);
1055     g_object_unref (connection);
1056     g_clear_error (&error);
1057     g_free (connection_uri);
1058     g_free (request_uri);
1059     return GST_RTSP_ERROR;
1060   }
1061 tunneling_failed:
1062   {
1063     GST_ERROR ("failed to setup tunneling");
1064     g_free (connection_uri);
1065     g_free (request_uri);
1066     return res;
1067   }
1068 }
1069 
1070 static void
add_auth_header(GstRTSPConnection * conn,GstRTSPMessage * message)1071 add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
1072 {
1073   switch (conn->auth_method) {
1074     case GST_RTSP_AUTH_BASIC:{
1075       gchar *user_pass;
1076       gchar *user_pass64;
1077       gchar *auth_string;
1078 
1079       if (conn->username == NULL || conn->passwd == NULL)
1080         break;
1081 
1082       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
1083       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
1084       auth_string = g_strdup_printf ("Basic %s", user_pass64);
1085 
1086       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1087           auth_string);
1088 
1089       g_free (user_pass);
1090       g_free (user_pass64);
1091       break;
1092     }
1093     case GST_RTSP_AUTH_DIGEST:{
1094       gchar *response;
1095       gchar *auth_string, *auth_string2;
1096       gchar *realm;
1097       gchar *nonce;
1098       gchar *opaque;
1099       const gchar *uri;
1100       const gchar *method;
1101 
1102       /* we need to have some params set */
1103       if (conn->auth_params == NULL || conn->username == NULL ||
1104           conn->passwd == NULL)
1105         break;
1106 
1107       /* we need the realm and nonce */
1108       realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
1109       nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
1110       if (realm == NULL || nonce == NULL)
1111         break;
1112 
1113       method = gst_rtsp_method_as_text (message->type_data.request.method);
1114       uri = message->type_data.request.uri;
1115 
1116       response =
1117           gst_rtsp_generate_digest_auth_response (NULL, method, realm,
1118           conn->username, conn->passwd, uri, nonce);
1119       auth_string =
1120           g_strdup_printf ("Digest username=\"%s\", "
1121           "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
1122           conn->username, realm, nonce, uri, response);
1123       g_free (response);
1124 
1125       opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
1126       if (opaque) {
1127         auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
1128             opaque);
1129         g_free (auth_string);
1130         auth_string = auth_string2;
1131       }
1132       /* Do not keep any old Authorization headers */
1133       gst_rtsp_message_remove_header (message, GST_RTSP_HDR_AUTHORIZATION, -1);
1134       gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
1135           auth_string);
1136       break;
1137     }
1138     default:
1139       /* Nothing to do */
1140       break;
1141   }
1142 }
1143 
1144 /**
1145  * gst_rtsp_connection_connect:
1146  * @conn: a #GstRTSPConnection
1147  * @timeout: a #GTimeVal timeout
1148  *
1149  * Attempt to connect to the url of @conn made with
1150  * gst_rtsp_connection_create(). If @timeout is %NULL this function can block
1151  * forever. If @timeout contains a valid timeout, this function will return
1152  * #GST_RTSP_ETIMEOUT after the timeout expired.
1153  *
1154  * This function can be cancelled with gst_rtsp_connection_flush().
1155  *
1156  * Returns: #GST_RTSP_OK when a connection could be made.
1157  */
1158 GstRTSPResult
gst_rtsp_connection_connect(GstRTSPConnection * conn,GTimeVal * timeout)1159 gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
1160 {
1161   GstRTSPResult result;
1162   GstRTSPMessage response;
1163 
1164   memset (&response, 0, sizeof (response));
1165   gst_rtsp_message_init (&response);
1166 
1167   result = gst_rtsp_connection_connect_with_response (conn, timeout, &response);
1168 
1169   gst_rtsp_message_unset (&response);
1170 
1171   return result;
1172 }
1173 
1174 static void
gen_date_string(gchar * date_string,guint len)1175 gen_date_string (gchar * date_string, guint len)
1176 {
1177   static const char wkdays[7][4] =
1178       { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
1179   static const char months[12][4] =
1180       { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
1181     "Nov", "Dec"
1182   };
1183   struct tm tm;
1184   time_t t;
1185 
1186   time (&t);
1187 
1188 #ifdef HAVE_GMTIME_R
1189   gmtime_r (&t, &tm);
1190 #else
1191   tm = *gmtime (&t);
1192 #endif
1193 
1194   g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
1195       wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
1196       tm.tm_hour, tm.tm_min, tm.tm_sec);
1197 }
1198 
1199 static GstRTSPResult
write_bytes(GOutputStream * stream,const guint8 * buffer,guint * idx,guint size,gboolean block,GCancellable * cancellable)1200 write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
1201     guint size, gboolean block, GCancellable * cancellable)
1202 {
1203   guint left;
1204   gssize r;
1205   GError *err = NULL;
1206 
1207   if (G_UNLIKELY (*idx > size))
1208     return GST_RTSP_ERROR;
1209 
1210   left = size - *idx;
1211 
1212   while (left) {
1213     if (block)
1214       r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
1215           cancellable, &err);
1216     else
1217       r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
1218           (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
1219     if (G_UNLIKELY (r < 0))
1220       goto error;
1221 
1222     left -= r;
1223     *idx += r;
1224   }
1225   return GST_RTSP_OK;
1226 
1227   /* ERRORS */
1228 error:
1229   {
1230     if (G_UNLIKELY (r == 0))
1231       return GST_RTSP_EEOF;
1232 
1233     if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1234       GST_WARNING ("%s", err->message);
1235     else
1236       GST_DEBUG ("%s", err->message);
1237 
1238     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1239       g_clear_error (&err);
1240       return GST_RTSP_EINTR;
1241     } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1242       g_clear_error (&err);
1243       return GST_RTSP_EINTR;
1244     } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1245       g_clear_error (&err);
1246       return GST_RTSP_ETIMEOUT;
1247     }
1248     g_clear_error (&err);
1249     return GST_RTSP_ESYS;
1250   }
1251 }
1252 
1253 /* NOTE: This changes the values of vectors if multiple iterations are needed! */
1254 #if GLIB_CHECK_VERSION(2, 59, 1)
1255 static GstRTSPResult
writev_bytes(GOutputStream * stream,GOutputVector * vectors,gint n_vectors,gsize * bytes_written,gboolean block,GCancellable * cancellable)1256 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1257     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1258 {
1259   gsize _bytes_written = 0;
1260   gsize written;
1261   GError *err = NULL;
1262   GPollableReturn res = G_POLLABLE_RETURN_OK;
1263 
1264   while (n_vectors > 0) {
1265     if (block) {
1266       if (G_UNLIKELY (!g_output_stream_writev (stream, vectors, n_vectors,
1267                   &written, cancellable, &err))) {
1268         /* This will never return G_IO_ERROR_WOULD_BLOCK */
1269         res = G_POLLABLE_RETURN_FAILED;
1270         goto error;
1271       }
1272     } else {
1273       res =
1274           g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
1275           (stream), vectors, n_vectors, &written, cancellable, &err);
1276 
1277       if (res != G_POLLABLE_RETURN_OK) {
1278         g_assert (written == 0);
1279         goto error;
1280       }
1281     }
1282     _bytes_written += written;
1283 
1284     /* skip vectors that have been written in full */
1285     while (written > 0 && written >= vectors[0].size) {
1286       written -= vectors[0].size;
1287       ++vectors;
1288       --n_vectors;
1289     }
1290 
1291     /* skip partially written vector data */
1292     if (written > 0) {
1293       vectors[0].size -= written;
1294       vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
1295     }
1296   }
1297 
1298   *bytes_written = _bytes_written;
1299 
1300   return GST_RTSP_OK;
1301 
1302   /* ERRORS */
1303 error:
1304   {
1305     *bytes_written = _bytes_written;
1306 
1307     if (err)
1308       GST_WARNING ("%s", err->message);
1309     if (res == G_POLLABLE_RETURN_WOULD_BLOCK) {
1310       g_assert (!err);
1311       return GST_RTSP_EINTR;
1312     } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1313       g_clear_error (&err);
1314       return GST_RTSP_EINTR;
1315     } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1316       g_clear_error (&err);
1317       return GST_RTSP_ETIMEOUT;
1318     } else if (G_UNLIKELY (written == 0)) {
1319       g_clear_error (&err);
1320       return GST_RTSP_EEOF;
1321     }
1322 
1323     g_clear_error (&err);
1324     return GST_RTSP_ESYS;
1325   }
1326 }
1327 #else
1328 static GstRTSPResult
writev_bytes(GOutputStream * stream,GOutputVector * vectors,gint n_vectors,gsize * bytes_written,gboolean block,GCancellable * cancellable)1329 writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
1330     gsize * bytes_written, gboolean block, GCancellable * cancellable)
1331 {
1332   gsize _bytes_written = 0;
1333   guint written;
1334   gint i;
1335   GstRTSPResult res = GST_RTSP_OK;
1336 
1337   for (i = 0; i < n_vectors; i++) {
1338     written = 0;
1339     res =
1340         write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
1341         block, cancellable);
1342     _bytes_written += written;
1343     if (G_UNLIKELY (res != GST_RTSP_OK))
1344       break;
1345   }
1346 
1347   *bytes_written = _bytes_written;
1348 
1349   return res;
1350 }
1351 #endif
1352 
1353 static gint
fill_raw_bytes(GstRTSPConnection * conn,guint8 * buffer,guint size,gboolean block,GError ** err)1354 fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1355     gboolean block, GError ** err)
1356 {
1357   gint out = 0;
1358 
1359   if (G_UNLIKELY (conn->initial_buffer != NULL)) {
1360     gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
1361 
1362     out = MIN (left, size);
1363     memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
1364 
1365     if (left == (gsize) out) {
1366       g_free (conn->initial_buffer);
1367       conn->initial_buffer = NULL;
1368       conn->initial_buffer_offset = 0;
1369     } else
1370       conn->initial_buffer_offset += out;
1371   }
1372 
1373   if (G_LIKELY (size > (guint) out)) {
1374     gssize r;
1375     gsize count = size - out;
1376     if (block)
1377       r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
1378           count, conn->may_cancel ? conn->cancellable : NULL, err);
1379     else
1380       r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
1381           (conn->input_stream), (gchar *) & buffer[out], count,
1382           conn->may_cancel ? conn->cancellable : NULL, err);
1383 
1384     if (G_UNLIKELY (r < 0)) {
1385       if (out == 0) {
1386         /* propagate the error */
1387         out = r;
1388       } else {
1389         /* we have some data ignore error */
1390         g_clear_error (err);
1391       }
1392     } else
1393       out += r;
1394   }
1395 
1396   return out;
1397 }
1398 
1399 static gint
fill_bytes(GstRTSPConnection * conn,guint8 * buffer,guint size,gboolean block,GError ** err)1400 fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
1401     gboolean block, GError ** err)
1402 {
1403   DecodeCtx *ctx = conn->ctxp;
1404   gint out = 0;
1405 
1406   if (ctx) {
1407     while (size > 0) {
1408       guint8 in[sizeof (ctx->out) * 4 / 3];
1409       gint r;
1410 
1411       while (size > 0 && ctx->cout < ctx->coutl) {
1412         /* we have some leftover bytes */
1413         *buffer++ = ctx->out[ctx->cout++];
1414         size--;
1415         out++;
1416       }
1417 
1418       /* got what we needed? */
1419       if (size == 0)
1420         break;
1421 
1422       /* try to read more bytes */
1423       r = fill_raw_bytes (conn, in, sizeof (in), block, err);
1424       if (r <= 0) {
1425         if (out == 0) {
1426           out = r;
1427         } else {
1428           /* we have some data ignore error */
1429           g_clear_error (err);
1430         }
1431         break;
1432       }
1433 
1434       ctx->cout = 0;
1435       ctx->coutl =
1436           g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
1437           &ctx->save);
1438     }
1439   } else {
1440     out = fill_raw_bytes (conn, buffer, size, block, err);
1441   }
1442 
1443   return out;
1444 }
1445 
1446 static GstRTSPResult
read_bytes(GstRTSPConnection * conn,guint8 * buffer,guint * idx,guint size,gboolean block)1447 read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1448     gboolean block)
1449 {
1450   guint left;
1451   gint r;
1452   GError *err = NULL;
1453 
1454   if (G_UNLIKELY (*idx > size))
1455     return GST_RTSP_ERROR;
1456 
1457   left = size - *idx;
1458 
1459   while (left) {
1460     r = fill_bytes (conn, &buffer[*idx], left, block, &err);
1461     if (G_UNLIKELY (r <= 0))
1462       goto error;
1463 
1464     left -= r;
1465     *idx += r;
1466   }
1467   return GST_RTSP_OK;
1468 
1469   /* ERRORS */
1470 error:
1471   {
1472     if (G_UNLIKELY (r == 0))
1473       return GST_RTSP_EEOF;
1474 
1475     GST_DEBUG ("%s", err->message);
1476     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1477       g_clear_error (&err);
1478       return GST_RTSP_EINTR;
1479     } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1480       g_clear_error (&err);
1481       return GST_RTSP_EINTR;
1482     } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1483       g_clear_error (&err);
1484       return GST_RTSP_ETIMEOUT;
1485     }
1486     g_clear_error (&err);
1487     return GST_RTSP_ESYS;
1488   }
1489 }
1490 
1491 /* The code below tries to handle clients using \r, \n or \r\n to indicate the
1492  * end of a line. It even does its best to handle clients which mix them (even
1493  * though this is a really stupid idea (tm).) It also handles Line White Space
1494  * (LWS), where a line end followed by whitespace is considered LWS. This is
1495  * the method used in RTSP (and HTTP) to break long lines.
1496  */
1497 static GstRTSPResult
read_line(GstRTSPConnection * conn,guint8 * buffer,guint * idx,guint size,gboolean block)1498 read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
1499     gboolean block)
1500 {
1501   GstRTSPResult res;
1502 
1503   while (TRUE) {
1504     guint8 c;
1505     guint i;
1506 
1507     if (conn->read_ahead == READ_AHEAD_EOH) {
1508       /* the last call to read_line() already determined that we have reached
1509        * the end of the headers, so convey that information now */
1510       conn->read_ahead = 0;
1511       break;
1512     } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1513       /* the last call to read_line() left off after having read \r\n */
1514       c = '\n';
1515     } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1516       /* the last call to read_line() left off after having read \r\n\r */
1517       c = '\r';
1518     } else if (conn->read_ahead != 0) {
1519       /* the last call to read_line() left us with a character to start with */
1520       c = (guint8) conn->read_ahead;
1521       conn->read_ahead = 0;
1522     } else {
1523       /* read the next character */
1524       i = 0;
1525       res = read_bytes (conn, &c, &i, 1, block);
1526       if (G_UNLIKELY (res != GST_RTSP_OK))
1527         return res;
1528     }
1529 
1530     /* special treatment of line endings */
1531     if (c == '\r' || c == '\n') {
1532       guint8 read_ahead;
1533 
1534     retry:
1535       /* need to read ahead one more character to know what to do... */
1536       i = 0;
1537       res = read_bytes (conn, &read_ahead, &i, 1, block);
1538       if (G_UNLIKELY (res != GST_RTSP_OK))
1539         return res;
1540 
1541       if (read_ahead == ' ' || read_ahead == '\t') {
1542         if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1543           /* got \r\n\r followed by whitespace, treat it as a normal line
1544            * followed by one starting with LWS */
1545           conn->read_ahead = read_ahead;
1546           break;
1547         } else {
1548           /* got LWS, change the line ending to a space and continue */
1549           c = ' ';
1550           conn->read_ahead = read_ahead;
1551         }
1552       } else if (conn->read_ahead == READ_AHEAD_CRLFCR) {
1553         if (read_ahead == '\r' || read_ahead == '\n') {
1554           /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */
1555           conn->read_ahead = READ_AHEAD_EOH;
1556           break;
1557         } else {
1558           /* got \r\n\r followed by something else, this is not really
1559            * supported since we have probably just eaten the first character
1560            * of the body or the next message, so just ignore the second \r
1561            * and live with it... */
1562           conn->read_ahead = read_ahead;
1563           break;
1564         }
1565       } else if (conn->read_ahead == READ_AHEAD_CRLF) {
1566         if (read_ahead == '\r') {
1567           /* got \r\n\r so far, need one more character... */
1568           conn->read_ahead = READ_AHEAD_CRLFCR;
1569           goto retry;
1570         } else if (read_ahead == '\n') {
1571           /* got \r\n\n, treat it as the end of the headers */
1572           conn->read_ahead = READ_AHEAD_EOH;
1573           break;
1574         } else {
1575           /* found the end of a line, keep read_ahead for the next line */
1576           conn->read_ahead = read_ahead;
1577           break;
1578         }
1579       } else if (c == read_ahead) {
1580         /* got double \r or \n, treat it as the end of the headers */
1581         conn->read_ahead = READ_AHEAD_EOH;
1582         break;
1583       } else if (c == '\r' && read_ahead == '\n') {
1584         /* got \r\n so far, still need more to know what to do... */
1585         conn->read_ahead = READ_AHEAD_CRLF;
1586         goto retry;
1587       } else {
1588         /* found the end of a line, keep read_ahead for the next line */
1589         conn->read_ahead = read_ahead;
1590         break;
1591       }
1592     }
1593 
1594     if (G_LIKELY (*idx < size - 1))
1595       buffer[(*idx)++] = c;
1596   }
1597   buffer[*idx] = '\0';
1598 
1599   return GST_RTSP_OK;
1600 }
1601 
1602 /**
1603  * gst_rtsp_connection_write:
1604  * @conn: a #GstRTSPConnection
1605  * @data: the data to write
1606  * @size: the size of @data
1607  * @timeout: a timeout value or %NULL
1608  *
1609  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
1610  * the specified @timeout. @timeout can be %NULL, in which case this function
1611  * might block forever.
1612  *
1613  * This function can be cancelled with gst_rtsp_connection_flush().
1614  *
1615  * Returns: #GST_RTSP_OK on success.
1616  */
1617 /* FIXME 2.0: This should've been static! */
1618 GstRTSPResult
gst_rtsp_connection_write(GstRTSPConnection * conn,const guint8 * data,guint size,GTimeVal * timeout)1619 gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
1620     guint size, GTimeVal * timeout)
1621 {
1622   guint offset;
1623   GstClockTime to;
1624   GstRTSPResult res;
1625 
1626   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1627   g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
1628   g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
1629 
1630   offset = 0;
1631 
1632   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
1633 
1634   g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1635   res =
1636       write_bytes (conn->output_stream, data, &offset, size, TRUE,
1637       conn->cancellable);
1638   g_socket_set_timeout (conn->write_socket, 0);
1639 
1640   return res;
1641 }
1642 
1643 static gboolean
serialize_message(GstRTSPConnection * conn,GstRTSPMessage * message,GstRTSPSerializedMessage * serialized_message)1644 serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
1645     GstRTSPSerializedMessage * serialized_message)
1646 {
1647   GString *str = NULL;
1648 
1649   memset (serialized_message, 0, sizeof (*serialized_message));
1650 
1651   /* Initially we borrow the body_data / body_buffer fields from
1652    * the message */
1653   serialized_message->borrowed = TRUE;
1654 
1655   switch (message->type) {
1656     case GST_RTSP_MESSAGE_REQUEST:
1657       str = g_string_new ("");
1658 
1659       /* create request string, add CSeq */
1660       g_string_append_printf (str, "%s %s RTSP/%s\r\n"
1661           "CSeq: %d\r\n",
1662           gst_rtsp_method_as_text (message->type_data.request.method),
1663           message->type_data.request.uri,
1664           gst_rtsp_version_as_text (message->type_data.request.version),
1665           conn->cseq++);
1666       /* add session id if we have one */
1667       if (conn->session_id[0] != '\0') {
1668         gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1);
1669         gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
1670             conn->session_id);
1671       }
1672       /* add any authentication headers */
1673       add_auth_header (conn, message);
1674       break;
1675     case GST_RTSP_MESSAGE_RESPONSE:
1676       str = g_string_new ("");
1677 
1678       /* create response string */
1679       g_string_append_printf (str, "RTSP/%s %d %s\r\n",
1680           gst_rtsp_version_as_text (message->type_data.response.version),
1681           message->type_data.response.code, message->type_data.response.reason);
1682       break;
1683     case GST_RTSP_MESSAGE_HTTP_REQUEST:
1684       str = g_string_new ("");
1685 
1686       /* create request string */
1687       g_string_append_printf (str, "%s %s HTTP/%s\r\n",
1688           gst_rtsp_method_as_text (message->type_data.request.method),
1689           message->type_data.request.uri,
1690           gst_rtsp_version_as_text (message->type_data.request.version));
1691       /* add any authentication headers */
1692       add_auth_header (conn, message);
1693       break;
1694     case GST_RTSP_MESSAGE_HTTP_RESPONSE:
1695       str = g_string_new ("");
1696 
1697       /* create response string */
1698       g_string_append_printf (str, "HTTP/%s %d %s\r\n",
1699           gst_rtsp_version_as_text (message->type_data.request.version),
1700           message->type_data.response.code, message->type_data.response.reason);
1701       break;
1702     case GST_RTSP_MESSAGE_DATA:
1703     {
1704       guint8 *data_header = serialized_message->data_header;
1705 
1706       /* prepare data header */
1707       data_header[0] = '$';
1708       data_header[1] = message->type_data.data.channel;
1709       data_header[2] = (message->body_size >> 8) & 0xff;
1710       data_header[3] = message->body_size & 0xff;
1711 
1712       /* create serialized message with header and data */
1713       serialized_message->data_is_data_header = TRUE;
1714       serialized_message->data_size = 4;
1715 
1716       if (message->body) {
1717         serialized_message->body_data = message->body;
1718         serialized_message->body_data_size = message->body_size;
1719       } else {
1720         g_assert (message->body_buffer != NULL);
1721         serialized_message->body_buffer = message->body_buffer;
1722       }
1723       break;
1724     }
1725     default:
1726       g_string_free (str, TRUE);
1727       g_return_val_if_reached (FALSE);
1728       break;
1729   }
1730 
1731   /* append headers and body */
1732   if (message->type != GST_RTSP_MESSAGE_DATA) {
1733     gchar date_string[100];
1734 
1735     g_assert (str != NULL);
1736 
1737     gen_date_string (date_string, sizeof (date_string));
1738 
1739     /* add date header */
1740     gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1);
1741     gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
1742 
1743     /* append headers */
1744     gst_rtsp_message_append_headers (message, str);
1745 
1746     /* append Content-Length and body if needed */
1747     if (message->body_size > 0) {
1748       gchar *len;
1749 
1750       len = g_strdup_printf ("%d", message->body_size);
1751       g_string_append_printf (str, "%s: %s\r\n",
1752           gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
1753       g_free (len);
1754       /* header ends here */
1755       g_string_append (str, "\r\n");
1756 
1757       if (message->body) {
1758         serialized_message->body_data = message->body;
1759         serialized_message->body_data_size = message->body_size;
1760       } else {
1761         g_assert (message->body_buffer != NULL);
1762         serialized_message->body_buffer = message->body_buffer;
1763       }
1764     } else {
1765       /* just end headers */
1766       g_string_append (str, "\r\n");
1767     }
1768 
1769     serialized_message->data_size = str->len;
1770     serialized_message->data = (guint8 *) g_string_free (str, FALSE);
1771   }
1772 
1773   return TRUE;
1774 }
1775 
1776 /**
1777  * gst_rtsp_connection_send:
1778  * @conn: a #GstRTSPConnection
1779  * @message: the message to send
1780  * @timeout: a timeout value or %NULL
1781  *
1782  * Attempt to send @message to the connected @conn, blocking up to
1783  * the specified @timeout. @timeout can be %NULL, in which case this function
1784  * might block forever.
1785  *
1786  * This function can be cancelled with gst_rtsp_connection_flush().
1787  *
1788  * Returns: #GST_RTSP_OK on success.
1789  */
1790 GstRTSPResult
gst_rtsp_connection_send(GstRTSPConnection * conn,GstRTSPMessage * message,GTimeVal * timeout)1791 gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
1792     GTimeVal * timeout)
1793 {
1794   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1795   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
1796 
1797   return gst_rtsp_connection_send_messages (conn, message, 1, timeout);
1798 }
1799 
1800 /**
1801  * gst_rtsp_connection_send_messages:
1802  * @conn: a #GstRTSPConnection
1803  * @messages: (array length=n_messages): the messages to send
1804  * @n_messages: the number of messages to send
1805  * @timeout: a timeout value or %NULL
1806  *
1807  * Attempt to send @messages to the connected @conn, blocking up to
1808  * the specified @timeout. @timeout can be %NULL, in which case this function
1809  * might block forever.
1810  *
1811  * This function can be cancelled with gst_rtsp_connection_flush().
1812  *
1813  * Returns: #GST_RTSP_OK on success.
1814  *
1815  * Since: 1.16
1816  */
1817 GstRTSPResult
gst_rtsp_connection_send_messages(GstRTSPConnection * conn,GstRTSPMessage * messages,guint n_messages,GTimeVal * timeout)1818 gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
1819     GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
1820 {
1821   GstClockTime to;
1822   GstRTSPResult res;
1823   GstRTSPSerializedMessage *serialized_messages;
1824   GOutputVector *vectors;
1825   GstMapInfo *map_infos;
1826   guint n_vectors, n_memories;
1827   gint i, j, k;
1828   gsize bytes_to_write, bytes_written;
1829 
1830   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
1831   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
1832 
1833   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
1834   memset (serialized_messages, 0,
1835       sizeof (GstRTSPSerializedMessage) * n_messages);
1836 
1837   for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
1838       i++) {
1839     if (G_UNLIKELY (!serialize_message (conn, &messages[i],
1840                 &serialized_messages[i])))
1841       goto no_message;
1842 
1843     if (conn->tunneled) {
1844       gint state = 0, save = 0;
1845       gchar *base64_buffer, *out_buffer;
1846       gsize written = 0;
1847       gsize in_length;
1848 
1849       in_length = serialized_messages[i].data_size;
1850       if (serialized_messages[i].body_data)
1851         in_length += serialized_messages[i].body_data_size;
1852       else if (serialized_messages[i].body_buffer)
1853         in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
1854 
1855       in_length = (in_length / 3 + 1) * 4 + 4 + 1;
1856       base64_buffer = out_buffer = g_malloc0 (in_length);
1857 
1858       written =
1859           g_base64_encode_step (serialized_messages[i].data_is_data_header ?
1860           serialized_messages[i].data_header : serialized_messages[i].data,
1861           serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
1862       out_buffer += written;
1863 
1864       if (serialized_messages[i].body_data) {
1865         written =
1866             g_base64_encode_step (serialized_messages[i].body_data,
1867             serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
1868             &save);
1869         out_buffer += written;
1870       } else if (serialized_messages[i].body_buffer) {
1871         guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1872 
1873         for (j = 0; j < n; j++) {
1874           GstMemory *mem =
1875               gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
1876           GstMapInfo map;
1877 
1878           gst_memory_map (mem, &map, GST_MAP_READ);
1879 
1880           written = g_base64_encode_step (map.data, map.size,
1881               FALSE, out_buffer, &state, &save);
1882           out_buffer += written;
1883 
1884           gst_memory_unmap (mem, &map);
1885         }
1886       }
1887 
1888       written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
1889       out_buffer += written;
1890 
1891       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1892       memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
1893 
1894       serialized_messages[i].data = (guint8 *) base64_buffer;
1895       serialized_messages[i].data_size = (out_buffer - base64_buffer) + 1;
1896       n_vectors++;
1897     } else {
1898       n_vectors++;
1899       if (serialized_messages[i].body_data) {
1900         n_vectors++;
1901       } else if (serialized_messages[i].body_buffer) {
1902         n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1903         n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
1904       }
1905     }
1906   }
1907 
1908   vectors = g_newa (GOutputVector, n_vectors);
1909   map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
1910 
1911   for (i = 0, j = 0, k = 0; i < n_messages; i++) {
1912     vectors[j].buffer = serialized_messages[i].data_is_data_header ?
1913         serialized_messages[i].data_header : serialized_messages[i].data;
1914     vectors[j].size = serialized_messages[i].data_size;
1915     bytes_to_write += vectors[j].size;
1916     j++;
1917 
1918     if (serialized_messages[i].body_data) {
1919       vectors[j].buffer = serialized_messages[i].body_data;
1920       vectors[j].size = serialized_messages[i].body_data_size;
1921       bytes_to_write += vectors[j].size;
1922       j++;
1923     } else if (serialized_messages[i].body_buffer) {
1924       gint l, n;
1925 
1926       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1927       for (l = 0; l < n; l++) {
1928         GstMemory *mem =
1929             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1930 
1931         gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
1932         vectors[j].buffer = map_infos[k].data;
1933         vectors[j].size = map_infos[k].size;
1934         bytes_to_write += vectors[j].size;
1935 
1936         k++;
1937         j++;
1938       }
1939     }
1940   }
1941 
1942   /* write request: this is synchronous */
1943   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
1944 
1945   g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
1946   res =
1947       writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
1948       TRUE, conn->cancellable);
1949   g_socket_set_timeout (conn->write_socket, 0);
1950 
1951   g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
1952 
1953   /* free everything */
1954   for (i = 0, k = 0; i < n_messages; i++) {
1955     if (serialized_messages[i].body_buffer) {
1956       gint l, n;
1957 
1958       n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
1959       for (l = 0; l < n; l++) {
1960         GstMemory *mem =
1961             gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
1962 
1963         gst_memory_unmap (mem, &map_infos[k]);
1964         k++;
1965       }
1966     }
1967 
1968     g_free (serialized_messages[i].data);
1969   }
1970 
1971   return res;
1972 
1973 no_message:
1974   {
1975     for (i = 0; i < n_messages; i++) {
1976       gst_rtsp_serialized_message_clear (&serialized_messages[i]);
1977     }
1978     g_warning ("Wrong message");
1979     return GST_RTSP_EINVAL;
1980   }
1981 }
1982 
1983 static GstRTSPResult
parse_string(gchar * dest,gint size,gchar ** src)1984 parse_string (gchar * dest, gint size, gchar ** src)
1985 {
1986   GstRTSPResult res = GST_RTSP_OK;
1987   gint idx;
1988 
1989   idx = 0;
1990   /* skip spaces */
1991   while (g_ascii_isspace (**src))
1992     (*src)++;
1993 
1994   while (!g_ascii_isspace (**src) && **src != '\0') {
1995     if (idx < size - 1)
1996       dest[idx++] = **src;
1997     else
1998       res = GST_RTSP_EPARSE;
1999     (*src)++;
2000   }
2001   if (size > 0)
2002     dest[idx] = '\0';
2003 
2004   return res;
2005 }
2006 
2007 static GstRTSPResult
parse_protocol_version(gchar * protocol,GstRTSPMsgType * type,GstRTSPVersion * version)2008 parse_protocol_version (gchar * protocol, GstRTSPMsgType * type,
2009     GstRTSPVersion * version)
2010 {
2011   GstRTSPVersion rversion;
2012   GstRTSPResult res = GST_RTSP_OK;
2013   gchar *ver;
2014 
2015   if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) {
2016     guint major;
2017     guint minor;
2018     gchar dummychar;
2019 
2020     *ver++ = '\0';
2021 
2022     /* the version number must be formatted as X.Y with nothing following */
2023     if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2)
2024       res = GST_RTSP_EPARSE;
2025 
2026     rversion = major * 0x10 + minor;
2027     if (g_ascii_strcasecmp (protocol, "RTSP") == 0) {
2028 
2029       if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_2_0) {
2030         *version = GST_RTSP_VERSION_INVALID;
2031         res = GST_RTSP_ERROR;
2032       }
2033     } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) {
2034       if (*type == GST_RTSP_MESSAGE_REQUEST)
2035         *type = GST_RTSP_MESSAGE_HTTP_REQUEST;
2036       else if (*type == GST_RTSP_MESSAGE_RESPONSE)
2037         *type = GST_RTSP_MESSAGE_HTTP_RESPONSE;
2038 
2039       if (rversion != GST_RTSP_VERSION_1_0 &&
2040           rversion != GST_RTSP_VERSION_1_1 && rversion != GST_RTSP_VERSION_2_0)
2041         res = GST_RTSP_ERROR;
2042     } else
2043       res = GST_RTSP_EPARSE;
2044   } else
2045     res = GST_RTSP_EPARSE;
2046 
2047   if (res == GST_RTSP_OK)
2048     *version = rversion;
2049 
2050   return res;
2051 }
2052 
2053 static GstRTSPResult
parse_response_status(guint8 * buffer,GstRTSPMessage * msg)2054 parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
2055 {
2056   GstRTSPResult res = GST_RTSP_OK;
2057   GstRTSPResult res2;
2058   gchar versionstr[20];
2059   gchar codestr[4];
2060   gint code;
2061   gchar *bptr;
2062 
2063   bptr = (gchar *) buffer;
2064 
2065   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2066     res = GST_RTSP_EPARSE;
2067 
2068   if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK)
2069     res = GST_RTSP_EPARSE;
2070   code = atoi (codestr);
2071   if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600))
2072     res = GST_RTSP_EPARSE;
2073 
2074   while (g_ascii_isspace (*bptr))
2075     bptr++;
2076 
2077   if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr,
2078               NULL) != GST_RTSP_OK))
2079     res = GST_RTSP_EPARSE;
2080 
2081   res2 = parse_protocol_version (versionstr, &msg->type,
2082       &msg->type_data.response.version);
2083   if (G_LIKELY (res == GST_RTSP_OK))
2084     res = res2;
2085 
2086   return res;
2087 }
2088 
2089 static GstRTSPResult
parse_request_line(guint8 * buffer,GstRTSPMessage * msg)2090 parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
2091 {
2092   GstRTSPResult res = GST_RTSP_OK;
2093   GstRTSPResult res2;
2094   gchar versionstr[20];
2095   gchar methodstr[20];
2096   gchar urlstr[4096];
2097   gchar *bptr;
2098   GstRTSPMethod method;
2099 
2100   bptr = (gchar *) buffer;
2101 
2102   if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK)
2103     res = GST_RTSP_EPARSE;
2104   method = gst_rtsp_find_method (methodstr);
2105 
2106   if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK)
2107     res = GST_RTSP_EPARSE;
2108   if (G_UNLIKELY (*urlstr == '\0'))
2109     res = GST_RTSP_EPARSE;
2110 
2111   if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK)
2112     res = GST_RTSP_EPARSE;
2113 
2114   if (G_UNLIKELY (*bptr != '\0'))
2115     res = GST_RTSP_EPARSE;
2116 
2117   if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method,
2118               urlstr) != GST_RTSP_OK))
2119     res = GST_RTSP_EPARSE;
2120 
2121   res2 = parse_protocol_version (versionstr, &msg->type,
2122       &msg->type_data.request.version);
2123   if (G_LIKELY (res == GST_RTSP_OK))
2124     res = res2;
2125 
2126   if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) {
2127     /* GET and POST are not allowed as RTSP methods */
2128     if (msg->type_data.request.method == GST_RTSP_GET ||
2129         msg->type_data.request.method == GST_RTSP_POST) {
2130       msg->type_data.request.method = GST_RTSP_INVALID;
2131       if (res == GST_RTSP_OK)
2132         res = GST_RTSP_ERROR;
2133     }
2134   } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2135     /* only GET and POST are allowed as HTTP methods */
2136     if (msg->type_data.request.method != GST_RTSP_GET &&
2137         msg->type_data.request.method != GST_RTSP_POST) {
2138       msg->type_data.request.method = GST_RTSP_INVALID;
2139       if (res == GST_RTSP_OK)
2140         res = GST_RTSP_ERROR;
2141     }
2142   }
2143 
2144   return res;
2145 }
2146 
2147 /* parsing lines means reading a Key: Value pair */
2148 static GstRTSPResult
parse_line(guint8 * buffer,GstRTSPMessage * msg)2149 parse_line (guint8 * buffer, GstRTSPMessage * msg)
2150 {
2151   GstRTSPHeaderField field;
2152   gchar *line = (gchar *) buffer;
2153   gchar *field_name = NULL;
2154   gchar *value;
2155 
2156   if ((value = strchr (line, ':')) == NULL || value == line)
2157     goto parse_error;
2158 
2159   /* trim space before the colon */
2160   if (value[-1] == ' ')
2161     value[-1] = '\0';
2162 
2163   /* replace the colon with a NUL */
2164   *value++ = '\0';
2165 
2166   /* find the header */
2167   field = gst_rtsp_find_header_field (line);
2168   /* custom header not present in the list of pre-defined headers */
2169   if (field == GST_RTSP_HDR_INVALID)
2170     field_name = line;
2171 
2172   /* split up the value in multiple key:value pairs if it contains comma(s) */
2173   while (*value != '\0') {
2174     gchar *next_value;
2175     gchar *comma = NULL;
2176     gboolean quoted = FALSE;
2177     guint comment = 0;
2178 
2179     /* trim leading space */
2180     if (*value == ' ')
2181       value++;
2182 
2183     /* for headers which may not appear multiple times, and thus may not
2184      * contain multiple values on the same line, we can short-circuit the loop
2185      * below and the entire value results in just one key:value pair*/
2186     if (!gst_rtsp_header_allow_multiple (field))
2187       next_value = value + strlen (value);
2188     else
2189       next_value = value;
2190 
2191     /* find the next value, taking special care of quotes and comments */
2192     while (*next_value != '\0') {
2193       if ((quoted || comment != 0) && *next_value == '\\' &&
2194           next_value[1] != '\0')
2195         next_value++;
2196       else if (comment == 0 && *next_value == '"')
2197         quoted = !quoted;
2198       else if (!quoted && *next_value == '(')
2199         comment++;
2200       else if (comment != 0 && *next_value == ')')
2201         comment--;
2202       else if (!quoted && comment == 0) {
2203         /* To quote RFC 2068: "User agents MUST take special care in parsing
2204          * the WWW-Authenticate field value if it contains more than one
2205          * challenge, or if more than one WWW-Authenticate header field is
2206          * provided, since the contents of a challenge may itself contain a
2207          * comma-separated list of authentication parameters."
2208          *
2209          * What this means is that we cannot just look for an unquoted comma
2210          * when looking for multiple values in Proxy-Authenticate and
2211          * WWW-Authenticate headers. Instead we need to look for the sequence
2212          * "comma [space] token space token" before we can split after the
2213          * comma...
2214          */
2215         if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE ||
2216             field == GST_RTSP_HDR_WWW_AUTHENTICATE) {
2217           if (*next_value == ',') {
2218             if (next_value[1] == ' ') {
2219               /* skip any space following the comma so we do not mistake it for
2220                * separating between two tokens */
2221               next_value++;
2222             }
2223             comma = next_value;
2224           } else if (*next_value == ' ' && next_value[1] != ',' &&
2225               next_value[1] != '=' && comma != NULL) {
2226             next_value = comma;
2227             comma = NULL;
2228             break;
2229           }
2230         } else if (*next_value == ',')
2231           break;
2232       }
2233 
2234       next_value++;
2235     }
2236 
2237     if (msg->type == GST_RTSP_MESSAGE_REQUEST && field == GST_RTSP_HDR_SESSION) {
2238       /* The timeout parameter is only allowed in a session response header
2239        * but some clients send it as part of the session request header.
2240        * Ignore everything from the semicolon to the end of the line. */
2241       next_value = value;
2242       while (*next_value != '\0') {
2243         if (*next_value == ';') {
2244           break;
2245         }
2246         next_value++;
2247       }
2248     }
2249 
2250     /* trim space */
2251     if (value != next_value && next_value[-1] == ' ')
2252       next_value[-1] = '\0';
2253 
2254     if (*next_value != '\0')
2255       *next_value++ = '\0';
2256 
2257     /* add the key:value pair */
2258     if (*value != '\0') {
2259       if (field != GST_RTSP_HDR_INVALID)
2260         gst_rtsp_message_add_header (msg, field, value);
2261       else
2262         gst_rtsp_message_add_header_by_name (msg, field_name, value);
2263     }
2264 
2265     value = next_value;
2266   }
2267 
2268   return GST_RTSP_OK;
2269 
2270   /* ERRORS */
2271 parse_error:
2272   {
2273     return GST_RTSP_EPARSE;
2274   }
2275 }
2276 
2277 /* convert all consecutive whitespace to a single space */
2278 static void
normalize_line(guint8 * buffer)2279 normalize_line (guint8 * buffer)
2280 {
2281   while (*buffer) {
2282     if (g_ascii_isspace (*buffer)) {
2283       guint8 *tmp;
2284 
2285       *buffer++ = ' ';
2286       for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) {
2287       }
2288       if (buffer != tmp)
2289         memmove (buffer, tmp, strlen ((gchar *) tmp) + 1);
2290     } else {
2291       buffer++;
2292     }
2293   }
2294 }
2295 
2296 static gboolean
cseq_validation(GstRTSPConnection * conn,GstRTSPMessage * message)2297 cseq_validation (GstRTSPConnection * conn, GstRTSPMessage * message)
2298 {
2299   gchar *cseq_header;
2300   gint64 cseq = 0;
2301   GstRTSPResult res;
2302 
2303   if (message->type == GST_RTSP_MESSAGE_RESPONSE ||
2304       message->type == GST_RTSP_MESSAGE_REQUEST) {
2305     if ((res = gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ,
2306                 &cseq_header, 0)) != GST_RTSP_OK) {
2307       /* rfc2326 This field MUST be present in all RTSP req and resp */
2308       goto invalid_format;
2309     }
2310 
2311     errno = 0;
2312     cseq = g_ascii_strtoll (cseq_header, NULL, 10);
2313     if (errno != 0 || cseq < 0) {
2314       /* CSeq has no valid value */
2315       goto invalid_format;
2316     }
2317 
2318     if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2319         (conn->cseq == 0 || conn->cseq < cseq)) {
2320       /* Response CSeq can't be higher than the number of outgoing requests
2321        * neither is a response valid if no request has been made */
2322       goto invalid_format;
2323     }
2324   }
2325   return GST_RTSP_OK;
2326 
2327 invalid_format:
2328   {
2329     return GST_RTSP_EPARSE;
2330   }
2331 }
2332 
2333 /* returns:
2334  *  GST_RTSP_OK when a complete message was read.
2335  *  GST_RTSP_EEOF: when the read socket is closed
2336  *  GST_RTSP_EINTR: when more data is needed.
2337  *  GST_RTSP_..: some other error occured.
2338  */
2339 static GstRTSPResult
build_next(GstRTSPBuilder * builder,GstRTSPMessage * message,GstRTSPConnection * conn,gboolean block)2340 build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
2341     GstRTSPConnection * conn, gboolean block)
2342 {
2343   GstRTSPResult res;
2344 
2345   while (TRUE) {
2346     switch (builder->state) {
2347       case STATE_START:
2348       {
2349         guint8 c;
2350 
2351         builder->offset = 0;
2352         res =
2353             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
2354             block);
2355         if (res != GST_RTSP_OK)
2356           goto done;
2357 
2358         c = builder->buffer[0];
2359 
2360         /* we have 1 bytes now and we can see if this is a data message or
2361          * not */
2362         if (c == '$') {
2363           /* data message, prepare for the header */
2364           builder->state = STATE_DATA_HEADER;
2365           conn->may_cancel = FALSE;
2366         } else if (c == '\n' || c == '\r') {
2367           /* skip \n and \r */
2368           builder->offset = 0;
2369         } else {
2370           builder->line = 0;
2371           builder->state = STATE_READ_LINES;
2372           conn->may_cancel = FALSE;
2373         }
2374         break;
2375       }
2376       case STATE_DATA_HEADER:
2377       {
2378         res =
2379             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
2380             block);
2381         if (res != GST_RTSP_OK)
2382           goto done;
2383 
2384         gst_rtsp_message_init_data (message, builder->buffer[1]);
2385 
2386         builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
2387         builder->body_data = g_malloc (builder->body_len + 1);
2388         builder->body_data[builder->body_len] = '\0';
2389         builder->offset = 0;
2390         builder->state = STATE_DATA_BODY;
2391         break;
2392       }
2393       case STATE_DATA_BODY:
2394       {
2395         res =
2396             read_bytes (conn, builder->body_data, &builder->offset,
2397             builder->body_len, block);
2398         if (res != GST_RTSP_OK)
2399           goto done;
2400 
2401         /* we have the complete body now, store in the message adjusting the
2402          * length to include the trailing '\0' */
2403         gst_rtsp_message_take_body (message,
2404             (guint8 *) builder->body_data, builder->body_len + 1);
2405         builder->body_data = NULL;
2406         builder->body_len = 0;
2407 
2408         builder->state = STATE_END;
2409         break;
2410       }
2411       case STATE_READ_LINES:
2412       {
2413         res = read_line (conn, builder->buffer, &builder->offset,
2414             sizeof (builder->buffer), block);
2415         if (res != GST_RTSP_OK)
2416           goto done;
2417 
2418         /* we have a regular response */
2419         if (builder->buffer[0] == '\0') {
2420           gchar *hdrval;
2421 
2422           /* empty line, end of message header */
2423           /* see if there is a Content-Length header, but ignore it if this
2424            * is a POST request with an x-sessioncookie header */
2425           if (gst_rtsp_message_get_header (message,
2426                   GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK &&
2427               (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST ||
2428                   message->type_data.request.method != GST_RTSP_POST ||
2429                   gst_rtsp_message_get_header (message,
2430                       GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) {
2431             /* there is, prepare to read the body */
2432             builder->body_len = atol (hdrval);
2433             builder->body_data = g_try_malloc (builder->body_len + 1);
2434             /* we can't do much here, we need the length to know how many bytes
2435              * we need to read next and when allocation fails, something is
2436              * probably wrong with the length. */
2437             if (builder->body_data == NULL)
2438               goto invalid_body_len;
2439 
2440             builder->body_data[builder->body_len] = '\0';
2441             builder->offset = 0;
2442             builder->state = STATE_DATA_BODY;
2443           } else {
2444             builder->state = STATE_END;
2445           }
2446           break;
2447         }
2448 
2449         /* we have a line */
2450         normalize_line (builder->buffer);
2451         if (builder->line == 0) {
2452           /* first line, check for response status */
2453           if (memcmp (builder->buffer, "RTSP", 4) == 0 ||
2454               memcmp (builder->buffer, "HTTP", 4) == 0) {
2455             builder->status = parse_response_status (builder->buffer, message);
2456           } else {
2457             builder->status = parse_request_line (builder->buffer, message);
2458           }
2459         } else {
2460           /* else just parse the line */
2461           res = parse_line (builder->buffer, message);
2462           if (res != GST_RTSP_OK)
2463             builder->status = res;
2464         }
2465         if (builder->status != GST_RTSP_OK) {
2466           res = builder->status;
2467           goto invalid_format;
2468         }
2469 
2470         builder->line++;
2471         builder->offset = 0;
2472         break;
2473       }
2474       case STATE_END:
2475       {
2476         gchar *session_cookie;
2477         gchar *session_id;
2478 
2479         conn->may_cancel = TRUE;
2480 
2481         if ((res = cseq_validation (conn, message)) != GST_RTSP_OK) {
2482           /* message don't comply with rfc2326 regarding CSeq */
2483           goto invalid_format;
2484         }
2485 
2486         if (message->type == GST_RTSP_MESSAGE_DATA) {
2487           /* data messages don't have headers */
2488           res = GST_RTSP_OK;
2489           goto done;
2490         }
2491 
2492         /* save the tunnel session in the connection */
2493         if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST &&
2494             !conn->manual_http &&
2495             conn->tstate == TUNNEL_STATE_NONE &&
2496             gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE,
2497                 &session_cookie, 0) == GST_RTSP_OK) {
2498           strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN);
2499           conn->tunnelid[TUNNELID_LEN - 1] = '\0';
2500           conn->tunneled = TRUE;
2501         }
2502 
2503         /* save session id in the connection for further use */
2504         if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
2505             gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
2506                 &session_id, 0) == GST_RTSP_OK) {
2507           gint maxlen, i;
2508 
2509           maxlen = sizeof (conn->session_id) - 1;
2510           /* the sessionid can have attributes marked with ;
2511            * Make sure we strip them */
2512           for (i = 0; i < maxlen && session_id[i] != '\0'; i++) {
2513             if (session_id[i] == ';') {
2514               maxlen = i;
2515               /* parse timeout */
2516               do {
2517                 i++;
2518               } while (g_ascii_isspace (session_id[i]));
2519               if (g_str_has_prefix (&session_id[i], "timeout=")) {
2520                 gint to;
2521 
2522                 /* if we parsed something valid, configure */
2523                 if ((to = atoi (&session_id[i + 8])) > 0)
2524                   conn->timeout = to;
2525               }
2526               break;
2527             }
2528           }
2529 
2530           /* make sure to not overflow */
2531           if (conn->remember_session_id) {
2532             strncpy (conn->session_id, session_id, maxlen);
2533             conn->session_id[maxlen] = '\0';
2534           }
2535         }
2536         res = builder->status;
2537         goto done;
2538       }
2539       default:
2540         res = GST_RTSP_ERROR;
2541         goto done;
2542     }
2543   }
2544 done:
2545   conn->may_cancel = TRUE;
2546   return res;
2547 
2548   /* ERRORS */
2549 invalid_body_len:
2550   {
2551     conn->may_cancel = TRUE;
2552     GST_DEBUG ("could not allocate body");
2553     return GST_RTSP_ERROR;
2554   }
2555 invalid_format:
2556   {
2557     conn->may_cancel = TRUE;
2558     GST_DEBUG ("could not parse");
2559     return res;
2560   }
2561 }
2562 
2563 /**
2564  * gst_rtsp_connection_read:
2565  * @conn: a #GstRTSPConnection
2566  * @data: the data to read
2567  * @size: the size of @data
2568  * @timeout: a timeout value or %NULL
2569  *
2570  * Attempt to read @size bytes into @data from the connected @conn, blocking up to
2571  * the specified @timeout. @timeout can be %NULL, in which case this function
2572  * might block forever.
2573  *
2574  * This function can be cancelled with gst_rtsp_connection_flush().
2575  *
2576  * Returns: #GST_RTSP_OK on success.
2577  */
2578 GstRTSPResult
gst_rtsp_connection_read(GstRTSPConnection * conn,guint8 * data,guint size,GTimeVal * timeout)2579 gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
2580     GTimeVal * timeout)
2581 {
2582   guint offset;
2583   GstClockTime to;
2584   GstRTSPResult res;
2585 
2586   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2587   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
2588   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2589 
2590   if (G_UNLIKELY (size == 0))
2591     return GST_RTSP_OK;
2592 
2593   offset = 0;
2594 
2595   /* configure timeout if any */
2596   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
2597 
2598   g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2599   res = read_bytes (conn, data, &offset, size, TRUE);
2600   g_socket_set_timeout (conn->read_socket, 0);
2601 
2602   return res;
2603 }
2604 
2605 static GstRTSPMessage *
gen_tunnel_reply(GstRTSPConnection * conn,GstRTSPStatusCode code,const GstRTSPMessage * request)2606 gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code,
2607     const GstRTSPMessage * request)
2608 {
2609   GstRTSPMessage *msg;
2610   GstRTSPResult res;
2611 
2612   if (gst_rtsp_status_as_text (code) == NULL)
2613     code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
2614 
2615   GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request),
2616       no_message);
2617 
2618   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER,
2619       "GStreamer RTSP Server");
2620   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close");
2621   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store");
2622   gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
2623 
2624   if (code == GST_RTSP_STS_OK) {
2625     /* add the local ip address to the tunnel reply, this is where the client
2626      * should send the POST request to */
2627     if (conn->local_ip)
2628       gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
2629           conn->local_ip);
2630     gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
2631         "application/x-rtsp-tunnelled");
2632   }
2633 
2634   return msg;
2635 
2636   /* ERRORS */
2637 no_message:
2638   {
2639     return NULL;
2640   }
2641 }
2642 
2643 /**
2644  * gst_rtsp_connection_receive:
2645  * @conn: a #GstRTSPConnection
2646  * @message: the message to read
2647  * @timeout: a timeout value or %NULL
2648  *
2649  * Attempt to read into @message from the connected @conn, blocking up to
2650  * the specified @timeout. @timeout can be %NULL, in which case this function
2651  * might block forever.
2652  *
2653  * This function can be cancelled with gst_rtsp_connection_flush().
2654  *
2655  * Returns: #GST_RTSP_OK on success.
2656  */
2657 GstRTSPResult
gst_rtsp_connection_receive(GstRTSPConnection * conn,GstRTSPMessage * message,GTimeVal * timeout)2658 gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
2659     GTimeVal * timeout)
2660 {
2661   GstRTSPResult res;
2662   GstRTSPBuilder builder;
2663   GstClockTime to;
2664 
2665   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2666   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
2667   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2668 
2669   /* configure timeout if any */
2670   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
2671 
2672   g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
2673   memset (&builder, 0, sizeof (GstRTSPBuilder));
2674   res = build_next (&builder, message, conn, TRUE);
2675   g_socket_set_timeout (conn->read_socket, 0);
2676 
2677   if (G_UNLIKELY (res != GST_RTSP_OK))
2678     goto read_error;
2679 
2680   if (!conn->manual_http) {
2681     if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
2682       if (conn->tstate == TUNNEL_STATE_NONE &&
2683           message->type_data.request.method == GST_RTSP_GET) {
2684         GstRTSPMessage *response;
2685 
2686         conn->tstate = TUNNEL_STATE_GET;
2687 
2688         /* tunnel GET request, we can reply now */
2689         response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
2690         res = gst_rtsp_connection_send (conn, response, timeout);
2691         gst_rtsp_message_free (response);
2692         if (res == GST_RTSP_OK)
2693           res = GST_RTSP_ETGET;
2694         goto cleanup;
2695       } else if (conn->tstate == TUNNEL_STATE_NONE &&
2696           message->type_data.request.method == GST_RTSP_POST) {
2697         conn->tstate = TUNNEL_STATE_POST;
2698 
2699         /* tunnel POST request, the caller now has to link the two
2700          * connections. */
2701         res = GST_RTSP_ETPOST;
2702         goto cleanup;
2703       } else {
2704         res = GST_RTSP_EPARSE;
2705         goto cleanup;
2706       }
2707     } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
2708       res = GST_RTSP_EPARSE;
2709       goto cleanup;
2710     }
2711   }
2712 
2713   /* we have a message here */
2714   build_reset (&builder);
2715 
2716   return GST_RTSP_OK;
2717 
2718   /* ERRORS */
2719 read_error:
2720 cleanup:
2721   {
2722     build_reset (&builder);
2723     gst_rtsp_message_unset (message);
2724     return res;
2725   }
2726 }
2727 
2728 /**
2729  * gst_rtsp_connection_close:
2730  * @conn: a #GstRTSPConnection
2731  *
2732  * Close the connected @conn. After this call, the connection is in the same
2733  * state as when it was first created.
2734  *
2735  * Returns: #GST_RTSP_OK on success.
2736  */
2737 GstRTSPResult
gst_rtsp_connection_close(GstRTSPConnection * conn)2738 gst_rtsp_connection_close (GstRTSPConnection * conn)
2739 {
2740   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2741 
2742   /* last unref closes the connection we don't want to explicitly close here
2743    * because these sockets might have been provided at construction */
2744   if (conn->stream0) {
2745     g_object_unref (conn->stream0);
2746     conn->stream0 = NULL;
2747     conn->socket0 = NULL;
2748   }
2749   if (conn->stream1) {
2750     g_object_unref (conn->stream1);
2751     conn->stream1 = NULL;
2752     conn->socket1 = NULL;
2753   }
2754 
2755   /* these were owned by the stream */
2756   conn->input_stream = NULL;
2757   conn->output_stream = NULL;
2758   conn->control_stream = NULL;
2759 
2760   g_free (conn->remote_ip);
2761   conn->remote_ip = NULL;
2762   g_free (conn->local_ip);
2763   conn->local_ip = NULL;
2764 
2765   conn->read_ahead = 0;
2766 
2767   g_free (conn->initial_buffer);
2768   conn->initial_buffer = NULL;
2769   conn->initial_buffer_offset = 0;
2770 
2771   conn->write_socket = NULL;
2772   conn->read_socket = NULL;
2773   conn->tunneled = FALSE;
2774   conn->tstate = TUNNEL_STATE_NONE;
2775   conn->ctxp = NULL;
2776   g_free (conn->username);
2777   conn->username = NULL;
2778   g_free (conn->passwd);
2779   conn->passwd = NULL;
2780   gst_rtsp_connection_clear_auth_params (conn);
2781   conn->timeout = 60;
2782   conn->cseq = 0;
2783   conn->session_id[0] = '\0';
2784 
2785   return GST_RTSP_OK;
2786 }
2787 
2788 /**
2789  * gst_rtsp_connection_free:
2790  * @conn: a #GstRTSPConnection
2791  *
2792  * Close and free @conn.
2793  *
2794  * Returns: #GST_RTSP_OK on success.
2795  */
2796 GstRTSPResult
gst_rtsp_connection_free(GstRTSPConnection * conn)2797 gst_rtsp_connection_free (GstRTSPConnection * conn)
2798 {
2799   GstRTSPResult res;
2800 
2801   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2802 
2803   res = gst_rtsp_connection_close (conn);
2804 
2805   if (conn->cancellable)
2806     g_object_unref (conn->cancellable);
2807   if (conn->client)
2808     g_object_unref (conn->client);
2809   if (conn->tls_database)
2810     g_object_unref (conn->tls_database);
2811   if (conn->tls_interaction)
2812     g_object_unref (conn->tls_interaction);
2813   if (conn->accept_certificate_destroy_notify)
2814     conn->
2815         accept_certificate_destroy_notify (conn->accept_certificate_user_data);
2816 
2817   g_timer_destroy (conn->timer);
2818   gst_rtsp_url_free (conn->url);
2819   g_free (conn->proxy_host);
2820   g_free (conn);
2821 
2822   return res;
2823 }
2824 
2825 /**
2826  * gst_rtsp_connection_poll:
2827  * @conn: a #GstRTSPConnection
2828  * @events: a bitmask of #GstRTSPEvent flags to check
2829  * @revents: location for result flags
2830  * @timeout: a timeout
2831  *
2832  * Wait up to the specified @timeout for the connection to become available for
2833  * at least one of the operations specified in @events. When the function returns
2834  * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
2835  * @conn.
2836  *
2837  * @timeout can be %NULL, in which case this function might block forever.
2838  *
2839  * This function can be cancelled with gst_rtsp_connection_flush().
2840  *
2841  * Returns: #GST_RTSP_OK on success.
2842  */
2843 GstRTSPResult
gst_rtsp_connection_poll(GstRTSPConnection * conn,GstRTSPEvent events,GstRTSPEvent * revents,GTimeVal * timeout)2844 gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
2845     GstRTSPEvent * revents, GTimeVal * timeout)
2846 {
2847   GMainContext *ctx;
2848   GSource *rs, *ws, *ts;
2849   GIOCondition condition;
2850 
2851   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2852   g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
2853   g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
2854   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
2855   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
2856 
2857   ctx = g_main_context_new ();
2858 
2859   /* configure timeout if any */
2860   if (timeout) {
2861     ts = g_timeout_source_new (GST_TIMEVAL_TO_TIME (*timeout) / GST_MSECOND);
2862     g_source_set_dummy_callback (ts);
2863     g_source_attach (ts, ctx);
2864     g_source_unref (ts);
2865   }
2866 
2867   if (events & GST_RTSP_EV_READ) {
2868     rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
2869         conn->cancellable);
2870     g_source_set_dummy_callback (rs);
2871     g_source_attach (rs, ctx);
2872     g_source_unref (rs);
2873   }
2874 
2875   if (events & GST_RTSP_EV_WRITE) {
2876     ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
2877         conn->cancellable);
2878     g_source_set_dummy_callback (ws);
2879     g_source_attach (ws, ctx);
2880     g_source_unref (ws);
2881   }
2882 
2883   /* Returns after handling all pending events */
2884   while (!g_main_context_iteration (ctx, TRUE));
2885 
2886   g_main_context_unref (ctx);
2887 
2888   *revents = 0;
2889   if (events & GST_RTSP_EV_READ) {
2890     condition = g_socket_condition_check (conn->read_socket,
2891         G_IO_IN | G_IO_PRI);
2892     if ((condition & G_IO_IN) || (condition & G_IO_PRI))
2893       *revents |= GST_RTSP_EV_READ;
2894   }
2895   if (events & GST_RTSP_EV_WRITE) {
2896     condition = g_socket_condition_check (conn->write_socket, G_IO_OUT);
2897     if ((condition & G_IO_OUT))
2898       *revents |= GST_RTSP_EV_WRITE;
2899   }
2900 
2901   if (*revents == 0)
2902     return GST_RTSP_ETIMEOUT;
2903 
2904   return GST_RTSP_OK;
2905 }
2906 
2907 /**
2908  * gst_rtsp_connection_next_timeout:
2909  * @conn: a #GstRTSPConnection
2910  * @timeout: a timeout
2911  *
2912  * Calculate the next timeout for @conn, storing the result in @timeout.
2913  *
2914  * Returns: #GST_RTSP_OK.
2915  */
2916 GstRTSPResult
gst_rtsp_connection_next_timeout(GstRTSPConnection * conn,GTimeVal * timeout)2917 gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
2918 {
2919   gdouble elapsed;
2920   glong sec;
2921   gulong usec;
2922   gint ctimeout;
2923 
2924   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2925   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
2926 
2927   ctimeout = conn->timeout;
2928   if (ctimeout >= 20) {
2929     /* Because we should act before the timeout we timeout 5
2930      * seconds in advance. */
2931     ctimeout -= 5;
2932   } else if (ctimeout >= 5) {
2933     /* else timeout 20% earlier */
2934     ctimeout -= ctimeout / 5;
2935   } else if (ctimeout >= 1) {
2936     /* else timeout 1 second earlier */
2937     ctimeout -= 1;
2938   }
2939 
2940   elapsed = g_timer_elapsed (conn->timer, &usec);
2941   if (elapsed >= ctimeout) {
2942     sec = 0;
2943     usec = 0;
2944   } else {
2945     sec = ctimeout - elapsed;
2946     if (usec <= G_USEC_PER_SEC)
2947       usec = G_USEC_PER_SEC - usec;
2948     else
2949       usec = 0;
2950   }
2951 
2952   timeout->tv_sec = sec;
2953   timeout->tv_usec = usec;
2954 
2955   return GST_RTSP_OK;
2956 }
2957 
2958 /**
2959  * gst_rtsp_connection_reset_timeout:
2960  * @conn: a #GstRTSPConnection
2961  *
2962  * Reset the timeout of @conn.
2963  *
2964  * Returns: #GST_RTSP_OK.
2965  */
2966 GstRTSPResult
gst_rtsp_connection_reset_timeout(GstRTSPConnection * conn)2967 gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
2968 {
2969   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2970 
2971   g_timer_start (conn->timer);
2972 
2973   return GST_RTSP_OK;
2974 }
2975 
2976 /**
2977  * gst_rtsp_connection_flush:
2978  * @conn: a #GstRTSPConnection
2979  * @flush: start or stop the flush
2980  *
2981  * Start or stop the flushing action on @conn. When flushing, all current
2982  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
2983  * is set to non-flushing mode again.
2984  *
2985  * Returns: #GST_RTSP_OK.
2986  */
2987 GstRTSPResult
gst_rtsp_connection_flush(GstRTSPConnection * conn,gboolean flush)2988 gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
2989 {
2990   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
2991 
2992   if (flush) {
2993     g_cancellable_cancel (conn->cancellable);
2994   } else {
2995     g_object_unref (conn->cancellable);
2996     conn->cancellable = g_cancellable_new ();
2997   }
2998 
2999   return GST_RTSP_OK;
3000 }
3001 
3002 /**
3003  * gst_rtsp_connection_set_proxy:
3004  * @conn: a #GstRTSPConnection
3005  * @host: the proxy host
3006  * @port: the proxy port
3007  *
3008  * Set the proxy host and port.
3009  *
3010  * Returns: #GST_RTSP_OK.
3011  */
3012 GstRTSPResult
gst_rtsp_connection_set_proxy(GstRTSPConnection * conn,const gchar * host,guint port)3013 gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
3014     const gchar * host, guint port)
3015 {
3016   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3017 
3018   g_free (conn->proxy_host);
3019   conn->proxy_host = g_strdup (host);
3020   conn->proxy_port = port;
3021 
3022   return GST_RTSP_OK;
3023 }
3024 
3025 /**
3026  * gst_rtsp_connection_set_auth:
3027  * @conn: a #GstRTSPConnection
3028  * @method: authentication method
3029  * @user: the user
3030  * @pass: the password
3031  *
3032  * Configure @conn for authentication mode @method with @user and @pass as the
3033  * user and password respectively.
3034  *
3035  * Returns: #GST_RTSP_OK.
3036  */
3037 GstRTSPResult
gst_rtsp_connection_set_auth(GstRTSPConnection * conn,GstRTSPAuthMethod method,const gchar * user,const gchar * pass)3038 gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
3039     GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
3040 {
3041   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3042 
3043   if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
3044           || g_strrstr (user, ":") != NULL))
3045     return GST_RTSP_EINVAL;
3046 
3047   /* Make sure the username and passwd are being set for authentication */
3048   if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
3049     return GST_RTSP_EINVAL;
3050 
3051   /* ":" chars are not allowed in usernames for basic auth */
3052   if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
3053     return GST_RTSP_EINVAL;
3054 
3055   g_free (conn->username);
3056   g_free (conn->passwd);
3057 
3058   conn->auth_method = method;
3059   conn->username = g_strdup (user);
3060   conn->passwd = g_strdup (pass);
3061 
3062   return GST_RTSP_OK;
3063 }
3064 
3065 /**
3066  * str_case_hash:
3067  * @key: ASCII string to hash
3068  *
3069  * Hashes @key in a case-insensitive manner.
3070  *
3071  * Returns: the hash code.
3072  **/
3073 static guint
str_case_hash(gconstpointer key)3074 str_case_hash (gconstpointer key)
3075 {
3076   const char *p = key;
3077   guint h = g_ascii_toupper (*p);
3078 
3079   if (h)
3080     for (p += 1; *p != '\0'; p++)
3081       h = (h << 5) - h + g_ascii_toupper (*p);
3082 
3083   return h;
3084 }
3085 
3086 /**
3087  * str_case_equal:
3088  * @v1: an ASCII string
3089  * @v2: another ASCII string
3090  *
3091  * Compares @v1 and @v2 in a case-insensitive manner
3092  *
3093  * Returns: %TRUE if they are equal (modulo case)
3094  **/
3095 static gboolean
str_case_equal(gconstpointer v1,gconstpointer v2)3096 str_case_equal (gconstpointer v1, gconstpointer v2)
3097 {
3098   const char *string1 = v1;
3099   const char *string2 = v2;
3100 
3101   return g_ascii_strcasecmp (string1, string2) == 0;
3102 }
3103 
3104 /**
3105  * gst_rtsp_connection_set_auth_param:
3106  * @conn: a #GstRTSPConnection
3107  * @param: authentication directive
3108  * @value: value
3109  *
3110  * Setup @conn with authentication directives. This is not necesary for
3111  * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
3112  * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
3113  * in the WWW-Authenticate response header and can include realm, domain,
3114  * nonce, opaque, stale, algorithm, qop as per RFC2617.
3115  */
3116 void
gst_rtsp_connection_set_auth_param(GstRTSPConnection * conn,const gchar * param,const gchar * value)3117 gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
3118     const gchar * param, const gchar * value)
3119 {
3120   g_return_if_fail (conn != NULL);
3121   g_return_if_fail (param != NULL);
3122 
3123   if (conn->auth_params == NULL) {
3124     conn->auth_params =
3125         g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
3126   }
3127   g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
3128 }
3129 
3130 /**
3131  * gst_rtsp_connection_clear_auth_params:
3132  * @conn: a #GstRTSPConnection
3133  *
3134  * Clear the list of authentication directives stored in @conn.
3135  */
3136 void
gst_rtsp_connection_clear_auth_params(GstRTSPConnection * conn)3137 gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
3138 {
3139   g_return_if_fail (conn != NULL);
3140 
3141   if (conn->auth_params != NULL) {
3142     g_hash_table_destroy (conn->auth_params);
3143     conn->auth_params = NULL;
3144   }
3145 }
3146 
3147 static GstRTSPResult
set_qos_dscp(GSocket * socket,guint qos_dscp)3148 set_qos_dscp (GSocket * socket, guint qos_dscp)
3149 {
3150 #ifndef IP_TOS
3151   GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
3152   return GST_RTSP_OK;
3153 #else
3154   gint fd;
3155   union gst_sockaddr sa;
3156   socklen_t slen = sizeof (sa);
3157   gint af;
3158   gint tos;
3159 
3160   if (!socket)
3161     return GST_RTSP_OK;
3162 
3163   fd = g_socket_get_fd (socket);
3164   if (getsockname (fd, &sa.sa, &slen) < 0)
3165     goto no_getsockname;
3166 
3167   af = sa.sa.sa_family;
3168 
3169   /* if this is an IPv4-mapped address then do IPv4 QoS */
3170   if (af == AF_INET6) {
3171     if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
3172       af = AF_INET;
3173   }
3174 
3175   /* extract and shift 6 bits of the DSCP */
3176   tos = (qos_dscp & 0x3f) << 2;
3177 
3178 #ifdef G_OS_WIN32
3179 #  define SETSOCKOPT_ARG4_TYPE const char *
3180 #else
3181 #  define SETSOCKOPT_ARG4_TYPE const void *
3182 #endif
3183 
3184   switch (af) {
3185     case AF_INET:
3186       if (setsockopt (fd, IPPROTO_IP, IP_TOS, (SETSOCKOPT_ARG4_TYPE) & tos,
3187               sizeof (tos)) < 0)
3188         goto no_setsockopt;
3189       break;
3190     case AF_INET6:
3191 #ifdef IPV6_TCLASS
3192       if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS,
3193               (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0)
3194         goto no_setsockopt;
3195       break;
3196 #endif
3197     default:
3198       goto wrong_family;
3199   }
3200 
3201   return GST_RTSP_OK;
3202 
3203   /* ERRORS */
3204 no_getsockname:
3205 no_setsockopt:
3206   {
3207     return GST_RTSP_ESYS;
3208   }
3209 wrong_family:
3210   {
3211     return GST_RTSP_ERROR;
3212   }
3213 #endif
3214 }
3215 
3216 /**
3217  * gst_rtsp_connection_set_qos_dscp:
3218  * @conn: a #GstRTSPConnection
3219  * @qos_dscp: DSCP value
3220  *
3221  * Configure @conn to use the specified DSCP value.
3222  *
3223  * Returns: #GST_RTSP_OK on success.
3224  */
3225 GstRTSPResult
gst_rtsp_connection_set_qos_dscp(GstRTSPConnection * conn,guint qos_dscp)3226 gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
3227 {
3228   GstRTSPResult res;
3229 
3230   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3231   g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
3232   g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
3233 
3234   res = set_qos_dscp (conn->socket0, qos_dscp);
3235   if (res == GST_RTSP_OK)
3236     res = set_qos_dscp (conn->socket1, qos_dscp);
3237 
3238   return res;
3239 }
3240 
3241 
3242 /**
3243  * gst_rtsp_connection_get_url:
3244  * @conn: a #GstRTSPConnection
3245  *
3246  * Retrieve the URL of the other end of @conn.
3247  *
3248  * Returns: The URL. This value remains valid until the
3249  * connection is freed.
3250  */
3251 GstRTSPUrl *
gst_rtsp_connection_get_url(const GstRTSPConnection * conn)3252 gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
3253 {
3254   g_return_val_if_fail (conn != NULL, NULL);
3255 
3256   return conn->url;
3257 }
3258 
3259 /**
3260  * gst_rtsp_connection_get_ip:
3261  * @conn: a #GstRTSPConnection
3262  *
3263  * Retrieve the IP address of the other end of @conn.
3264  *
3265  * Returns: The IP address as a string. this value remains valid until the
3266  * connection is closed.
3267  */
3268 const gchar *
gst_rtsp_connection_get_ip(const GstRTSPConnection * conn)3269 gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
3270 {
3271   g_return_val_if_fail (conn != NULL, NULL);
3272 
3273   return conn->remote_ip;
3274 }
3275 
3276 /**
3277  * gst_rtsp_connection_set_ip:
3278  * @conn: a #GstRTSPConnection
3279  * @ip: an ip address
3280  *
3281  * Set the IP address of the server.
3282  */
3283 void
gst_rtsp_connection_set_ip(GstRTSPConnection * conn,const gchar * ip)3284 gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
3285 {
3286   g_return_if_fail (conn != NULL);
3287 
3288   g_free (conn->remote_ip);
3289   conn->remote_ip = g_strdup (ip);
3290 }
3291 
3292 /**
3293  * gst_rtsp_connection_get_read_socket:
3294  * @conn: a #GstRTSPConnection
3295  *
3296  * Get the file descriptor for reading.
3297  *
3298  * Returns: (transfer none): the file descriptor used for reading or %NULL on
3299  * error. The file descriptor remains valid until the connection is closed.
3300  */
3301 GSocket *
gst_rtsp_connection_get_read_socket(const GstRTSPConnection * conn)3302 gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
3303 {
3304   g_return_val_if_fail (conn != NULL, NULL);
3305   g_return_val_if_fail (conn->read_socket != NULL, NULL);
3306 
3307   return conn->read_socket;
3308 }
3309 
3310 /**
3311  * gst_rtsp_connection_get_write_socket:
3312  * @conn: a #GstRTSPConnection
3313  *
3314  * Get the file descriptor for writing.
3315  *
3316  * Returns: (transfer none): the file descriptor used for writing or NULL on
3317  * error. The file descriptor remains valid until the connection is closed.
3318  */
3319 GSocket *
gst_rtsp_connection_get_write_socket(const GstRTSPConnection * conn)3320 gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
3321 {
3322   g_return_val_if_fail (conn != NULL, NULL);
3323   g_return_val_if_fail (conn->write_socket != NULL, NULL);
3324 
3325   return conn->write_socket;
3326 }
3327 
3328 /**
3329  * gst_rtsp_connection_set_http_mode:
3330  * @conn: a #GstRTSPConnection
3331  * @enable: %TRUE to enable manual HTTP mode
3332  *
3333  * By setting the HTTP mode to %TRUE the message parsing will support HTTP
3334  * messages in addition to the RTSP messages. It will also disable the
3335  * automatic handling of setting up an HTTP tunnel.
3336  */
3337 void
gst_rtsp_connection_set_http_mode(GstRTSPConnection * conn,gboolean enable)3338 gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable)
3339 {
3340   g_return_if_fail (conn != NULL);
3341 
3342   conn->manual_http = enable;
3343 }
3344 
3345 /**
3346  * gst_rtsp_connection_set_tunneled:
3347  * @conn: a #GstRTSPConnection
3348  * @tunneled: the new state
3349  *
3350  * Set the HTTP tunneling state of the connection. This must be configured before
3351  * the @conn is connected.
3352  */
3353 void
gst_rtsp_connection_set_tunneled(GstRTSPConnection * conn,gboolean tunneled)3354 gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
3355 {
3356   g_return_if_fail (conn != NULL);
3357   g_return_if_fail (conn->read_socket == NULL);
3358   g_return_if_fail (conn->write_socket == NULL);
3359 
3360   conn->tunneled = tunneled;
3361 }
3362 
3363 /**
3364  * gst_rtsp_connection_is_tunneled:
3365  * @conn: a #GstRTSPConnection
3366  *
3367  * Get the tunneling state of the connection.
3368  *
3369  * Returns: if @conn is using HTTP tunneling.
3370  */
3371 gboolean
gst_rtsp_connection_is_tunneled(const GstRTSPConnection * conn)3372 gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
3373 {
3374   g_return_val_if_fail (conn != NULL, FALSE);
3375 
3376   return conn->tunneled;
3377 }
3378 
3379 /**
3380  * gst_rtsp_connection_get_tunnelid:
3381  * @conn: a #GstRTSPConnection
3382  *
3383  * Get the tunnel session id the connection.
3384  *
3385  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
3386  */
3387 const gchar *
gst_rtsp_connection_get_tunnelid(const GstRTSPConnection * conn)3388 gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
3389 {
3390   g_return_val_if_fail (conn != NULL, NULL);
3391 
3392   if (!conn->tunneled)
3393     return NULL;
3394 
3395   return conn->tunnelid;
3396 }
3397 
3398 /**
3399  * gst_rtsp_connection_do_tunnel:
3400  * @conn: a #GstRTSPConnection
3401  * @conn2: a #GstRTSPConnection or %NULL
3402  *
3403  * If @conn received the first tunnel connection and @conn2 received
3404  * the second tunnel connection, link the two connections together so that
3405  * @conn manages the tunneled connection.
3406  *
3407  * After this call, @conn2 cannot be used anymore and must be freed with
3408  * gst_rtsp_connection_free().
3409  *
3410  * If @conn2 is %NULL then only the base64 decoding context will be setup for
3411  * @conn.
3412  *
3413  * Returns: return GST_RTSP_OK on success.
3414  */
3415 GstRTSPResult
gst_rtsp_connection_do_tunnel(GstRTSPConnection * conn,GstRTSPConnection * conn2)3416 gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
3417     GstRTSPConnection * conn2)
3418 {
3419   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
3420 
3421   if (conn2 != NULL) {
3422     GstRTSPTunnelState ts1 = conn->tstate;
3423     GstRTSPTunnelState ts2 = conn2->tstate;
3424 
3425     g_return_val_if_fail ((ts1 == TUNNEL_STATE_GET && ts2 == TUNNEL_STATE_POST)
3426         || (ts1 == TUNNEL_STATE_POST && ts2 == TUNNEL_STATE_GET),
3427         GST_RTSP_EINVAL);
3428     g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid,
3429             TUNNELID_LEN), GST_RTSP_EINVAL);
3430 
3431     /* both connections have socket0 as the read/write socket */
3432     if (ts1 == TUNNEL_STATE_GET) {
3433       /* conn2 is the HTTP POST channel. take its socket and set it as read
3434        * socket in conn */
3435       conn->socket1 = conn2->socket0;
3436       conn->stream1 = conn2->stream0;
3437       conn->input_stream = conn2->input_stream;
3438       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3439       conn2->output_stream = NULL;
3440     } else {
3441       /* conn2 is the HTTP GET channel. take its socket and set it as write
3442        * socket in conn */
3443       conn->socket1 = conn->socket0;
3444       conn->stream1 = conn->stream0;
3445       conn->socket0 = conn2->socket0;
3446       conn->stream0 = conn2->stream0;
3447       conn->output_stream = conn2->output_stream;
3448       conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
3449     }
3450 
3451     /* clean up some of the state of conn2 */
3452     g_cancellable_cancel (conn2->cancellable);
3453     conn2->write_socket = conn2->read_socket = NULL;
3454     conn2->socket0 = NULL;
3455     conn2->stream0 = NULL;
3456     conn2->socket1 = NULL;
3457     conn2->stream1 = NULL;
3458     conn2->input_stream = NULL;
3459     conn2->control_stream = NULL;
3460     g_object_unref (conn2->cancellable);
3461     conn2->cancellable = NULL;
3462 
3463     /* We make socket0 the write socket and socket1 the read socket. */
3464     conn->write_socket = conn->socket0;
3465     conn->read_socket = conn->socket1;
3466 
3467     conn->tstate = TUNNEL_STATE_COMPLETE;
3468 
3469     g_free (conn->initial_buffer);
3470     conn->initial_buffer = conn2->initial_buffer;
3471     conn2->initial_buffer = NULL;
3472     conn->initial_buffer_offset = conn2->initial_buffer_offset;
3473   }
3474 
3475   /* we need base64 decoding for the readfd */
3476   conn->ctx.state = 0;
3477   conn->ctx.save = 0;
3478   conn->ctx.cout = 0;
3479   conn->ctx.coutl = 0;
3480   conn->ctxp = &conn->ctx;
3481 
3482   return GST_RTSP_OK;
3483 }
3484 
3485 /**
3486  * gst_rtsp_connection_set_remember_session_id:
3487  * @conn: a #GstRTSPConnection
3488  * @remember: %TRUE if the connection should remember the session id
3489  *
3490  * Sets if the #GstRTSPConnection should remember the session id from the last
3491  * response received and force it onto any further requests.
3492  *
3493  * The default value is %TRUE
3494  */
3495 
3496 void
gst_rtsp_connection_set_remember_session_id(GstRTSPConnection * conn,gboolean remember)3497 gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn,
3498     gboolean remember)
3499 {
3500   conn->remember_session_id = remember;
3501   if (!remember)
3502     conn->session_id[0] = '\0';
3503 }
3504 
3505 /**
3506  * gst_rtsp_connection_get_remember_session_id:
3507  * @conn: a #GstRTSPConnection
3508  *
3509  * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the
3510  * last response to set it on any further request.
3511  */
3512 
3513 gboolean
gst_rtsp_connection_get_remember_session_id(GstRTSPConnection * conn)3514 gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
3515 {
3516   return conn->remember_session_id;
3517 }
3518 
3519 
3520 #define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3521 #define READ_COND   (G_IO_IN | READ_ERR)
3522 #define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
3523 #define WRITE_COND  (G_IO_OUT | WRITE_ERR)
3524 
3525 /* async functions */
3526 struct _GstRTSPWatch
3527 {
3528   GSource source;
3529 
3530   GstRTSPConnection *conn;
3531 
3532   GstRTSPBuilder builder;
3533   GstRTSPMessage message;
3534 
3535   GSource *readsrc;
3536   GSource *writesrc;
3537   GSource *controlsrc;
3538 
3539   gboolean keep_running;
3540 
3541   /* queued message for transmission */
3542   guint id;
3543   GMutex mutex;
3544   GstQueueArray *messages;
3545   gsize messages_bytes;
3546   guint messages_count;
3547 
3548   gsize max_bytes;
3549   guint max_messages;
3550   GCond queue_not_full;
3551   gboolean flushing;
3552 
3553   GstRTSPWatchFuncs funcs;
3554 
3555   gpointer user_data;
3556   GDestroyNotify notify;
3557 };
3558 
3559 #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
3560       ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
3561 
3562 static gboolean
gst_rtsp_source_prepare(GSource * source,gint * timeout)3563 gst_rtsp_source_prepare (GSource * source, gint * timeout)
3564 {
3565   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3566 
3567   if (watch->conn->initial_buffer != NULL)
3568     return TRUE;
3569 
3570   *timeout = (watch->conn->timeout * 1000);
3571 
3572   return FALSE;
3573 }
3574 
3575 static gboolean
gst_rtsp_source_check(GSource * source)3576 gst_rtsp_source_check (GSource * source)
3577 {
3578   return FALSE;
3579 }
3580 
3581 static gboolean
gst_rtsp_source_dispatch_read_get_channel(GPollableInputStream * stream,GstRTSPWatch * watch)3582 gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
3583     GstRTSPWatch * watch)
3584 {
3585   gssize count;
3586   guint8 buffer[1024];
3587   GError *error = NULL;
3588 
3589   /* try to read in order to be able to detect errors, we read 1k in case some
3590    * client actually decides to send data on the GET channel */
3591   count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
3592       &error);
3593   if (count == 0) {
3594     /* other end closed the socket */
3595     goto eof;
3596   }
3597 
3598   if (count < 0) {
3599     GST_DEBUG ("%s", error->message);
3600     if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
3601         g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
3602       g_clear_error (&error);
3603       goto done;
3604     }
3605     g_clear_error (&error);
3606     goto read_error;
3607   }
3608 
3609   /* client sent data on the GET channel, ignore it */
3610 
3611 done:
3612   return TRUE;
3613 
3614   /* ERRORS */
3615 eof:
3616   {
3617     if (watch->funcs.closed)
3618       watch->funcs.closed (watch, watch->user_data);
3619 
3620     /* the read connection was closed, stop the watch now */
3621     watch->keep_running = FALSE;
3622 
3623     return FALSE;
3624   }
3625 read_error:
3626   {
3627     if (watch->funcs.error_full)
3628       watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
3629           0, watch->user_data);
3630     else if (watch->funcs.error)
3631       watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
3632 
3633     goto eof;
3634   }
3635 }
3636 
3637 static gboolean
gst_rtsp_source_dispatch_read(GPollableInputStream * stream,GstRTSPWatch * watch)3638 gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
3639     GstRTSPWatch * watch)
3640 {
3641   GstRTSPResult res = GST_RTSP_ERROR;
3642   GstRTSPConnection *conn = watch->conn;
3643 
3644   /* if this connection was already closed, stop now */
3645   if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
3646     goto eof;
3647 
3648   res = build_next (&watch->builder, &watch->message, conn, FALSE);
3649   if (res == GST_RTSP_EINTR)
3650     goto done;
3651   else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
3652     g_mutex_lock (&watch->mutex);
3653     if (watch->readsrc) {
3654       if (!g_source_is_destroyed ((GSource *) watch))
3655         g_source_remove_child_source ((GSource *) watch, watch->readsrc);
3656       g_source_unref (watch->readsrc);
3657       watch->readsrc = NULL;
3658     }
3659 
3660     if (conn->stream1) {
3661       g_object_unref (conn->stream1);
3662       conn->stream1 = NULL;
3663       conn->socket1 = NULL;
3664       conn->input_stream = NULL;
3665     }
3666     g_mutex_unlock (&watch->mutex);
3667 
3668     /* When we are in tunnelled mode, the read socket can be closed and we
3669      * should be prepared for a new POST method to reopen it */
3670     if (conn->tstate == TUNNEL_STATE_COMPLETE) {
3671       /* remove the read connection for the tunnel */
3672       /* we accept a new POST request */
3673       conn->tstate = TUNNEL_STATE_GET;
3674       /* and signal that we lost our tunnel */
3675       if (watch->funcs.tunnel_lost)
3676         res = watch->funcs.tunnel_lost (watch, watch->user_data);
3677       /* we add read source on the write socket able to detect when client closes get channel in tunneled mode */
3678       g_mutex_lock (&watch->mutex);
3679       if (watch->conn->control_stream && !watch->controlsrc) {
3680         watch->controlsrc =
3681             g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3682             (watch->conn->control_stream), NULL);
3683         g_source_set_callback (watch->controlsrc,
3684             (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3685             NULL);
3686         g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3687       }
3688       g_mutex_unlock (&watch->mutex);
3689       goto read_done;
3690     } else
3691       goto eof;
3692   } else if (G_LIKELY (res == GST_RTSP_OK)) {
3693     if (!conn->manual_http &&
3694         watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3695       if (conn->tstate == TUNNEL_STATE_NONE &&
3696           watch->message.type_data.request.method == GST_RTSP_GET) {
3697         GstRTSPMessage *response;
3698         GstRTSPStatusCode code;
3699 
3700         conn->tstate = TUNNEL_STATE_GET;
3701 
3702         if (watch->funcs.tunnel_start)
3703           code = watch->funcs.tunnel_start (watch, watch->user_data);
3704         else
3705           code = GST_RTSP_STS_OK;
3706 
3707         /* queue the response */
3708         response = gen_tunnel_reply (conn, code, &watch->message);
3709         if (watch->funcs.tunnel_http_response)
3710           watch->funcs.tunnel_http_response (watch, &watch->message, response,
3711               watch->user_data);
3712         gst_rtsp_watch_send_message (watch, response, NULL);
3713         gst_rtsp_message_free (response);
3714         goto read_done;
3715       } else if (conn->tstate == TUNNEL_STATE_NONE &&
3716           watch->message.type_data.request.method == GST_RTSP_POST) {
3717         conn->tstate = TUNNEL_STATE_POST;
3718 
3719         /* in the callback the connection should be tunneled with the
3720          * GET connection */
3721         if (watch->funcs.tunnel_complete) {
3722           watch->funcs.tunnel_complete (watch, watch->user_data);
3723         }
3724         goto read_done;
3725       }
3726     }
3727   } else
3728     goto read_error;
3729 
3730   if (!conn->manual_http) {
3731     /* if manual HTTP support is not enabled, then restore the message to
3732      * what it would have looked like without the support for parsing HTTP
3733      * messages being present */
3734     if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
3735       watch->message.type = GST_RTSP_MESSAGE_REQUEST;
3736       watch->message.type_data.request.method = GST_RTSP_INVALID;
3737       if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
3738         watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
3739       res = GST_RTSP_EPARSE;
3740     } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
3741       watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
3742       if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
3743         watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
3744       res = GST_RTSP_EPARSE;
3745     }
3746   }
3747   if (G_LIKELY (res != GST_RTSP_OK))
3748     goto read_error;
3749 
3750   if (watch->funcs.message_received)
3751     watch->funcs.message_received (watch, &watch->message, watch->user_data);
3752 
3753 read_done:
3754   gst_rtsp_message_unset (&watch->message);
3755   build_reset (&watch->builder);
3756 
3757 done:
3758   return TRUE;
3759 
3760   /* ERRORS */
3761 eof:
3762   {
3763     if (watch->funcs.closed)
3764       watch->funcs.closed (watch, watch->user_data);
3765 
3766     /* we closed the read connection, stop the watch now */
3767     watch->keep_running = FALSE;
3768 
3769     /* always stop when the input returns EOF in non-tunneled mode */
3770     return FALSE;
3771   }
3772 read_error:
3773   {
3774     if (watch->funcs.error_full)
3775       watch->funcs.error_full (watch, res, &watch->message,
3776           0, watch->user_data);
3777     else if (watch->funcs.error)
3778       watch->funcs.error (watch, res, watch->user_data);
3779 
3780     goto eof;
3781   }
3782 }
3783 
3784 static gboolean
gst_rtsp_source_dispatch(GSource * source,GSourceFunc callback G_GNUC_UNUSED,gpointer user_data G_GNUC_UNUSED)3785 gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
3786     gpointer user_data G_GNUC_UNUSED)
3787 {
3788   GstRTSPWatch *watch = (GstRTSPWatch *) source;
3789   GstRTSPConnection *conn = watch->conn;
3790 
3791   if (conn->initial_buffer != NULL) {
3792     gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
3793         watch);
3794   }
3795   return watch->keep_running;
3796 }
3797 
3798 static gboolean
gst_rtsp_source_dispatch_write(GPollableOutputStream * stream,GstRTSPWatch * watch)3799 gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
3800     GstRTSPWatch * watch)
3801 {
3802   GstRTSPResult res = GST_RTSP_ERROR;
3803   GstRTSPConnection *conn = watch->conn;
3804 
3805   /* if this connection was already closed, stop now */
3806   if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3807       !watch->messages)
3808     goto eof;
3809 
3810   g_mutex_lock (&watch->mutex);
3811   do {
3812     guint n_messages = gst_queue_array_get_length (watch->messages);
3813     GOutputVector *vectors;
3814     GstMapInfo *map_infos;
3815     guint *ids;
3816     gsize bytes_to_write, bytes_written;
3817     guint n_vectors, n_memories, n_ids, drop_messages;
3818     gint i, j, l, n_mmap;
3819     GstRTSPSerializedMessage *msg;
3820 
3821     /* if this connection was already closed, stop now */
3822     if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
3823         !watch->messages) {
3824       g_mutex_unlock (&watch->mutex);
3825       goto eof;
3826     }
3827 
3828     if (n_messages == 0) {
3829       if (watch->writesrc) {
3830         if (!g_source_is_destroyed ((GSource *) watch))
3831           g_source_remove_child_source ((GSource *) watch, watch->writesrc);
3832         g_source_unref (watch->writesrc);
3833         watch->writesrc = NULL;
3834         /* we create and add the write source again when we actually have
3835          * something to write */
3836 
3837         /* since write source is now removed we add read source on the write
3838          * socket instead to be able to detect when client closes get channel
3839          * in tunneled mode */
3840         if (watch->conn->control_stream) {
3841           watch->controlsrc =
3842               g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
3843               (watch->conn->control_stream), NULL);
3844           g_source_set_callback (watch->controlsrc,
3845               (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
3846               NULL);
3847           g_source_add_child_source ((GSource *) watch, watch->controlsrc);
3848         } else {
3849           watch->controlsrc = NULL;
3850         }
3851       }
3852       break;
3853     }
3854 
3855     for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
3856       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3857       if (msg->id != 0)
3858         n_ids++;
3859 
3860       if (msg->data_offset < msg->data_size)
3861         n_vectors++;
3862 
3863       if (msg->body_data && msg->body_offset < msg->body_data_size) {
3864         n_vectors++;
3865       } else if (msg->body_buffer) {
3866         guint m, n;
3867         guint offset = 0;
3868 
3869         n = gst_buffer_n_memory (msg->body_buffer);
3870         for (m = 0; m < n; m++) {
3871           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
3872 
3873           /* Skip all memories we already wrote */
3874           if (offset + mem->size <= msg->body_offset) {
3875             offset += mem->size;
3876             continue;
3877           }
3878           offset += mem->size;
3879 
3880           n_memories++;
3881           n_vectors++;
3882         }
3883       }
3884     }
3885 
3886     vectors = g_newa (GOutputVector, n_vectors);
3887     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
3888     ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
3889     if (ids)
3890       memset (ids, 0, sizeof (guint) * (n_ids + 1));
3891 
3892     for (i = 0, j = 0, n_mmap = 0, l = 0, bytes_to_write = 0; i < n_messages;
3893         i++) {
3894       msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3895 
3896       if (msg->data_offset < msg->data_size) {
3897         vectors[j].buffer = (msg->data_is_data_header ?
3898             msg->data_header : msg->data) + msg->data_offset;
3899         vectors[j].size = msg->data_size - msg->data_offset;
3900         bytes_to_write += vectors[j].size;
3901         j++;
3902       }
3903 
3904       if (msg->body_data) {
3905         if (msg->body_offset < msg->body_data_size) {
3906           vectors[j].buffer = msg->body_data + msg->body_offset;
3907           vectors[j].size = msg->body_data_size - msg->body_offset;
3908           bytes_to_write += vectors[j].size;
3909           j++;
3910         }
3911       } else if (msg->body_buffer) {
3912         guint m, n;
3913         guint offset = 0;
3914         n = gst_buffer_n_memory (msg->body_buffer);
3915         for (m = 0; m < n; m++) {
3916           GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
3917           guint off;
3918 
3919           /* Skip all memories we already wrote */
3920           if (offset + mem->size <= msg->body_offset) {
3921             offset += mem->size;
3922             continue;
3923           }
3924 
3925           if (offset < msg->body_offset)
3926             off = msg->body_offset - offset;
3927           else
3928             off = 0;
3929 
3930           offset += mem->size;
3931 
3932           g_assert (off < mem->size);
3933 
3934           gst_memory_map (mem, &map_infos[n_mmap], GST_MAP_READ);
3935           vectors[j].buffer = map_infos[n_mmap].data + off;
3936           vectors[j].size = map_infos[n_mmap].size - off;
3937           bytes_to_write += vectors[j].size;
3938 
3939           n_mmap++;
3940           j++;
3941         }
3942       }
3943     }
3944 
3945     res =
3946         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
3947         &bytes_written, FALSE, watch->conn->cancellable);
3948     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
3949 
3950     /* First unmap all memories here, this simplifies the code below
3951      * as we don't have to skip all memories that were already written
3952      * before */
3953     for (i = 0; i < n_mmap; i++) {
3954       gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
3955     }
3956 
3957     if (bytes_written == bytes_to_write) {
3958       /* fast path, just unmap all memories, free memory, drop all messages and notify them */
3959       l = 0;
3960       while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
3961         if (msg->id) {
3962           ids[l] = msg->id;
3963           l++;
3964         }
3965 
3966         gst_rtsp_serialized_message_clear (msg);
3967       }
3968 
3969       g_assert (watch->messages_bytes >= bytes_written);
3970       watch->messages_bytes -= bytes_written;
3971     } else if (bytes_written > 0) {
3972       /* not done, let's skip all messages that were sent already and free them */
3973       for (i = 0, drop_messages = 0; i < n_messages; i++) {
3974         msg = gst_queue_array_peek_nth_struct (watch->messages, i);
3975 
3976         if (bytes_written >= msg->data_size - msg->data_offset) {
3977           guint body_size;
3978 
3979           /* all data of this message is sent, check body and otherwise
3980            * skip the whole message for next time */
3981           bytes_written -= (msg->data_size - msg->data_offset);
3982           watch->messages_bytes -= (msg->data_size - msg->data_offset);
3983           msg->data_offset = msg->data_size;
3984 
3985           if (msg->body_data) {
3986             body_size = msg->body_data_size;
3987           } else if (msg->body_buffer) {
3988             body_size = gst_buffer_get_size (msg->body_buffer);
3989           } else {
3990             body_size = 0;
3991           }
3992 
3993           if (bytes_written + msg->body_offset >= body_size) {
3994             /* body written, drop this message */
3995             bytes_written -= body_size - msg->body_offset;
3996             watch->messages_bytes -= body_size - msg->body_offset;
3997             msg->body_offset = body_size;
3998             drop_messages++;
3999 
4000             if (msg->id) {
4001               ids[l] = msg->id;
4002               l++;
4003             }
4004 
4005             gst_rtsp_serialized_message_clear (msg);
4006           } else {
4007             msg->body_offset += bytes_written;
4008             watch->messages_bytes -= bytes_written;
4009             bytes_written = 0;
4010           }
4011         } else {
4012           /* Need to continue sending from the data of this message */
4013           msg->data_offset += bytes_written;
4014           watch->messages_bytes -= bytes_written;
4015           bytes_written = 0;
4016         }
4017       }
4018 
4019       while (drop_messages > 0) {
4020         msg = gst_queue_array_pop_head_struct (watch->messages);
4021         g_assert (msg);
4022         drop_messages--;
4023       }
4024 
4025       g_assert (watch->messages_bytes >= bytes_written);
4026       watch->messages_bytes -= bytes_written;
4027     }
4028 
4029     if (!IS_BACKLOG_FULL (watch))
4030       g_cond_signal (&watch->queue_not_full);
4031     g_mutex_unlock (&watch->mutex);
4032 
4033     /* notify all messages that were successfully written */
4034     if (ids) {
4035       while (*ids) {
4036         /* only decrease the counter for messages that have an id. Only
4037          * the last message of a messages chunk is counted */
4038         watch->messages_count--;
4039 
4040         if (watch->funcs.message_sent)
4041           watch->funcs.message_sent (watch, *ids, watch->user_data);
4042         ids++;
4043       }
4044     }
4045 
4046     if (res == GST_RTSP_EINTR) {
4047       goto write_blocked;
4048     } else if (G_UNLIKELY (res != GST_RTSP_OK)) {
4049       goto write_error;
4050     }
4051     g_mutex_lock (&watch->mutex);
4052   } while (TRUE);
4053   g_mutex_unlock (&watch->mutex);
4054 
4055 write_blocked:
4056   return TRUE;
4057 
4058   /* ERRORS */
4059 eof:
4060   {
4061     return FALSE;
4062   }
4063 write_error:
4064   {
4065     if (watch->funcs.error_full) {
4066       guint i, n_messages;
4067 
4068       n_messages = gst_queue_array_get_length (watch->messages);
4069       for (i = 0; i < n_messages; i++) {
4070         GstRTSPSerializedMessage *msg =
4071             gst_queue_array_peek_nth_struct (watch->messages, i);
4072         if (msg->id)
4073           watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data);
4074       }
4075     } else if (watch->funcs.error) {
4076       watch->funcs.error (watch, res, watch->user_data);
4077     }
4078 
4079     return FALSE;
4080   }
4081 }
4082 
4083 static void
gst_rtsp_source_finalize(GSource * source)4084 gst_rtsp_source_finalize (GSource * source)
4085 {
4086   GstRTSPWatch *watch = (GstRTSPWatch *) source;
4087   GstRTSPSerializedMessage *msg;
4088 
4089   if (watch->notify)
4090     watch->notify (watch->user_data);
4091 
4092   build_reset (&watch->builder);
4093   gst_rtsp_message_unset (&watch->message);
4094 
4095   while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4096     gst_rtsp_serialized_message_clear (msg);
4097   }
4098   gst_queue_array_free (watch->messages);
4099   watch->messages = NULL;
4100   watch->messages_bytes = 0;
4101   watch->messages_count = 0;
4102 
4103   g_cond_clear (&watch->queue_not_full);
4104 
4105   if (watch->readsrc)
4106     g_source_unref (watch->readsrc);
4107   if (watch->writesrc)
4108     g_source_unref (watch->writesrc);
4109   if (watch->controlsrc)
4110     g_source_unref (watch->controlsrc);
4111 
4112   g_mutex_clear (&watch->mutex);
4113 }
4114 
4115 static GSourceFuncs gst_rtsp_source_funcs = {
4116   gst_rtsp_source_prepare,
4117   gst_rtsp_source_check,
4118   gst_rtsp_source_dispatch,
4119   gst_rtsp_source_finalize,
4120   NULL,
4121   NULL
4122 };
4123 
4124 /**
4125  * gst_rtsp_watch_new: (skip)
4126  * @conn: a #GstRTSPConnection
4127  * @funcs: watch functions
4128  * @user_data: user data to pass to @funcs
4129  * @notify: notify when @user_data is not referenced anymore
4130  *
4131  * Create a watch object for @conn. The functions provided in @funcs will be
4132  * called with @user_data when activity happened on the watch.
4133  *
4134  * The new watch is usually created so that it can be attached to a
4135  * maincontext with gst_rtsp_watch_attach().
4136  *
4137  * @conn must exist for the entire lifetime of the watch.
4138  *
4139  * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
4140  * communication. Free with gst_rtsp_watch_unref () after usage.
4141  */
4142 GstRTSPWatch *
gst_rtsp_watch_new(GstRTSPConnection * conn,GstRTSPWatchFuncs * funcs,gpointer user_data,GDestroyNotify notify)4143 gst_rtsp_watch_new (GstRTSPConnection * conn,
4144     GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
4145 {
4146   GstRTSPWatch *result;
4147 
4148   g_return_val_if_fail (conn != NULL, NULL);
4149   g_return_val_if_fail (funcs != NULL, NULL);
4150   g_return_val_if_fail (conn->read_socket != NULL, NULL);
4151   g_return_val_if_fail (conn->write_socket != NULL, NULL);
4152 
4153   result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
4154       sizeof (GstRTSPWatch));
4155 
4156   result->conn = conn;
4157   result->builder.state = STATE_START;
4158 
4159   g_mutex_init (&result->mutex);
4160   result->messages =
4161       gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10);
4162   g_cond_init (&result->queue_not_full);
4163 
4164   gst_rtsp_watch_reset (result);
4165   result->keep_running = TRUE;
4166   result->flushing = FALSE;
4167 
4168   result->funcs = *funcs;
4169   result->user_data = user_data;
4170   result->notify = notify;
4171 
4172   return result;
4173 }
4174 
4175 /**
4176  * gst_rtsp_watch_reset:
4177  * @watch: a #GstRTSPWatch
4178  *
4179  * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
4180  * when the file descriptors of the connection might have changed.
4181  */
4182 void
gst_rtsp_watch_reset(GstRTSPWatch * watch)4183 gst_rtsp_watch_reset (GstRTSPWatch * watch)
4184 {
4185   g_mutex_lock (&watch->mutex);
4186   if (watch->readsrc) {
4187     g_source_remove_child_source ((GSource *) watch, watch->readsrc);
4188     g_source_unref (watch->readsrc);
4189   }
4190   if (watch->writesrc) {
4191     g_source_remove_child_source ((GSource *) watch, watch->writesrc);
4192     g_source_unref (watch->writesrc);
4193     watch->writesrc = NULL;
4194   }
4195   if (watch->controlsrc) {
4196     g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4197     g_source_unref (watch->controlsrc);
4198     watch->controlsrc = NULL;
4199   }
4200 
4201   if (watch->conn->input_stream) {
4202     watch->readsrc =
4203         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4204         (watch->conn->input_stream), NULL);
4205     g_source_set_callback (watch->readsrc,
4206         (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
4207     g_source_add_child_source ((GSource *) watch, watch->readsrc);
4208   } else {
4209     watch->readsrc = NULL;
4210   }
4211 
4212   /* we create and add the write source when we actually have something to
4213    * write */
4214 
4215   /* when write source is not added we add read source on the write socket
4216    * instead to be able to detect when client closes get channel in tunneled
4217    * mode */
4218   if (watch->conn->control_stream) {
4219     watch->controlsrc =
4220         g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
4221         (watch->conn->control_stream), NULL);
4222     g_source_set_callback (watch->controlsrc,
4223         (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
4224     g_source_add_child_source ((GSource *) watch, watch->controlsrc);
4225   } else {
4226     watch->controlsrc = NULL;
4227   }
4228   g_mutex_unlock (&watch->mutex);
4229 }
4230 
4231 /**
4232  * gst_rtsp_watch_attach:
4233  * @watch: a #GstRTSPWatch
4234  * @context: a GMainContext (if NULL, the default context will be used)
4235  *
4236  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
4237  *
4238  * Returns: the ID (greater than 0) for the watch within the GMainContext.
4239  */
4240 guint
gst_rtsp_watch_attach(GstRTSPWatch * watch,GMainContext * context)4241 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
4242 {
4243   g_return_val_if_fail (watch != NULL, 0);
4244 
4245   return g_source_attach ((GSource *) watch, context);
4246 }
4247 
4248 /**
4249  * gst_rtsp_watch_unref:
4250  * @watch: a #GstRTSPWatch
4251  *
4252  * Decreases the reference count of @watch by one. If the resulting reference
4253  * count is zero the watch and associated memory will be destroyed.
4254  */
4255 void
gst_rtsp_watch_unref(GstRTSPWatch * watch)4256 gst_rtsp_watch_unref (GstRTSPWatch * watch)
4257 {
4258   g_return_if_fail (watch != NULL);
4259 
4260   g_source_unref ((GSource *) watch);
4261 }
4262 
4263 /**
4264  * gst_rtsp_watch_set_send_backlog:
4265  * @watch: a #GstRTSPWatch
4266  * @bytes: maximum bytes
4267  * @messages: maximum messages
4268  *
4269  * Set the maximum amount of bytes and messages that will be queued in @watch.
4270  * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
4271  * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
4272  *
4273  * A value of 0 for @bytes or @messages means no limits.
4274  *
4275  * Since: 1.2
4276  */
4277 void
gst_rtsp_watch_set_send_backlog(GstRTSPWatch * watch,gsize bytes,guint messages)4278 gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
4279     gsize bytes, guint messages)
4280 {
4281   g_return_if_fail (watch != NULL);
4282 
4283   g_mutex_lock (&watch->mutex);
4284   watch->max_bytes = bytes;
4285   watch->max_messages = messages;
4286   if (!IS_BACKLOG_FULL (watch))
4287     g_cond_signal (&watch->queue_not_full);
4288   g_mutex_unlock (&watch->mutex);
4289 
4290   GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
4291       bytes, messages);
4292 }
4293 
4294 /**
4295  * gst_rtsp_watch_get_send_backlog:
4296  * @watch: a #GstRTSPWatch
4297  * @bytes: (out) (allow-none): maximum bytes
4298  * @messages: (out) (allow-none): maximum messages
4299  *
4300  * Get the maximum amount of bytes and messages that will be queued in @watch.
4301  * See gst_rtsp_watch_set_send_backlog().
4302  *
4303  * Since: 1.2
4304  */
4305 void
gst_rtsp_watch_get_send_backlog(GstRTSPWatch * watch,gsize * bytes,guint * messages)4306 gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
4307     gsize * bytes, guint * messages)
4308 {
4309   g_return_if_fail (watch != NULL);
4310 
4311   g_mutex_lock (&watch->mutex);
4312   if (bytes)
4313     *bytes = watch->max_bytes;
4314   if (messages)
4315     *messages = watch->max_messages;
4316   g_mutex_unlock (&watch->mutex);
4317 }
4318 
4319 static GstRTSPResult
gst_rtsp_watch_write_serialized_messages(GstRTSPWatch * watch,GstRTSPSerializedMessage * messages,guint n_messages,guint * id)4320 gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
4321     GstRTSPSerializedMessage * messages, guint n_messages, guint * id)
4322 {
4323   GstRTSPResult res;
4324   GMainContext *context = NULL;
4325   gint i;
4326 
4327   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4328   g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL);
4329 
4330   g_mutex_lock (&watch->mutex);
4331   if (watch->flushing)
4332     goto flushing;
4333 
4334   /* try to send the message synchronously first */
4335   if (gst_queue_array_get_length (watch->messages) == 0) {
4336     gint j, k;
4337     GOutputVector *vectors;
4338     GstMapInfo *map_infos;
4339     gsize bytes_to_write, bytes_written;
4340     guint n_vectors, n_memories, drop_messages;
4341 
4342     for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) {
4343       n_vectors++;
4344       if (messages[i].body_data) {
4345         n_vectors++;
4346       } else if (messages[i].body_buffer) {
4347         n_vectors += gst_buffer_n_memory (messages[i].body_buffer);
4348         n_memories += gst_buffer_n_memory (messages[i].body_buffer);
4349       }
4350     }
4351 
4352     vectors = g_newa (GOutputVector, n_vectors);
4353     map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
4354 
4355     for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) {
4356       vectors[j].buffer = messages[i].data_is_data_header ?
4357           messages[i].data_header : messages[i].data;
4358       vectors[j].size = messages[i].data_size;
4359       bytes_to_write += vectors[j].size;
4360       j++;
4361 
4362       if (messages[i].body_data) {
4363         vectors[j].buffer = messages[i].body_data;
4364         vectors[j].size = messages[i].body_data_size;
4365         bytes_to_write += vectors[j].size;
4366         j++;
4367       } else if (messages[i].body_buffer) {
4368         gint l, n;
4369 
4370         n = gst_buffer_n_memory (messages[i].body_buffer);
4371         for (l = 0; l < n; l++) {
4372           GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l);
4373 
4374           gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
4375           vectors[j].buffer = map_infos[k].data;
4376           vectors[j].size = map_infos[k].size;
4377           bytes_to_write += vectors[j].size;
4378 
4379           k++;
4380           j++;
4381         }
4382       }
4383     }
4384 
4385     res =
4386         writev_bytes (watch->conn->output_stream, vectors, n_vectors,
4387         &bytes_written, FALSE, watch->conn->cancellable);
4388     g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
4389 
4390     /* At this point we sent everything we could without blocking or
4391      * error and updated the offsets inside the message accordingly */
4392 
4393     /* First of all unmap all memories. This simplifies the code below */
4394     for (k = 0; k < n_memories; k++) {
4395       gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
4396     }
4397 
4398     if (res != GST_RTSP_EINTR) {
4399       /* actual error or done completely */
4400       if (id != NULL)
4401         *id = 0;
4402 
4403       /* free everything */
4404       for (i = 0, k = 0; i < n_messages; i++) {
4405         gst_rtsp_serialized_message_clear (&messages[i]);
4406       }
4407 
4408       goto done;
4409     }
4410 
4411     /* not done, let's skip all messages that were sent already and free them */
4412     for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
4413       if (bytes_written >= messages[i].data_size) {
4414         guint body_size;
4415 
4416         /* all data of this message is sent, check body and otherwise
4417          * skip the whole message for next time */
4418         messages[i].data_offset = messages[i].data_size;
4419         bytes_written -= messages[i].data_size;
4420 
4421         if (messages[i].body_data) {
4422           body_size = messages[i].body_data_size;
4423 
4424         } else if (messages[i].body_buffer) {
4425           body_size = gst_buffer_get_size (messages[i].body_buffer);
4426         } else {
4427           body_size = 0;
4428         }
4429 
4430         if (bytes_written >= body_size) {
4431           /* body written, drop this message */
4432           messages[i].body_offset = body_size;
4433           bytes_written -= body_size;
4434           drop_messages++;
4435 
4436           gst_rtsp_serialized_message_clear (&messages[i]);
4437         } else {
4438           messages[i].body_offset = bytes_written;
4439           bytes_written = 0;
4440         }
4441       } else {
4442         /* Need to continue sending from the data of this message */
4443         messages[i].data_offset = bytes_written;
4444         bytes_written = 0;
4445       }
4446     }
4447 
4448     g_assert (n_messages > drop_messages);
4449 
4450     messages += drop_messages;
4451     n_messages -= drop_messages;
4452   }
4453 
4454   /* check limits */
4455   if (IS_BACKLOG_FULL (watch))
4456     goto too_much_backlog;
4457 
4458   for (i = 0; i < n_messages; i++) {
4459     GstRTSPSerializedMessage local_message;
4460 
4461     /* make a record with the data and id for sending async */
4462     local_message = messages[i];
4463 
4464     /* copy the body data or take an additional reference to the body buffer
4465      * we don't own them here */
4466     if (local_message.body_data) {
4467       local_message.body_data =
4468           g_memdup (local_message.body_data, local_message.body_data_size);
4469     } else if (local_message.body_buffer) {
4470       gst_buffer_ref (local_message.body_buffer);
4471     }
4472     local_message.borrowed = FALSE;
4473 
4474     /* set an id for the very last message */
4475     if (i == n_messages - 1) {
4476       do {
4477         /* make sure rec->id is never 0 */
4478         local_message.id = ++watch->id;
4479       } while (G_UNLIKELY (local_message.id == 0));
4480 
4481       if (id != NULL)
4482         *id = local_message.id;
4483     } else {
4484       local_message.id = 0;
4485     }
4486 
4487     /* add the record to a queue. */
4488     gst_queue_array_push_tail_struct (watch->messages, &local_message);
4489     watch->messages_bytes +=
4490         (local_message.data_size - local_message.data_offset);
4491     if (local_message.body_data)
4492       watch->messages_bytes +=
4493           (local_message.body_data_size - local_message.body_offset);
4494     else if (local_message.body_buffer)
4495       watch->messages_bytes +=
4496           (gst_buffer_get_size (local_message.body_buffer) -
4497           local_message.body_offset);
4498   }
4499   /* each message chunks is one unit */
4500   watch->messages_count++;
4501 
4502   /* make sure the main context will now also check for writability on the
4503    * socket */
4504   context = ((GSource *) watch)->context;
4505   if (!watch->writesrc) {
4506     /* remove the read source on the write socket, we will be able to detect
4507      * errors while writing */
4508     if (watch->controlsrc) {
4509       g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
4510       g_source_unref (watch->controlsrc);
4511       watch->controlsrc = NULL;
4512     }
4513 
4514     watch->writesrc =
4515         g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
4516         (watch->conn->output_stream), NULL);
4517     g_source_set_callback (watch->writesrc,
4518         (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
4519     g_source_add_child_source ((GSource *) watch, watch->writesrc);
4520   }
4521   res = GST_RTSP_OK;
4522 
4523 done:
4524   g_mutex_unlock (&watch->mutex);
4525 
4526   if (context)
4527     g_main_context_wakeup (context);
4528 
4529   return res;
4530 
4531   /* ERRORS */
4532 flushing:
4533   {
4534     GST_DEBUG ("we are flushing");
4535     g_mutex_unlock (&watch->mutex);
4536     for (i = 0; i < n_messages; i++) {
4537       gst_rtsp_serialized_message_clear (&messages[i]);
4538     }
4539     return GST_RTSP_EINTR;
4540   }
4541 too_much_backlog:
4542   {
4543     GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
4544         G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
4545         watch->messages_bytes, watch->max_messages, watch->messages_count);
4546     g_mutex_unlock (&watch->mutex);
4547     for (i = 0; i < n_messages; i++) {
4548       gst_rtsp_serialized_message_clear (&messages[i]);
4549     }
4550     return GST_RTSP_ENOMEM;
4551   }
4552 
4553   return GST_RTSP_OK;
4554 }
4555 
4556 /**
4557  * gst_rtsp_watch_write_data:
4558  * @watch: a #GstRTSPWatch
4559  * @data: (array length=size) (transfer full): the data to queue
4560  * @size: the size of @data
4561  * @id: (out) (allow-none): location for a message ID or %NULL
4562  *
4563  * Write @data using the connection of the @watch. If it cannot be sent
4564  * immediately, it will be queued for transmission in @watch. The contents of
4565  * @message will then be serialized and transmitted when the connection of the
4566  * @watch becomes writable. In case the @message is queued, the ID returned in
4567  * @id will be non-zero and used as the ID argument in the message_sent
4568  * callback.
4569  *
4570  * This function will take ownership of @data and g_free() it after use.
4571  *
4572  * If the amount of queued data exceeds the limits set with
4573  * gst_rtsp_watch_set_send_backlog(), this function will return
4574  * #GST_RTSP_ENOMEM.
4575  *
4576  * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
4577  * are reached. #GST_RTSP_EINTR when @watch was flushing.
4578  */
4579 /* FIXME 2.0: This should've been static! */
4580 GstRTSPResult
gst_rtsp_watch_write_data(GstRTSPWatch * watch,const guint8 * data,guint size,guint * id)4581 gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
4582     guint size, guint * id)
4583 {
4584   GstRTSPSerializedMessage serialized_message;
4585 
4586   memset (&serialized_message, 0, sizeof (serialized_message));
4587   serialized_message.data = (guint8 *) data;
4588   serialized_message.data_size = size;
4589 
4590   return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message,
4591       1, id);
4592 }
4593 
4594 /**
4595  * gst_rtsp_watch_send_message:
4596  * @watch: a #GstRTSPWatch
4597  * @message: a #GstRTSPMessage
4598  * @id: (out) (allow-none): location for a message ID or %NULL
4599  *
4600  * Send a @message using the connection of the @watch. If it cannot be sent
4601  * immediately, it will be queued for transmission in @watch. The contents of
4602  * @message will then be serialized and transmitted when the connection of the
4603  * @watch becomes writable. In case the @message is queued, the ID returned in
4604  * @id will be non-zero and used as the ID argument in the message_sent
4605  * callback.
4606  *
4607  * Returns: #GST_RTSP_OK on success.
4608  */
4609 GstRTSPResult
gst_rtsp_watch_send_message(GstRTSPWatch * watch,GstRTSPMessage * message,guint * id)4610 gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
4611     guint * id)
4612 {
4613   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4614   g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
4615 
4616   return gst_rtsp_watch_send_messages (watch, message, 1, id);
4617 }
4618 
4619 /**
4620  * gst_rtsp_watch_send_messages:
4621  * @watch: a #GstRTSPWatch
4622  * @messages: (array length=n_messages): the messages to send
4623  * @n_messages: the number of messages to send
4624  * @id: (out) (allow-none): location for a message ID or %NULL
4625  *
4626  * Sends @messages using the connection of the @watch. If they cannot be sent
4627  * immediately, they will be queued for transmission in @watch. The contents of
4628  * @messages will then be serialized and transmitted when the connection of the
4629  * @watch becomes writable. In case the @messages are queued, the ID returned in
4630  * @id will be non-zero and used as the ID argument in the message_sent
4631  * callback once the last message is sent. The callback will only be called
4632  * once for the last message.
4633  *
4634  * Returns: #GST_RTSP_OK on success.
4635  *
4636  * Since: 1.16
4637  */
4638 GstRTSPResult
gst_rtsp_watch_send_messages(GstRTSPWatch * watch,GstRTSPMessage * messages,guint n_messages,guint * id)4639 gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages,
4640     guint n_messages, guint * id)
4641 {
4642   GstRTSPSerializedMessage *serialized_messages;
4643   gint i;
4644 
4645   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4646   g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
4647 
4648   serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
4649   memset (serialized_messages, 0,
4650       sizeof (GstRTSPSerializedMessage) * n_messages);
4651 
4652   for (i = 0; i < n_messages; i++) {
4653     if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i]))
4654       goto error;
4655   }
4656 
4657   return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages,
4658       n_messages, id);
4659 
4660 error:
4661   for (i = 0; i < n_messages; i++) {
4662     gst_rtsp_serialized_message_clear (&serialized_messages[i]);
4663   }
4664 
4665   return GST_RTSP_EINVAL;
4666 }
4667 
4668 /**
4669  * gst_rtsp_watch_wait_backlog:
4670  * @watch: a #GstRTSPWatch
4671  * @timeout: a #GTimeVal timeout
4672  *
4673  * Wait until there is place in the backlog queue, @timeout is reached
4674  * or @watch is set to flushing.
4675  *
4676  * If @timeout is %NULL this function can block forever. If @timeout
4677  * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT
4678  * after the timeout expired.
4679  *
4680  * The typically use of this function is when gst_rtsp_watch_write_data
4681  * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for
4682  * free space in the backlog queue and try again.
4683  *
4684  * Returns: %GST_RTSP_OK when if there is room in queue.
4685  *          %GST_RTSP_ETIMEOUT when @timeout was reached.
4686  *          %GST_RTSP_EINTR when @watch is flushing
4687  *          %GST_RTSP_EINVAL when called with invalid parameters.
4688  *
4689  * Since: 1.4
4690  */
4691 GstRTSPResult
gst_rtsp_watch_wait_backlog(GstRTSPWatch * watch,GTimeVal * timeout)4692 gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout)
4693 {
4694   gint64 end_time;
4695   GstClockTime to;
4696 
4697   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
4698 
4699   to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
4700   end_time = g_get_monotonic_time () + GST_TIME_AS_USECONDS (to);
4701 
4702   g_mutex_lock (&watch->mutex);
4703   if (watch->flushing)
4704     goto flushing;
4705 
4706   while (IS_BACKLOG_FULL (watch)) {
4707     gboolean res;
4708 
4709     res = g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time);
4710     if (watch->flushing)
4711       goto flushing;
4712 
4713     if (!res)
4714       goto timeout;
4715   }
4716   g_mutex_unlock (&watch->mutex);
4717 
4718   return GST_RTSP_OK;
4719 
4720   /* ERRORS */
4721 flushing:
4722   {
4723     GST_DEBUG ("we are flushing");
4724     g_mutex_unlock (&watch->mutex);
4725     return GST_RTSP_EINTR;
4726   }
4727 timeout:
4728   {
4729     GST_DEBUG ("we timed out");
4730     g_mutex_unlock (&watch->mutex);
4731     return GST_RTSP_ETIMEOUT;
4732   }
4733 }
4734 
4735 /**
4736  * gst_rtsp_watch_set_flushing:
4737  * @watch: a #GstRTSPWatch
4738  * @flushing: new flushing state
4739  *
4740  * When @flushing is %TRUE, abort a call to gst_rtsp_watch_wait_backlog()
4741  * and make sure gst_rtsp_watch_write_data() returns immediately with
4742  * #GST_RTSP_EINTR. And empty the queue.
4743  *
4744  * Since: 1.4
4745  */
4746 void
gst_rtsp_watch_set_flushing(GstRTSPWatch * watch,gboolean flushing)4747 gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing)
4748 {
4749   g_return_if_fail (watch != NULL);
4750 
4751   g_mutex_lock (&watch->mutex);
4752   watch->flushing = flushing;
4753   g_cond_signal (&watch->queue_not_full);
4754   if (flushing) {
4755     GstRTSPSerializedMessage *msg;
4756 
4757     while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
4758       gst_rtsp_serialized_message_clear (msg);
4759     }
4760   }
4761   g_mutex_unlock (&watch->mutex);
4762 }
4763