1"""
2Create and delete FILES_PER_THREAD temp files (via tempfile.TemporaryFile)
3in each of NUM_THREADS threads, recording the number of successes and
4failures.  A failure is a bug in tempfile, and may be due to:
5
6+ Trying to create more than one tempfile with the same name.
7+ Trying to delete a tempfile that doesn't still exist.
8+ Something we've never seen before.
9
10By default, NUM_THREADS == 20 and FILES_PER_THREAD == 50.  This is enough to
11create about 150 failures per run under Win98SE in 2.0, and runs pretty
12quickly. Guido reports needing to boost FILES_PER_THREAD to 500 before
13provoking a 2.0 failure under Linux.
14"""
15
16import tempfile
17
18from test.support import threading_helper
19import unittest
20import io
21import threading
22from traceback import print_exc
23
24
25NUM_THREADS = 20
26FILES_PER_THREAD = 50
27
28
29startEvent = threading.Event()
30
31
32class TempFileGreedy(threading.Thread):
33    error_count = 0
34    ok_count = 0
35
36    def run(self):
37        self.errors = io.StringIO()
38        startEvent.wait()
39        for i in range(FILES_PER_THREAD):
40            try:
41                f = tempfile.TemporaryFile("w+b")
42                f.close()
43            except:
44                self.error_count += 1
45                print_exc(file=self.errors)
46            else:
47                self.ok_count += 1
48
49
50class ThreadedTempFileTest(unittest.TestCase):
51    def test_main(self):
52        threads = [TempFileGreedy() for i in range(NUM_THREADS)]
53        with threading_helper.start_threads(threads, startEvent.set):
54            pass
55        ok = sum(t.ok_count for t in threads)
56        errors = [str(t.name) + str(t.errors.getvalue())
57                  for t in threads if t.error_count]
58
59        msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
60            '\n'.join(errors))
61        self.assertEqual(errors, [], msg)
62        self.assertEqual(ok, NUM_THREADS * FILES_PER_THREAD)
63
64if __name__ == "__main__":
65    unittest.main()
66