1# -*- Mode: Python -*- 2 3import os 4import unittest 5import tempfile 6import os.path 7import shutil 8import warnings 9 10try: 11 import fcntl 12except ImportError: 13 fcntl = None 14 15from gi.repository import GLib 16from gi import PyGIDeprecationWarning 17 18 19class IOChannel(unittest.TestCase): 20 def setUp(self): 21 self.workdir = tempfile.mkdtemp() 22 23 self.testutf8 = os.path.join(self.workdir, 'testutf8.txt') 24 with open(self.testutf8, 'wb') as f: 25 f.write(u'''hello ♥ world 26second line 27 28À demain!'''.encode('UTF-8')) 29 30 self.testlatin1 = os.path.join(self.workdir, 'testlatin1.txt') 31 with open(self.testlatin1, 'wb') as f: 32 f.write(b'''hell\xf8 world 33second line 34 35\xc0 demain!''') 36 37 self.testout = os.path.join(self.workdir, 'testout.txt') 38 39 def tearDown(self): 40 shutil.rmtree(self.workdir) 41 42 def test_file_readline_utf8(self): 43 ch = GLib.IOChannel(filename=self.testutf8) 44 self.assertEqual(ch.get_encoding(), 'UTF-8') 45 self.assertTrue(ch.get_close_on_unref()) 46 self.assertEqual(ch.readline(), 'hello ♥ world\n') 47 self.assertEqual(ch.get_buffer_condition(), GLib.IOCondition.IN) 48 self.assertEqual(ch.readline(), 'second line\n') 49 self.assertEqual(ch.readline(), '\n') 50 self.assertEqual(ch.readline(), 'À demain!') 51 self.assertEqual(ch.get_buffer_condition(), 0) 52 self.assertEqual(ch.readline(), '') 53 ch.shutdown(True) 54 55 def test_file_readline_latin1(self): 56 ch = GLib.IOChannel(filename=self.testlatin1, mode='r') 57 ch.set_encoding('latin1') 58 self.assertEqual(ch.get_encoding(), 'latin1') 59 self.assertEqual(ch.readline(), 'hellø world\n') 60 self.assertEqual(ch.readline(), 'second line\n') 61 self.assertEqual(ch.readline(), '\n') 62 self.assertEqual(ch.readline(), 'À demain!') 63 ch.shutdown(True) 64 65 def test_file_iter(self): 66 items = [] 67 ch = GLib.IOChannel(filename=self.testutf8) 68 for item in ch: 69 items.append(item) 70 self.assertEqual(len(items), 4) 71 self.assertEqual(items[0], 'hello ♥ world\n') 72 ch.shutdown(True) 73 74 def test_file_readlines(self): 75 ch = GLib.IOChannel(filename=self.testutf8) 76 lines = ch.readlines() 77 # Note, this really ought to be 4, but the static bindings add an extra 78 # empty one 79 self.assertGreaterEqual(len(lines), 4) 80 self.assertLessEqual(len(lines), 5) 81 self.assertEqual(lines[0], 'hello ♥ world\n') 82 self.assertEqual(lines[3], 'À demain!') 83 if len(lines) == 4: 84 self.assertEqual(lines[4], '') 85 86 def test_file_read(self): 87 ch = GLib.IOChannel(filename=self.testutf8) 88 with open(self.testutf8, 'rb') as f: 89 self.assertEqual(ch.read(), f.read()) 90 91 ch = GLib.IOChannel(filename=self.testutf8) 92 with open(self.testutf8, 'rb') as f: 93 self.assertEqual(ch.read(10), f.read(10)) 94 95 ch = GLib.IOChannel(filename=self.testutf8) 96 with open(self.testutf8, 'rb') as f: 97 self.assertEqual(ch.read(max_count=15), f.read(15)) 98 99 def test_seek(self): 100 ch = GLib.IOChannel(filename=self.testutf8) 101 ch.seek(2) 102 self.assertEqual(ch.read(3), b'llo') 103 104 ch.seek(2, 0) # SEEK_SET 105 self.assertEqual(ch.read(3), b'llo') 106 107 ch.seek(1, 1) # SEEK_CUR, skip the space 108 self.assertEqual(ch.read(3), b'\xe2\x99\xa5') 109 110 ch.seek(2, 2) # SEEK_END 111 # FIXME: does not work currently 112 # self.assertEqual(ch.read(2), b'n!') 113 114 # invalid whence value 115 self.assertRaises(ValueError, ch.seek, 0, 3) 116 ch.shutdown(True) 117 118 def test_file_write(self): 119 ch = GLib.IOChannel(filename=self.testout, mode='w') 120 ch.set_encoding('latin1') 121 ch.write('hellø world\n') 122 ch.shutdown(True) 123 ch = GLib.IOChannel(filename=self.testout, mode='a') 124 ch.set_encoding('latin1') 125 ch.write('À demain!') 126 ch.shutdown(True) 127 128 with open(self.testout, 'rb') as f: 129 self.assertEqual(f.read().decode('latin1'), u'hellø world\nÀ demain!') 130 131 def test_file_writelines(self): 132 ch = GLib.IOChannel(filename=self.testout, mode='w') 133 ch.writelines(['foo', 'bar\n', 'baz\n', 'end']) 134 ch.shutdown(True) 135 136 with open(self.testout, 'r') as f: 137 self.assertEqual(f.read(), 'foobar\nbaz\nend') 138 139 def test_buffering(self): 140 writer = GLib.IOChannel(filename=self.testout, mode='w') 141 writer.set_encoding(None) 142 self.assertTrue(writer.get_buffered()) 143 self.assertGreater(writer.get_buffer_size(), 10) 144 145 reader = GLib.IOChannel(filename=self.testout, mode='r') 146 147 # does not get written immediately on buffering 148 writer.write('abc') 149 self.assertEqual(reader.read(), b'') 150 writer.flush() 151 self.assertEqual(reader.read(), b'abc') 152 153 # does get written immediately without buffering 154 writer.set_buffered(False) 155 writer.write('def') 156 self.assertEqual(reader.read(), b'def') 157 158 # writes after buffer overflow 159 writer.set_buffer_size(10) 160 writer.write('0123456789012') 161 self.assertTrue(reader.read().startswith(b'012')) 162 writer.flush() 163 reader.read() # ignore bits written after flushing 164 165 # closing flushes 166 writer.set_buffered(True) 167 writer.write('ghi') 168 writer.shutdown(True) 169 self.assertEqual(reader.read(), b'ghi') 170 reader.shutdown(True) 171 172 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 173 def test_fd_read(self): 174 (r, w) = os.pipe() 175 176 ch = GLib.IOChannel(filedes=r) 177 ch.set_encoding(None) 178 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 179 self.assertNotEqual(ch.get_flags() | GLib.IOFlags.NONBLOCK, 0) 180 self.assertEqual(ch.read(), b'') 181 os.write(w, b'\x01\x02') 182 self.assertEqual(ch.read(), b'\x01\x02') 183 184 # now test blocking case, after closing the write end 185 ch.set_flags(GLib.IOFlags(ch.get_flags() & ~GLib.IOFlags.NONBLOCK)) 186 os.write(w, b'\x03\x04') 187 os.close(w) 188 self.assertEqual(ch.read(), b'\x03\x04') 189 190 ch.shutdown(True) 191 192 @unittest.skipUnless(fcntl, "no fcntl") 193 def test_fd_write(self): 194 (r, w) = os.pipe() 195 fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK) 196 197 ch = GLib.IOChannel(filedes=w, mode='w') 198 ch.set_encoding(None) 199 ch.set_buffered(False) 200 ch.write(b'\x01\x02') 201 self.assertEqual(os.read(r, 10), b'\x01\x02') 202 203 # now test blocking case, after closing the write end 204 fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) & ~os.O_NONBLOCK) 205 ch.write(b'\x03\x04') 206 ch.shutdown(True) 207 self.assertEqual(os.read(r, 10), b'\x03\x04') 208 os.close(r) 209 210 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 211 def test_deprecated_method_add_watch_no_data(self): 212 (r, w) = os.pipe() 213 214 ch = GLib.IOChannel(filedes=r) 215 ch.set_encoding(None) 216 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 217 218 cb_reads = [] 219 220 def cb(channel, condition): 221 self.assertEqual(channel, ch) 222 self.assertEqual(condition, GLib.IOCondition.IN) 223 cb_reads.append(channel.read()) 224 if len(cb_reads) == 2: 225 ml.quit() 226 return True 227 228 # io_add_watch() method is deprecated, use GLib.io_add_watch 229 with warnings.catch_warnings(record=True) as warn: 230 warnings.simplefilter('always') 231 ch.add_watch(GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH) 232 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning)) 233 234 def write(): 235 os.write(w, b'a') 236 GLib.idle_add(lambda: os.write(w, b'b') and False) 237 238 ml = GLib.MainLoop() 239 GLib.idle_add(write) 240 GLib.timeout_add(2000, ml.quit) 241 ml.run() 242 243 self.assertEqual(cb_reads, [b'a', b'b']) 244 245 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 246 def test_deprecated_method_add_watch_data_priority(self): 247 (r, w) = os.pipe() 248 249 ch = GLib.IOChannel(filedes=r) 250 ch.set_encoding(None) 251 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 252 253 cb_reads = [] 254 255 def cb(channel, condition, data): 256 self.assertEqual(channel, ch) 257 self.assertEqual(condition, GLib.IOCondition.IN) 258 self.assertEqual(data, 'hello') 259 cb_reads.append(channel.read()) 260 if len(cb_reads) == 2: 261 ml.quit() 262 return True 263 264 ml = GLib.MainLoop() 265 # io_add_watch() method is deprecated, use GLib.io_add_watch 266 with warnings.catch_warnings(record=True) as warn: 267 warnings.simplefilter('always') 268 id = ch.add_watch(GLib.IOCondition.IN, cb, 'hello', priority=GLib.PRIORITY_HIGH) 269 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning)) 270 271 self.assertEqual(ml.get_context().find_source_by_id(id).priority, 272 GLib.PRIORITY_HIGH) 273 274 def write(): 275 os.write(w, b'a') 276 GLib.idle_add(lambda: os.write(w, b'b') and False) 277 278 GLib.idle_add(write) 279 GLib.timeout_add(2000, ml.quit) 280 ml.run() 281 282 self.assertEqual(cb_reads, [b'a', b'b']) 283 284 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 285 def test_add_watch_no_data(self): 286 (r, w) = os.pipe() 287 288 ch = GLib.IOChannel(filedes=r) 289 ch.set_encoding(None) 290 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 291 292 cb_reads = [] 293 294 def cb(channel, condition): 295 self.assertEqual(channel, ch) 296 self.assertEqual(condition, GLib.IOCondition.IN) 297 cb_reads.append(channel.read()) 298 if len(cb_reads) == 2: 299 ml.quit() 300 return True 301 302 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb) 303 304 ml = GLib.MainLoop() 305 self.assertEqual(ml.get_context().find_source_by_id(id).priority, 306 GLib.PRIORITY_HIGH) 307 308 def write(): 309 os.write(w, b'a') 310 GLib.idle_add(lambda: os.write(w, b'b') and False) 311 312 GLib.idle_add(write) 313 GLib.timeout_add(2000, ml.quit) 314 ml.run() 315 316 self.assertEqual(cb_reads, [b'a', b'b']) 317 318 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 319 def test_add_watch_with_data(self): 320 (r, w) = os.pipe() 321 322 ch = GLib.IOChannel(filedes=r) 323 ch.set_encoding(None) 324 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 325 326 cb_reads = [] 327 328 def cb(channel, condition, data): 329 self.assertEqual(channel, ch) 330 self.assertEqual(condition, GLib.IOCondition.IN) 331 self.assertEqual(data, 'hello') 332 cb_reads.append(channel.read()) 333 if len(cb_reads) == 2: 334 ml.quit() 335 return True 336 337 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb, 'hello') 338 339 ml = GLib.MainLoop() 340 self.assertEqual(ml.get_context().find_source_by_id(id).priority, 341 GLib.PRIORITY_HIGH) 342 343 def write(): 344 os.write(w, b'a') 345 GLib.idle_add(lambda: os.write(w, b'b') and False) 346 347 GLib.idle_add(write) 348 GLib.timeout_add(2000, ml.quit) 349 ml.run() 350 351 self.assertEqual(cb_reads, [b'a', b'b']) 352 353 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 354 def test_add_watch_with_multi_data(self): 355 (r, w) = os.pipe() 356 357 ch = GLib.IOChannel(filedes=r) 358 ch.set_encoding(None) 359 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 360 361 cb_reads = [] 362 363 def cb(channel, condition, data1, data2, data3): 364 self.assertEqual(channel, ch) 365 self.assertEqual(condition, GLib.IOCondition.IN) 366 self.assertEqual(data1, 'a') 367 self.assertEqual(data2, 'b') 368 self.assertEqual(data3, 'c') 369 cb_reads.append(channel.read()) 370 if len(cb_reads) == 2: 371 ml.quit() 372 return True 373 374 id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb, 375 'a', 'b', 'c') 376 377 ml = GLib.MainLoop() 378 self.assertEqual(ml.get_context().find_source_by_id(id).priority, 379 GLib.PRIORITY_HIGH) 380 381 def write(): 382 os.write(w, b'a') 383 GLib.idle_add(lambda: os.write(w, b'b') and False) 384 385 GLib.idle_add(write) 386 GLib.timeout_add(2000, ml.quit) 387 ml.run() 388 389 self.assertEqual(cb_reads, [b'a', b'b']) 390 391 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 392 def test_deprecated_add_watch_no_data(self): 393 (r, w) = os.pipe() 394 395 ch = GLib.IOChannel(filedes=r) 396 ch.set_encoding(None) 397 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 398 399 cb_reads = [] 400 401 def cb(channel, condition): 402 self.assertEqual(channel, ch) 403 self.assertEqual(condition, GLib.IOCondition.IN) 404 cb_reads.append(channel.read()) 405 if len(cb_reads) == 2: 406 ml.quit() 407 return True 408 409 with warnings.catch_warnings(record=True) as warn: 410 warnings.simplefilter('always') 411 id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH) 412 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning)) 413 414 ml = GLib.MainLoop() 415 self.assertEqual(ml.get_context().find_source_by_id(id).priority, 416 GLib.PRIORITY_HIGH) 417 418 def write(): 419 os.write(w, b'a') 420 GLib.idle_add(lambda: os.write(w, b'b') and False) 421 422 GLib.idle_add(write) 423 GLib.timeout_add(2000, ml.quit) 424 ml.run() 425 426 self.assertEqual(cb_reads, [b'a', b'b']) 427 428 @unittest.skipIf(os.name == "nt", "NONBLOCK not implemented on Windows") 429 def test_deprecated_add_watch_with_data(self): 430 (r, w) = os.pipe() 431 432 ch = GLib.IOChannel(filedes=r) 433 ch.set_encoding(None) 434 ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK) 435 436 cb_reads = [] 437 438 def cb(channel, condition, data): 439 self.assertEqual(channel, ch) 440 self.assertEqual(condition, GLib.IOCondition.IN) 441 self.assertEqual(data, 'hello') 442 cb_reads.append(channel.read()) 443 if len(cb_reads) == 2: 444 ml.quit() 445 return True 446 447 with warnings.catch_warnings(record=True) as warn: 448 warnings.simplefilter('always') 449 id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, 'hello', 450 priority=GLib.PRIORITY_HIGH) 451 self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning)) 452 453 ml = GLib.MainLoop() 454 self.assertEqual(ml.get_context().find_source_by_id(id).priority, 455 GLib.PRIORITY_HIGH) 456 457 def write(): 458 os.write(w, b'a') 459 GLib.idle_add(lambda: os.write(w, b'b') and False) 460 461 GLib.idle_add(write) 462 463 GLib.timeout_add(2000, ml.quit) 464 ml.run() 465 466 self.assertEqual(cb_reads, [b'a', b'b']) 467 468 def test_backwards_compat_flags(self): 469 with warnings.catch_warnings(): 470 warnings.simplefilter('ignore', PyGIDeprecationWarning) 471 472 self.assertEqual(GLib.IOCondition.IN, GLib.IO_IN) 473 self.assertEqual(GLib.IOFlags.NONBLOCK, GLib.IO_FLAG_NONBLOCK) 474 self.assertEqual(GLib.IOFlags.IS_SEEKABLE, GLib.IO_FLAG_IS_SEEKABLE) 475 self.assertEqual(GLib.IOStatus.NORMAL, GLib.IO_STATUS_NORMAL) 476