1import asyncio
2import sys
3import pytest
4import logging
5import loguru
6import time
7import threading
8import os
9import re
10import multiprocessing
11from loguru import logger
12
13
14async def async_writer(msg):
15    await asyncio.sleep(0.01)
16    print(msg, end="")
17
18
19class AsyncWriter:
20    async def __call__(self, msg):
21        await asyncio.sleep(0.01)
22        print(msg, end="")
23
24
25def test_coroutine_function(capsys):
26    async def worker():
27        logger.debug("A message")
28        await logger.complete()
29
30    logger.add(async_writer, format="{message}")
31
32    asyncio.run(worker())
33
34    out, err = capsys.readouterr()
35    assert err == ""
36    assert out == "A message\n"
37
38
39def test_async_callable_sink(capsys):
40    async def worker():
41        logger.debug("A message")
42        await logger.complete()
43
44    logger.add(AsyncWriter(), format="{message}")
45
46    asyncio.run(worker())
47
48    out, err = capsys.readouterr()
49    assert err == ""
50    assert out == "A message\n"
51
52
53def test_concurrent_execution(capsys):
54    async def task(i):
55        logger.debug("=> {}", i)
56
57    async def main():
58        tasks = [task(i) for i in range(10)]
59        await asyncio.gather(*tasks)
60        await logger.complete()
61
62    logger.add(async_writer, format="{message}")
63
64    asyncio.run(main())
65
66    out, err = capsys.readouterr()
67    assert err == ""
68    assert sorted(out.splitlines()) == sorted("=> %d" % i for i in range(10))
69
70
71def test_recursive_coroutine(capsys):
72    async def task(i):
73        if i == 0:
74            await logger.complete()
75            return
76        logger.info("{}!", i)
77        await task(i - 1)
78
79    logger.add(async_writer, format="{message}")
80
81    asyncio.run(task(9))
82
83    out, err = capsys.readouterr()
84    assert err == ""
85    assert sorted(out.splitlines()) == sorted("%d!" % i for i in range(1, 10))
86
87
88@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop")
89def test_using_another_event_loop(capsys):
90    async def worker():
91        logger.debug("A message")
92        await logger.complete()
93
94    loop = asyncio.new_event_loop()
95
96    logger.add(async_writer, format="{message}", loop=loop)
97
98    loop.run_until_complete(worker())
99
100    out, err = capsys.readouterr()
101    assert err == ""
102    assert out == "A message\n"
103
104
105def test_using_another_event_loop_set_global_before_add(capsys):
106    async def worker():
107        logger.debug("A message")
108        await logger.complete()
109
110    loop = asyncio.new_event_loop()
111    asyncio.set_event_loop(loop)
112
113    logger.add(async_writer, format="{message}", loop=loop)
114
115    loop.run_until_complete(worker())
116
117    out, err = capsys.readouterr()
118    assert err == ""
119    assert out == "A message\n"
120
121
122def test_using_another_event_loop_set_global_after_add(capsys):
123    async def worker():
124        logger.debug("A message")
125        await logger.complete()
126
127    loop = asyncio.new_event_loop()
128
129    logger.add(async_writer, format="{message}", loop=loop)
130
131    asyncio.set_event_loop(loop)
132    loop.run_until_complete(worker())
133
134    out, err = capsys.readouterr()
135    assert err == ""
136    assert out == "A message\n"
137
138
139def test_run_mutiple_different_loops(capsys):
140    async def worker(i):
141        logger.debug("Message {}", i)
142        await logger.complete()
143
144    logger.add(async_writer, format="{message}", loop=None)
145
146    asyncio.run(worker(1))
147    asyncio.run(worker(2))
148
149    out, err = capsys.readouterr()
150    assert err == ""
151    assert out == "Message 1\nMessage 2\n"
152
153
154@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop")
155def test_run_multiple_same_loop(capsys):
156    async def worker(i):
157        logger.debug("Message {}", i)
158        await logger.complete()
159
160    loop = asyncio.new_event_loop()
161
162    logger.add(async_writer, format="{message}", loop=loop)
163
164    loop.run_until_complete(worker(1))
165    loop.run_until_complete(worker(2))
166
167    out, err = capsys.readouterr()
168    assert err == ""
169    assert out == "Message 1\nMessage 2\n"
170
171
172def test_run_multiple_same_loop_set_global(capsys):
173    async def worker(i):
174        logger.debug("Message {}", i)
175        await logger.complete()
176
177    loop = asyncio.new_event_loop()
178    asyncio.set_event_loop(loop)
179
180    logger.add(async_writer, format="{message}", loop=loop)
181
182    loop.run_until_complete(worker(1))
183    loop.run_until_complete(worker(2))
184
185    out, err = capsys.readouterr()
186    assert err == ""
187    assert out == "Message 1\nMessage 2\n"
188
189
190@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop")
191def test_complete_in_another_run(capsys):
192    async def worker_1():
193        logger.debug("A")
194
195    async def worker_2():
196        logger.debug("B")
197        await logger.complete()
198
199    loop = asyncio.new_event_loop()
200
201    logger.add(async_writer, format="{message}", loop=loop)
202
203    loop.run_until_complete(worker_1())
204    loop.run_until_complete(worker_2())
205
206    out, err = capsys.readouterr()
207    assert out == "A\nB\n"
208    assert err == ""
209
210
211def test_complete_in_another_run_set_global(capsys):
212    async def worker_1():
213        logger.debug("A")
214
215    async def worker_2():
216        logger.debug("B")
217        await logger.complete()
218
219    loop = asyncio.new_event_loop()
220    asyncio.set_event_loop(loop)
221
222    logger.add(async_writer, format="{message}", loop=loop)
223
224    loop.run_until_complete(worker_1())
225    loop.run_until_complete(worker_2())
226
227    out, err = capsys.readouterr()
228    assert out == "A\nB\n"
229    assert err == ""
230
231
232def test_tasks_cancelled_on_remove(capsys):
233    logger.add(async_writer, format="{message}", catch=False)
234
235    async def foo():
236        logger.info("A")
237        logger.info("B")
238        logger.info("C")
239        logger.remove()
240        await logger.complete()
241
242    asyncio.run(foo())
243
244    out, err = capsys.readouterr()
245    assert out == err == ""
246
247
248def test_remove_without_tasks(capsys):
249    logger.add(async_writer, format="{message}", catch=False)
250    logger.remove()
251
252    async def foo():
253        logger.info("!")
254        await logger.complete()
255
256    asyncio.run(foo())
257
258    out, err = capsys.readouterr()
259    assert out == err == ""
260
261
262def test_complete_without_tasks(capsys):
263    logger.add(async_writer, catch=False)
264
265    async def worker():
266        await logger.complete()
267
268    asyncio.run(worker())
269
270    out, err = capsys.readouterr()
271    assert out == err == ""
272
273
274def test_complete_stream_noop(capsys):
275    logger.add(sys.stderr, format="{message}", catch=False)
276    logger.info("A")
277
278    async def worker():
279        logger.info("B")
280        await logger.complete()
281        logger.info("C")
282
283    asyncio.run(worker())
284
285    logger.info("D")
286
287    out, err = capsys.readouterr()
288    assert out == ""
289    assert err == "A\nB\nC\nD\n"
290
291
292def test_complete_file_noop(tmpdir):
293    filepath = tmpdir.join("test.log")
294
295    logger.add(str(filepath), format="{message}", catch=False)
296    logger.info("A")
297
298    async def worker():
299        logger.info("B")
300        await logger.complete()
301        logger.info("C")
302
303    asyncio.run(worker())
304
305    logger.info("D")
306
307    assert filepath.read() == "A\nB\nC\nD\n"
308
309
310def test_complete_function_noop():
311    out = ""
312
313    def write(msg):
314        nonlocal out
315        out += msg
316
317    logger.add(write, format="{message}", catch=False)
318    logger.info("A")
319
320    async def worker():
321        logger.info("B")
322        await logger.complete()
323        logger.info("C")
324
325    asyncio.run(worker())
326
327    logger.info("D")
328
329    assert out == "A\nB\nC\nD\n"
330
331
332def test_complete_standard_noop(capsys):
333    logger.add(logging.StreamHandler(sys.stderr), format="{message}", catch=False)
334    logger.info("A")
335
336    async def worker():
337        logger.info("B")
338        await logger.complete()
339        logger.info("C")
340
341    asyncio.run(worker())
342
343    logger.info("D")
344
345    out, err = capsys.readouterr()
346    assert out == ""
347    assert err == "A\nB\nC\nD\n"
348
349
350def test_exception_in_coroutine_caught(capsys):
351    async def sink(msg):
352        raise Exception("Oh no")
353
354    async def main():
355        logger.add(sink, catch=True)
356        logger.info("Hello world")
357        await asyncio.sleep(0.1)
358        await logger.complete()
359
360    asyncio.run(main())
361
362    out, err = capsys.readouterr()
363    lines = err.strip().splitlines()
364
365    assert out == ""
366    assert lines[0] == "--- Logging error in Loguru Handler #0 ---"
367    assert re.match(r"Record was: \{.*Hello world.*\}", lines[1])
368    assert lines[-2] == "Exception: Oh no"
369    assert lines[-1] == "--- End of logging error ---"
370
371
372def test_exception_in_coroutine_not_caught(capsys, caplog):
373    async def sink(msg):
374        raise ValueError("Oh no")
375
376    async def main():
377        logger.add(sink, catch=False)
378        logger.info("Hello world")
379        await asyncio.sleep(0.1)
380        await logger.complete()
381
382    asyncio.run(main())
383
384    out, err = capsys.readouterr()
385    assert out == err == ""
386
387    records = caplog.records
388    assert len(records) == 1
389    record = records[0]
390
391    message = record.getMessage()
392    assert "Logging error in Loguru Handler" not in message
393    assert "was never retrieved" not in message
394
395    exc_type, exc_value, _ = record.exc_info
396    assert exc_type == ValueError
397    assert str(exc_value) == "Oh no"
398
399
400def test_exception_in_coroutine_during_complete_caught(capsys):
401    async def sink(msg):
402        await asyncio.sleep(0.1)
403        raise Exception("Oh no")
404
405    async def main():
406        logger.add(sink, catch=True)
407        logger.info("Hello world")
408        await logger.complete()
409
410    asyncio.run(main())
411
412    out, err = capsys.readouterr()
413    lines = err.strip().splitlines()
414
415    assert out == ""
416    assert lines[0] == "--- Logging error in Loguru Handler #0 ---"
417    assert re.match(r"Record was: \{.*Hello world.*\}", lines[1])
418    assert lines[-2] == "Exception: Oh no"
419    assert lines[-1] == "--- End of logging error ---"
420
421
422def test_exception_in_coroutine_during_complete_not_caught(capsys, caplog):
423    async def sink(msg):
424        await asyncio.sleep(0.1)
425        raise ValueError("Oh no")
426
427    async def main():
428        logger.add(sink, catch=False)
429        logger.info("Hello world")
430        await logger.complete()
431
432    asyncio.run(main())
433
434    out, err = capsys.readouterr()
435    assert out == err == ""
436
437    records = caplog.records
438    assert len(records) == 1
439    record = records[0]
440
441    message = record.getMessage()
442    assert "Logging error in Loguru Handler" not in message
443    assert "was never retrieved" not in message
444
445    exc_type, exc_value, _ = record.exc_info
446    assert exc_type == ValueError
447    assert str(exc_value) == "Oh no"
448
449
450@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop")
451def test_enqueue_coroutine_loop_not_none(capsys):
452    loop = asyncio.new_event_loop()
453    logger.add(async_writer, enqueue=True, loop=loop, format="{message}", catch=False)
454
455    async def worker():
456        logger.info("A")
457        await logger.complete()
458
459    loop.run_until_complete(worker())
460
461    out, err = capsys.readouterr()
462    assert out == "A\n"
463    assert err == ""
464
465
466def test_enqueue_coroutine_loop_not_none_set_global(capsys):
467    loop = asyncio.new_event_loop()
468    asyncio.set_event_loop(loop)
469
470    logger.add(async_writer, enqueue=True, loop=loop, format="{message}", catch=False)
471
472    async def worker():
473        logger.info("A")
474        await logger.complete()
475
476    loop.run_until_complete(worker())
477
478    out, err = capsys.readouterr()
479    assert out == "A\n"
480    assert err == ""
481
482
483@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop")
484def test_enqueue_coroutine_loop_is_none(capsys):
485    loop = asyncio.new_event_loop()
486    asyncio.set_event_loop(loop)
487
488    logger.add(async_writer, enqueue=True, loop=None, format="{message}", catch=False)
489
490    async def worker(msg):
491        logger.info(msg)
492        await logger.complete()
493
494    asyncio.run(worker("A"))
495
496    out, err = capsys.readouterr()
497    assert out == err == ""
498
499    loop.run_until_complete(worker("B"))
500
501    out, err = capsys.readouterr()
502    assert out == "A\nB\n"
503    assert err == ""
504
505
506def test_enqueue_coroutine_loop_is_none_set_global(capsys):
507    loop = asyncio.new_event_loop()
508    asyncio.set_event_loop(loop)
509
510    logger.add(async_writer, enqueue=True, loop=None, format="{message}", catch=False)
511
512    async def worker(msg):
513        logger.info(msg)
514        await logger.complete()
515
516    loop.run_until_complete(worker("A"))
517
518    out, err = capsys.readouterr()
519    assert out == "A\n"
520    assert err == ""
521
522
523def test_custom_complete_function(capsys):
524    awaited = False
525
526    class Handler:
527        def write(self, message):
528            print(message, end="")
529
530        async def complete(self):
531            nonlocal awaited
532            awaited = True
533
534    async def worker():
535        logger.info("A")
536        await logger.complete()
537
538    logger.add(Handler(), catch=False, format="{message}")
539
540    asyncio.run(worker())
541
542    out, err = capsys.readouterr()
543    assert out == "A\n"
544    assert err == ""
545    assert awaited
546
547
548@pytest.mark.skipif(sys.version_info < (3, 5, 3), reason="Coroutine can't access running loop")
549@pytest.mark.parametrize("loop_is_none", [True, False])
550def test_complete_from_another_loop(capsys, loop_is_none):
551    main_loop = asyncio.new_event_loop()
552    second_loop = asyncio.new_event_loop()
553
554    loop = None if loop_is_none else main_loop
555    logger.add(async_writer, loop=loop, format="{message}")
556
557    async def worker_1():
558        logger.info("A")
559
560    async def worker_2():
561        await logger.complete()
562
563    main_loop.run_until_complete(worker_1())
564    second_loop.run_until_complete(worker_2())
565
566    out, err = capsys.readouterr()
567    assert out == err == ""
568
569    main_loop.run_until_complete(worker_2())
570
571    out, err = capsys.readouterr()
572    assert out == "A\n"
573    assert err == ""
574
575
576@pytest.mark.parametrize("loop_is_none", [True, False])
577def test_complete_from_another_loop_set_global(capsys, loop_is_none):
578    main_loop = asyncio.new_event_loop()
579    second_loop = asyncio.new_event_loop()
580
581    loop = None if loop_is_none else main_loop
582    logger.add(async_writer, loop=loop, format="{message}")
583
584    async def worker_1():
585        logger.info("A")
586
587    async def worker_2():
588        await logger.complete()
589
590    asyncio.set_event_loop(main_loop)
591    main_loop.run_until_complete(worker_1())
592
593    asyncio.set_event_loop(second_loop)
594    second_loop.run_until_complete(worker_2())
595
596    out, err = capsys.readouterr()
597    assert out == err == ""
598
599    asyncio.set_event_loop(main_loop)
600    main_loop.run_until_complete(worker_2())
601
602    out, err = capsys.readouterr()
603    assert out == "A\n"
604    assert err == ""
605
606
607def test_complete_from_multiple_threads_loop_is_none(capsys):
608    async def worker(i):
609        for j in range(100):
610            await asyncio.sleep(0)
611            logger.info("{:03}", i)
612        await logger.complete()
613
614    async def sink(msg):
615        print(msg, end="")
616
617    def worker_(i):
618        asyncio.run(worker(i))
619
620    logger.add(sink, catch=False, format="{message}")
621
622    threads = [threading.Thread(target=worker_, args=(i,)) for i in range(10)]
623
624    for t in threads:
625        t.start()
626
627    for t in threads:
628        t.join()
629
630    out, err = capsys.readouterr()
631    assert sorted(out.splitlines()) == ["{:03}".format(i) for i in range(10) for _ in range(100)]
632    assert err == ""
633
634
635def test_complete_from_multiple_threads_loop_is_not_none(capsys):
636    async def worker(i):
637        for j in range(100):
638            await asyncio.sleep(0)
639            logger.info("{:03}", i)
640        await logger.complete()
641
642    async def sink(msg):
643        print(msg, end="")
644
645    def worker_(i):
646        asyncio.run(worker(i))
647
648    loop = asyncio.new_event_loop()
649    logger.add(sink, catch=False, format="{message}", loop=loop)
650
651    threads = [threading.Thread(target=worker_, args=(i,)) for i in range(10)]
652
653    for t in threads:
654        t.start()
655
656    for t in threads:
657        t.join()
658
659    async def complete():
660        await logger.complete()
661
662    loop.run_until_complete(complete())
663
664    out, err = capsys.readouterr()
665    assert sorted(out.splitlines()) == ["{:03}".format(i) for i in range(10) for _ in range(100)]
666    assert err == ""
667
668
669async def async_subworker(logger_):
670    logger_.info("Child")
671    await logger_.complete()
672
673
674async def async_mainworker(logger_):
675    logger_.info("Main")
676    await logger_.complete()
677
678
679def subworker(logger_):
680    loop = asyncio.get_event_loop()
681    loop.run_until_complete(async_subworker(logger_))
682
683
684class Writer:
685    def __init__(self):
686        self.output = ""
687
688    async def write(self, message):
689        self.output += message
690
691
692def test_complete_with_sub_processes(monkeypatch, capsys):
693    ctx = multiprocessing.get_context("spawn")
694    monkeypatch.setattr(loguru._handler, "multiprocessing", ctx)
695
696    loop = asyncio.new_event_loop()
697    writer = Writer()
698    logger.add(writer.write, format="{message}", enqueue=True, loop=loop)
699
700    process = ctx.Process(target=subworker, args=[logger])
701    process.start()
702    process.join()
703
704    async def complete():
705        await logger.complete()
706
707    loop.run_until_complete(complete())
708
709    out, err = capsys.readouterr()
710    assert out == err == ""
711    assert writer.output == "Child\n"
712