1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16import copy 17import datetime 18import json 19 20from twisted.internet import defer 21from twisted.internet import reactor 22from twisted.python import log 23 24from buildbot import config 25from buildbot import util 26from buildbot.changes import base 27from buildbot.changes.filter import ChangeFilter 28from buildbot.util import bytes2unicode 29from buildbot.util import httpclientservice 30from buildbot.util import runprocess 31from buildbot.util.protocol import LineProcessProtocol 32from buildbot.util.pullrequest import PullRequestMixin 33 34 35def _canonicalize_event(event): 36 """ 37 Return an event dictionary which is consistent between the gerrit 38 event stream and the gerrit event log formats. 39 """ 40 # For "patchset-created" the events-log JSON looks like: 41 # "project": {"name": "buildbot"} 42 # while the stream-events JSON looks like: 43 # "project": "buildbot" 44 # so we canonicalize them to the latter 45 if "change" not in event: 46 return event 47 48 change = event["change"] 49 if "project" not in change: 50 return event 51 52 project = change["project"] 53 if not isinstance(project, dict): 54 return event 55 56 if "name" not in project: 57 return event 58 59 event = copy.deepcopy(event) 60 event["change"]["project"] = project["name"] 61 return event 62 63 64class GerritChangeFilter(ChangeFilter): 65 66 """This gerrit specific change filter helps creating pre-commit and post-commit builders""" 67 68 def __init__(self, 69 eventtype=None, eventtype_re=None, eventtype_fn=None, **kw): 70 super().__init__(**kw) 71 72 self.checks.update( 73 self.createChecks( 74 (eventtype, eventtype_re, eventtype_fn, "prop:event.type"), 75 )) 76 # for branch change filter, we take the real gerrit branch 77 # instead of the change's branch, which is also used as a grouping key 78 if "branch" in self.checks: 79 self.checks["prop:event.change.branch"] = self.checks["branch"] 80 del self.checks["branch"] 81 82 83def _gerrit_user_to_author(props, username="unknown"): 84 """ 85 Convert Gerrit account properties to Buildbot format 86 87 Take into account missing values 88 """ 89 username = props.get("username", username) 90 username = props.get("name", username) 91 if "email" in props: 92 username += " <%(email)s>" % props 93 return username 94 95 96class GerritChangeSourceBase(base.ChangeSource, PullRequestMixin): 97 98 """This source will maintain a connection to gerrit ssh server 99 that will provide us gerrit events in json format.""" 100 101 compare_attrs = ("gerritserver", "gerritport") 102 name = None 103 # list of properties that are no of no use to be put in the event dict 104 external_property_denylist = ["event.eventCreatedOn"] 105 external_property_whitelist = ['*'] 106 property_basename = 'event' 107 108 def checkConfig(self, 109 gitBaseURL=None, 110 handled_events=("patchset-created", "ref-updated"), 111 debug=False, 112 get_files=False): 113 114 if gitBaseURL is None: 115 config.error("gitBaseURL must be specified") 116 117 def reconfigService(self, 118 gitBaseURL=None, 119 handled_events=("patchset-created", "ref-updated"), 120 debug=False, 121 get_files=False): 122 self.gitBaseURL = gitBaseURL 123 self.handled_events = list(handled_events) 124 self._get_files = get_files 125 self.debug = debug 126 127 def lineReceived(self, line): 128 try: 129 event = json.loads(bytes2unicode(line)) 130 except ValueError: 131 log.msg("bad json line: {}".format(line)) 132 return defer.succeed(None) 133 134 if not(isinstance(event, dict) and "type" in event): 135 if self.debug: 136 log.msg("no type in event {}".format(line)) 137 return defer.succeed(None) 138 139 return self.eventReceived(event) 140 141 def build_properties(self, event): 142 properties = self.extractProperties(event) 143 properties["event.source"] = self.__class__.__name__ 144 if event['type'] in ('patchset-created', 'comment-added') and 'change' in event: 145 properties['target_branch'] = event["change"]["branch"] 146 return properties 147 148 def eventReceived(self, event): 149 if not (event['type'] in self.handled_events): 150 if self.debug: 151 log.msg("the event type '{}' is not setup to handle".format(event['type'])) 152 return defer.succeed(None) 153 154 properties = self.build_properties(event) 155 func_name = "eventReceived_{}".format(event["type"].replace("-", "_")) 156 func = getattr(self, func_name, None) 157 if func is None: 158 return self.addChangeFromEvent(properties, event) 159 160 return func(properties, event) 161 162 @defer.inlineCallbacks 163 def addChange(self, event_type, chdict): 164 stampdict = { 165 "branch": chdict["branch"], 166 "revision": chdict["revision"], 167 "patch_author": chdict["author"], 168 "patch_comment": chdict["comments"], 169 "repository": chdict["repository"], 170 "project": chdict["project"], 171 "codebase": '', 172 } 173 174 stampid, found_existing = yield( 175 self.master.db.sourcestamps.findOrCreateId(**stampdict)) 176 177 if found_existing and event_type in ("patchset-created", "ref-updated"): 178 if self.debug: 179 eventstr = "{}/{} -- {}:{}".format( 180 self.gitBaseURL, chdict["project"], chdict["branch"], 181 chdict["revision"]) 182 message = ( 183 "gerrit: duplicate change event {} by {}" 184 .format(eventstr, self.__class__.__name__)) 185 log.msg(message.encode("utf-8")) 186 return 187 188 if self.debug: 189 eventstr = "{} -- {}:{}".format( 190 chdict["repository"], chdict["branch"], chdict["revision"]) 191 message = ( 192 "gerrit: adding change from {} in {}" 193 .format(eventstr, self.__class__.__name__)) 194 log.msg(message.encode("utf-8")) 195 196 try: 197 yield self.master.data.updates.addChange(**chdict) 198 except Exception: 199 # eat failures.. 200 log.err('error adding change from GerritChangeSource') 201 202 def get_branch_from_event(self, event): 203 if event['type'] in ('patchset-created', 'comment-added'): 204 return event["patchSet"]["ref"] 205 return event["change"]["branch"] 206 207 @defer.inlineCallbacks 208 def addChangeFromEvent(self, properties, event): 209 if "change" not in event: 210 if self.debug: 211 log.msg("unsupported event {}".format(event["type"])) 212 return None 213 214 if "patchSet" not in event: 215 if self.debug: 216 log.msg("unsupported event {}".format(event["type"])) 217 return None 218 219 event = _canonicalize_event(event) 220 event_change = event["change"] 221 222 files = ["unknown"] 223 if self._get_files: 224 files = yield self.getFiles( 225 change=event_change["number"], 226 patchset=event["patchSet"]["number"] 227 ) 228 229 yield self.addChange(event['type'], { 230 'author': _gerrit_user_to_author(event_change["owner"]), 231 'project': util.bytes2unicode(event_change["project"]), 232 'repository': "{}/{}".format( 233 self.gitBaseURL, event_change["project"]), 234 'branch': self.get_branch_from_event(event), 235 'revision': event["patchSet"]["revision"], 236 'revlink': event_change["url"], 237 'comments': event_change["subject"], 238 'files': files, 239 'category': event["type"], 240 'properties': properties}) 241 return None 242 243 def eventReceived_ref_updated(self, properties, event): 244 ref = event["refUpdate"] 245 author = "gerrit" 246 247 if "submitter" in event: 248 author = _gerrit_user_to_author(event["submitter"], author) 249 250 # Ignore ref-updated events if patchset-created events are expected for this push. 251 # ref-updated events may arrive before patchset-created events and cause problems, as 252 # builds would be using properties from ref-updated event and not from patchset-created. 253 # As a result it may appear that the change was not related to a Gerrit change and cause 254 # reporters to not submit reviews for example. 255 if 'patchset-created' in self.handled_events and ref['refName'].startswith('refs/changes/'): 256 return None 257 258 return self.addChange(event['type'], dict( 259 author=author, 260 project=ref["project"], 261 repository="{}/{}".format(self.gitBaseURL, ref["project"]), 262 branch=ref["refName"], 263 revision=ref["newRev"], 264 comments="Gerrit: commit(s) pushed.", 265 files=["unknown"], 266 category=event["type"], 267 properties=properties)) 268 269 270class GerritChangeSource(GerritChangeSourceBase): 271 272 """This source will maintain a connection to gerrit ssh server 273 that will provide us gerrit events in json format.""" 274 275 compare_attrs = ("gerritserver", "gerritport") 276 277 STREAM_GOOD_CONNECTION_TIME = 120 278 "(seconds) connections longer than this are considered good, and reset the backoff timer" 279 280 STREAM_BACKOFF_MIN = 0.5 281 "(seconds) minimum, but nonzero, time to wait before retrying a failed connection" 282 283 STREAM_BACKOFF_EXPONENT = 1.5 284 "multiplier used to increase the backoff from MIN to MAX on repeated failures" 285 286 STREAM_BACKOFF_MAX = 60 287 "(seconds) maximum time to wait before retrying a failed connection" 288 289 name = None 290 291 def checkConfig(self, 292 gerritserver, 293 username, 294 gerritport=29418, 295 identity_file=None, 296 **kwargs): 297 if self.name is None: 298 self.name = "GerritChangeSource:{}@{}:{}".format(username, gerritserver, gerritport) 299 if 'gitBaseURL' not in kwargs: 300 kwargs['gitBaseURL'] = "automatic at reconfigure" 301 super().checkConfig(**kwargs) 302 303 def reconfigService(self, 304 gerritserver, 305 username, 306 gerritport=29418, 307 identity_file=None, 308 name=None, 309 **kwargs): 310 if 'gitBaseURL' not in kwargs: 311 kwargs['gitBaseURL'] = "ssh://{}@{}:{}".format(username, gerritserver, gerritport) 312 self.gerritserver = gerritserver 313 self.gerritport = gerritport 314 self.username = username 315 self.identity_file = identity_file 316 self.process = None 317 self.wantProcess = False 318 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN 319 return super().reconfigService(**kwargs) 320 321 class LocalPP(LineProcessProtocol): 322 323 def __init__(self, change_source): 324 super().__init__() 325 self.change_source = change_source 326 327 @defer.inlineCallbacks 328 def outLineReceived(self, line): 329 if self.change_source.debug: 330 log.msg(b"gerrit: " + line) 331 yield self.change_source.lineReceived(line) 332 333 def errLineReceived(self, line): 334 if self.change_source.debug: 335 log.msg(b"gerrit stderr: " + line) 336 337 def processEnded(self, status): 338 super().processEnded(status) 339 self.change_source.streamProcessStopped() 340 341 def streamProcessStopped(self): 342 self.process = None 343 344 # if the service is stopped, don't try to restart the process 345 if not self.wantProcess or not self.running: 346 return 347 348 now = util.now() 349 if now - self.lastStreamProcessStart < \ 350 self.STREAM_GOOD_CONNECTION_TIME: 351 # bad startup; start the stream process again after a timeout, 352 # and then increase the timeout 353 log.msg( 354 "'gerrit stream-events' failed; restarting after %ds" 355 % round(self.streamProcessTimeout)) 356 self.master.reactor.callLater( 357 self.streamProcessTimeout, self.startStreamProcess) 358 self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT 359 if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX: 360 self.streamProcessTimeout = self.STREAM_BACKOFF_MAX 361 else: 362 # good startup, but lost connection; restart immediately, 363 # and set the timeout to its minimum 364 365 # make sure we log the reconnection, so that it might be detected 366 # and network connectivity fixed 367 log.msg("gerrit stream-events lost connection. Reconnecting...") 368 self.startStreamProcess() 369 self.streamProcessTimeout = self.STREAM_BACKOFF_MIN 370 371 def _buildGerritCommand(self, *gerrit_args): 372 '''Get an ssh command list which invokes gerrit with the given args on the 373 remote host''' 374 375 cmd = [ 376 "ssh", 377 "-o", "BatchMode=yes", 378 "{}@{}".format(self.username, self.gerritserver), 379 "-p", str(self.gerritport) 380 ] 381 382 if self.identity_file is not None: 383 cmd.extend(["-i", self.identity_file]) 384 385 cmd.append("gerrit") 386 cmd.extend(gerrit_args) 387 return cmd 388 389 def startStreamProcess(self): 390 if self.debug: 391 log.msg("starting 'gerrit stream-events'") 392 393 cmd = self._buildGerritCommand("stream-events") 394 self.lastStreamProcessStart = util.now() 395 self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", cmd, env=None) 396 397 @defer.inlineCallbacks 398 def getFiles(self, change, patchset): 399 cmd = self._buildGerritCommand("query", str(change), "--format", "JSON", 400 "--files", "--patch-sets") 401 402 if self.debug: 403 log.msg("querying gerrit for changed files in change {}/{}: {}".format(change, patchset, 404 cmd)) 405 406 rc, out = yield runprocess.run_process(self.master.reactor, cmd, env=None, 407 collect_stderr=False) 408 if rc != 0: 409 return ["unknown"] 410 411 out = out.splitlines()[0] 412 res = json.loads(bytes2unicode(out)) 413 414 if res.get("rowCount") == 0: 415 return ["unknown"] 416 417 patchsets = {i["number"]: i["files"] for i in res["patchSets"]} 418 return [i["file"] for i in patchsets[int(patchset)]] 419 420 def activate(self): 421 self.wantProcess = True 422 self.startStreamProcess() 423 424 def deactivate(self): 425 self.wantProcess = False 426 if self.process: 427 self.process.signalProcess("KILL") 428 # TODO: if this occurs while the process is restarting, some exceptions 429 # may be logged, although things will settle down normally 430 431 def describe(self): 432 status = "" 433 if not self.process: 434 status = "[NOT CONNECTED - check log]" 435 return (("GerritChangeSource watching the remote " 436 "Gerrit repository {}@{} {}").format(self.username, self.gerritserver, status)) 437 438 439class GerritEventLogPoller(GerritChangeSourceBase): 440 441 POLL_INTERVAL_SEC = 30 442 FIRST_FETCH_LOOKBACK_DAYS = 30 443 444 def checkConfig(self, 445 baseURL, 446 auth, 447 pollInterval=POLL_INTERVAL_SEC, 448 pollAtLaunch=True, 449 firstFetchLookback=FIRST_FETCH_LOOKBACK_DAYS, 450 **kwargs): 451 if self.name is None: 452 self.name = "GerritEventLogPoller:{}".format(baseURL) 453 super().checkConfig(**kwargs) 454 455 @defer.inlineCallbacks 456 def reconfigService(self, 457 baseURL, 458 auth, 459 pollInterval=POLL_INTERVAL_SEC, 460 pollAtLaunch=True, 461 firstFetchLookback=FIRST_FETCH_LOOKBACK_DAYS, 462 **kwargs): 463 464 yield super().reconfigService(**kwargs) 465 if baseURL.endswith('/'): 466 baseURL = baseURL[:-1] 467 468 self._pollInterval = pollInterval 469 self._pollAtLaunch = pollAtLaunch 470 self._oid = yield self.master.db.state.getObjectId(self.name, self.__class__.__name__) 471 self._http = yield httpclientservice.HTTPClientService.getService( 472 self.master, baseURL, auth=auth) 473 474 self._first_fetch_lookback = firstFetchLookback 475 self._last_event_time = None 476 477 @staticmethod 478 def now(): 479 """patchable now (datetime is not patchable as builtin)""" 480 return datetime.datetime.utcnow() 481 482 @defer.inlineCallbacks 483 def poll(self): 484 last_event_ts = yield self.master.db.state.getState(self._oid, 'last_event_ts', None) 485 if last_event_ts is None: 486 # If there is not last event time stored in the database, then set 487 # the last event time to some historical look-back 488 last_event = self.now() - datetime.timedelta(days=self._first_fetch_lookback) 489 else: 490 last_event = datetime.datetime.utcfromtimestamp(last_event_ts) 491 last_event_formatted = last_event.strftime("%Y-%m-%d %H:%M:%S") 492 493 if self.debug: 494 log.msg("Polling gerrit: {}".format(last_event_formatted).encode("utf-8")) 495 496 res = yield self._http.get("/plugins/events-log/events/", 497 params=dict(t1=last_event_formatted)) 498 lines = yield res.content() 499 for line in lines.splitlines(): 500 yield self.lineReceived(line) 501 502 @defer.inlineCallbacks 503 def eventReceived(self, event): 504 res = yield super().eventReceived(event) 505 if 'eventCreatedOn' in event: 506 yield self.master.db.state.setState(self._oid, 'last_event_ts', event['eventCreatedOn']) 507 return res 508 509 @defer.inlineCallbacks 510 def getFiles(self, change, patchset): 511 res = yield self._http.get("/changes/{}/revisions/{}/files/".format(change, patchset)) 512 res = yield res.content() 513 514 res = res.splitlines()[1].decode('utf8') # the first line of every response is `)]}'` 515 return list(json.loads(res)) 516 517 # FIXME this copy the code from PollingChangeSource 518 # but as PollingChangeSource and its subclasses need to be ported to reconfigurability 519 # we can't use it right now 520 @base.poll_method 521 def doPoll(self): 522 d = defer.maybeDeferred(self.poll) 523 d.addErrback(log.err, 'while polling for changes') 524 return d 525 526 def force(self): 527 self.doPoll() 528 529 def activate(self): 530 self.doPoll.start(interval=self._pollInterval, now=self._pollAtLaunch) 531 532 def deactivate(self): 533 return self.doPoll.stop() 534 535 def describe(self): 536 msg = ("GerritEventLogPoller watching the remote " 537 "Gerrit repository {}") 538 return msg.format(self.name) 539