1# This file is part of Buildbot. Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16import abc
17import base64
18import os
19import time
20
21from twisted.internet import defer
22from twisted.internet import reactor
23from twisted.internet.error import ProcessExitedAlready
24from twisted.python.failure import Failure
25
26from buildbot import config
27from buildbot.util import asyncSleep
28from buildbot.util.httpclientservice import HTTPClientService
29from buildbot.util.logger import Logger
30from buildbot.util.protocol import LineProcessProtocol
31from buildbot.util.service import BuildbotService
32
33log = Logger()
34
35
36# this is a BuildbotService, so that it can be started and destroyed.
37# this is needed to implement kubectl proxy lifecycle
38class KubeConfigLoaderBase(BuildbotService):
39    name = "KubeConfig"
40
41    @abc.abstractmethod
42    def getConfig(self):
43        """
44        @return dictionary with optional params
45        {
46            'master_url': 'https://kube_master.url',
47            'namespace': 'default_namespace',
48            'headers' {
49                'Authentication': XXX
50            }
51            # todo (quite hard to implement with treq):
52            'cert': 'optional client certificate used to connect to ssl'
53            'verify': 'kube master certificate authority to use to connect'
54        }
55        """
56
57    def getAuthorization(self):
58        return None
59
60    def __str__(self):
61        """return unique str for SharedService"""
62        # hash is implemented from ComparableMixin
63        return "{}({})".format(self.__class__.__name__, hash(self))
64
65
66class KubeHardcodedConfig(KubeConfigLoaderBase):
67    def reconfigService(self,
68                        master_url=None,
69                        bearerToken=None,
70                        basicAuth=None,
71                        headers=None,
72                        cert=None,
73                        verify=None,
74                        namespace="default"):
75        self.config = {'master_url': master_url, 'namespace': namespace, 'headers': {}}
76        if headers is not None:
77            self.config['headers'] = headers
78        if basicAuth and bearerToken:
79            raise Exception("set one of basicAuth and bearerToken, not both")
80        self.basicAuth = basicAuth
81        self.bearerToken = bearerToken
82        if cert is not None:
83            self.config['cert'] = cert
84        if verify is not None:
85            self.config['verify'] = verify
86
87    checkConfig = reconfigService
88
89    @defer.inlineCallbacks
90    def getAuthorization(self):
91        if self.basicAuth is not None:
92            basicAuth = yield self.renderSecrets(self.basicAuth)
93            authstring = "{user}:{password}".format(**basicAuth).encode('utf-8')
94            encoded = base64.b64encode(authstring)
95            return "Basic {0}".format(encoded)
96
97        if self.bearerToken is not None:
98            bearerToken = yield self.renderSecrets(self.bearerToken)
99            return "Bearer {0}".format(bearerToken)
100
101        return None
102
103    def getConfig(self):
104        return self.config
105
106
107class KubeCtlProxyConfigLoader(KubeConfigLoaderBase):
108    """ We use kubectl proxy to connect to kube master.
109    Parsing the config and setting up SSL is complex.
110    So for now, we use kubectl proxy to load the config and connect to master.
111    This will run the kube proxy as a subprocess, and return configuration with
112    http://localhost:PORT
113    """
114    kube_ctl_proxy_cmd = ['kubectl', 'proxy']  # for tests override
115
116    class LocalPP(LineProcessProtocol):
117        def __init__(self):
118            super().__init__()
119            self.got_output_deferred = defer.Deferred()
120            self.terminated_deferred = defer.Deferred()
121            self.first_line = b""
122
123        def outLineReceived(self, line):
124            if not self.got_output_deferred.called:
125                self.got_output_deferred.callback(line)
126
127        def errLineReceived(self, line):
128            if not self.got_output_deferred.called:
129                self.got_output_deferred.errback(Failure(RuntimeError(line)))
130
131        def processEnded(self, status):
132            super().processEnded(status)
133            self.terminated_deferred.callback(None)
134
135    def checkConfig(self, proxy_port=8001, namespace="default"):
136        self.pp = None
137        self.process = None
138
139    @defer.inlineCallbacks
140    def ensureSubprocessKilled(self):
141        if self.pp is not None:
142            try:
143                self.process.signalProcess("TERM")
144            except ProcessExitedAlready:
145                pass  # oh well
146            yield self.pp.terminated_deferred
147
148    @defer.inlineCallbacks
149    def reconfigService(self, proxy_port=8001, namespace="default"):
150        self.proxy_port = proxy_port
151        self.namespace = namespace
152        yield self.ensureSubprocessKilled()
153        self.pp = self.LocalPP()
154        self.process = reactor.spawnProcess(
155            self.pp,
156            self.kube_ctl_proxy_cmd[0],
157            self.kube_ctl_proxy_cmd + ["-p", str(self.proxy_port)],
158            env=None)
159        self.kube_proxy_output = yield self.pp.got_output_deferred
160
161    def stopService(self):
162        return self.ensureSubprocessKilled()
163
164    def getConfig(self):
165        return {
166            'master_url': "http://localhost:{}".format(self.proxy_port),
167            'namespace': self.namespace
168        }
169
170
171class KubeInClusterConfigLoader(KubeConfigLoaderBase):
172    kube_dir = '/var/run/secrets/kubernetes.io/serviceaccount/'
173
174    kube_namespace_file = os.path.join(kube_dir, 'namespace')
175    kube_token_file = os.path.join(kube_dir, 'token')
176    kube_cert_file = os.path.join(kube_dir, 'ca.crt')
177
178    def checkConfig(self):
179        if not os.path.exists(self.kube_dir):
180            config.error(
181                "Not in kubernetes cluster (kube_dir not found: {})".format(
182                    self.kube_dir))
183
184    def reconfigService(self):
185        self.config = {}
186        self.config['master_url'] = os.environ['KUBERNETES_PORT'].replace(
187            'tcp', 'https')
188        self.config['verify'] = self.kube_cert_file
189        with open(self.kube_token_file, encoding="utf-8") as token_content:
190            token = token_content.read().strip()
191            self.config['headers'] = {
192                'Authorization': 'Bearer {0}'.format(token)
193            }
194        with open(self.kube_namespace_file, encoding="utf-8") as namespace_content:
195            self.config['namespace'] = namespace_content.read().strip()
196
197    def getConfig(self):
198        return self.config
199
200
201class KubeError(RuntimeError):
202    def __init__(self, response_json):
203        super().__init__(response_json['message'])
204        self.json = response_json
205        self.reason = response_json.get('reason')
206
207
208class KubeClientService(HTTPClientService):
209    def __init__(self, kube_config=None):
210        self.config = kube_config
211        super().__init__('')
212        self._namespace = None
213        kube_config.setServiceParent(self)
214
215    @defer.inlineCallbacks
216    def _prepareRequest(self, ep, kwargs):
217        config = self.config.getConfig()
218        self._base_url = config['master_url']
219        url, req_kwargs = super()._prepareRequest(ep, kwargs)
220
221        if 'headers' not in req_kwargs:
222            req_kwargs['headers'] = {}
223        if 'headers' in config:
224            req_kwargs['headers'].update(config['headers'])
225
226        auth = yield self.config.getAuthorization()
227        if auth is not None:
228            req_kwargs['headers']['Authorization'] = auth
229
230        # warning: this only works with txrequests! not treq
231        for arg in ['cert', 'verify']:
232            if arg in config:
233                req_kwargs[arg] = config[arg]
234
235        return (url, req_kwargs)
236
237    @defer.inlineCallbacks
238    def createPod(self, namespace, spec):
239        url = '/api/v1/namespaces/{namespace}/pods'.format(namespace=namespace)
240        res = yield self.post(url, json=spec)
241        res_json = yield res.json()
242        if res.code not in (200, 201, 202):
243            raise KubeError(res_json)
244        return res_json
245
246    @defer.inlineCallbacks
247    def deletePod(self, namespace, name, graceperiod=0):
248        url = '/api/v1/namespaces/{namespace}/pods/{name}'.format(
249            namespace=namespace, name=name)
250        res = yield self.delete(url, params={'graceperiod': graceperiod})
251        res_json = yield res.json()
252        if res.code != 200:
253            raise KubeError(res_json)
254        return res_json
255
256    @defer.inlineCallbacks
257    def waitForPodDeletion(self, namespace, name, timeout):
258        t1 = time.time()
259        url = '/api/v1/namespaces/{namespace}/pods/{name}/status'.format(
260            namespace=namespace, name=name)
261        while True:
262            if time.time() - t1 > timeout:
263                raise TimeoutError(
264                    "Did not see pod {name} terminate after {timeout}s".format(
265                        name=name, timeout=timeout))
266            res = yield self.get(url)
267            res_json = yield res.json()
268            if res.code == 404:
269                break  # 404 means the pod has terminated
270            if res.code != 200:
271                raise KubeError(res_json)
272            yield asyncSleep(1)
273        return res_json
274
275    @property
276    def namespace(self):
277        if self._namespace is None:
278            self._namespace = self.config.getConfig()['namespace']
279        return self._namespace
280