1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for L{twisted.python.systemd}.
6"""
7
8
9import os
10
11from twisted.python.systemd import ListenFDs
12from twisted.trial.unittest import TestCase
13
14
15class InheritedDescriptorsMixin:
16    """
17    Mixin for a L{TestCase} subclass which defines test methods for some kind of
18    systemd sd-daemon class.  In particular, it defines tests for a
19    C{inheritedDescriptors} method.
20    """
21
22    def test_inheritedDescriptors(self):
23        """
24        C{inheritedDescriptors} returns a list of integers giving the file
25        descriptors which were inherited from systemd.
26        """
27        sddaemon = self.getDaemon(7, 3)
28        self.assertEqual([7, 8, 9], sddaemon.inheritedDescriptors())
29
30    def test_repeated(self):
31        """
32        Any subsequent calls to C{inheritedDescriptors} return the same list.
33        """
34        sddaemon = self.getDaemon(7, 3)
35        self.assertEqual(
36            sddaemon.inheritedDescriptors(), sddaemon.inheritedDescriptors()
37        )
38
39
40class MemoryOnlyMixin:
41    """
42    Mixin for a L{TestCase} subclass which creates creating a fake, in-memory
43    implementation of C{inheritedDescriptors}.  This provides verification that
44    the fake behaves in a compatible way with the real implementation.
45    """
46
47    def getDaemon(self, start, count):
48        """
49        Invent C{count} new I{file descriptors} (actually integers, attached to
50        no real file description), starting at C{start}.  Construct and return a
51        new L{ListenFDs} which will claim those integers represent inherited
52        file descriptors.
53        """
54        return ListenFDs(range(start, start + count))
55
56
57class EnvironmentMixin:
58    """
59    Mixin for a L{TestCase} subclass which creates a real implementation of
60    C{inheritedDescriptors} which is based on the environment variables set by
61    systemd.  To facilitate testing, this mixin will also create a fake
62    environment dictionary and add keys to it to make it look as if some
63    descriptors have been inherited.
64    """
65
66    def initializeEnvironment(self, count, pid):
67        """
68        Create a copy of the process environment and add I{LISTEN_FDS} and
69        I{LISTEN_PID} (the environment variables set by systemd) to it.
70        """
71        result = os.environ.copy()
72        result["LISTEN_FDS"] = str(count)
73        result["LISTEN_PID"] = str(pid)
74        return result
75
76    def getDaemon(self, start, count):
77        """
78        Create a new L{ListenFDs} instance, initialized with a fake environment
79        dictionary which will be set up as systemd would have set it up if
80        C{count} descriptors were being inherited.  The descriptors will also
81        start at C{start}.
82        """
83        fakeEnvironment = self.initializeEnvironment(count, os.getpid())
84        return ListenFDs.fromEnvironment(environ=fakeEnvironment, start=start)
85
86
87class MemoryOnlyTests(MemoryOnlyMixin, InheritedDescriptorsMixin, TestCase):
88    """
89    Apply tests to L{ListenFDs}, explicitly constructed with some fake file
90    descriptors.
91    """
92
93
94class EnvironmentTests(EnvironmentMixin, InheritedDescriptorsMixin, TestCase):
95    """
96    Apply tests to L{ListenFDs}, constructed based on an environment dictionary.
97    """
98
99    def test_secondEnvironment(self):
100        """
101        Only a single L{Environment} can extract inherited file descriptors.
102        """
103        fakeEnvironment = self.initializeEnvironment(3, os.getpid())
104        first = ListenFDs.fromEnvironment(environ=fakeEnvironment)
105        second = ListenFDs.fromEnvironment(environ=fakeEnvironment)
106        self.assertEqual(list(range(3, 6)), first.inheritedDescriptors())
107        self.assertEqual([], second.inheritedDescriptors())
108
109    def test_mismatchedPID(self):
110        """
111        If the current process PID does not match the PID in the environment, no
112        inherited descriptors are reported.
113        """
114        fakeEnvironment = self.initializeEnvironment(3, os.getpid() + 1)
115        sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
116        self.assertEqual([], sddaemon.inheritedDescriptors())
117
118    def test_missingPIDVariable(self):
119        """
120        If the I{LISTEN_PID} environment variable is not present, no inherited
121        descriptors are reported.
122        """
123        fakeEnvironment = self.initializeEnvironment(3, os.getpid())
124        del fakeEnvironment["LISTEN_PID"]
125        sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
126        self.assertEqual([], sddaemon.inheritedDescriptors())
127
128    def test_nonIntegerPIDVariable(self):
129        """
130        If the I{LISTEN_PID} environment variable is set to a string that cannot
131        be parsed as an integer, no inherited descriptors are reported.
132        """
133        fakeEnvironment = self.initializeEnvironment(3, "hello, world")
134        sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
135        self.assertEqual([], sddaemon.inheritedDescriptors())
136
137    def test_missingFDSVariable(self):
138        """
139        If the I{LISTEN_FDS} environment variable is not present, no inherited
140        descriptors are reported.
141        """
142        fakeEnvironment = self.initializeEnvironment(3, os.getpid())
143        del fakeEnvironment["LISTEN_FDS"]
144        sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
145        self.assertEqual([], sddaemon.inheritedDescriptors())
146
147    def test_nonIntegerFDSVariable(self):
148        """
149        If the I{LISTEN_FDS} environment variable is set to a string that cannot
150        be parsed as an integer, no inherited descriptors are reported.
151        """
152        fakeEnvironment = self.initializeEnvironment("hello, world", os.getpid())
153        sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
154        self.assertEqual([], sddaemon.inheritedDescriptors())
155
156    def test_defaultEnviron(self):
157        """
158        If the process environment is not explicitly passed to
159        L{Environment.__init__}, the real process environment dictionary is
160        used.
161        """
162        self.patch(os, "environ", {"LISTEN_PID": str(os.getpid()), "LISTEN_FDS": "5"})
163        sddaemon = ListenFDs.fromEnvironment()
164        self.assertEqual(list(range(3, 3 + 5)), sddaemon.inheritedDescriptors())
165