1#    Copyright 2011 OpenStack Foundation
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15import configparser
16import logging
17import logging.handlers
18import os
19import tempfile
20from unittest import mock
21import uuid
22
23import fixtures
24import testtools
25
26from oslo_rootwrap import cmd
27from oslo_rootwrap import daemon
28from oslo_rootwrap import filters
29from oslo_rootwrap import subprocess
30from oslo_rootwrap import wrapper
31
32
33class RootwrapLoaderTestCase(testtools.TestCase):
34
35    def test_privsep_in_loader(self):
36        privsep = ["privsep-helper", "--context", "foo"]
37        filterlist = wrapper.load_filters([])
38
39        # mock out get_exec because
40        with mock.patch.object(filters.CommandFilter, 'get_exec') as ge:
41            ge.return_value = "/fake/privsep-helper"
42            filtermatch = wrapper.match_filter(filterlist, privsep)
43
44            self.assertIsNotNone(filtermatch)
45            self.assertEqual(["/fake/privsep-helper", "--context", "foo"],
46                             filtermatch.get_command(privsep))
47
48    def test_strict_switched_off_in_configparser(self):
49        temp_dir = self.useFixture(fixtures.TempDir()).path
50        os.mkdir(os.path.join(temp_dir, 'nested'))
51        temp_file = os.path.join(temp_dir, 'test.conf')
52        f = open(temp_file, 'w')
53        f.write("""[Filters]
54privsep: PathFilter, privsep-helper, root
55privsep: PathFilter, privsep-helper, root
56""")
57        f.close()
58        filterlist = wrapper.load_filters([temp_dir])
59        self.assertIsNotNone(filterlist)
60
61
62class RootwrapTestCase(testtools.TestCase):
63    if os.path.exists('/sbin/ip'):
64        _ip = '/sbin/ip'
65    else:
66        _ip = '/bin/ip'
67
68    def setUp(self):
69        super(RootwrapTestCase, self).setUp()
70        self.filters = [
71            filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'),
72            filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"),
73            filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'),
74            filters.CommandFilter("/nonexistent/cat", "root"),
75            filters.CommandFilter("/bin/cat", "root")  # Keep this one last
76        ]
77
78    def test_CommandFilter(self):
79        f = filters.CommandFilter("sleep", 'root', '10')
80        self.assertFalse(f.match(["sleep2"]))
81
82        # verify that any arguments are accepted
83        self.assertTrue(f.match(["sleep"]))
84        self.assertTrue(f.match(["sleep", "anything"]))
85        self.assertTrue(f.match(["sleep", "10"]))
86        f = filters.CommandFilter("sleep", 'root')
87        self.assertTrue(f.match(["sleep", "10"]))
88
89    def test_empty_commandfilter(self):
90        f = filters.CommandFilter("sleep", "root")
91        self.assertFalse(f.match([]))
92        self.assertFalse(f.match(None))
93
94    def test_empty_regexpfilter(self):
95        f = filters.RegExpFilter("sleep", "root", "sleep")
96        self.assertFalse(f.match([]))
97        self.assertFalse(f.match(None))
98
99    def test_empty_invalid_regexpfilter(self):
100        f = filters.RegExpFilter("sleep", "root")
101        self.assertFalse(f.match(["anything"]))
102        self.assertFalse(f.match([]))
103
104    def test_RegExpFilter_match(self):
105        usercmd = ["ls", "/root"]
106        filtermatch = wrapper.match_filter(self.filters, usercmd)
107        self.assertFalse(filtermatch is None)
108        self.assertEqual(["/bin/ls", "/root"],
109                         filtermatch.get_command(usercmd))
110
111    def test_RegExpFilter_reject(self):
112        usercmd = ["ls", "root"]
113        self.assertRaises(wrapper.NoFilterMatched,
114                          wrapper.match_filter, self.filters, usercmd)
115
116    def test_missing_command(self):
117        valid_but_missing = ["foo_bar_not_exist"]
118        invalid = ["foo_bar_not_exist_and_not_matched"]
119        self.assertRaises(wrapper.FilterMatchNotExecutable,
120                          wrapper.match_filter,
121                          self.filters, valid_but_missing)
122        self.assertRaises(wrapper.NoFilterMatched,
123                          wrapper.match_filter, self.filters, invalid)
124
125    def _test_EnvFilter_as_DnsMasq(self, config_file_arg):
126        usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar',
127                   'dnsmasq', 'foo']
128        f = filters.EnvFilter("env", "root", config_file_arg + '=A',
129                              'NETWORK_ID=', "/usr/bin/dnsmasq")
130        self.assertTrue(f.match(usercmd))
131        self.assertEqual(['/usr/bin/dnsmasq', 'foo'], f.get_command(usercmd))
132        env = f.get_environment(usercmd)
133        self.assertEqual('A', env.get(config_file_arg))
134        self.assertEqual('foobar', env.get('NETWORK_ID'))
135
136    def test_EnvFilter(self):
137        envset = ['A=/some/thing', 'B=somethingelse']
138        envcmd = ['env'] + envset
139        realcmd = ['sleep', '10']
140        usercmd = envcmd + realcmd
141
142        f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep")
143        # accept with leading env
144        self.assertTrue(f.match(envcmd + ["sleep"]))
145        # accept without leading env
146        self.assertTrue(f.match(envset + ["sleep"]))
147
148        # any other command does not match
149        self.assertFalse(f.match(envcmd + ["sleep2"]))
150        self.assertFalse(f.match(envset + ["sleep2"]))
151
152        # accept any trailing arguments
153        self.assertTrue(f.match(usercmd))
154
155        # require given environment variables to match
156        self.assertFalse(f.match([envcmd, 'C=ELSE']))
157        self.assertFalse(f.match(['env', 'C=xx']))
158        self.assertFalse(f.match(['env', 'A=xx']))
159
160        # require env command to be given
161        # (otherwise CommandFilters should match
162        self.assertFalse(f.match(realcmd))
163        # require command to match
164        self.assertFalse(f.match(envcmd))
165        self.assertFalse(f.match(envcmd[1:]))
166
167        # ensure that the env command is stripped when executing
168        self.assertEqual(realcmd, f.exec_args(usercmd))
169        env = f.get_environment(usercmd)
170        # check that environment variables are set
171        self.assertEqual('/some/thing', env.get('A'))
172        self.assertEqual('somethingelse', env.get('B'))
173        self.assertNotIn('sleep', env.keys())
174
175    def test_EnvFilter_without_leading_env(self):
176        envset = ['A=/some/thing', 'B=somethingelse']
177        envcmd = ['env'] + envset
178        realcmd = ['sleep', '10']
179
180        f = filters.EnvFilter("sleep", "root", "A=", "B=ignored")
181
182        # accept without leading env
183        self.assertTrue(f.match(envset + ["sleep"]))
184
185        self.assertEqual(realcmd, f.get_command(envcmd + realcmd))
186        self.assertEqual(realcmd, f.get_command(envset + realcmd))
187
188        env = f.get_environment(envset + realcmd)
189        # check that environment variables are set
190        self.assertEqual('/some/thing', env.get('A'))
191        self.assertEqual('somethingelse', env.get('B'))
192        self.assertNotIn('sleep', env.keys())
193
194    def test_KillFilter(self):
195        if not os.path.exists("/proc/%d" % os.getpid()):
196            self.skipTest("Test requires /proc filesystem (procfs)")
197        p = subprocess.Popen(["cat"], stdin=subprocess.PIPE,
198                             stdout=subprocess.PIPE,
199                             stderr=subprocess.STDOUT)
200        try:
201            f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP")
202            f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP")
203            usercmd = ['kill', '-ALRM', p.pid]
204            # Incorrect signal should fail
205            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
206            usercmd = ['kill', p.pid]
207            # Providing no signal should fail
208            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
209            # Providing matching signal should be allowed
210            usercmd = ['kill', '-9', p.pid]
211            self.assertTrue(f.match(usercmd) or f2.match(usercmd))
212
213            f = filters.KillFilter("root", "/bin/cat")
214            f2 = filters.KillFilter("root", "/usr/bin/cat")
215            usercmd = ['kill', os.getpid()]
216            # Our own PID does not match /bin/sleep, so it should fail
217            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
218            usercmd = ['kill', 999999]
219            # Nonexistent PID should fail
220            self.assertFalse(f.match(usercmd) or f2.match(usercmd))
221            usercmd = ['kill', p.pid]
222            # Providing no signal should work
223            self.assertTrue(f.match(usercmd) or f2.match(usercmd))
224
225            # verify that relative paths are matched against $PATH
226            f = filters.KillFilter("root", "cat")
227            # Our own PID does not match so it should fail
228            usercmd = ['kill', os.getpid()]
229            self.assertFalse(f.match(usercmd))
230            # Filter should find cat in /bin or /usr/bin
231            usercmd = ['kill', p.pid]
232            self.assertTrue(f.match(usercmd))
233            # Filter shouldn't be able to find binary in $PATH, so fail
234            with fixtures.EnvironmentVariable("PATH", "/foo:/bar"):
235                self.assertFalse(f.match(usercmd))
236            # ensure that unset $PATH is not causing an exception
237            with fixtures.EnvironmentVariable("PATH"):
238                self.assertFalse(f.match(usercmd))
239        finally:
240            # Terminate the "cat" process and wait for it to finish
241            p.terminate()
242            p.wait()
243
244    def test_KillFilter_no_raise(self):
245        """Makes sure ValueError from bug 926412 is gone."""
246        f = filters.KillFilter("root", "")
247        # Providing anything other than kill should be False
248        usercmd = ['notkill', 999999]
249        self.assertFalse(f.match(usercmd))
250        # Providing something that is not a pid should be False
251        usercmd = ['kill', 'notapid']
252        self.assertFalse(f.match(usercmd))
253        # no arguments should also be fine
254        self.assertFalse(f.match([]))
255        self.assertFalse(f.match(None))
256
257    def test_KillFilter_deleted_exe(self):
258        """Makes sure deleted exe's are killed correctly."""
259        command = "/bin/commandddddd"
260        f = filters.KillFilter("root", command)
261        usercmd = ['kill', 1234]
262        # Providing no signal should work
263        with mock.patch('os.readlink') as readlink:
264            readlink.return_value = command + ' (deleted)'
265            with mock.patch('os.path.isfile') as exists:
266                def fake_exists(path):
267                    return path == command
268                exists.side_effect = fake_exists
269                self.assertTrue(f.match(usercmd))
270
271    @mock.patch('os.readlink')
272    @mock.patch('os.path.isfile')
273    def test_KillFilter_upgraded_exe(self, mock_isfile, mock_readlink):
274        """Makes sure upgraded exe's are killed correctly."""
275        f = filters.KillFilter("root", "/bin/commandddddd")
276        command = "/bin/commandddddd"
277        usercmd = ['kill', 1234]
278
279        def fake_exists(path):
280            return path == command
281
282        mock_readlink.return_value = command + '\0\05190bfb2 (deleted)'
283        mock_isfile.side_effect = fake_exists
284        self.assertTrue(f.match(usercmd))
285
286    @mock.patch('os.readlink')
287    @mock.patch('os.path.isfile')
288    @mock.patch('os.path.exists')
289    @mock.patch('os.access')
290    def test_KillFilter_renamed_exe(self, mock_access, mock_exists,
291                                    mock_isfile, mock_readlink):
292        """Makes sure renamed exe's are killed correctly."""
293        command = "/bin/commandddddd"
294        f = filters.KillFilter("root", command)
295        usercmd = ['kill', 1234]
296
297        def fake_os_func(path, *args):
298            return path == command
299
300        mock_readlink.return_value = command + ';90bfb2 (deleted)'
301        m = mock.mock_open(read_data=command)
302        with mock.patch("builtins.open", m, create=True):
303            mock_isfile.side_effect = fake_os_func
304            mock_exists.side_effect = fake_os_func
305            mock_access.side_effect = fake_os_func
306            self.assertTrue(f.match(usercmd))
307
308    def test_ReadFileFilter(self):
309        goodfn = '/good/file.name'
310        f = filters.ReadFileFilter(goodfn)
311        usercmd = ['cat', '/bad/file']
312        self.assertFalse(f.match(['cat', '/bad/file']))
313        usercmd = ['cat', goodfn]
314        self.assertEqual(['/bin/cat', goodfn], f.get_command(usercmd))
315        self.assertTrue(f.match(usercmd))
316
317    def test_IpFilter_non_netns(self):
318        f = filters.IpFilter(self._ip, 'root')
319        self.assertTrue(f.match(['ip', 'link', 'list']))
320        self.assertTrue(f.match(['ip', '-s', 'link', 'list']))
321        self.assertTrue(f.match(['ip', '-s', '-v', 'netns', 'add']))
322        self.assertTrue(f.match(['ip', 'link', 'set', 'interface',
323                                 'netns', 'somens']))
324
325    def test_IpFilter_netns(self):
326        f = filters.IpFilter(self._ip, 'root')
327        self.assertFalse(f.match(['ip', 'netns', 'exec', 'foo']))
328        self.assertFalse(f.match(['ip', 'netns', 'exec']))
329        self.assertFalse(f.match(['ip', '-s', 'netns', 'exec']))
330        self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec']))
331        self.assertFalse(f.match(['ip', 'net', 'exec', 'foo']))
332        self.assertFalse(f.match(['ip', 'netns', 'e', 'foo']))
333
334    def _test_IpFilter_netns_helper(self, action):
335        f = filters.IpFilter(self._ip, 'root')
336        self.assertTrue(f.match(['ip', 'link', action]))
337
338    def test_IpFilter_netns_add(self):
339        self._test_IpFilter_netns_helper('add')
340
341    def test_IpFilter_netns_delete(self):
342        self._test_IpFilter_netns_helper('delete')
343
344    def test_IpFilter_netns_list(self):
345        self._test_IpFilter_netns_helper('list')
346
347    def test_IpNetnsExecFilter_match(self):
348        f = filters.IpNetnsExecFilter(self._ip, 'root')
349        self.assertTrue(
350            f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
351        self.assertTrue(f.match(['ip', 'net', 'exec', 'foo', 'bar']))
352        self.assertTrue(f.match(['ip', 'netn', 'e', 'foo', 'bar']))
353        self.assertTrue(f.match(['ip', 'net', 'e', 'foo', 'bar']))
354        self.assertTrue(f.match(['ip', 'net', 'exe', 'foo', 'bar']))
355
356    def test_IpNetnsExecFilter_nomatch(self):
357        f = filters.IpNetnsExecFilter(self._ip, 'root')
358        self.assertFalse(f.match(['ip', 'link', 'list']))
359        self.assertFalse(f.match(['ip', 'foo', 'bar', 'netns']))
360        self.assertFalse(f.match(['ip', '-s', 'netns', 'exec']))
361        self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec']))
362        self.assertFalse(f.match(['ip', 'netns exec', 'foo', 'bar', 'baz']))
363        self.assertFalse(f.match([]))
364
365        # verify that at least a NS is given
366        self.assertFalse(f.match(['ip', 'netns', 'exec']))
367
368    def test_IpNetnsExecFilter_nomatch_nonroot(self):
369        f = filters.IpNetnsExecFilter(self._ip, 'user')
370        self.assertFalse(
371            f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
372
373    def test_match_filter_recurses_exec_command_filter_matches(self):
374        filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
375                       filters.IpFilter(self._ip, 'root')]
376        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
377
378        self.assertIsNotNone(wrapper.match_filter(filter_list, args))
379
380    def test_match_filter_recurses_exec_command_matches_user(self):
381        filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
382                       filters.IpFilter(self._ip, 'user')]
383        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
384
385        # Currently ip netns exec requires root, so verify that
386        # no non-root filter is matched, as that would escalate privileges
387        self.assertRaises(wrapper.NoFilterMatched,
388                          wrapper.match_filter, filter_list, args)
389
390    def test_match_filter_recurses_exec_command_filter_does_not_match(self):
391        filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'),
392                       filters.IpFilter(self._ip, 'root')]
393        args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
394                'ip', 'link', 'list']
395
396        self.assertRaises(wrapper.NoFilterMatched,
397                          wrapper.match_filter, filter_list, args)
398
399    def test_ChainingRegExpFilter_match(self):
400        filter_list = [filters.ChainingRegExpFilter('nice', 'root',
401                                                    'nice', r'-?\d+'),
402                       filters.CommandFilter('cat', 'root')]
403        args = ['nice', '5', 'cat', '/a']
404        dirs = ['/bin', '/usr/bin']
405
406        self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))
407
408    def test_ChainingRegExpFilter_not_match(self):
409        filter_list = [filters.ChainingRegExpFilter('nice', 'root',
410                                                    'nice', r'-?\d+'),
411                       filters.CommandFilter('cat', 'root')]
412        args_invalid = (['nice', '5', 'ls', '/a'],
413                        ['nice', '--5', 'cat', '/a'],
414                        ['nice2', '5', 'cat', '/a'],
415                        ['nice', 'cat', '/a'],
416                        ['nice', '5'])
417        dirs = ['/bin', '/usr/bin']
418
419        for args in args_invalid:
420            self.assertRaises(wrapper.NoFilterMatched,
421                              wrapper.match_filter, filter_list, args, dirs)
422
423    def test_ChainingRegExpFilter_multiple(self):
424        filter_list = [filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
425                                                    '-c[0-3]'),
426                       filters.ChainingRegExpFilter('ionice', 'root', 'ionice',
427                                                    '-c[0-3]', '-n[0-7]'),
428                       filters.CommandFilter('cat', 'root')]
429        # both filters match to ['ionice', '-c2'], but only the second accepts
430        args = ['ionice', '-c2', '-n7', 'cat', '/a']
431        dirs = ['/bin', '/usr/bin']
432
433        self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs))
434
435    def test_ReadFileFilter_empty_args(self):
436        goodfn = '/good/file.name'
437        f = filters.ReadFileFilter(goodfn)
438        self.assertFalse(f.match([]))
439        self.assertFalse(f.match(None))
440
441    def test_exec_dirs_search(self):
442        # This test supposes you have /bin/cat or /usr/bin/cat locally
443        f = filters.CommandFilter("cat", "root")
444        usercmd = ['cat', '/f']
445        self.assertTrue(f.match(usercmd))
446        self.assertTrue(f.get_command(usercmd,
447                                      exec_dirs=['/bin', '/usr/bin'])
448                        in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f']))
449
450    def test_skips(self):
451        # Check that all filters are skipped and that the last matches
452        usercmd = ["cat", "/"]
453        filtermatch = wrapper.match_filter(self.filters, usercmd)
454        self.assertTrue(filtermatch is self.filters[-1])
455
456    def test_RootwrapConfig(self):
457        raw = configparser.RawConfigParser()
458
459        # Empty config should raise configparser.Error
460        self.assertRaises(configparser.Error,
461                          wrapper.RootwrapConfig, raw)
462
463        # Check default values
464        raw.set('DEFAULT', 'filters_path', '/a,/b')
465        config = wrapper.RootwrapConfig(raw)
466        self.assertEqual(['/a', '/b'], config.filters_path)
467        self.assertEqual(os.environ["PATH"].split(':'), config.exec_dirs)
468
469        with fixtures.EnvironmentVariable("PATH"):
470            c = wrapper.RootwrapConfig(raw)
471            self.assertEqual([], c.exec_dirs)
472
473        self.assertFalse(config.use_syslog)
474        self.assertEqual(logging.handlers.SysLogHandler.LOG_SYSLOG,
475                         config.syslog_log_facility)
476        self.assertEqual(logging.ERROR, config.syslog_log_level)
477
478        # Check general values
479        raw.set('DEFAULT', 'exec_dirs', '/a,/x')
480        config = wrapper.RootwrapConfig(raw)
481        self.assertEqual(['/a', '/x'], config.exec_dirs)
482
483        raw.set('DEFAULT', 'use_syslog', 'oui')
484        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
485        raw.set('DEFAULT', 'use_syslog', 'true')
486        config = wrapper.RootwrapConfig(raw)
487        self.assertTrue(config.use_syslog)
488
489        raw.set('DEFAULT', 'syslog_log_facility', 'moo')
490        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
491        raw.set('DEFAULT', 'syslog_log_facility', 'local0')
492        config = wrapper.RootwrapConfig(raw)
493        self.assertEqual(logging.handlers.SysLogHandler.LOG_LOCAL0,
494                         config.syslog_log_facility)
495        raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH')
496        config = wrapper.RootwrapConfig(raw)
497        self.assertEqual(logging.handlers.SysLogHandler.LOG_AUTH,
498                         config.syslog_log_facility)
499
500        raw.set('DEFAULT', 'syslog_log_level', 'bar')
501        self.assertRaises(ValueError, wrapper.RootwrapConfig, raw)
502        raw.set('DEFAULT', 'syslog_log_level', 'INFO')
503        config = wrapper.RootwrapConfig(raw)
504        self.assertEqual(logging.INFO, config.syslog_log_level)
505
506    def test_getlogin(self):
507        with mock.patch('os.getlogin') as os_getlogin:
508            os_getlogin.return_value = 'foo'
509            self.assertEqual('foo', wrapper._getlogin())
510
511    def test_getlogin_bad(self):
512        with mock.patch('os.getenv') as os_getenv:
513            with mock.patch('os.getlogin') as os_getlogin:
514                os_getenv.side_effect = [None, None, 'bar']
515                os_getlogin.side_effect = OSError(
516                    '[Errno 22] Invalid argument')
517                self.assertEqual('bar', wrapper._getlogin())
518                os_getlogin.assert_called_once_with()
519                self.assertEqual(3, os_getenv.call_count)
520
521
522class PathFilterTestCase(testtools.TestCase):
523    def setUp(self):
524        super(PathFilterTestCase, self).setUp()
525
526        self.tmp_root_dir = tempfile.mkdtemp()
527        tmpdir = fixtures.TempDir(self.tmp_root_dir)
528        self.useFixture(tmpdir)
529
530        self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path)
531
532        gen_name = lambda: str(uuid.uuid4())
533
534        self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some')
535        self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join(self.tmp_root_dir, 'some')
536        self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..',
537                                                 'some')
538        self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some')
539
540        self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path,
541                                                         gen_name())
542        os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'),
543                   self.TRAVERSAL_SYMLINK_WITHIN_DIR)
544
545        self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path,
546                                                          gen_name())
547        os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'),
548                   self.TRAVERSAL_SYMLINK_OUTSIDE_DIR)
549
550        self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name())
551        os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR)
552
553        self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name())
554        os.symlink(os.path.join(self.tmp_root_dir, 'some_file'),
555                   self.SYMLINK_OUTSIDE_DIR)
556
557    def test_empty_args(self):
558        self.assertFalse(self.f.match([]))
559        self.assertFalse(self.f.match(None))
560
561    def test_argument_pass_constraint(self):
562        f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass')
563
564        args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR]
565        self.assertTrue(f.match(args))
566
567    def test_argument_equality_constraint(self):
568        temp_file_path = os.path.join(self.tmp_root_dir, 'spam/eggs')
569        f = filters.PathFilter('/bin/chown', 'root', 'nova', temp_file_path)
570
571        args = ['chown', 'nova', temp_file_path]
572        self.assertTrue(f.match(args))
573
574        args = ['chown', 'quantum', temp_file_path]
575        self.assertFalse(f.match(args))
576
577    def test_wrong_arguments_number(self):
578        args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
579        self.assertFalse(self.f.match(args))
580
581    def test_wrong_exec_command(self):
582        args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR]
583        self.assertFalse(self.f.match(args))
584
585    def test_match(self):
586        args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
587        self.assertTrue(self.f.match(args))
588
589    def test_match_traversal(self):
590        args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
591        self.assertTrue(self.f.match(args))
592
593    def test_match_symlink(self):
594        args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
595        self.assertTrue(self.f.match(args))
596
597    def test_match_traversal_symlink(self):
598        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
599        self.assertTrue(self.f.match(args))
600
601    def test_reject(self):
602        args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR]
603        self.assertFalse(self.f.match(args))
604
605    def test_reject_traversal(self):
606        args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR]
607        self.assertFalse(self.f.match(args))
608
609    def test_reject_symlink(self):
610        args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR]
611        self.assertFalse(self.f.match(args))
612
613    def test_reject_traversal_symlink(self):
614        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR]
615        self.assertFalse(self.f.match(args))
616
617    def test_get_command(self):
618        args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
619        expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR]
620
621        self.assertEqual(expected, self.f.get_command(args))
622
623    def test_get_command_traversal(self):
624        args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR]
625        expected = ['/bin/chown', 'nova',
626                    os.path.realpath(self.TRAVERSAL_WITHIN_DIR)]
627
628        self.assertEqual(expected, self.f.get_command(args))
629
630    def test_get_command_symlink(self):
631        args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR]
632        expected = ['/bin/chown', 'nova',
633                    os.path.realpath(self.SYMLINK_WITHIN_DIR)]
634
635        self.assertEqual(expected, self.f.get_command(args))
636
637    def test_get_command_traversal_symlink(self):
638        args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR]
639        expected = ['/bin/chown', 'nova',
640                    os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)]
641
642        self.assertEqual(expected, self.f.get_command(args))
643
644
645class RunOneCommandTestCase(testtools.TestCase):
646    def _test_returncode_helper(self, returncode, expected):
647        with mock.patch.object(wrapper, 'start_subprocess') as mock_start:
648            with mock.patch('sys.exit') as mock_exit:
649                mock_start.return_value.wait.return_value = returncode
650                cmd.run_one_command(None, mock.Mock(), None, None)
651        mock_exit.assert_called_once_with(expected)
652
653    def test_positive_returncode(self):
654        self._test_returncode_helper(1, 1)
655
656    def test_negative_returncode(self):
657        self._test_returncode_helper(-1, 129)
658
659
660class DaemonCleanupException(Exception):
661    pass
662
663
664class DaemonCleanupTestCase(testtools.TestCase):
665
666    @mock.patch('os.chmod')
667    @mock.patch('shutil.rmtree')
668    @mock.patch('tempfile.mkdtemp')
669    @mock.patch('multiprocessing.managers.BaseManager.get_server',
670                side_effect=DaemonCleanupException)
671    def test_daemon_no_cleanup_for_uninitialized_server(self, gs, mkd, *args):
672        mkd.return_value = '/just_dir/123'
673        self.assertRaises(DaemonCleanupException, daemon.daemon_start,
674                          config=None, filters=None)
675