1#!/usr/bin/env python
2# encoding: UTF-8
3#
4# Licensed to the Apache Software Foundation (ASF) under one or more
5# contributor license agreements.  See the NOTICE file distributed with
6# this work for additional information regarding copyright ownership.
7# The ASF licenses this file to You under the Apache License, Version 2.0
8# (the "License"); you may not use this file except in compliance with
9# the License.  You may obtain a copy of the License at
10#
11#     http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19
20#
21# SvnWcSub - Subscribe to a SvnPubSub stream, and keep a set of working copy
22# paths in sync
23#
24# Example:
25#  svnwcsub.py svnwcsub.conf
26#
27# On startup svnwcsub checks the working copy's path, runs a single svn update
28# and then watches for changes to that path.
29#
30# See svnwcsub.conf for more information on its contents.
31#
32
33# TODO:
34# - bulk update at startup time to avoid backlog warnings
35# - fold BigDoEverythingClasss ("BDEC") into Daemon
36# - fold WorkingCopy._get_match() into __init__
37# - remove wc_ready(). assume all WorkingCopy instances are usable.
38#   place the instances into .watch at creation. the .update_applies()
39#   just returns if the wc is disabled (eg. could not find wc dir)
40# - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER
41#   (a base path exclusion list should work for the ASF)
42# - add support for SIGHUP to reread the config and reinitialize working copies
43# - joes will write documentation for svnpubsub as these items become fulfilled
44# - make LOGLEVEL configurable
45
46import errno
47import subprocess
48import threading
49import sys
50import stat
51import os
52import re
53import posixpath
54try:
55  import ConfigParser
56except ImportError:
57  import configparser as ConfigParser
58import time
59import logging.handlers
60try:
61  import Queue
62except ImportError:
63  import queue as Queue
64import optparse
65import functools
66try:
67  import urlparse
68except ImportError:
69  import urllib.parse as urlparse
70
71import daemonize
72import svnpubsub.client
73import svnpubsub.util
74
75assert hasattr(subprocess, 'check_call')
76def check_call(*args, **kwds):
77    """Wrapper around subprocess.check_call() that logs stderr upon failure,
78    with an optional list of exit codes to consider non-failure."""
79    assert 'stderr' not in kwds
80    if '__okayexits' in kwds:
81        __okayexits = kwds['__okayexits']
82        del kwds['__okayexits']
83    else:
84        __okayexits = set([0]) # EXIT_SUCCESS
85    kwds.update(stderr=subprocess.PIPE)
86    pipe = subprocess.Popen(*args, **kwds)
87    output, errput = pipe.communicate()
88    if pipe.returncode not in __okayexits:
89        cmd = args[0] if len(args) else kwds.get('args', '(no command)')
90        # TODO: log stdout too?
91        logging.error('Command failed: returncode=%d command=%r stderr=%r',
92                      pipe.returncode, cmd, errput)
93        raise subprocess.CalledProcessError(pipe.returncode, args)
94    return pipe.returncode # is EXIT_OK
95
96### note: this runs synchronously. within the current Twisted environment,
97### it is called from ._get_match() which is run on a thread so it won't
98### block the Twisted main loop.
99def svn_info(svnbin, env, path):
100    "Run 'svn info' on the target path, returning a dict of info data."
101    args = [svnbin, "info", "--non-interactive", "--", path]
102    output = svnpubsub.util.check_output(args, env=env).strip()
103    info = { }
104    for line in output.split('\n'):
105        idx = line.index(':')
106        info[line[:idx]] = line[idx+1:].strip()
107    return info
108
109try:
110    import glob
111    glob.iglob
112    def is_emptydir(path):
113        # ### If the directory contains only dotfile children, this will readdir()
114        # ### the entire directory.  But os.readdir() is not exposed to us...
115        for x in glob.iglob('%s/*' % path):
116            return False
117        for x in glob.iglob('%s/.*' % path):
118            return False
119        return True
120except (ImportError, AttributeError):
121    # Python ≤2.4
122    def is_emptydir(path):
123        # This will read the entire directory list to memory.
124        return not os.listdir(path)
125
126class WorkingCopy(object):
127    def __init__(self, bdec, path, url):
128        self.path = path
129        self.url = url
130
131        try:
132            self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env)
133            bdec.wc_ready(self)
134        except:
135            logging.exception('problem with working copy: %s', path)
136
137    def update_applies(self, uuid, path):
138        if self.uuid != uuid:
139            return False
140
141        path = str(path)
142        if path == self.match:
143            #print "ua: Simple match"
144            # easy case. woo.
145            return True
146        if len(path) < len(self.match):
147            # path is potentially a parent directory of match?
148            #print "ua: parent check"
149            if self.match[0:len(path)] == path:
150                return True
151        if len(path) > len(self.match):
152            # path is potentially a sub directory of match
153            #print "ua: sub dir check"
154            if path[0:len(self.match)] == self.match:
155                return True
156        return False
157
158    def _get_match(self, svnbin, env):
159        ### quick little hack to auto-checkout missing working copies
160        dotsvn = os.path.join(self.path, ".svn")
161        if not os.path.isdir(dotsvn) or is_emptydir(dotsvn):
162            logging.info("autopopulate %s from %s" % (self.path, self.url))
163            check_call([svnbin, 'co', '-q',
164                        '--force',
165                        '--non-interactive',
166                        '--config-option',
167                        'config:miscellany:use-commit-times=on',
168                        '--', self.url, self.path],
169                       env=env)
170
171        # Fetch the info for matching dirs_changed against this WC
172        info = svn_info(svnbin, env, self.path)
173        root = info['Repository Root']
174        url = info['URL']
175        relpath = url[len(root):]  # also has leading '/'
176        uuid = info['Repository UUID']
177        return str(relpath), uuid
178
179
180PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/")
181
182class BigDoEverythingClasss(object):
183    def __init__(self, config):
184        self.svnbin = config.get_value('svnbin')
185        self.env = config.get_env()
186        self.tracking = config.get_track()
187        self.hook = config.get_optional_value('hook')
188        self.streams = config.get_value('streams').split()
189        self.worker = BackgroundWorker(self.svnbin, self.env, self.hook)
190        self.watch = [ ]
191
192    def start(self):
193        for path, url in self.tracking.items():
194            # working copies auto-register with the BDEC when they are ready.
195            WorkingCopy(self, path, url)
196
197    def wc_ready(self, wc):
198        # called when a working copy object has its basic info/url,
199        # Add it to our watchers, and trigger an svn update.
200        logging.info("Watching WC at %s <-> %s" % (wc.path, wc.url))
201        self.watch.append(wc)
202        self.worker.add_work(OP_BOOT, wc)
203
204    def _normalize_path(self, path):
205        if path[0] != '/':
206            return "/" + path
207        return posixpath.abspath(path)
208
209    def commit(self, url, commit):
210        if commit.type != 'svn' or commit.format != 1:
211            logging.info("SKIP unknown commit format (%s.%d)",
212                         commit.type, commit.format)
213            return
214        logging.info("COMMIT r%d (%d paths) from %s"
215                     % (commit.id, len(commit.changed), url))
216
217        paths = map(self._normalize_path, commit.changed)
218        if len(paths):
219            pre = posixpath.commonprefix(paths)
220            if pre == "/websites/":
221                # special case for svnmucc "dynamic content" buildbot commits
222                # just take the first production path to avoid updating all cms working copies
223                for p in paths:
224                    m = PRODUCTION_RE_FILTER.match(p)
225                    if m:
226                        pre = m.group(0)
227                        break
228
229            #print "Common Prefix: %s" % (pre)
230            wcs = [wc for wc in self.watch if wc.update_applies(commit.repository, pre)]
231            logging.info("Updating %d WC for r%d" % (len(wcs), commit.id))
232            for wc in wcs:
233                self.worker.add_work(OP_UPDATE, wc)
234
235
236# Start logging warnings if the work backlog reaches this many items
237BACKLOG_TOO_HIGH = 20
238OP_BOOT = 'boot'
239OP_UPDATE = 'update'
240OP_CLEANUP = 'cleanup'
241
242class BackgroundWorker(threading.Thread):
243    def __init__(self, svnbin, env, hook):
244        threading.Thread.__init__(self)
245
246        # The main thread/process should not wait for this thread to exit.
247        ### compat with Python 2.5
248        self.setDaemon(True)
249
250        self.svnbin = svnbin
251        self.env = env
252        self.hook = hook
253        self.q = Queue.Queue()
254
255        self.has_started = False
256
257    def run(self):
258        while True:
259            # This will block until something arrives
260            operation, wc = self.q.get()
261
262            # Warn if the queue is too long.
263            # (Note: the other thread might have added entries to self.q
264            # after the .get() and before the .qsize().)
265            qsize = self.q.qsize()+1
266            if operation != OP_BOOT and qsize > BACKLOG_TOO_HIGH:
267                logging.warn('worker backlog is at %d', qsize)
268
269            try:
270                if operation == OP_UPDATE:
271                    self._update(wc)
272                elif operation == OP_BOOT:
273                    self._update(wc, boot=True)
274                elif operation == OP_CLEANUP:
275                    self._cleanup(wc)
276                else:
277                    logging.critical('unknown operation: %s', operation)
278            except:
279                logging.exception('exception in worker')
280
281            # In case we ever want to .join() against the work queue
282            self.q.task_done()
283
284    def add_work(self, operation, wc):
285        # Start the thread when work first arrives. Thread-start needs to
286        # be delayed in case the process forks itself to become a daemon.
287        if not self.has_started:
288            self.start()
289            self.has_started = True
290
291        self.q.put((operation, wc))
292
293    def _update(self, wc, boot=False):
294        "Update the specified working copy."
295
296        # For giggles, let's clean up the working copy in case something
297        # happened earlier.
298        self._cleanup(wc)
299
300        logging.info("updating: %s", wc.path)
301
302        ## Run the hook
303        HEAD = svn_info(self.svnbin, self.env, wc.url)['Revision']
304        if self.hook:
305            hook_mode = ['pre-update', 'pre-boot'][boot]
306            logging.info('running hook: %s at %s',
307                         wc.path, hook_mode)
308            args = [self.hook, hook_mode, wc.path, HEAD, wc.url]
309            rc = check_call(args, env=self.env, __okayexits=[0, 1])
310            if rc == 1:
311                # TODO: log stderr
312                logging.warn('hook denied update of %s at %s',
313                             wc.path, hook_mode)
314                return
315            del rc
316
317        ### we need to move some of these args into the config. these are
318        ### still specific to the ASF setup.
319        args = [self.svnbin, 'switch',
320                '--quiet',
321                '--non-interactive',
322                '--trust-server-cert',
323                '--ignore-externals',
324                '--config-option',
325                'config:miscellany:use-commit-times=on',
326                '--',
327                wc.url + '@' + HEAD,
328                wc.path]
329        check_call(args, env=self.env)
330
331        ### check the loglevel before running 'svn info'?
332        info = svn_info(self.svnbin, self.env, wc.path)
333        assert info['Revision'] == HEAD
334        logging.info("updated: %s now at r%s", wc.path, info['Revision'])
335
336        ## Run the hook
337        if self.hook:
338            hook_mode = ['post-update', 'boot'][boot]
339            logging.info('running hook: %s at revision %s due to %s',
340                         wc.path, info['Revision'], hook_mode)
341            args = [self.hook, hook_mode,
342                    wc.path, info['Revision'], wc.url]
343            check_call(args, env=self.env)
344
345    def _cleanup(self, wc):
346        "Run a cleanup on the specified working copy."
347
348        ### we need to move some of these args into the config. these are
349        ### still specific to the ASF setup.
350        args = [self.svnbin, 'cleanup',
351                '--non-interactive',
352                '--trust-server-cert',
353                '--config-option',
354                'config:miscellany:use-commit-times=on',
355                wc.path]
356        check_call(args, env=self.env)
357
358
359class ReloadableConfig(ConfigParser.SafeConfigParser):
360    def __init__(self, fname):
361        ConfigParser.SafeConfigParser.__init__(self)
362
363        self.fname = fname
364        self.read(fname)
365
366        ### install a signal handler to set SHOULD_RELOAD. BDEC should
367        ### poll this flag, and then adjust its internal structures after
368        ### the reload.
369        self.should_reload = False
370
371    def reload(self):
372        # Delete everything. Just re-reading would overlay, and would not
373        # remove sections/options. Note that [DEFAULT] will not be removed.
374        for section in self.sections():
375            self.remove_section(section)
376
377        # Now re-read the configuration file.
378        self.read(fname)
379
380    def get_value(self, which):
381        return self.get(ConfigParser.DEFAULTSECT, which)
382
383    def get_optional_value(self, which, default=None):
384        if self.has_option(ConfigParser.DEFAULTSECT, which):
385            return self.get(ConfigParser.DEFAULTSECT, which)
386        else:
387            return default
388
389    def get_env(self):
390        env = os.environ.copy()
391        default_options = self.defaults().keys()
392        for name, value in self.items('env'):
393            if name not in default_options:
394                env[name] = value
395        return env
396
397    def get_track(self):
398        "Return the {PATH: URL} dictionary of working copies to track."
399        track = dict(self.items('track'))
400        for name in self.defaults().keys():
401            del track[name]
402        return track
403
404    def optionxform(self, option):
405        # Do not lowercase the option name.
406        return str(option)
407
408
409class Daemon(daemonize.Daemon):
410    def __init__(self, logfile, pidfile, umask, bdec):
411        daemonize.Daemon.__init__(self, logfile, pidfile)
412
413        self.umask = umask
414        self.bdec = bdec
415
416    def setup(self):
417        # There is no setup which the parent needs to wait for.
418        pass
419
420    def run(self):
421        logging.info('svnwcsub started, pid=%d', os.getpid())
422
423        # Set the umask in the daemon process. Defaults to 000 for
424        # daemonized processes. Foreground processes simply inherit
425        # the value from the parent process.
426        if self.umask is not None:
427            umask = int(self.umask, 8)
428            os.umask(umask)
429            logging.info('umask set to %03o', umask)
430
431        # Start the BDEC (on the main thread), then start the client
432        self.bdec.start()
433
434        mc = svnpubsub.client.MultiClient(self.bdec.streams,
435                                          self.bdec.commit,
436                                          self._event)
437        mc.run_forever()
438
439    def _event(self, url, event_name, event_arg):
440        if event_name == 'error':
441            logging.exception('from %s', url)
442        elif event_name == 'ping':
443            logging.debug('ping from %s', url)
444        else:
445            logging.info('"%s" from %s', event_name, url)
446
447
448def prepare_logging(logfile):
449    "Log to the specified file, or to stdout if None."
450
451    if logfile:
452        # Rotate logs daily, keeping 7 days worth.
453        handler = logging.handlers.TimedRotatingFileHandler(
454          logfile, when='midnight', backupCount=7,
455          )
456    else:
457        handler = logging.StreamHandler(sys.stdout)
458
459    # Add a timestamp to the log records
460    formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s',
461                                  '%Y-%m-%d %H:%M:%S')
462    handler.setFormatter(formatter)
463
464    # Apply the handler to the root logger
465    root = logging.getLogger()
466    root.addHandler(handler)
467
468    ### use logging.INFO for now. switch to cmdline option or a config?
469    root.setLevel(logging.INFO)
470
471
472def handle_options(options):
473    # Set up the logging, then process the rest of the options.
474    prepare_logging(options.logfile)
475
476    # In daemon mode, we let the daemonize module handle the pidfile.
477    # Otherwise, we should write this (foreground) PID into the file.
478    if options.pidfile and not options.daemon:
479        pid = os.getpid()
480        # Be wary of symlink attacks
481        try:
482            os.remove(options.pidfile)
483        except OSError:
484            pass
485        fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL,
486                     stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
487        os.write(fd, '%d\n' % pid)
488        os.close(fd)
489        logging.info('pid %d written to %s', pid, options.pidfile)
490
491    if options.gid:
492        try:
493            gid = int(options.gid)
494        except ValueError:
495            import grp
496            gid = grp.getgrnam(options.gid)[2]
497        logging.info('setting gid %d', gid)
498        os.setgid(gid)
499
500    if options.uid:
501        try:
502            uid = int(options.uid)
503        except ValueError:
504            import pwd
505            uid = pwd.getpwnam(options.uid)[2]
506        logging.info('setting uid %d', uid)
507        os.setuid(uid)
508
509
510def main(args):
511    parser = optparse.OptionParser(
512        description='An SvnPubSub client to keep working copies synchronized '
513                    'with a repository.',
514        usage='Usage: %prog [options] CONFIG_FILE',
515        )
516    parser.add_option('--logfile',
517                      help='filename for logging')
518    parser.add_option('--pidfile',
519                      help="the process' PID will be written to this file")
520    parser.add_option('--uid',
521                      help='switch to this UID before running')
522    parser.add_option('--gid',
523                      help='switch to this GID before running')
524    parser.add_option('--umask',
525                      help='set this (octal) umask before running')
526    parser.add_option('--daemon', action='store_true',
527                      help='run as a background daemon')
528
529    options, extra = parser.parse_args(args)
530
531    if len(extra) != 1:
532        parser.error('CONFIG_FILE is required')
533    config_file = extra[0]
534
535    if options.daemon and not options.logfile:
536        parser.error('LOGFILE is required when running as a daemon')
537    if options.daemon and not options.pidfile:
538        parser.error('PIDFILE is required when running as a daemon')
539
540    # Process any provided options.
541    handle_options(options)
542
543    c = ReloadableConfig(config_file)
544    bdec = BigDoEverythingClasss(c)
545
546    # We manage the logfile ourselves (along with possible rotation). The
547    # daemon process can just drop stdout/stderr into /dev/null.
548    d = Daemon('/dev/null', os.path.abspath(options.pidfile),
549               options.umask, bdec)
550    if options.daemon:
551        # Daemonize the process and call sys.exit() with appropriate code
552        d.daemonize_exit()
553    else:
554        # Just run in the foreground (the default)
555        d.foreground()
556
557
558if __name__ == "__main__":
559    main(sys.argv[1:])
560