1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16import copy
17import datetime
18import json
19
20from twisted.internet import defer
21from twisted.internet import reactor
22from twisted.python import log
23
24from buildbot import config
25from buildbot import util
26from buildbot.changes import base
27from buildbot.changes.filter import ChangeFilter
28from buildbot.util import bytes2unicode
29from buildbot.util import httpclientservice
30from buildbot.util import runprocess
31from buildbot.util.protocol import LineProcessProtocol
32from buildbot.util.pullrequest import PullRequestMixin
33
34
35def _canonicalize_event(event):
36    """
37    Return an event dictionary which is consistent between the gerrit
38    event stream and the gerrit event log formats.
39    """
40    # For "patchset-created" the events-log JSON looks like:
41    #   "project": {"name": "buildbot"}
42    # while the stream-events JSON looks like:
43    #   "project": "buildbot"
44    # so we canonicalize them to the latter
45    if "change" not in event:
46        return event
47
48    change = event["change"]
49    if "project" not in change:
50        return event
51
52    project = change["project"]
53    if not isinstance(project, dict):
54        return event
55
56    if "name" not in project:
57        return event
58
59    event = copy.deepcopy(event)
60    event["change"]["project"] = project["name"]
61    return event
62
63
64class GerritChangeFilter(ChangeFilter):
65
66    """This gerrit specific change filter helps creating pre-commit and post-commit builders"""
67
68    def __init__(self,
69                 eventtype=None, eventtype_re=None, eventtype_fn=None, **kw):
70        super().__init__(**kw)
71
72        self.checks.update(
73            self.createChecks(
74                (eventtype, eventtype_re, eventtype_fn, "prop:event.type"),
75            ))
76        # for branch change filter, we take the real gerrit branch
77        # instead of the change's branch, which is also used as a grouping key
78        if "branch" in self.checks:
79            self.checks["prop:event.change.branch"] = self.checks["branch"]
80            del self.checks["branch"]
81
82
83def _gerrit_user_to_author(props, username="unknown"):
84    """
85    Convert Gerrit account properties to Buildbot format
86
87    Take into account missing values
88    """
89    username = props.get("username", username)
90    username = props.get("name", username)
91    if "email" in props:
92        username += " <%(email)s>" % props
93    return username
94
95
96class GerritChangeSourceBase(base.ChangeSource, PullRequestMixin):
97
98    """This source will maintain a connection to gerrit ssh server
99    that will provide us gerrit events in json format."""
100
101    compare_attrs = ("gerritserver", "gerritport")
102    name = None
103    # list of properties that are no of no use to be put in the event dict
104    external_property_denylist = ["event.eventCreatedOn"]
105    external_property_whitelist = ['*']
106    property_basename = 'event'
107
108    def checkConfig(self,
109                    gitBaseURL=None,
110                    handled_events=("patchset-created", "ref-updated"),
111                    debug=False,
112                    get_files=False):
113
114        if gitBaseURL is None:
115            config.error("gitBaseURL must be specified")
116
117    def reconfigService(self,
118                        gitBaseURL=None,
119                        handled_events=("patchset-created", "ref-updated"),
120                        debug=False,
121                        get_files=False):
122        self.gitBaseURL = gitBaseURL
123        self.handled_events = list(handled_events)
124        self._get_files = get_files
125        self.debug = debug
126
127    def lineReceived(self, line):
128        try:
129            event = json.loads(bytes2unicode(line))
130        except ValueError:
131            log.msg("bad json line: {}".format(line))
132            return defer.succeed(None)
133
134        if not(isinstance(event, dict) and "type" in event):
135            if self.debug:
136                log.msg("no type in event {}".format(line))
137            return defer.succeed(None)
138
139        return self.eventReceived(event)
140
141    def build_properties(self, event):
142        properties = self.extractProperties(event)
143        properties["event.source"] = self.__class__.__name__
144        if event['type'] in ('patchset-created', 'comment-added') and 'change' in event:
145            properties['target_branch'] = event["change"]["branch"]
146        return properties
147
148    def eventReceived(self, event):
149        if not (event['type'] in self.handled_events):
150            if self.debug:
151                log.msg("the event type '{}' is not setup to handle".format(event['type']))
152            return defer.succeed(None)
153
154        properties = self.build_properties(event)
155        func_name = "eventReceived_{}".format(event["type"].replace("-", "_"))
156        func = getattr(self, func_name, None)
157        if func is None:
158            return self.addChangeFromEvent(properties, event)
159
160        return func(properties, event)
161
162    @defer.inlineCallbacks
163    def addChange(self, event_type, chdict):
164        stampdict = {
165            "branch": chdict["branch"],
166            "revision": chdict["revision"],
167            "patch_author": chdict["author"],
168            "patch_comment": chdict["comments"],
169            "repository": chdict["repository"],
170            "project": chdict["project"],
171            "codebase": '',
172        }
173
174        stampid, found_existing = yield(
175             self.master.db.sourcestamps.findOrCreateId(**stampdict))
176
177        if found_existing and event_type in ("patchset-created", "ref-updated"):
178            if self.debug:
179                eventstr = "{}/{} -- {}:{}".format(
180                    self.gitBaseURL, chdict["project"], chdict["branch"],
181                    chdict["revision"])
182                message = (
183                    "gerrit: duplicate change event {} by {}"
184                    .format(eventstr, self.__class__.__name__))
185                log.msg(message.encode("utf-8"))
186            return
187
188        if self.debug:
189            eventstr = "{} -- {}:{}".format(
190                chdict["repository"], chdict["branch"], chdict["revision"])
191            message = (
192                "gerrit: adding change from {} in {}"
193                .format(eventstr, self.__class__.__name__))
194            log.msg(message.encode("utf-8"))
195
196        try:
197            yield self.master.data.updates.addChange(**chdict)
198        except Exception:
199            # eat failures..
200            log.err('error adding change from GerritChangeSource')
201
202    def get_branch_from_event(self, event):
203        if event['type'] in ('patchset-created', 'comment-added'):
204            return event["patchSet"]["ref"]
205        return event["change"]["branch"]
206
207    @defer.inlineCallbacks
208    def addChangeFromEvent(self, properties, event):
209        if "change" not in event:
210            if self.debug:
211                log.msg("unsupported event {}".format(event["type"]))
212            return None
213
214        if "patchSet" not in event:
215            if self.debug:
216                log.msg("unsupported event {}".format(event["type"]))
217            return None
218
219        event = _canonicalize_event(event)
220        event_change = event["change"]
221
222        files = ["unknown"]
223        if self._get_files:
224            files = yield self.getFiles(
225                change=event_change["number"],
226                patchset=event["patchSet"]["number"]
227            )
228
229        yield self.addChange(event['type'], {
230            'author': _gerrit_user_to_author(event_change["owner"]),
231            'project': util.bytes2unicode(event_change["project"]),
232            'repository': "{}/{}".format(
233                self.gitBaseURL, event_change["project"]),
234            'branch': self.get_branch_from_event(event),
235            'revision': event["patchSet"]["revision"],
236            'revlink': event_change["url"],
237            'comments': event_change["subject"],
238            'files': files,
239            'category': event["type"],
240            'properties': properties})
241        return None
242
243    def eventReceived_ref_updated(self, properties, event):
244        ref = event["refUpdate"]
245        author = "gerrit"
246
247        if "submitter" in event:
248            author = _gerrit_user_to_author(event["submitter"], author)
249
250        # Ignore ref-updated events if patchset-created events are expected for this push.
251        # ref-updated events may arrive before patchset-created events and cause problems, as
252        # builds would be using properties from ref-updated event and not from patchset-created.
253        # As a result it may appear that the change was not related to a Gerrit change and cause
254        # reporters to not submit reviews for example.
255        if 'patchset-created' in self.handled_events and ref['refName'].startswith('refs/changes/'):
256            return None
257
258        return self.addChange(event['type'], dict(
259            author=author,
260            project=ref["project"],
261            repository="{}/{}".format(self.gitBaseURL, ref["project"]),
262            branch=ref["refName"],
263            revision=ref["newRev"],
264            comments="Gerrit: commit(s) pushed.",
265            files=["unknown"],
266            category=event["type"],
267            properties=properties))
268
269
270class GerritChangeSource(GerritChangeSourceBase):
271
272    """This source will maintain a connection to gerrit ssh server
273    that will provide us gerrit events in json format."""
274
275    compare_attrs = ("gerritserver", "gerritport")
276
277    STREAM_GOOD_CONNECTION_TIME = 120
278    "(seconds) connections longer than this are considered good, and reset the backoff timer"
279
280    STREAM_BACKOFF_MIN = 0.5
281    "(seconds) minimum, but nonzero, time to wait before retrying a failed connection"
282
283    STREAM_BACKOFF_EXPONENT = 1.5
284    "multiplier used to increase the backoff from MIN to MAX on repeated failures"
285
286    STREAM_BACKOFF_MAX = 60
287    "(seconds) maximum time to wait before retrying a failed connection"
288
289    name = None
290
291    def checkConfig(self,
292                    gerritserver,
293                    username,
294                    gerritport=29418,
295                    identity_file=None,
296                    **kwargs):
297        if self.name is None:
298            self.name = "GerritChangeSource:{}@{}:{}".format(username, gerritserver, gerritport)
299        if 'gitBaseURL' not in kwargs:
300            kwargs['gitBaseURL'] = "automatic at reconfigure"
301        super().checkConfig(**kwargs)
302
303    def reconfigService(self,
304                        gerritserver,
305                        username,
306                        gerritport=29418,
307                        identity_file=None,
308                        name=None,
309                        **kwargs):
310        if 'gitBaseURL' not in kwargs:
311            kwargs['gitBaseURL'] = "ssh://{}@{}:{}".format(username, gerritserver, gerritport)
312        self.gerritserver = gerritserver
313        self.gerritport = gerritport
314        self.username = username
315        self.identity_file = identity_file
316        self.process = None
317        self.wantProcess = False
318        self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
319        return super().reconfigService(**kwargs)
320
321    class LocalPP(LineProcessProtocol):
322
323        def __init__(self, change_source):
324            super().__init__()
325            self.change_source = change_source
326
327        @defer.inlineCallbacks
328        def outLineReceived(self, line):
329            if self.change_source.debug:
330                log.msg(b"gerrit: " + line)
331            yield self.change_source.lineReceived(line)
332
333        def errLineReceived(self, line):
334            if self.change_source.debug:
335                log.msg(b"gerrit stderr: " + line)
336
337        def processEnded(self, status):
338            super().processEnded(status)
339            self.change_source.streamProcessStopped()
340
341    def streamProcessStopped(self):
342        self.process = None
343
344        # if the service is stopped, don't try to restart the process
345        if not self.wantProcess or not self.running:
346            return
347
348        now = util.now()
349        if now - self.lastStreamProcessStart < \
350           self.STREAM_GOOD_CONNECTION_TIME:
351            # bad startup; start the stream process again after a timeout,
352            # and then increase the timeout
353            log.msg(
354                "'gerrit stream-events' failed; restarting after %ds"
355                % round(self.streamProcessTimeout))
356            self.master.reactor.callLater(
357                self.streamProcessTimeout, self.startStreamProcess)
358            self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT
359            if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX:
360                self.streamProcessTimeout = self.STREAM_BACKOFF_MAX
361        else:
362            # good startup, but lost connection; restart immediately,
363            # and set the timeout to its minimum
364
365            # make sure we log the reconnection, so that it might be detected
366            # and network connectivity fixed
367            log.msg("gerrit stream-events lost connection. Reconnecting...")
368            self.startStreamProcess()
369            self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
370
371    def _buildGerritCommand(self, *gerrit_args):
372        '''Get an ssh command list which invokes gerrit with the given args on the
373        remote host'''
374
375        cmd = [
376            "ssh",
377            "-o", "BatchMode=yes",
378            "{}@{}".format(self.username, self.gerritserver),
379            "-p", str(self.gerritport)
380        ]
381
382        if self.identity_file is not None:
383            cmd.extend(["-i", self.identity_file])
384
385        cmd.append("gerrit")
386        cmd.extend(gerrit_args)
387        return cmd
388
389    def startStreamProcess(self):
390        if self.debug:
391            log.msg("starting 'gerrit stream-events'")
392
393        cmd = self._buildGerritCommand("stream-events")
394        self.lastStreamProcessStart = util.now()
395        self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", cmd, env=None)
396
397    @defer.inlineCallbacks
398    def getFiles(self, change, patchset):
399        cmd = self._buildGerritCommand("query", str(change), "--format", "JSON",
400                                       "--files", "--patch-sets")
401
402        if self.debug:
403            log.msg("querying gerrit for changed files in change {}/{}: {}".format(change, patchset,
404                                                                                   cmd))
405
406        rc, out = yield runprocess.run_process(self.master.reactor, cmd, env=None,
407                                               collect_stderr=False)
408        if rc != 0:
409            return ["unknown"]
410
411        out = out.splitlines()[0]
412        res = json.loads(bytes2unicode(out))
413
414        if res.get("rowCount") == 0:
415            return ["unknown"]
416
417        patchsets = {i["number"]: i["files"] for i in res["patchSets"]}
418        return [i["file"] for i in patchsets[int(patchset)]]
419
420    def activate(self):
421        self.wantProcess = True
422        self.startStreamProcess()
423
424    def deactivate(self):
425        self.wantProcess = False
426        if self.process:
427            self.process.signalProcess("KILL")
428        # TODO: if this occurs while the process is restarting, some exceptions
429        # may be logged, although things will settle down normally
430
431    def describe(self):
432        status = ""
433        if not self.process:
434            status = "[NOT CONNECTED - check log]"
435        return (("GerritChangeSource watching the remote "
436                 "Gerrit repository {}@{} {}").format(self.username, self.gerritserver, status))
437
438
439class GerritEventLogPoller(GerritChangeSourceBase):
440
441    POLL_INTERVAL_SEC = 30
442    FIRST_FETCH_LOOKBACK_DAYS = 30
443
444    def checkConfig(self,
445                    baseURL,
446                    auth,
447                    pollInterval=POLL_INTERVAL_SEC,
448                    pollAtLaunch=True,
449                    firstFetchLookback=FIRST_FETCH_LOOKBACK_DAYS,
450                    **kwargs):
451        if self.name is None:
452            self.name = "GerritEventLogPoller:{}".format(baseURL)
453        super().checkConfig(**kwargs)
454
455    @defer.inlineCallbacks
456    def reconfigService(self,
457                        baseURL,
458                        auth,
459                        pollInterval=POLL_INTERVAL_SEC,
460                        pollAtLaunch=True,
461                        firstFetchLookback=FIRST_FETCH_LOOKBACK_DAYS,
462                        **kwargs):
463
464        yield super().reconfigService(**kwargs)
465        if baseURL.endswith('/'):
466            baseURL = baseURL[:-1]
467
468        self._pollInterval = pollInterval
469        self._pollAtLaunch = pollAtLaunch
470        self._oid = yield self.master.db.state.getObjectId(self.name, self.__class__.__name__)
471        self._http = yield httpclientservice.HTTPClientService.getService(
472            self.master, baseURL, auth=auth)
473
474        self._first_fetch_lookback = firstFetchLookback
475        self._last_event_time = None
476
477    @staticmethod
478    def now():
479        """patchable now (datetime is not patchable as builtin)"""
480        return datetime.datetime.utcnow()
481
482    @defer.inlineCallbacks
483    def poll(self):
484        last_event_ts = yield self.master.db.state.getState(self._oid, 'last_event_ts', None)
485        if last_event_ts is None:
486            # If there is not last event time stored in the database, then set
487            # the last event time to some historical look-back
488            last_event = self.now() - datetime.timedelta(days=self._first_fetch_lookback)
489        else:
490            last_event = datetime.datetime.utcfromtimestamp(last_event_ts)
491        last_event_formatted = last_event.strftime("%Y-%m-%d %H:%M:%S")
492
493        if self.debug:
494            log.msg("Polling gerrit: {}".format(last_event_formatted).encode("utf-8"))
495
496        res = yield self._http.get("/plugins/events-log/events/",
497                                   params=dict(t1=last_event_formatted))
498        lines = yield res.content()
499        for line in lines.splitlines():
500            yield self.lineReceived(line)
501
502    @defer.inlineCallbacks
503    def eventReceived(self, event):
504        res = yield super().eventReceived(event)
505        if 'eventCreatedOn' in event:
506            yield self.master.db.state.setState(self._oid, 'last_event_ts', event['eventCreatedOn'])
507        return res
508
509    @defer.inlineCallbacks
510    def getFiles(self, change, patchset):
511        res = yield self._http.get("/changes/{}/revisions/{}/files/".format(change, patchset))
512        res = yield res.content()
513
514        res = res.splitlines()[1].decode('utf8')  # the first line of every response is `)]}'`
515        return list(json.loads(res))
516
517    # FIXME this copy the code from PollingChangeSource
518    # but as PollingChangeSource and its subclasses need to be ported to reconfigurability
519    # we can't use it right now
520    @base.poll_method
521    def doPoll(self):
522        d = defer.maybeDeferred(self.poll)
523        d.addErrback(log.err, 'while polling for changes')
524        return d
525
526    def force(self):
527        self.doPoll()
528
529    def activate(self):
530        self.doPoll.start(interval=self._pollInterval, now=self._pollAtLaunch)
531
532    def deactivate(self):
533        return self.doPoll.stop()
534
535    def describe(self):
536        msg = ("GerritEventLogPoller watching the remote "
537               "Gerrit repository {}")
538        return msg.format(self.name)
539