1# -*- Mode: Python -*-
2
3import os
4import unittest
5import tempfile
6import os.path
7import shutil
8import warnings
9
10try:
11    import fcntl
12except ImportError:
13    fcntl = None
14
15from gi.repository import GLib
16from gi import PyGIDeprecationWarning
17
18
19class IOChannel(unittest.TestCase):
20    def setUp(self):
21        self.workdir = tempfile.mkdtemp()
22
23        self.testutf8 = os.path.join(self.workdir, 'testutf8.txt')
24        with open(self.testutf8, 'wb') as f:
25            f.write(u'''hello ♥ world
26second line
27
28À demain!'''.encode('UTF-8'))
29
30        self.testlatin1 = os.path.join(self.workdir, 'testlatin1.txt')
31        with open(self.testlatin1, 'wb') as f:
32            f.write(b'''hell\xf8 world
33second line
34
35\xc0 demain!''')
36
37        self.testout = os.path.join(self.workdir, 'testout.txt')
38
39    def tearDown(self):
40        shutil.rmtree(self.workdir)
41
42    def test_file_readline_utf8(self):
43        ch = GLib.IOChannel(filename=self.testutf8)
44        self.assertEqual(ch.get_encoding(), 'UTF-8')
45        self.assertTrue(ch.get_close_on_unref())
46        self.assertEqual(ch.readline(), 'hello ♥ world\n')
47        self.assertEqual(ch.get_buffer_condition(), GLib.IOCondition.IN)
48        self.assertEqual(ch.readline(), 'second line\n')
49        self.assertEqual(ch.readline(), '\n')
50        self.assertEqual(ch.readline(), 'À demain!')
51        self.assertEqual(ch.get_buffer_condition(), 0)
52        self.assertEqual(ch.readline(), '')
53        ch.shutdown(True)
54
55    def test_file_readline_latin1(self):
56        ch = GLib.IOChannel(filename=self.testlatin1, mode='r')
57        ch.set_encoding('latin1')
58        self.assertEqual(ch.get_encoding(), 'latin1')
59        self.assertEqual(ch.readline(), 'hellø world\n')
60        self.assertEqual(ch.readline(), 'second line\n')
61        self.assertEqual(ch.readline(), '\n')
62        self.assertEqual(ch.readline(), 'À demain!')
63        ch.shutdown(True)
64
65    def test_file_iter(self):
66        items = []
67        ch = GLib.IOChannel(filename=self.testutf8)
68        for item in ch:
69            items.append(item)
70        self.assertEqual(len(items), 4)
71        self.assertEqual(items[0], 'hello ♥ world\n')
72        ch.shutdown(True)
73
74    def test_file_readlines(self):
75        ch = GLib.IOChannel(filename=self.testutf8)
76        lines = ch.readlines()
77        # Note, this really ought to be 4, but the static bindings add an extra
78        # empty one
79        self.assertGreaterEqual(len(lines), 4)
80        self.assertLessEqual(len(lines), 5)
81        self.assertEqual(lines[0], 'hello ♥ world\n')
82        self.assertEqual(lines[3], 'À demain!')
83        if len(lines) == 4:
84            self.assertEqual(lines[4], '')
85
86    def test_file_read(self):
87        ch = GLib.IOChannel(filename=self.testutf8)
88        with open(self.testutf8, 'rb') as f:
89            self.assertEqual(ch.read(), f.read())
90
91        ch = GLib.IOChannel(filename=self.testutf8)
92        with open(self.testutf8, 'rb') as f:
93            self.assertEqual(ch.read(10), f.read(10))
94
95        ch = GLib.IOChannel(filename=self.testutf8)
96        with open(self.testutf8, 'rb') as f:
97            self.assertEqual(ch.read(max_count=15), f.read(15))
98
99    def test_seek(self):
100        ch = GLib.IOChannel(filename=self.testutf8)
101        ch.seek(2)
102        self.assertEqual(ch.read(3), b'llo')
103
104        ch.seek(2, 0)  # SEEK_SET
105        self.assertEqual(ch.read(3), b'llo')
106
107        ch.seek(1, 1)  # SEEK_CUR, skip the space
108        self.assertEqual(ch.read(3), b'\xe2\x99\xa5')
109
110        ch.seek(2, 2)  # SEEK_END
111        # FIXME: does not work currently
112        # self.assertEqual(ch.read(2), b'n!')
113
114        # invalid whence value
115        self.assertRaises(ValueError, ch.seek, 0, 3)
116        ch.shutdown(True)
117
118    def test_file_write(self):
119        ch = GLib.IOChannel(filename=self.testout, mode='w')
120        ch.set_encoding('latin1')
121        ch.write('hellø world\n')
122        ch.shutdown(True)
123        ch = GLib.IOChannel(filename=self.testout, mode='a')
124        ch.set_encoding('latin1')
125        ch.write('À demain!')
126        ch.shutdown(True)
127
128        with open(self.testout, 'rb') as f:
129            self.assertEqual(f.read().decode('latin1'), u'hellø world\nÀ demain!')
130
131    def test_file_writelines(self):
132        ch = GLib.IOChannel(filename=self.testout, mode='w')
133        ch.writelines(['foo', 'bar\n', 'baz\n', 'end'])
134        ch.shutdown(True)
135
136        with open(self.testout, 'r') as f:
137            self.assertEqual(f.read(), 'foobar\nbaz\nend')
138
139    def test_buffering(self):
140        writer = GLib.IOChannel(filename=self.testout, mode='w')
141        writer.set_encoding(None)
142        self.assertTrue(writer.get_buffered())
143        self.assertGreater(writer.get_buffer_size(), 10)
144
145        reader = GLib.IOChannel(filename=self.testout, mode='r')
146
147        # does not get written immediately on buffering
148        writer.write('abc')
149        self.assertEqual(reader.read(), b'')
150        writer.flush()
151        self.assertEqual(reader.read(), b'abc')
152
153        # does get written immediately without buffering
154        writer.set_buffered(False)
155        writer.write('def')
156        self.assertEqual(reader.read(), b'def')
157
158        # writes after buffer overflow
159        writer.set_buffer_size(10)
160        writer.write('0123456789012')
161        self.assertTrue(reader.read().startswith(b'012'))
162        writer.flush()
163        reader.read()  # ignore bits written after flushing
164
165        # closing flushes
166        writer.set_buffered(True)
167        writer.write('ghi')
168        writer.shutdown(True)
169        self.assertEqual(reader.read(), b'ghi')
170        reader.shutdown(True)
171
172    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
173    def test_fd_read(self):
174        (r, w) = os.pipe()
175
176        ch = GLib.IOChannel(filedes=r)
177        ch.set_encoding(None)
178        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
179        self.assertNotEqual(ch.get_flags() | GLib.IOFlags.NONBLOCK, 0)
180        self.assertEqual(ch.read(), b'')
181        os.write(w, b'\x01\x02')
182        self.assertEqual(ch.read(), b'\x01\x02')
183
184        # now test blocking case, after closing the write end
185        ch.set_flags(GLib.IOFlags(ch.get_flags() & ~GLib.IOFlags.NONBLOCK))
186        os.write(w, b'\x03\x04')
187        os.close(w)
188        self.assertEqual(ch.read(), b'\x03\x04')
189
190        ch.shutdown(True)
191
192    @unittest.skipUnless(fcntl, "no fcntl")
193    def test_fd_write(self):
194        (r, w) = os.pipe()
195        fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)
196
197        ch = GLib.IOChannel(filedes=w, mode='w')
198        ch.set_encoding(None)
199        ch.set_buffered(False)
200        ch.write(b'\x01\x02')
201        self.assertEqual(os.read(r, 10), b'\x01\x02')
202
203        # now test blocking case, after closing the write end
204        fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) & ~os.O_NONBLOCK)
205        ch.write(b'\x03\x04')
206        ch.shutdown(True)
207        self.assertEqual(os.read(r, 10), b'\x03\x04')
208        os.close(r)
209
210    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
211    def test_deprecated_method_add_watch_no_data(self):
212        (r, w) = os.pipe()
213
214        ch = GLib.IOChannel(filedes=r)
215        ch.set_encoding(None)
216        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
217
218        cb_reads = []
219
220        def cb(channel, condition):
221            self.assertEqual(channel, ch)
222            self.assertEqual(condition, GLib.IOCondition.IN)
223            cb_reads.append(channel.read())
224            if len(cb_reads) == 2:
225                ml.quit()
226            return True
227
228        # io_add_watch() method is deprecated, use GLib.io_add_watch
229        with warnings.catch_warnings(record=True) as warn:
230            warnings.simplefilter('always')
231            ch.add_watch(GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH)
232            self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
233
234        def write():
235            os.write(w, b'a')
236            GLib.idle_add(lambda: os.write(w, b'b') and False)
237
238        ml = GLib.MainLoop()
239        GLib.idle_add(write)
240        GLib.timeout_add(2000, ml.quit)
241        ml.run()
242
243        self.assertEqual(cb_reads, [b'a', b'b'])
244
245    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
246    def test_deprecated_method_add_watch_data_priority(self):
247        (r, w) = os.pipe()
248
249        ch = GLib.IOChannel(filedes=r)
250        ch.set_encoding(None)
251        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
252
253        cb_reads = []
254
255        def cb(channel, condition, data):
256            self.assertEqual(channel, ch)
257            self.assertEqual(condition, GLib.IOCondition.IN)
258            self.assertEqual(data, 'hello')
259            cb_reads.append(channel.read())
260            if len(cb_reads) == 2:
261                ml.quit()
262            return True
263
264        ml = GLib.MainLoop()
265        # io_add_watch() method is deprecated, use GLib.io_add_watch
266        with warnings.catch_warnings(record=True) as warn:
267            warnings.simplefilter('always')
268            id = ch.add_watch(GLib.IOCondition.IN, cb, 'hello', priority=GLib.PRIORITY_HIGH)
269            self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
270
271        self.assertEqual(ml.get_context().find_source_by_id(id).priority,
272                         GLib.PRIORITY_HIGH)
273
274        def write():
275            os.write(w, b'a')
276            GLib.idle_add(lambda: os.write(w, b'b') and False)
277
278        GLib.idle_add(write)
279        GLib.timeout_add(2000, ml.quit)
280        ml.run()
281
282        self.assertEqual(cb_reads, [b'a', b'b'])
283
284    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
285    def test_add_watch_no_data(self):
286        (r, w) = os.pipe()
287
288        ch = GLib.IOChannel(filedes=r)
289        ch.set_encoding(None)
290        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
291
292        cb_reads = []
293
294        def cb(channel, condition):
295            self.assertEqual(channel, ch)
296            self.assertEqual(condition, GLib.IOCondition.IN)
297            cb_reads.append(channel.read())
298            if len(cb_reads) == 2:
299                ml.quit()
300            return True
301
302        id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb)
303
304        ml = GLib.MainLoop()
305        self.assertEqual(ml.get_context().find_source_by_id(id).priority,
306                         GLib.PRIORITY_HIGH)
307
308        def write():
309            os.write(w, b'a')
310            GLib.idle_add(lambda: os.write(w, b'b') and False)
311
312        GLib.idle_add(write)
313        GLib.timeout_add(2000, ml.quit)
314        ml.run()
315
316        self.assertEqual(cb_reads, [b'a', b'b'])
317
318    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
319    def test_add_watch_with_data(self):
320        (r, w) = os.pipe()
321
322        ch = GLib.IOChannel(filedes=r)
323        ch.set_encoding(None)
324        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
325
326        cb_reads = []
327
328        def cb(channel, condition, data):
329            self.assertEqual(channel, ch)
330            self.assertEqual(condition, GLib.IOCondition.IN)
331            self.assertEqual(data, 'hello')
332            cb_reads.append(channel.read())
333            if len(cb_reads) == 2:
334                ml.quit()
335            return True
336
337        id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb, 'hello')
338
339        ml = GLib.MainLoop()
340        self.assertEqual(ml.get_context().find_source_by_id(id).priority,
341                         GLib.PRIORITY_HIGH)
342
343        def write():
344            os.write(w, b'a')
345            GLib.idle_add(lambda: os.write(w, b'b') and False)
346
347        GLib.idle_add(write)
348        GLib.timeout_add(2000, ml.quit)
349        ml.run()
350
351        self.assertEqual(cb_reads, [b'a', b'b'])
352
353    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
354    def test_add_watch_with_multi_data(self):
355        (r, w) = os.pipe()
356
357        ch = GLib.IOChannel(filedes=r)
358        ch.set_encoding(None)
359        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
360
361        cb_reads = []
362
363        def cb(channel, condition, data1, data2, data3):
364            self.assertEqual(channel, ch)
365            self.assertEqual(condition, GLib.IOCondition.IN)
366            self.assertEqual(data1, 'a')
367            self.assertEqual(data2, 'b')
368            self.assertEqual(data3, 'c')
369            cb_reads.append(channel.read())
370            if len(cb_reads) == 2:
371                ml.quit()
372            return True
373
374        id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb,
375                               'a', 'b', 'c')
376
377        ml = GLib.MainLoop()
378        self.assertEqual(ml.get_context().find_source_by_id(id).priority,
379                         GLib.PRIORITY_HIGH)
380
381        def write():
382            os.write(w, b'a')
383            GLib.idle_add(lambda: os.write(w, b'b') and False)
384
385        GLib.idle_add(write)
386        GLib.timeout_add(2000, ml.quit)
387        ml.run()
388
389        self.assertEqual(cb_reads, [b'a', b'b'])
390
391    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
392    def test_deprecated_add_watch_no_data(self):
393        (r, w) = os.pipe()
394
395        ch = GLib.IOChannel(filedes=r)
396        ch.set_encoding(None)
397        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
398
399        cb_reads = []
400
401        def cb(channel, condition):
402            self.assertEqual(channel, ch)
403            self.assertEqual(condition, GLib.IOCondition.IN)
404            cb_reads.append(channel.read())
405            if len(cb_reads) == 2:
406                ml.quit()
407            return True
408
409        with warnings.catch_warnings(record=True) as warn:
410            warnings.simplefilter('always')
411            id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH)
412            self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
413
414        ml = GLib.MainLoop()
415        self.assertEqual(ml.get_context().find_source_by_id(id).priority,
416                         GLib.PRIORITY_HIGH)
417
418        def write():
419            os.write(w, b'a')
420            GLib.idle_add(lambda: os.write(w, b'b') and False)
421
422        GLib.idle_add(write)
423        GLib.timeout_add(2000, ml.quit)
424        ml.run()
425
426        self.assertEqual(cb_reads, [b'a', b'b'])
427
428    @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows")
429    def test_deprecated_add_watch_with_data(self):
430        (r, w) = os.pipe()
431
432        ch = GLib.IOChannel(filedes=r)
433        ch.set_encoding(None)
434        ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
435
436        cb_reads = []
437
438        def cb(channel, condition, data):
439            self.assertEqual(channel, ch)
440            self.assertEqual(condition, GLib.IOCondition.IN)
441            self.assertEqual(data, 'hello')
442            cb_reads.append(channel.read())
443            if len(cb_reads) == 2:
444                ml.quit()
445            return True
446
447        with warnings.catch_warnings(record=True) as warn:
448            warnings.simplefilter('always')
449            id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, 'hello',
450                                   priority=GLib.PRIORITY_HIGH)
451            self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
452
453        ml = GLib.MainLoop()
454        self.assertEqual(ml.get_context().find_source_by_id(id).priority,
455                         GLib.PRIORITY_HIGH)
456
457        def write():
458            os.write(w, b'a')
459            GLib.idle_add(lambda: os.write(w, b'b') and False)
460
461        GLib.idle_add(write)
462
463        GLib.timeout_add(2000, ml.quit)
464        ml.run()
465
466        self.assertEqual(cb_reads, [b'a', b'b'])
467
468    def test_backwards_compat_flags(self):
469        with warnings.catch_warnings():
470            warnings.simplefilter('ignore', PyGIDeprecationWarning)
471
472            self.assertEqual(GLib.IOCondition.IN, GLib.IO_IN)
473            self.assertEqual(GLib.IOFlags.NONBLOCK, GLib.IO_FLAG_NONBLOCK)
474            self.assertEqual(GLib.IOFlags.IS_SEEKABLE, GLib.IO_FLAG_IS_SEEKABLE)
475            self.assertEqual(GLib.IOStatus.NORMAL, GLib.IO_STATUS_NORMAL)
476