1import pipes
2import os
3import string
4import unittest
5import shutil
6from test.support import reap_children, unix_shell
7from test.support.os_helper import TESTFN, unlink
8
9
10if os.name != 'posix':
11    raise unittest.SkipTest('pipes module only works on posix')
12
13if not (unix_shell and os.path.exists(unix_shell)):
14    raise unittest.SkipTest('pipes module requires a shell')
15
16TESTFN2 = TESTFN + "2"
17
18# tr a-z A-Z is not portable, so make the ranges explicit
19s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
20
21class SimplePipeTests(unittest.TestCase):
22    def tearDown(self):
23        for f in (TESTFN, TESTFN2):
24            unlink(f)
25
26    def testSimplePipe1(self):
27        if shutil.which('tr') is None:
28            self.skipTest('tr is not available')
29        t = pipes.Template()
30        t.append(s_command, pipes.STDIN_STDOUT)
31        with t.open(TESTFN, 'w') as f:
32            f.write('hello world #1')
33        with open(TESTFN) as f:
34            self.assertEqual(f.read(), 'HELLO WORLD #1')
35
36    def testSimplePipe2(self):
37        if shutil.which('tr') is None:
38            self.skipTest('tr is not available')
39        with open(TESTFN, 'w') as f:
40            f.write('hello world #2')
41        t = pipes.Template()
42        t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
43        t.copy(TESTFN, TESTFN2)
44        with open(TESTFN2) as f:
45            self.assertEqual(f.read(), 'HELLO WORLD #2')
46
47    def testSimplePipe3(self):
48        if shutil.which('tr') is None:
49            self.skipTest('tr is not available')
50        with open(TESTFN, 'w') as f:
51            f.write('hello world #2')
52        t = pipes.Template()
53        t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
54        f = t.open(TESTFN, 'r')
55        try:
56            self.assertEqual(f.read(), 'HELLO WORLD #2')
57        finally:
58            f.close()
59
60    def testEmptyPipeline1(self):
61        # copy through empty pipe
62        d = 'empty pipeline test COPY'
63        with open(TESTFN, 'w') as f:
64            f.write(d)
65        with open(TESTFN2, 'w') as f:
66            f.write('')
67        t=pipes.Template()
68        t.copy(TESTFN, TESTFN2)
69        with open(TESTFN2) as f:
70            self.assertEqual(f.read(), d)
71
72    def testEmptyPipeline2(self):
73        # read through empty pipe
74        d = 'empty pipeline test READ'
75        with open(TESTFN, 'w') as f:
76            f.write(d)
77        t=pipes.Template()
78        f = t.open(TESTFN, 'r')
79        try:
80            self.assertEqual(f.read(), d)
81        finally:
82            f.close()
83
84    def testEmptyPipeline3(self):
85        # write through empty pipe
86        d = 'empty pipeline test WRITE'
87        t = pipes.Template()
88        with t.open(TESTFN, 'w') as f:
89            f.write(d)
90        with open(TESTFN) as f:
91            self.assertEqual(f.read(), d)
92
93    def testRepr(self):
94        t = pipes.Template()
95        self.assertEqual(repr(t), "<Template instance, steps=[]>")
96        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
97        self.assertEqual(repr(t),
98                    "<Template instance, steps=[('tr a-z A-Z', '--')]>")
99
100    def testSetDebug(self):
101        t = pipes.Template()
102        t.debug(False)
103        self.assertEqual(t.debugging, False)
104        t.debug(True)
105        self.assertEqual(t.debugging, True)
106
107    def testReadOpenSink(self):
108        # check calling open('r') on a pipe ending with
109        # a sink raises ValueError
110        t = pipes.Template()
111        t.append('boguscmd', pipes.SINK)
112        self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
113
114    def testWriteOpenSource(self):
115        # check calling open('w') on a pipe ending with
116        # a source raises ValueError
117        t = pipes.Template()
118        t.prepend('boguscmd', pipes.SOURCE)
119        self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
120
121    def testBadAppendOptions(self):
122        t = pipes.Template()
123
124        # try a non-string command
125        self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
126
127        # try a type that isn't recognized
128        self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
129
130        # shouldn't be able to append a source
131        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
132
133        # check appending two sinks
134        t = pipes.Template()
135        t.append('boguscmd', pipes.SINK)
136        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
137
138        # command needing file input but with no $IN
139        t = pipes.Template()
140        self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
141                           pipes.FILEIN_FILEOUT)
142        t = pipes.Template()
143        self.assertRaises(ValueError, t.append, 'boguscmd',
144                           pipes.FILEIN_STDOUT)
145
146        # command needing file output but with no $OUT
147        t = pipes.Template()
148        self.assertRaises(ValueError, t.append, 'boguscmd $IN',
149                           pipes.FILEIN_FILEOUT)
150        t = pipes.Template()
151        self.assertRaises(ValueError, t.append, 'boguscmd',
152                           pipes.STDIN_FILEOUT)
153
154
155    def testBadPrependOptions(self):
156        t = pipes.Template()
157
158        # try a non-string command
159        self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
160
161        # try a type that isn't recognized
162        self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
163
164        # shouldn't be able to prepend a sink
165        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
166
167        # check prepending two sources
168        t = pipes.Template()
169        t.prepend('boguscmd', pipes.SOURCE)
170        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
171
172        # command needing file input but with no $IN
173        t = pipes.Template()
174        self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
175                           pipes.FILEIN_FILEOUT)
176        t = pipes.Template()
177        self.assertRaises(ValueError, t.prepend, 'boguscmd',
178                           pipes.FILEIN_STDOUT)
179
180        # command needing file output but with no $OUT
181        t = pipes.Template()
182        self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
183                           pipes.FILEIN_FILEOUT)
184        t = pipes.Template()
185        self.assertRaises(ValueError, t.prepend, 'boguscmd',
186                           pipes.STDIN_FILEOUT)
187
188    def testBadOpenMode(self):
189        t = pipes.Template()
190        self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
191
192    def testClone(self):
193        t = pipes.Template()
194        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
195
196        u = t.clone()
197        self.assertNotEqual(id(t), id(u))
198        self.assertEqual(t.steps, u.steps)
199        self.assertNotEqual(id(t.steps), id(u.steps))
200        self.assertEqual(t.debugging, u.debugging)
201
202
203def tearDownModule():
204    reap_children()
205
206
207if __name__ == "__main__":
208    unittest.main()
209