1#!/usr/bin/env python 2# encoding: UTF-8 3# 4# Licensed to the Apache Software Foundation (ASF) under one or more 5# contributor license agreements. See the NOTICE file distributed with 6# this work for additional information regarding copyright ownership. 7# The ASF licenses this file to You under the Apache License, Version 2.0 8# (the "License"); you may not use this file except in compliance with 9# the License. You may obtain a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, software 14# distributed under the License is distributed on an "AS IS" BASIS, 15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16# See the License for the specific language governing permissions and 17# limitations under the License. 18# 19 20# 21# SvnWcSub - Subscribe to a SvnPubSub stream, and keep a set of working copy 22# paths in sync 23# 24# Example: 25# svnwcsub.py svnwcsub.conf 26# 27# On startup svnwcsub checks the working copy's path, runs a single svn update 28# and then watches for changes to that path. 29# 30# See svnwcsub.conf for more information on its contents. 31# 32 33# TODO: 34# - bulk update at startup time to avoid backlog warnings 35# - fold BigDoEverythingClasss ("BDEC") into Daemon 36# - fold WorkingCopy._get_match() into __init__ 37# - remove wc_ready(). assume all WorkingCopy instances are usable. 38# place the instances into .watch at creation. the .update_applies() 39# just returns if the wc is disabled (eg. could not find wc dir) 40# - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER 41# (a base path exclusion list should work for the ASF) 42# - add support for SIGHUP to reread the config and reinitialize working copies 43# - joes will write documentation for svnpubsub as these items become fulfilled 44# - make LOGLEVEL configurable 45 46import errno 47import subprocess 48import threading 49import sys 50import stat 51import os 52import re 53import posixpath 54try: 55 import ConfigParser 56except ImportError: 57 import configparser as ConfigParser 58import time 59import logging.handlers 60try: 61 import Queue 62except ImportError: 63 import queue as Queue 64import optparse 65import functools 66try: 67 import urlparse 68except ImportError: 69 import urllib.parse as urlparse 70 71import daemonize 72import svnpubsub.client 73import svnpubsub.util 74 75assert hasattr(subprocess, 'check_call') 76def check_call(*args, **kwds): 77 """Wrapper around subprocess.check_call() that logs stderr upon failure, 78 with an optional list of exit codes to consider non-failure.""" 79 assert 'stderr' not in kwds 80 if '__okayexits' in kwds: 81 __okayexits = kwds['__okayexits'] 82 del kwds['__okayexits'] 83 else: 84 __okayexits = set([0]) # EXIT_SUCCESS 85 kwds.update(stderr=subprocess.PIPE) 86 pipe = subprocess.Popen(*args, **kwds) 87 output, errput = pipe.communicate() 88 if pipe.returncode not in __okayexits: 89 cmd = args[0] if len(args) else kwds.get('args', '(no command)') 90 # TODO: log stdout too? 91 logging.error('Command failed: returncode=%d command=%r stderr=%r', 92 pipe.returncode, cmd, errput) 93 raise subprocess.CalledProcessError(pipe.returncode, args) 94 return pipe.returncode # is EXIT_OK 95 96### note: this runs synchronously. within the current Twisted environment, 97### it is called from ._get_match() which is run on a thread so it won't 98### block the Twisted main loop. 99def svn_info(svnbin, env, path): 100 "Run 'svn info' on the target path, returning a dict of info data." 101 args = [svnbin, "info", "--non-interactive", "--", path] 102 output = svnpubsub.util.check_output(args, env=env).strip() 103 info = { } 104 for line in output.split('\n'): 105 idx = line.index(':') 106 info[line[:idx]] = line[idx+1:].strip() 107 return info 108 109try: 110 import glob 111 glob.iglob 112 def is_emptydir(path): 113 # ### If the directory contains only dotfile children, this will readdir() 114 # ### the entire directory. But os.readdir() is not exposed to us... 115 for x in glob.iglob('%s/*' % path): 116 return False 117 for x in glob.iglob('%s/.*' % path): 118 return False 119 return True 120except (ImportError, AttributeError): 121 # Python ≤2.4 122 def is_emptydir(path): 123 # This will read the entire directory list to memory. 124 return not os.listdir(path) 125 126class WorkingCopy(object): 127 def __init__(self, bdec, path, url): 128 self.path = path 129 self.url = url 130 131 try: 132 self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env) 133 bdec.wc_ready(self) 134 except: 135 logging.exception('problem with working copy: %s', path) 136 137 def update_applies(self, uuid, path): 138 if self.uuid != uuid: 139 return False 140 141 path = str(path) 142 if path == self.match: 143 #print "ua: Simple match" 144 # easy case. woo. 145 return True 146 if len(path) < len(self.match): 147 # path is potentially a parent directory of match? 148 #print "ua: parent check" 149 if self.match[0:len(path)] == path: 150 return True 151 if len(path) > len(self.match): 152 # path is potentially a sub directory of match 153 #print "ua: sub dir check" 154 if path[0:len(self.match)] == self.match: 155 return True 156 return False 157 158 def _get_match(self, svnbin, env): 159 ### quick little hack to auto-checkout missing working copies 160 dotsvn = os.path.join(self.path, ".svn") 161 if not os.path.isdir(dotsvn) or is_emptydir(dotsvn): 162 logging.info("autopopulate %s from %s" % (self.path, self.url)) 163 check_call([svnbin, 'co', '-q', 164 '--force', 165 '--non-interactive', 166 '--config-option', 167 'config:miscellany:use-commit-times=on', 168 '--', self.url, self.path], 169 env=env) 170 171 # Fetch the info for matching dirs_changed against this WC 172 info = svn_info(svnbin, env, self.path) 173 root = info['Repository Root'] 174 url = info['URL'] 175 relpath = url[len(root):] # also has leading '/' 176 uuid = info['Repository UUID'] 177 return str(relpath), uuid 178 179 180PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/") 181 182class BigDoEverythingClasss(object): 183 def __init__(self, config): 184 self.svnbin = config.get_value('svnbin') 185 self.env = config.get_env() 186 self.tracking = config.get_track() 187 self.hook = config.get_optional_value('hook') 188 self.streams = config.get_value('streams').split() 189 self.worker = BackgroundWorker(self.svnbin, self.env, self.hook) 190 self.watch = [ ] 191 192 def start(self): 193 for path, url in self.tracking.items(): 194 # working copies auto-register with the BDEC when they are ready. 195 WorkingCopy(self, path, url) 196 197 def wc_ready(self, wc): 198 # called when a working copy object has its basic info/url, 199 # Add it to our watchers, and trigger an svn update. 200 logging.info("Watching WC at %s <-> %s" % (wc.path, wc.url)) 201 self.watch.append(wc) 202 self.worker.add_work(OP_BOOT, wc) 203 204 def _normalize_path(self, path): 205 if path[0] != '/': 206 return "/" + path 207 return posixpath.abspath(path) 208 209 def commit(self, url, commit): 210 if commit.type != 'svn' or commit.format != 1: 211 logging.info("SKIP unknown commit format (%s.%d)", 212 commit.type, commit.format) 213 return 214 logging.info("COMMIT r%d (%d paths) from %s" 215 % (commit.id, len(commit.changed), url)) 216 217 paths = map(self._normalize_path, commit.changed) 218 if len(paths): 219 pre = posixpath.commonprefix(paths) 220 if pre == "/websites/": 221 # special case for svnmucc "dynamic content" buildbot commits 222 # just take the first production path to avoid updating all cms working copies 223 for p in paths: 224 m = PRODUCTION_RE_FILTER.match(p) 225 if m: 226 pre = m.group(0) 227 break 228 229 #print "Common Prefix: %s" % (pre) 230 wcs = [wc for wc in self.watch if wc.update_applies(commit.repository, pre)] 231 logging.info("Updating %d WC for r%d" % (len(wcs), commit.id)) 232 for wc in wcs: 233 self.worker.add_work(OP_UPDATE, wc) 234 235 236# Start logging warnings if the work backlog reaches this many items 237BACKLOG_TOO_HIGH = 20 238OP_BOOT = 'boot' 239OP_UPDATE = 'update' 240OP_CLEANUP = 'cleanup' 241 242class BackgroundWorker(threading.Thread): 243 def __init__(self, svnbin, env, hook): 244 threading.Thread.__init__(self) 245 246 # The main thread/process should not wait for this thread to exit. 247 ### compat with Python 2.5 248 self.setDaemon(True) 249 250 self.svnbin = svnbin 251 self.env = env 252 self.hook = hook 253 self.q = Queue.Queue() 254 255 self.has_started = False 256 257 def run(self): 258 while True: 259 # This will block until something arrives 260 operation, wc = self.q.get() 261 262 # Warn if the queue is too long. 263 # (Note: the other thread might have added entries to self.q 264 # after the .get() and before the .qsize().) 265 qsize = self.q.qsize()+1 266 if operation != OP_BOOT and qsize > BACKLOG_TOO_HIGH: 267 logging.warn('worker backlog is at %d', qsize) 268 269 try: 270 if operation == OP_UPDATE: 271 self._update(wc) 272 elif operation == OP_BOOT: 273 self._update(wc, boot=True) 274 elif operation == OP_CLEANUP: 275 self._cleanup(wc) 276 else: 277 logging.critical('unknown operation: %s', operation) 278 except: 279 logging.exception('exception in worker') 280 281 # In case we ever want to .join() against the work queue 282 self.q.task_done() 283 284 def add_work(self, operation, wc): 285 # Start the thread when work first arrives. Thread-start needs to 286 # be delayed in case the process forks itself to become a daemon. 287 if not self.has_started: 288 self.start() 289 self.has_started = True 290 291 self.q.put((operation, wc)) 292 293 def _update(self, wc, boot=False): 294 "Update the specified working copy." 295 296 # For giggles, let's clean up the working copy in case something 297 # happened earlier. 298 self._cleanup(wc) 299 300 logging.info("updating: %s", wc.path) 301 302 ## Run the hook 303 HEAD = svn_info(self.svnbin, self.env, wc.url)['Revision'] 304 if self.hook: 305 hook_mode = ['pre-update', 'pre-boot'][boot] 306 logging.info('running hook: %s at %s', 307 wc.path, hook_mode) 308 args = [self.hook, hook_mode, wc.path, HEAD, wc.url] 309 rc = check_call(args, env=self.env, __okayexits=[0, 1]) 310 if rc == 1: 311 # TODO: log stderr 312 logging.warn('hook denied update of %s at %s', 313 wc.path, hook_mode) 314 return 315 del rc 316 317 ### we need to move some of these args into the config. these are 318 ### still specific to the ASF setup. 319 args = [self.svnbin, 'switch', 320 '--quiet', 321 '--non-interactive', 322 '--trust-server-cert', 323 '--ignore-externals', 324 '--config-option', 325 'config:miscellany:use-commit-times=on', 326 '--', 327 wc.url + '@' + HEAD, 328 wc.path] 329 check_call(args, env=self.env) 330 331 ### check the loglevel before running 'svn info'? 332 info = svn_info(self.svnbin, self.env, wc.path) 333 assert info['Revision'] == HEAD 334 logging.info("updated: %s now at r%s", wc.path, info['Revision']) 335 336 ## Run the hook 337 if self.hook: 338 hook_mode = ['post-update', 'boot'][boot] 339 logging.info('running hook: %s at revision %s due to %s', 340 wc.path, info['Revision'], hook_mode) 341 args = [self.hook, hook_mode, 342 wc.path, info['Revision'], wc.url] 343 check_call(args, env=self.env) 344 345 def _cleanup(self, wc): 346 "Run a cleanup on the specified working copy." 347 348 ### we need to move some of these args into the config. these are 349 ### still specific to the ASF setup. 350 args = [self.svnbin, 'cleanup', 351 '--non-interactive', 352 '--trust-server-cert', 353 '--config-option', 354 'config:miscellany:use-commit-times=on', 355 wc.path] 356 check_call(args, env=self.env) 357 358 359class ReloadableConfig(ConfigParser.SafeConfigParser): 360 def __init__(self, fname): 361 ConfigParser.SafeConfigParser.__init__(self) 362 363 self.fname = fname 364 self.read(fname) 365 366 ### install a signal handler to set SHOULD_RELOAD. BDEC should 367 ### poll this flag, and then adjust its internal structures after 368 ### the reload. 369 self.should_reload = False 370 371 def reload(self): 372 # Delete everything. Just re-reading would overlay, and would not 373 # remove sections/options. Note that [DEFAULT] will not be removed. 374 for section in self.sections(): 375 self.remove_section(section) 376 377 # Now re-read the configuration file. 378 self.read(fname) 379 380 def get_value(self, which): 381 return self.get(ConfigParser.DEFAULTSECT, which) 382 383 def get_optional_value(self, which, default=None): 384 if self.has_option(ConfigParser.DEFAULTSECT, which): 385 return self.get(ConfigParser.DEFAULTSECT, which) 386 else: 387 return default 388 389 def get_env(self): 390 env = os.environ.copy() 391 default_options = self.defaults().keys() 392 for name, value in self.items('env'): 393 if name not in default_options: 394 env[name] = value 395 return env 396 397 def get_track(self): 398 "Return the {PATH: URL} dictionary of working copies to track." 399 track = dict(self.items('track')) 400 for name in self.defaults().keys(): 401 del track[name] 402 return track 403 404 def optionxform(self, option): 405 # Do not lowercase the option name. 406 return str(option) 407 408 409class Daemon(daemonize.Daemon): 410 def __init__(self, logfile, pidfile, umask, bdec): 411 daemonize.Daemon.__init__(self, logfile, pidfile) 412 413 self.umask = umask 414 self.bdec = bdec 415 416 def setup(self): 417 # There is no setup which the parent needs to wait for. 418 pass 419 420 def run(self): 421 logging.info('svnwcsub started, pid=%d', os.getpid()) 422 423 # Set the umask in the daemon process. Defaults to 000 for 424 # daemonized processes. Foreground processes simply inherit 425 # the value from the parent process. 426 if self.umask is not None: 427 umask = int(self.umask, 8) 428 os.umask(umask) 429 logging.info('umask set to %03o', umask) 430 431 # Start the BDEC (on the main thread), then start the client 432 self.bdec.start() 433 434 mc = svnpubsub.client.MultiClient(self.bdec.streams, 435 self.bdec.commit, 436 self._event) 437 mc.run_forever() 438 439 def _event(self, url, event_name, event_arg): 440 if event_name == 'error': 441 logging.exception('from %s', url) 442 elif event_name == 'ping': 443 logging.debug('ping from %s', url) 444 else: 445 logging.info('"%s" from %s', event_name, url) 446 447 448def prepare_logging(logfile): 449 "Log to the specified file, or to stdout if None." 450 451 if logfile: 452 # Rotate logs daily, keeping 7 days worth. 453 handler = logging.handlers.TimedRotatingFileHandler( 454 logfile, when='midnight', backupCount=7, 455 ) 456 else: 457 handler = logging.StreamHandler(sys.stdout) 458 459 # Add a timestamp to the log records 460 formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', 461 '%Y-%m-%d %H:%M:%S') 462 handler.setFormatter(formatter) 463 464 # Apply the handler to the root logger 465 root = logging.getLogger() 466 root.addHandler(handler) 467 468 ### use logging.INFO for now. switch to cmdline option or a config? 469 root.setLevel(logging.INFO) 470 471 472def handle_options(options): 473 # Set up the logging, then process the rest of the options. 474 prepare_logging(options.logfile) 475 476 # In daemon mode, we let the daemonize module handle the pidfile. 477 # Otherwise, we should write this (foreground) PID into the file. 478 if options.pidfile and not options.daemon: 479 pid = os.getpid() 480 # Be wary of symlink attacks 481 try: 482 os.remove(options.pidfile) 483 except OSError: 484 pass 485 fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 486 stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) 487 os.write(fd, '%d\n' % pid) 488 os.close(fd) 489 logging.info('pid %d written to %s', pid, options.pidfile) 490 491 if options.gid: 492 try: 493 gid = int(options.gid) 494 except ValueError: 495 import grp 496 gid = grp.getgrnam(options.gid)[2] 497 logging.info('setting gid %d', gid) 498 os.setgid(gid) 499 500 if options.uid: 501 try: 502 uid = int(options.uid) 503 except ValueError: 504 import pwd 505 uid = pwd.getpwnam(options.uid)[2] 506 logging.info('setting uid %d', uid) 507 os.setuid(uid) 508 509 510def main(args): 511 parser = optparse.OptionParser( 512 description='An SvnPubSub client to keep working copies synchronized ' 513 'with a repository.', 514 usage='Usage: %prog [options] CONFIG_FILE', 515 ) 516 parser.add_option('--logfile', 517 help='filename for logging') 518 parser.add_option('--pidfile', 519 help="the process' PID will be written to this file") 520 parser.add_option('--uid', 521 help='switch to this UID before running') 522 parser.add_option('--gid', 523 help='switch to this GID before running') 524 parser.add_option('--umask', 525 help='set this (octal) umask before running') 526 parser.add_option('--daemon', action='store_true', 527 help='run as a background daemon') 528 529 options, extra = parser.parse_args(args) 530 531 if len(extra) != 1: 532 parser.error('CONFIG_FILE is required') 533 config_file = extra[0] 534 535 if options.daemon and not options.logfile: 536 parser.error('LOGFILE is required when running as a daemon') 537 if options.daemon and not options.pidfile: 538 parser.error('PIDFILE is required when running as a daemon') 539 540 # Process any provided options. 541 handle_options(options) 542 543 c = ReloadableConfig(config_file) 544 bdec = BigDoEverythingClasss(c) 545 546 # We manage the logfile ourselves (along with possible rotation). The 547 # daemon process can just drop stdout/stderr into /dev/null. 548 d = Daemon('/dev/null', os.path.abspath(options.pidfile), 549 options.umask, bdec) 550 if options.daemon: 551 # Daemonize the process and call sys.exit() with appropriate code 552 d.daemonize_exit() 553 else: 554 # Just run in the foreground (the default) 555 d.foreground() 556 557 558if __name__ == "__main__": 559 main(sys.argv[1:]) 560