1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16import abc 17import base64 18import os 19import time 20 21from twisted.internet import defer 22from twisted.internet import reactor 23from twisted.internet.error import ProcessExitedAlready 24from twisted.python.failure import Failure 25 26from buildbot import config 27from buildbot.util import asyncSleep 28from buildbot.util.httpclientservice import HTTPClientService 29from buildbot.util.logger import Logger 30from buildbot.util.protocol import LineProcessProtocol 31from buildbot.util.service import BuildbotService 32 33log = Logger() 34 35 36# this is a BuildbotService, so that it can be started and destroyed. 37# this is needed to implement kubectl proxy lifecycle 38class KubeConfigLoaderBase(BuildbotService): 39 name = "KubeConfig" 40 41 @abc.abstractmethod 42 def getConfig(self): 43 """ 44 @return dictionary with optional params 45 { 46 'master_url': 'https://kube_master.url', 47 'namespace': 'default_namespace', 48 'headers' { 49 'Authentication': XXX 50 } 51 # todo (quite hard to implement with treq): 52 'cert': 'optional client certificate used to connect to ssl' 53 'verify': 'kube master certificate authority to use to connect' 54 } 55 """ 56 57 def getAuthorization(self): 58 return None 59 60 def __str__(self): 61 """return unique str for SharedService""" 62 # hash is implemented from ComparableMixin 63 return "{}({})".format(self.__class__.__name__, hash(self)) 64 65 66class KubeHardcodedConfig(KubeConfigLoaderBase): 67 def reconfigService(self, 68 master_url=None, 69 bearerToken=None, 70 basicAuth=None, 71 headers=None, 72 cert=None, 73 verify=None, 74 namespace="default"): 75 self.config = {'master_url': master_url, 'namespace': namespace, 'headers': {}} 76 if headers is not None: 77 self.config['headers'] = headers 78 if basicAuth and bearerToken: 79 raise Exception("set one of basicAuth and bearerToken, not both") 80 self.basicAuth = basicAuth 81 self.bearerToken = bearerToken 82 if cert is not None: 83 self.config['cert'] = cert 84 if verify is not None: 85 self.config['verify'] = verify 86 87 checkConfig = reconfigService 88 89 @defer.inlineCallbacks 90 def getAuthorization(self): 91 if self.basicAuth is not None: 92 basicAuth = yield self.renderSecrets(self.basicAuth) 93 authstring = "{user}:{password}".format(**basicAuth).encode('utf-8') 94 encoded = base64.b64encode(authstring) 95 return "Basic {0}".format(encoded) 96 97 if self.bearerToken is not None: 98 bearerToken = yield self.renderSecrets(self.bearerToken) 99 return "Bearer {0}".format(bearerToken) 100 101 return None 102 103 def getConfig(self): 104 return self.config 105 106 107class KubeCtlProxyConfigLoader(KubeConfigLoaderBase): 108 """ We use kubectl proxy to connect to kube master. 109 Parsing the config and setting up SSL is complex. 110 So for now, we use kubectl proxy to load the config and connect to master. 111 This will run the kube proxy as a subprocess, and return configuration with 112 http://localhost:PORT 113 """ 114 kube_ctl_proxy_cmd = ['kubectl', 'proxy'] # for tests override 115 116 class LocalPP(LineProcessProtocol): 117 def __init__(self): 118 super().__init__() 119 self.got_output_deferred = defer.Deferred() 120 self.terminated_deferred = defer.Deferred() 121 self.first_line = b"" 122 123 def outLineReceived(self, line): 124 if not self.got_output_deferred.called: 125 self.got_output_deferred.callback(line) 126 127 def errLineReceived(self, line): 128 if not self.got_output_deferred.called: 129 self.got_output_deferred.errback(Failure(RuntimeError(line))) 130 131 def processEnded(self, status): 132 super().processEnded(status) 133 self.terminated_deferred.callback(None) 134 135 def checkConfig(self, proxy_port=8001, namespace="default"): 136 self.pp = None 137 self.process = None 138 139 @defer.inlineCallbacks 140 def ensureSubprocessKilled(self): 141 if self.pp is not None: 142 try: 143 self.process.signalProcess("TERM") 144 except ProcessExitedAlready: 145 pass # oh well 146 yield self.pp.terminated_deferred 147 148 @defer.inlineCallbacks 149 def reconfigService(self, proxy_port=8001, namespace="default"): 150 self.proxy_port = proxy_port 151 self.namespace = namespace 152 yield self.ensureSubprocessKilled() 153 self.pp = self.LocalPP() 154 self.process = reactor.spawnProcess( 155 self.pp, 156 self.kube_ctl_proxy_cmd[0], 157 self.kube_ctl_proxy_cmd + ["-p", str(self.proxy_port)], 158 env=None) 159 self.kube_proxy_output = yield self.pp.got_output_deferred 160 161 def stopService(self): 162 return self.ensureSubprocessKilled() 163 164 def getConfig(self): 165 return { 166 'master_url': "http://localhost:{}".format(self.proxy_port), 167 'namespace': self.namespace 168 } 169 170 171class KubeInClusterConfigLoader(KubeConfigLoaderBase): 172 kube_dir = '/var/run/secrets/kubernetes.io/serviceaccount/' 173 174 kube_namespace_file = os.path.join(kube_dir, 'namespace') 175 kube_token_file = os.path.join(kube_dir, 'token') 176 kube_cert_file = os.path.join(kube_dir, 'ca.crt') 177 178 def checkConfig(self): 179 if not os.path.exists(self.kube_dir): 180 config.error( 181 "Not in kubernetes cluster (kube_dir not found: {})".format( 182 self.kube_dir)) 183 184 def reconfigService(self): 185 self.config = {} 186 self.config['master_url'] = os.environ['KUBERNETES_PORT'].replace( 187 'tcp', 'https') 188 self.config['verify'] = self.kube_cert_file 189 with open(self.kube_token_file, encoding="utf-8") as token_content: 190 token = token_content.read().strip() 191 self.config['headers'] = { 192 'Authorization': 'Bearer {0}'.format(token) 193 } 194 with open(self.kube_namespace_file, encoding="utf-8") as namespace_content: 195 self.config['namespace'] = namespace_content.read().strip() 196 197 def getConfig(self): 198 return self.config 199 200 201class KubeError(RuntimeError): 202 def __init__(self, response_json): 203 super().__init__(response_json['message']) 204 self.json = response_json 205 self.reason = response_json.get('reason') 206 207 208class KubeClientService(HTTPClientService): 209 def __init__(self, kube_config=None): 210 self.config = kube_config 211 super().__init__('') 212 self._namespace = None 213 kube_config.setServiceParent(self) 214 215 @defer.inlineCallbacks 216 def _prepareRequest(self, ep, kwargs): 217 config = self.config.getConfig() 218 self._base_url = config['master_url'] 219 url, req_kwargs = super()._prepareRequest(ep, kwargs) 220 221 if 'headers' not in req_kwargs: 222 req_kwargs['headers'] = {} 223 if 'headers' in config: 224 req_kwargs['headers'].update(config['headers']) 225 226 auth = yield self.config.getAuthorization() 227 if auth is not None: 228 req_kwargs['headers']['Authorization'] = auth 229 230 # warning: this only works with txrequests! not treq 231 for arg in ['cert', 'verify']: 232 if arg in config: 233 req_kwargs[arg] = config[arg] 234 235 return (url, req_kwargs) 236 237 @defer.inlineCallbacks 238 def createPod(self, namespace, spec): 239 url = '/api/v1/namespaces/{namespace}/pods'.format(namespace=namespace) 240 res = yield self.post(url, json=spec) 241 res_json = yield res.json() 242 if res.code not in (200, 201, 202): 243 raise KubeError(res_json) 244 return res_json 245 246 @defer.inlineCallbacks 247 def deletePod(self, namespace, name, graceperiod=0): 248 url = '/api/v1/namespaces/{namespace}/pods/{name}'.format( 249 namespace=namespace, name=name) 250 res = yield self.delete(url, params={'graceperiod': graceperiod}) 251 res_json = yield res.json() 252 if res.code != 200: 253 raise KubeError(res_json) 254 return res_json 255 256 @defer.inlineCallbacks 257 def waitForPodDeletion(self, namespace, name, timeout): 258 t1 = time.time() 259 url = '/api/v1/namespaces/{namespace}/pods/{name}/status'.format( 260 namespace=namespace, name=name) 261 while True: 262 if time.time() - t1 > timeout: 263 raise TimeoutError( 264 "Did not see pod {name} terminate after {timeout}s".format( 265 name=name, timeout=timeout)) 266 res = yield self.get(url) 267 res_json = yield res.json() 268 if res.code == 404: 269 break # 404 means the pod has terminated 270 if res.code != 200: 271 raise KubeError(res_json) 272 yield asyncSleep(1) 273 return res_json 274 275 @property 276 def namespace(self): 277 if self._namespace is None: 278 self._namespace = self.config.getConfig()['namespace'] 279 return self._namespace 280