1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Tests for L{twisted.python.systemd}. 6""" 7 8 9import os 10 11from twisted.python.systemd import ListenFDs 12from twisted.trial.unittest import TestCase 13 14 15class InheritedDescriptorsMixin: 16 """ 17 Mixin for a L{TestCase} subclass which defines test methods for some kind of 18 systemd sd-daemon class. In particular, it defines tests for a 19 C{inheritedDescriptors} method. 20 """ 21 22 def test_inheritedDescriptors(self): 23 """ 24 C{inheritedDescriptors} returns a list of integers giving the file 25 descriptors which were inherited from systemd. 26 """ 27 sddaemon = self.getDaemon(7, 3) 28 self.assertEqual([7, 8, 9], sddaemon.inheritedDescriptors()) 29 30 def test_repeated(self): 31 """ 32 Any subsequent calls to C{inheritedDescriptors} return the same list. 33 """ 34 sddaemon = self.getDaemon(7, 3) 35 self.assertEqual( 36 sddaemon.inheritedDescriptors(), sddaemon.inheritedDescriptors() 37 ) 38 39 40class MemoryOnlyMixin: 41 """ 42 Mixin for a L{TestCase} subclass which creates creating a fake, in-memory 43 implementation of C{inheritedDescriptors}. This provides verification that 44 the fake behaves in a compatible way with the real implementation. 45 """ 46 47 def getDaemon(self, start, count): 48 """ 49 Invent C{count} new I{file descriptors} (actually integers, attached to 50 no real file description), starting at C{start}. Construct and return a 51 new L{ListenFDs} which will claim those integers represent inherited 52 file descriptors. 53 """ 54 return ListenFDs(range(start, start + count)) 55 56 57class EnvironmentMixin: 58 """ 59 Mixin for a L{TestCase} subclass which creates a real implementation of 60 C{inheritedDescriptors} which is based on the environment variables set by 61 systemd. To facilitate testing, this mixin will also create a fake 62 environment dictionary and add keys to it to make it look as if some 63 descriptors have been inherited. 64 """ 65 66 def initializeEnvironment(self, count, pid): 67 """ 68 Create a copy of the process environment and add I{LISTEN_FDS} and 69 I{LISTEN_PID} (the environment variables set by systemd) to it. 70 """ 71 result = os.environ.copy() 72 result["LISTEN_FDS"] = str(count) 73 result["LISTEN_PID"] = str(pid) 74 return result 75 76 def getDaemon(self, start, count): 77 """ 78 Create a new L{ListenFDs} instance, initialized with a fake environment 79 dictionary which will be set up as systemd would have set it up if 80 C{count} descriptors were being inherited. The descriptors will also 81 start at C{start}. 82 """ 83 fakeEnvironment = self.initializeEnvironment(count, os.getpid()) 84 return ListenFDs.fromEnvironment(environ=fakeEnvironment, start=start) 85 86 87class MemoryOnlyTests(MemoryOnlyMixin, InheritedDescriptorsMixin, TestCase): 88 """ 89 Apply tests to L{ListenFDs}, explicitly constructed with some fake file 90 descriptors. 91 """ 92 93 94class EnvironmentTests(EnvironmentMixin, InheritedDescriptorsMixin, TestCase): 95 """ 96 Apply tests to L{ListenFDs}, constructed based on an environment dictionary. 97 """ 98 99 def test_secondEnvironment(self): 100 """ 101 Only a single L{Environment} can extract inherited file descriptors. 102 """ 103 fakeEnvironment = self.initializeEnvironment(3, os.getpid()) 104 first = ListenFDs.fromEnvironment(environ=fakeEnvironment) 105 second = ListenFDs.fromEnvironment(environ=fakeEnvironment) 106 self.assertEqual(list(range(3, 6)), first.inheritedDescriptors()) 107 self.assertEqual([], second.inheritedDescriptors()) 108 109 def test_mismatchedPID(self): 110 """ 111 If the current process PID does not match the PID in the environment, no 112 inherited descriptors are reported. 113 """ 114 fakeEnvironment = self.initializeEnvironment(3, os.getpid() + 1) 115 sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment) 116 self.assertEqual([], sddaemon.inheritedDescriptors()) 117 118 def test_missingPIDVariable(self): 119 """ 120 If the I{LISTEN_PID} environment variable is not present, no inherited 121 descriptors are reported. 122 """ 123 fakeEnvironment = self.initializeEnvironment(3, os.getpid()) 124 del fakeEnvironment["LISTEN_PID"] 125 sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment) 126 self.assertEqual([], sddaemon.inheritedDescriptors()) 127 128 def test_nonIntegerPIDVariable(self): 129 """ 130 If the I{LISTEN_PID} environment variable is set to a string that cannot 131 be parsed as an integer, no inherited descriptors are reported. 132 """ 133 fakeEnvironment = self.initializeEnvironment(3, "hello, world") 134 sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment) 135 self.assertEqual([], sddaemon.inheritedDescriptors()) 136 137 def test_missingFDSVariable(self): 138 """ 139 If the I{LISTEN_FDS} environment variable is not present, no inherited 140 descriptors are reported. 141 """ 142 fakeEnvironment = self.initializeEnvironment(3, os.getpid()) 143 del fakeEnvironment["LISTEN_FDS"] 144 sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment) 145 self.assertEqual([], sddaemon.inheritedDescriptors()) 146 147 def test_nonIntegerFDSVariable(self): 148 """ 149 If the I{LISTEN_FDS} environment variable is set to a string that cannot 150 be parsed as an integer, no inherited descriptors are reported. 151 """ 152 fakeEnvironment = self.initializeEnvironment("hello, world", os.getpid()) 153 sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment) 154 self.assertEqual([], sddaemon.inheritedDescriptors()) 155 156 def test_defaultEnviron(self): 157 """ 158 If the process environment is not explicitly passed to 159 L{Environment.__init__}, the real process environment dictionary is 160 used. 161 """ 162 self.patch(os, "environ", {"LISTEN_PID": str(os.getpid()), "LISTEN_FDS": "5"}) 163 sddaemon = ListenFDs.fromEnvironment() 164 self.assertEqual(list(range(3, 3 + 5)), sddaemon.inheritedDescriptors()) 165