1 /*
2  * This file is part of the Nice GLib ICE library.
3  *
4  * (C) 2014 Collabora Ltd.
5  *  Contact: Philip Withnall
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is the Nice GLib ICE library.
18  *
19  * The Initial Developers of the Original Code are Collabora Ltd and Nokia
20  * Corporation. All Rights Reserved.
21  *
22  * Contributors:
23  *   Philip Withnall, Collabora Ltd.
24  *
25  * Alternatively, the contents of this file may be used under the terms of the
26  * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
27  * case the provisions of LGPL are applicable instead of those above. If you
28  * wish to allow use of your version of this file only under the terms of the
29  * LGPL and not to allow others to use your version of this file under the
30  * MPL, indicate your decision by deleting the provisions above and replace
31  * them with the notice and other provisions required by the LGPL. If you do
32  * not delete the provisions above, a recipient may use your version of this
33  * file under either the MPL or the LGPL.
34  */
35 #ifdef HAVE_CONFIG_H
36 # include <config.h>
37 #endif
38 
39 #include "agent.h"
40 #include "test-io-stream-common.h"
41 
42 #include <stdlib.h>
43 #include <string.h>
44 #ifndef G_OS_WIN32
45 #include <unistd.h>
46 #endif
47 
48 GMutex start_mutex;
49 GCond start_cond;
50 gboolean started;
51 
52 /* Waits about 10 seconds for @var to be NULL/FALSE */
53 #define WAIT_UNTIL_UNSET(var, context)			\
54   if (var)						\
55     {							\
56       int i;						\
57 							\
58       for (i = 0; i < 13 && (var); i++)			\
59 	{						\
60 	  g_usleep (1000 * (1 << i));			\
61 	  g_main_context_iteration (context, FALSE);	\
62 	}						\
63 							\
64       g_assert (!(var));				\
65     }
66 
timer_cb(gpointer pointer)67 static gboolean timer_cb (gpointer pointer)
68 {
69   g_debug ("test-thread:%s: %p", G_STRFUNC, pointer);
70 
71   /* note: should not be reached, abort */
72   g_debug ("ERROR: test has got stuck, aborting...");
73   abort();
74   exit (-1);
75 }
76 
77 static void
wait_for_start(TestIOStreamThreadData * data)78 wait_for_start (TestIOStreamThreadData *data)
79 {
80   g_mutex_lock (data->start_mutex);
81   (*data->start_count)--;
82   g_cond_broadcast (data->start_cond);
83   while (*data->start_count > 0)
84     g_cond_wait (data->start_cond, data->start_mutex);
85   g_mutex_unlock (data->start_mutex);
86 }
87 
88 static gpointer
write_thread_cb(gpointer user_data)89 write_thread_cb (gpointer user_data)
90 {
91   TestIOStreamThreadData *data = user_data;
92   GMainContext *main_context;
93   GOutputStream *output_stream = NULL;
94 
95   main_context = g_main_context_new ();
96   g_main_context_push_thread_default (main_context);
97 
98   /* Synchronise thread starting. */
99   wait_for_start (data);
100 
101   /* Wait for the stream to be writeable. */
102   g_mutex_lock (&data->write_mutex);
103   while (!(data->stream_open && data->stream_ready))
104     g_cond_wait (&data->write_cond, &data->write_mutex);
105   g_mutex_unlock (&data->write_mutex);
106 
107   if (data->reliable)
108     output_stream = g_io_stream_get_output_stream (data->io_stream);
109   data->callbacks->write_thread (output_stream, data);
110 
111   g_main_context_pop_thread_default (main_context);
112   g_main_context_unref (main_context);
113 
114   return NULL;
115 }
116 
117 static gpointer
read_thread_cb(gpointer user_data)118 read_thread_cb (gpointer user_data)
119 {
120   TestIOStreamThreadData *data = user_data;
121   GMainContext *main_context;
122   GInputStream *input_stream = NULL;
123 
124   main_context = g_main_context_new ();
125   g_main_context_push_thread_default (main_context);
126 
127   /* Synchronise thread starting. */
128   wait_for_start (data);
129 
130   if (data->reliable)
131     input_stream = g_io_stream_get_input_stream (data->io_stream);
132   data->callbacks->read_thread (input_stream, data);
133 
134   g_main_context_pop_thread_default (main_context);
135   g_main_context_unref (main_context);
136 
137   return NULL;
138 }
139 
140 static gpointer
main_thread_cb(gpointer user_data)141 main_thread_cb (gpointer user_data)
142 {
143   TestIOStreamThreadData *data = user_data;
144 
145   g_main_context_push_thread_default (data->main_context);
146 
147   /* Synchronise thread starting. */
148   wait_for_start (data);
149 
150   /* Run the main context. */
151   g_main_loop_run (data->main_loop);
152 
153   g_main_context_pop_thread_default (data->main_context);
154 
155   return NULL;
156 }
157 
158 static void
candidate_gathering_done_cb(NiceAgent * agent,guint stream_id,gpointer user_data)159 candidate_gathering_done_cb (NiceAgent *agent, guint stream_id,
160     gpointer user_data)
161 {
162   NiceAgent *other = g_object_get_data (G_OBJECT (agent), "other-agent");
163   gchar *ufrag = NULL, *password = NULL;
164   GSList *cands, *i;
165   guint id, other_id;
166   gpointer tmp;
167 
168   tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
169   id = GPOINTER_TO_UINT (tmp);
170   tmp = g_object_get_data (G_OBJECT (other), "stream-id");
171   other_id = GPOINTER_TO_UINT (tmp);
172 
173   nice_agent_get_local_credentials (agent, id, &ufrag, &password);
174   nice_agent_set_remote_credentials (other,
175       other_id, ufrag, password);
176   g_free (ufrag);
177   g_free (password);
178 
179   cands = nice_agent_get_local_candidates (agent, id, 1);
180   g_assert (cands != NULL);
181 
182   nice_agent_set_remote_candidates (other, other_id, 1, cands);
183 
184   for (i = cands; i; i = i->next)
185     nice_candidate_free ((NiceCandidate *) i->data);
186   g_slist_free (cands);
187 }
188 
189 static void
reliable_transport_writable_cb(NiceAgent * agent,guint stream_id,guint component_id,gpointer user_data)190 reliable_transport_writable_cb (NiceAgent *agent, guint stream_id,
191     guint component_id, gpointer user_data)
192 {
193   TestIOStreamThreadData *data = user_data;
194 
195   g_assert (data->reliable);
196 
197   /* Signal writeability. */
198   g_mutex_lock (&data->write_mutex);
199   data->stream_open = TRUE;
200   g_cond_broadcast (&data->write_cond);
201   g_mutex_unlock (&data->write_mutex);
202 
203   if (data->callbacks->reliable_transport_writable != NULL) {
204     GIOStream *io_stream;
205     GOutputStream *output_stream;
206 
207     io_stream = g_object_get_data (G_OBJECT (agent), "io-stream");
208     g_assert (io_stream != NULL);
209     output_stream = g_io_stream_get_output_stream (io_stream);
210 
211     data->callbacks->reliable_transport_writable (output_stream, agent,
212        stream_id, component_id, data);
213   }
214 }
215 
216 static void
component_state_changed_cb(NiceAgent * agent,guint stream_id,guint component_id,guint state,gpointer user_data)217 component_state_changed_cb (NiceAgent *agent, guint stream_id,
218     guint component_id, guint state, gpointer user_data)
219 {
220   TestIOStreamThreadData *data = user_data;
221 
222   if (state != NICE_COMPONENT_STATE_READY)
223     return;
224 
225   /* Signal stream state. */
226   g_mutex_lock (&data->write_mutex);
227   data->stream_ready = TRUE;
228   g_cond_broadcast (&data->write_cond);
229   g_mutex_unlock (&data->write_mutex);
230 }
231 
232 static void
new_selected_pair_cb(NiceAgent * agent,guint stream_id,guint component_id,gchar * lfoundation,gchar * rfoundation,gpointer user_data)233 new_selected_pair_cb (NiceAgent *agent, guint stream_id, guint component_id,
234     gchar *lfoundation, gchar *rfoundation, gpointer user_data)
235 {
236   TestIOStreamThreadData *data = user_data;
237 
238   if (data->callbacks->new_selected_pair != NULL) {
239     data->callbacks->new_selected_pair (agent, stream_id, component_id,
240         lfoundation, rfoundation, data);
241   }
242 }
243 
244 static NiceAgent *
create_agent(gboolean controlling_mode,TestIOStreamThreadData * data,GMainContext ** main_context,GMainLoop ** main_loop)245 create_agent (gboolean controlling_mode, TestIOStreamThreadData *data,
246     GMainContext **main_context, GMainLoop **main_loop)
247 {
248   NiceAgent *agent;
249   NiceAddress base_addr;
250   const gchar *stun_server, *stun_server_port;
251 
252   /* Create main contexts. */
253   *main_context = g_main_context_new ();
254   *main_loop = g_main_loop_new (*main_context, FALSE);
255 
256   /* Use Google compatibility to ignore credentials. */
257   if (data->reliable)
258     agent = nice_agent_new_reliable (*main_context, NICE_COMPATIBILITY_GOOGLE);
259   else
260     agent = nice_agent_new (*main_context, NICE_COMPATIBILITY_GOOGLE);
261 
262   g_object_set (G_OBJECT (agent),
263       "controlling-mode", controlling_mode,
264       "upnp", FALSE,
265       NULL);
266 
267   /* Specify which local interface to use. */
268   g_assert (nice_address_set_from_string (&base_addr, "127.0.0.1"));
269   nice_agent_add_local_address (agent, &base_addr);
270 
271   /* Hook up signals. */
272   g_signal_connect (G_OBJECT (agent), "candidate-gathering-done",
273       (GCallback) candidate_gathering_done_cb,
274       GUINT_TO_POINTER (controlling_mode));
275   g_signal_connect (G_OBJECT (agent), "new-selected-pair",
276       (GCallback) new_selected_pair_cb, data);
277   g_signal_connect (G_OBJECT (agent), "component-state-changed",
278     (GCallback) component_state_changed_cb, data);
279 
280   if (data->reliable) {
281     g_signal_connect (G_OBJECT (agent), "reliable-transport-writable",
282       (GCallback) reliable_transport_writable_cb, data);
283   } else {
284     data->stream_open = TRUE;
285   }
286 
287   /* Configure the STUN server. */
288   stun_server = g_getenv ("NICE_STUN_SERVER");
289   stun_server_port = g_getenv ("NICE_STUN_SERVER_PORT");
290 
291   if (stun_server != NULL) {
292     g_object_set (G_OBJECT (agent),
293         "stun-server", stun_server,
294         "stun-server-port", atoi (stun_server_port),
295         NULL);
296   }
297 
298   return agent;
299 }
300 
301 static void
add_stream(NiceAgent * agent)302 add_stream (NiceAgent *agent)
303 {
304   guint stream_id;
305 
306   stream_id = nice_agent_add_stream (agent, 2);
307   g_assert_cmpuint (stream_id, >, 0);
308 
309   g_object_set_data (G_OBJECT (agent), "stream-id",
310       GUINT_TO_POINTER (stream_id));
311 }
312 
313 static void
run_agent(TestIOStreamThreadData * data,NiceAgent * agent)314 run_agent (TestIOStreamThreadData *data, NiceAgent *agent)
315 {
316   guint stream_id;
317   gpointer tmp;
318 
319   tmp = g_object_get_data (G_OBJECT (agent), "stream-id");
320   stream_id = GPOINTER_TO_UINT (tmp);
321 
322   nice_agent_gather_candidates (agent, stream_id);
323 
324   if (data->reliable) {
325     data->io_stream =
326         G_IO_STREAM (nice_agent_get_io_stream (agent, stream_id, 1));
327     g_object_set_data (G_OBJECT (agent), "io-stream", data->io_stream);
328   } else {
329     data->io_stream = NULL;
330   }
331 }
332 
333 GThread *
spawn_thread(const gchar * thread_name,GThreadFunc thread_func,gpointer user_data)334 spawn_thread (const gchar *thread_name, GThreadFunc thread_func,
335     gpointer user_data)
336 {
337   GThread *thread;
338 
339   thread = g_thread_new (thread_name, thread_func, user_data);
340   g_assert (thread);
341 
342   return thread;
343 }
344 
345 void
run_io_stream_test(guint deadlock_timeout,gboolean reliable,const TestIOStreamCallbacks * callbacks,gpointer l_user_data,GDestroyNotify l_user_data_free,gpointer r_user_data,GDestroyNotify r_user_data_free)346 run_io_stream_test (guint deadlock_timeout, gboolean reliable,
347     const TestIOStreamCallbacks *callbacks,
348     gpointer l_user_data, GDestroyNotify l_user_data_free,
349     gpointer r_user_data, GDestroyNotify r_user_data_free)
350 {
351   GMainLoop *error_loop;
352   GThread *l_main_thread, *r_main_thread;
353   GThread *l_write_thread, *l_read_thread, *r_write_thread, *r_read_thread;
354   TestIOStreamThreadData l_data = { NULL }, r_data = { NULL };
355   GMutex mutex;
356   GCond cond;
357   guint start_count = 6;
358   guint stream_id;
359 
360   g_mutex_init (&mutex);
361   g_cond_init (&cond);
362 
363   error_loop = g_main_loop_new (NULL, FALSE);
364 
365   /* Set up data structures. */
366   l_data.reliable = reliable;
367   l_data.error_loop = error_loop;
368   l_data.callbacks = callbacks;
369   l_data.user_data = l_user_data;
370   l_data.user_data_free = l_user_data_free;
371 
372   g_cond_init (&l_data.write_cond);
373   g_mutex_init (&l_data.write_mutex);
374   l_data.stream_open = FALSE;
375   l_data.stream_ready = FALSE;
376   l_data.start_mutex = &mutex;
377   l_data.start_cond = &cond;
378   l_data.start_count = &start_count;
379 
380   r_data.reliable = reliable;
381   r_data.error_loop = error_loop;
382   r_data.callbacks = callbacks;
383   r_data.user_data = r_user_data;
384   r_data.user_data_free = r_user_data_free;
385 
386   g_cond_init (&r_data.write_cond);
387   g_mutex_init (&r_data.write_mutex);
388   r_data.stream_open = FALSE;
389   r_data.stream_ready = FALSE;
390   r_data.start_mutex = &mutex;
391   r_data.start_cond = &cond;
392   r_data.start_count = &start_count;
393 
394   l_data.other = &r_data;
395   r_data.other = &l_data;
396 
397   /* Create the L and R agents. */
398   l_data.agent = create_agent (TRUE, &l_data,
399       &l_data.main_context, &l_data.main_loop);
400   r_data.agent = create_agent (FALSE, &r_data,
401       &r_data.main_context, &r_data.main_loop);
402 
403   g_object_set_data (G_OBJECT (l_data.agent), "other-agent", r_data.agent);
404   g_object_set_data (G_OBJECT (r_data.agent), "other-agent", l_data.agent);
405 
406   /* Add a timer to catch deadlocks. */
407   g_timeout_add_seconds (deadlock_timeout, timer_cb, NULL);
408 
409   l_main_thread = spawn_thread ("libnice L main", main_thread_cb, &l_data);
410   r_main_thread = spawn_thread ("libnice R main", main_thread_cb, &r_data);
411 
412   add_stream (l_data.agent);
413   add_stream (r_data.agent);
414   run_agent (&l_data, l_data.agent);
415   run_agent (&r_data, r_data.agent);
416 
417   l_read_thread = spawn_thread ("libnice L read", read_thread_cb, &l_data);
418   r_read_thread = spawn_thread ("libnice R read", read_thread_cb, &r_data);
419 
420   if (callbacks->write_thread != NULL) {
421     l_write_thread = spawn_thread ("libnice L write", write_thread_cb, &l_data);
422     r_write_thread = spawn_thread ("libnice R write", write_thread_cb, &r_data);
423   } else {
424     g_mutex_lock (&mutex);
425     start_count -= 2;
426     g_cond_broadcast (&cond);
427     g_mutex_unlock (&mutex);
428 
429     l_write_thread = NULL;
430     r_write_thread = NULL;
431   }
432 
433   /* Run loop for error timer */
434   g_main_loop_run (error_loop);
435 
436   /* Clean up the main loops and threads. */
437   stop_main_loop (l_data.main_loop);
438   stop_main_loop (r_data.main_loop);
439 
440   g_thread_join (l_read_thread);
441   g_thread_join (r_read_thread);
442   if (l_write_thread != NULL)
443     g_thread_join (l_write_thread);
444   if (r_write_thread != NULL)
445     g_thread_join (r_write_thread);
446   g_thread_join (l_main_thread);
447   g_thread_join (r_main_thread);
448 
449   /* Free things. */
450   if (r_data.user_data_free != NULL)
451     r_data.user_data_free (r_data.user_data);
452 
453   if (l_data.user_data_free != NULL)
454     l_data.user_data_free (l_data.user_data);
455 
456   g_cond_clear (&r_data.write_cond);
457   g_mutex_clear (&r_data.write_mutex);
458   g_cond_clear (&l_data.write_cond);
459   g_mutex_clear (&l_data.write_mutex);
460 
461   if (r_data.io_stream != NULL)
462     g_object_unref (r_data.io_stream);
463   if (l_data.io_stream != NULL)
464     g_object_unref (l_data.io_stream);
465 
466   stream_id =
467     GPOINTER_TO_UINT (g_object_get_data (G_OBJECT (r_data.agent), "stream-id"));
468   if (stream_id != 0)
469     nice_agent_remove_stream (r_data.agent, stream_id);
470   stream_id =
471     GPOINTER_TO_UINT (g_object_get_data (G_OBJECT (l_data.agent), "stream-id"));
472   if (stream_id != 0)
473     nice_agent_remove_stream (l_data.agent, stream_id);
474 
475   g_object_add_weak_pointer (G_OBJECT (r_data.agent),
476                              (gpointer *) &r_data.agent);
477   g_object_add_weak_pointer (G_OBJECT (l_data.agent),
478                              (gpointer *) &l_data.agent);
479 
480   g_object_unref (r_data.agent);
481   g_object_unref (l_data.agent);
482 
483   WAIT_UNTIL_UNSET (r_data.agent, r_data.main_context);
484   WAIT_UNTIL_UNSET (l_data.agent, l_data.main_context);
485 
486   g_main_loop_unref (r_data.main_loop);
487   g_main_loop_unref (l_data.main_loop);
488 
489   g_main_context_unref (r_data.main_context);
490   g_main_context_unref (l_data.main_context);
491 
492   g_main_loop_unref (error_loop);
493 
494   g_mutex_clear (&mutex);
495   g_cond_clear (&cond);
496 }
497 
498 /* Once we’ve received all the expected bytes, wait to finish sending all bytes,
499  * then send and wait for the close message. Finally, remove the stream.
500  *
501  * This must only be called from the read thread implementation. */
502 void
check_for_termination(TestIOStreamThreadData * data,gsize * recv_count,gsize * other_recv_count,volatile gsize * send_count,gsize expected_recv_count)503 check_for_termination (TestIOStreamThreadData *data, gsize *recv_count,
504     gsize *other_recv_count, volatile gsize *send_count, gsize expected_recv_count)
505 {
506   guint stream_id;
507   gpointer tmp;
508   GError *error = NULL;
509 
510   /* Wait for transmission to complete. */
511   while (*send_count < expected_recv_count) {
512     if (data->callbacks->wait_transmission_cb) {
513       data->callbacks->wait_transmission_cb (data->agent);
514     }
515   }
516 
517   /* Send a close message. */
518   tmp = g_object_get_data (G_OBJECT (data->agent), "stream-id");
519   stream_id = GPOINTER_TO_UINT (tmp);
520 
521   /* Can't be certain enough to test for termination on non-reliable streams.
522    * There may be packet losses, etc
523    */
524   if (data->io_stream) {
525     gssize len;
526 
527     g_output_stream_close (g_io_stream_get_output_stream (data->io_stream),
528             NULL, &error);
529 
530     g_assert_no_error (error);
531 
532     len = g_input_stream_skip (g_io_stream_get_input_stream (data->io_stream),
533         1024 * 1024, NULL, &error);
534     g_assert_no_error (error);
535     g_assert_cmpint (len, ==, 0);
536   }
537 
538   /* Remove the stream and run away. */
539   nice_agent_remove_stream (data->agent, stream_id);
540   g_object_set_data (G_OBJECT (data->agent), "stream-id", GUINT_TO_POINTER (0));
541   g_clear_object (&data->io_stream);
542 
543   data->done = TRUE;
544   if (data->other->done)
545     g_main_loop_quit (data->error_loop);
546 
547   /* If both sides have finished, quit the test main loop. */
548   if (*recv_count > expected_recv_count &&
549       *other_recv_count > expected_recv_count) {
550     g_main_loop_quit (data->error_loop);
551   }
552 }
553 
554 void
stop_main_loop(GMainLoop * loop)555 stop_main_loop (GMainLoop *loop)
556 {
557   GSource *src = g_idle_source_new ();
558   g_source_set_callback (src, G_SOURCE_FUNC (g_main_loop_quit),
559       g_main_loop_ref (loop), (GDestroyNotify) g_main_loop_unref);
560   g_source_attach (src, g_main_loop_get_context (loop));
561   g_source_unref (src);
562 }
563