1import asyncio 2import sys 3import pytest 4import logging 5import loguru 6import time 7import threading 8import os 9import re 10import multiprocessing 11from loguru import logger 12 13 14async def async_writer(msg): 15 await asyncio.sleep(0.01) 16 print(msg, end="") 17 18 19class AsyncWriter: 20 async def __call__(self, msg): 21 await asyncio.sleep(0.01) 22 print(msg, end="") 23 24 25def test_coroutine_function(capsys): 26 async def worker(): 27 logger.debug("A message") 28 await logger.complete() 29 30 logger.add(async_writer, format="{message}") 31 32 asyncio.run(worker()) 33 34 out, err = capsys.readouterr() 35 assert err == "" 36 assert out == "A message\n" 37 38 39def test_async_callable_sink(capsys): 40 async def worker(): 41 logger.debug("A message") 42 await logger.complete() 43 44 logger.add(AsyncWriter(), format="{message}") 45 46 asyncio.run(worker()) 47 48 out, err = capsys.readouterr() 49 assert err == "" 50 assert out == "A message\n" 51 52 53def test_concurrent_execution(capsys): 54 async def task(i): 55 logger.debug("=> {}", i) 56 57 async def main(): 58 tasks = [task(i) for i in range(10)] 59 await asyncio.gather(*tasks) 60 await logger.complete() 61 62 logger.add(async_writer, format="{message}") 63 64 asyncio.run(main()) 65 66 out, err = capsys.readouterr() 67 assert err == "" 68 assert sorted(out.splitlines()) == sorted("=> %d" % i for i in range(10)) 69 70 71def test_recursive_coroutine(capsys): 72 async def task(i): 73 if i == 0: 74 await logger.complete() 75 return 76 logger.info("{}!", i) 77 await task(i - 1) 78 79 logger.add(async_writer, format="{message}") 80 81 asyncio.run(task(9)) 82 83 out, err = capsys.readouterr() 84 assert err == "" 85 assert sorted(out.splitlines()) == sorted("%d!" % i for i in range(1, 10)) 86 87 88@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop") 89def test_using_another_event_loop(capsys): 90 async def worker(): 91 logger.debug("A message") 92 await logger.complete() 93 94 loop = asyncio.new_event_loop() 95 96 logger.add(async_writer, format="{message}", loop=loop) 97 98 loop.run_until_complete(worker()) 99 100 out, err = capsys.readouterr() 101 assert err == "" 102 assert out == "A message\n" 103 104 105def test_using_another_event_loop_set_global_before_add(capsys): 106 async def worker(): 107 logger.debug("A message") 108 await logger.complete() 109 110 loop = asyncio.new_event_loop() 111 asyncio.set_event_loop(loop) 112 113 logger.add(async_writer, format="{message}", loop=loop) 114 115 loop.run_until_complete(worker()) 116 117 out, err = capsys.readouterr() 118 assert err == "" 119 assert out == "A message\n" 120 121 122def test_using_another_event_loop_set_global_after_add(capsys): 123 async def worker(): 124 logger.debug("A message") 125 await logger.complete() 126 127 loop = asyncio.new_event_loop() 128 129 logger.add(async_writer, format="{message}", loop=loop) 130 131 asyncio.set_event_loop(loop) 132 loop.run_until_complete(worker()) 133 134 out, err = capsys.readouterr() 135 assert err == "" 136 assert out == "A message\n" 137 138 139def test_run_mutiple_different_loops(capsys): 140 async def worker(i): 141 logger.debug("Message {}", i) 142 await logger.complete() 143 144 logger.add(async_writer, format="{message}", loop=None) 145 146 asyncio.run(worker(1)) 147 asyncio.run(worker(2)) 148 149 out, err = capsys.readouterr() 150 assert err == "" 151 assert out == "Message 1\nMessage 2\n" 152 153 154@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop") 155def test_run_multiple_same_loop(capsys): 156 async def worker(i): 157 logger.debug("Message {}", i) 158 await logger.complete() 159 160 loop = asyncio.new_event_loop() 161 162 logger.add(async_writer, format="{message}", loop=loop) 163 164 loop.run_until_complete(worker(1)) 165 loop.run_until_complete(worker(2)) 166 167 out, err = capsys.readouterr() 168 assert err == "" 169 assert out == "Message 1\nMessage 2\n" 170 171 172def test_run_multiple_same_loop_set_global(capsys): 173 async def worker(i): 174 logger.debug("Message {}", i) 175 await logger.complete() 176 177 loop = asyncio.new_event_loop() 178 asyncio.set_event_loop(loop) 179 180 logger.add(async_writer, format="{message}", loop=loop) 181 182 loop.run_until_complete(worker(1)) 183 loop.run_until_complete(worker(2)) 184 185 out, err = capsys.readouterr() 186 assert err == "" 187 assert out == "Message 1\nMessage 2\n" 188 189 190@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop") 191def test_complete_in_another_run(capsys): 192 async def worker_1(): 193 logger.debug("A") 194 195 async def worker_2(): 196 logger.debug("B") 197 await logger.complete() 198 199 loop = asyncio.new_event_loop() 200 201 logger.add(async_writer, format="{message}", loop=loop) 202 203 loop.run_until_complete(worker_1()) 204 loop.run_until_complete(worker_2()) 205 206 out, err = capsys.readouterr() 207 assert out == "A\nB\n" 208 assert err == "" 209 210 211def test_complete_in_another_run_set_global(capsys): 212 async def worker_1(): 213 logger.debug("A") 214 215 async def worker_2(): 216 logger.debug("B") 217 await logger.complete() 218 219 loop = asyncio.new_event_loop() 220 asyncio.set_event_loop(loop) 221 222 logger.add(async_writer, format="{message}", loop=loop) 223 224 loop.run_until_complete(worker_1()) 225 loop.run_until_complete(worker_2()) 226 227 out, err = capsys.readouterr() 228 assert out == "A\nB\n" 229 assert err == "" 230 231 232def test_tasks_cancelled_on_remove(capsys): 233 logger.add(async_writer, format="{message}", catch=False) 234 235 async def foo(): 236 logger.info("A") 237 logger.info("B") 238 logger.info("C") 239 logger.remove() 240 await logger.complete() 241 242 asyncio.run(foo()) 243 244 out, err = capsys.readouterr() 245 assert out == err == "" 246 247 248def test_remove_without_tasks(capsys): 249 logger.add(async_writer, format="{message}", catch=False) 250 logger.remove() 251 252 async def foo(): 253 logger.info("!") 254 await logger.complete() 255 256 asyncio.run(foo()) 257 258 out, err = capsys.readouterr() 259 assert out == err == "" 260 261 262def test_complete_without_tasks(capsys): 263 logger.add(async_writer, catch=False) 264 265 async def worker(): 266 await logger.complete() 267 268 asyncio.run(worker()) 269 270 out, err = capsys.readouterr() 271 assert out == err == "" 272 273 274def test_complete_stream_noop(capsys): 275 logger.add(sys.stderr, format="{message}", catch=False) 276 logger.info("A") 277 278 async def worker(): 279 logger.info("B") 280 await logger.complete() 281 logger.info("C") 282 283 asyncio.run(worker()) 284 285 logger.info("D") 286 287 out, err = capsys.readouterr() 288 assert out == "" 289 assert err == "A\nB\nC\nD\n" 290 291 292def test_complete_file_noop(tmpdir): 293 filepath = tmpdir.join("test.log") 294 295 logger.add(str(filepath), format="{message}", catch=False) 296 logger.info("A") 297 298 async def worker(): 299 logger.info("B") 300 await logger.complete() 301 logger.info("C") 302 303 asyncio.run(worker()) 304 305 logger.info("D") 306 307 assert filepath.read() == "A\nB\nC\nD\n" 308 309 310def test_complete_function_noop(): 311 out = "" 312 313 def write(msg): 314 nonlocal out 315 out += msg 316 317 logger.add(write, format="{message}", catch=False) 318 logger.info("A") 319 320 async def worker(): 321 logger.info("B") 322 await logger.complete() 323 logger.info("C") 324 325 asyncio.run(worker()) 326 327 logger.info("D") 328 329 assert out == "A\nB\nC\nD\n" 330 331 332def test_complete_standard_noop(capsys): 333 logger.add(logging.StreamHandler(sys.stderr), format="{message}", catch=False) 334 logger.info("A") 335 336 async def worker(): 337 logger.info("B") 338 await logger.complete() 339 logger.info("C") 340 341 asyncio.run(worker()) 342 343 logger.info("D") 344 345 out, err = capsys.readouterr() 346 assert out == "" 347 assert err == "A\nB\nC\nD\n" 348 349 350def test_exception_in_coroutine_caught(capsys): 351 async def sink(msg): 352 raise Exception("Oh no") 353 354 async def main(): 355 logger.add(sink, catch=True) 356 logger.info("Hello world") 357 await asyncio.sleep(0.1) 358 await logger.complete() 359 360 asyncio.run(main()) 361 362 out, err = capsys.readouterr() 363 lines = err.strip().splitlines() 364 365 assert out == "" 366 assert lines[0] == "--- Logging error in Loguru Handler #0 ---" 367 assert re.match(r"Record was: \{.*Hello world.*\}", lines[1]) 368 assert lines[-2] == "Exception: Oh no" 369 assert lines[-1] == "--- End of logging error ---" 370 371 372def test_exception_in_coroutine_not_caught(capsys, caplog): 373 async def sink(msg): 374 raise ValueError("Oh no") 375 376 async def main(): 377 logger.add(sink, catch=False) 378 logger.info("Hello world") 379 await asyncio.sleep(0.1) 380 await logger.complete() 381 382 asyncio.run(main()) 383 384 out, err = capsys.readouterr() 385 assert out == err == "" 386 387 records = caplog.records 388 assert len(records) == 1 389 record = records[0] 390 391 message = record.getMessage() 392 assert "Logging error in Loguru Handler" not in message 393 assert "was never retrieved" not in message 394 395 exc_type, exc_value, _ = record.exc_info 396 assert exc_type == ValueError 397 assert str(exc_value) == "Oh no" 398 399 400def test_exception_in_coroutine_during_complete_caught(capsys): 401 async def sink(msg): 402 await asyncio.sleep(0.1) 403 raise Exception("Oh no") 404 405 async def main(): 406 logger.add(sink, catch=True) 407 logger.info("Hello world") 408 await logger.complete() 409 410 asyncio.run(main()) 411 412 out, err = capsys.readouterr() 413 lines = err.strip().splitlines() 414 415 assert out == "" 416 assert lines[0] == "--- Logging error in Loguru Handler #0 ---" 417 assert re.match(r"Record was: \{.*Hello world.*\}", lines[1]) 418 assert lines[-2] == "Exception: Oh no" 419 assert lines[-1] == "--- End of logging error ---" 420 421 422def test_exception_in_coroutine_during_complete_not_caught(capsys, caplog): 423 async def sink(msg): 424 await asyncio.sleep(0.1) 425 raise ValueError("Oh no") 426 427 async def main(): 428 logger.add(sink, catch=False) 429 logger.info("Hello world") 430 await logger.complete() 431 432 asyncio.run(main()) 433 434 out, err = capsys.readouterr() 435 assert out == err == "" 436 437 records = caplog.records 438 assert len(records) == 1 439 record = records[0] 440 441 message = record.getMessage() 442 assert "Logging error in Loguru Handler" not in message 443 assert "was never retrieved" not in message 444 445 exc_type, exc_value, _ = record.exc_info 446 assert exc_type == ValueError 447 assert str(exc_value) == "Oh no" 448 449 450@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop") 451def test_enqueue_coroutine_loop_not_none(capsys): 452 loop = asyncio.new_event_loop() 453 logger.add(async_writer, enqueue=True, loop=loop, format="{message}", catch=False) 454 455 async def worker(): 456 logger.info("A") 457 await logger.complete() 458 459 loop.run_until_complete(worker()) 460 461 out, err = capsys.readouterr() 462 assert out == "A\n" 463 assert err == "" 464 465 466def test_enqueue_coroutine_loop_not_none_set_global(capsys): 467 loop = asyncio.new_event_loop() 468 asyncio.set_event_loop(loop) 469 470 logger.add(async_writer, enqueue=True, loop=loop, format="{message}", catch=False) 471 472 async def worker(): 473 logger.info("A") 474 await logger.complete() 475 476 loop.run_until_complete(worker()) 477 478 out, err = capsys.readouterr() 479 assert out == "A\n" 480 assert err == "" 481 482 483@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop") 484def test_enqueue_coroutine_loop_is_none(capsys): 485 loop = asyncio.new_event_loop() 486 asyncio.set_event_loop(loop) 487 488 logger.add(async_writer, enqueue=True, loop=None, format="{message}", catch=False) 489 490 async def worker(msg): 491 logger.info(msg) 492 await logger.complete() 493 494 asyncio.run(worker("A")) 495 496 out, err = capsys.readouterr() 497 assert out == err == "" 498 499 loop.run_until_complete(worker("B")) 500 501 out, err = capsys.readouterr() 502 assert out == "A\nB\n" 503 assert err == "" 504 505 506def test_enqueue_coroutine_loop_is_none_set_global(capsys): 507 loop = asyncio.new_event_loop() 508 asyncio.set_event_loop(loop) 509 510 logger.add(async_writer, enqueue=True, loop=None, format="{message}", catch=False) 511 512 async def worker(msg): 513 logger.info(msg) 514 await logger.complete() 515 516 loop.run_until_complete(worker("A")) 517 518 out, err = capsys.readouterr() 519 assert out == "A\n" 520 assert err == "" 521 522 523def test_custom_complete_function(capsys): 524 awaited = False 525 526 class Handler: 527 def write(self, message): 528 print(message, end="") 529 530 async def complete(self): 531 nonlocal awaited 532 awaited = True 533 534 async def worker(): 535 logger.info("A") 536 await logger.complete() 537 538 logger.add(Handler(), catch=False, format="{message}") 539 540 asyncio.run(worker()) 541 542 out, err = capsys.readouterr() 543 assert out == "A\n" 544 assert err == "" 545 assert awaited 546 547 548@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop") 549@pytest.mark.parametrize("loop_is_none", [True, False]) 550def test_complete_from_another_loop(capsys, loop_is_none): 551 main_loop = asyncio.new_event_loop() 552 second_loop = asyncio.new_event_loop() 553 554 loop = None if loop_is_none else main_loop 555 logger.add(async_writer, loop=loop, format="{message}") 556 557 async def worker_1(): 558 logger.info("A") 559 560 async def worker_2(): 561 await logger.complete() 562 563 main_loop.run_until_complete(worker_1()) 564 second_loop.run_until_complete(worker_2()) 565 566 out, err = capsys.readouterr() 567 assert out == err == "" 568 569 main_loop.run_until_complete(worker_2()) 570 571 out, err = capsys.readouterr() 572 assert out == "A\n" 573 assert err == "" 574 575 576@pytest.mark.parametrize("loop_is_none", [True, False]) 577def test_complete_from_another_loop_set_global(capsys, loop_is_none): 578 main_loop = asyncio.new_event_loop() 579 second_loop = asyncio.new_event_loop() 580 581 loop = None if loop_is_none else main_loop 582 logger.add(async_writer, loop=loop, format="{message}") 583 584 async def worker_1(): 585 logger.info("A") 586 587 async def worker_2(): 588 await logger.complete() 589 590 asyncio.set_event_loop(main_loop) 591 main_loop.run_until_complete(worker_1()) 592 593 asyncio.set_event_loop(second_loop) 594 second_loop.run_until_complete(worker_2()) 595 596 out, err = capsys.readouterr() 597 assert out == err == "" 598 599 asyncio.set_event_loop(main_loop) 600 main_loop.run_until_complete(worker_2()) 601 602 out, err = capsys.readouterr() 603 assert out == "A\n" 604 assert err == "" 605 606 607def test_complete_from_multiple_threads_loop_is_none(capsys): 608 async def worker(i): 609 for j in range(100): 610 await asyncio.sleep(0) 611 logger.info("{:03}", i) 612 await logger.complete() 613 614 async def sink(msg): 615 print(msg, end="") 616 617 def worker_(i): 618 asyncio.run(worker(i)) 619 620 logger.add(sink, catch=False, format="{message}") 621 622 threads = [threading.Thread(target=worker_, args=(i,)) for i in range(10)] 623 624 for t in threads: 625 t.start() 626 627 for t in threads: 628 t.join() 629 630 out, err = capsys.readouterr() 631 assert sorted(out.splitlines()) == ["{:03}".format(i) for i in range(10) for _ in range(100)] 632 assert err == "" 633 634 635def test_complete_from_multiple_threads_loop_is_not_none(capsys): 636 async def worker(i): 637 for j in range(100): 638 await asyncio.sleep(0) 639 logger.info("{:03}", i) 640 await logger.complete() 641 642 async def sink(msg): 643 print(msg, end="") 644 645 def worker_(i): 646 asyncio.run(worker(i)) 647 648 loop = asyncio.new_event_loop() 649 logger.add(sink, catch=False, format="{message}", loop=loop) 650 651 threads = [threading.Thread(target=worker_, args=(i,)) for i in range(10)] 652 653 for t in threads: 654 t.start() 655 656 for t in threads: 657 t.join() 658 659 async def complete(): 660 await logger.complete() 661 662 loop.run_until_complete(complete()) 663 664 out, err = capsys.readouterr() 665 assert sorted(out.splitlines()) == ["{:03}".format(i) for i in range(10) for _ in range(100)] 666 assert err == "" 667 668 669async def async_subworker(logger_): 670 logger_.info("Child") 671 await logger_.complete() 672 673 674async def async_mainworker(logger_): 675 logger_.info("Main") 676 await logger_.complete() 677 678 679def subworker(logger_): 680 loop = asyncio.get_event_loop() 681 loop.run_until_complete(async_subworker(logger_)) 682 683 684class Writer: 685 def __init__(self): 686 self.output = "" 687 688 async def write(self, message): 689 self.output += message 690 691 692def test_complete_with_sub_processes(monkeypatch, capsys): 693 ctx = multiprocessing.get_context("spawn") 694 monkeypatch.setattr(loguru._handler, "multiprocessing", ctx) 695 696 loop = asyncio.new_event_loop() 697 writer = Writer() 698 logger.add(writer.write, format="{message}", enqueue=True, loop=loop) 699 700 process = ctx.Process(target=subworker, args=[logger]) 701 process.start() 702 process.join() 703 704 async def complete(): 705 await logger.complete() 706 707 loop.run_until_complete(complete()) 708 709 out, err = capsys.readouterr() 710 assert out == err == "" 711 assert writer.output == "Child\n" 712