1# Copyright 2015 Christoph Reiter 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 2 of the License, or 6# (at your option) any later version. 7 8import threading 9 10from tests import TestCase 11 12from gi.repository import Gtk 13 14from quodlibet.util.thread import call_async, call_async_background, \ 15 Cancellable, terminate_all 16 17 18class Tcall_async(TestCase): 19 20 def test_main(self): 21 cancel = Cancellable() 22 23 data = [] 24 25 def func(): 26 data.append(threading.current_thread().name) 27 28 def callback(result): 29 data.append(threading.current_thread().name) 30 31 call_async(func, cancel, callback) 32 Gtk.main_iteration() 33 while Gtk.events_pending(): 34 Gtk.main_iteration() 35 36 call_async_background(func, cancel, callback) 37 Gtk.main_iteration() 38 while Gtk.events_pending(): 39 Gtk.main_iteration() 40 41 main_name = threading.current_thread().name 42 self.assertEqual(len(data), 4) 43 self.assertNotEqual(data[0], main_name) 44 self.assertEqual(data[1], main_name) 45 self.assertNotEqual(data[2], main_name) 46 self.assertEqual(data[3], main_name) 47 48 def test_cancel(self): 49 def func(): 50 assert 0 51 52 def callback(result): 53 assert 0 54 55 cancel = Cancellable() 56 cancel.cancel() 57 call_async(func, cancel, callback) 58 Gtk.main_iteration() 59 while Gtk.events_pending(): 60 Gtk.main_iteration() 61 62 def test_terminate_all(self): 63 terminate_all() 64