1from __future__ import unicode_literals 2from builtins import str 3from builtins import object 4 5import copy 6import multiprocessing 7import time 8 9from pkg_resources import iter_entry_points 10 11from dateutil.parser import parse as parse_date 12from dateutil.tz import tzlocal 13from jinja2 import Template 14import pytz 15import six 16 17from taskw.task import Task 18 19from bugwarrior.config import asbool, asint, aslist, die, get_service_password, ServiceConfig 20from bugwarrior.db import MARKUP, URLShortener 21 22import logging 23log = logging.getLogger(__name__) 24 25 26# Sentinels for process completion status 27SERVICE_FINISHED_OK = 0 28SERVICE_FINISHED_ERROR = 1 29 30# Used by `parse_date` as a timezone when you would like a naive 31# date string to be parsed as if it were in your local timezone 32LOCAL_TIMEZONE = 'LOCAL_TIMEZONE' 33 34def get_service(service_name): 35 epoint = iter_entry_points(group='bugwarrior.service', name=service_name) 36 try: 37 epoint = next(epoint) 38 except StopIteration: 39 return None 40 41 return epoint.load() 42 43 44class IssueService(object): 45 """ Abstract base class for each service """ 46 # Which class should this service instantiate for holding these issues? 47 ISSUE_CLASS = None 48 # What prefix should we use for this service's configuration values 49 CONFIG_PREFIX = '' 50 51 def __init__(self, main_config, main_section, target): 52 self.config = ServiceConfig(self.CONFIG_PREFIX, main_config, target) 53 self.main_section = main_section 54 self.main_config = main_config 55 self.target = target 56 57 self.desc_len = self._get_config_or_default('description_length', 35, asint); 58 self.anno_len = self._get_config_or_default('annotation_length', 45, asint); 59 self.inline_links = self._get_config_or_default('inline_links', True, asbool); 60 self.annotation_links = self._get_config_or_default('annotation_links', not self.inline_links, asbool) 61 self.annotation_comments = self._get_config_or_default('annotation_comments', True, asbool) 62 self.annotation_newlines = self._get_config_or_default('annotation_newlines', False, asbool) 63 self.shorten = self._get_config_or_default('shorten', False, asbool) 64 65 self.default_priority = self.config.get('default_priority', 'M') 66 67 self.add_tags = [] 68 for raw_option in aslist(self.config.get('add_tags', '')): 69 option = raw_option.strip(' +;') 70 if option: 71 self.add_tags.append(option) 72 73 log.info("Working on [%s]", self.target) 74 75 76 def _get_config_or_default(self, key, default, as_type=lambda x: x): 77 """Return a main config value, or default if it does not exist.""" 78 79 if self.main_config.has_option(self.main_section, key): 80 return as_type(self.main_config.get(self.main_section, key)) 81 return default 82 83 84 def get_templates(self): 85 """ Get any defined templates for configuration values. 86 87 Users can override the value of any Taskwarrior field using 88 this feature on a per-key basis. The key should be the name of 89 the field to you would like to configure the value of, followed 90 by '_template', and the value should be a Jinja template 91 generating the field's value. As context variables, all fields 92 on the taskwarrior record are available. 93 94 For example, to prefix the returned 95 project name for tickets returned by a service with 'workproject_', 96 you could add an entry reading: 97 98 project_template = workproject_{{project}} 99 100 Or, if you'd simply like to override the returned project name 101 for all tickets incoming from a specific service, you could add 102 an entry like: 103 104 project_template = myprojectname 105 106 The above would cause all issues to recieve a project name 107 of 'myprojectname', regardless of what the project name of the 108 generated issue was. 109 110 """ 111 templates = {} 112 for key in six.iterkeys(Task.FIELDS): 113 template_key = '%s_template' % key 114 if template_key in self.config: 115 templates[key] = self.config.get(template_key) 116 return templates 117 118 def get_password(self, key, login='nousername'): 119 password = self.config.get(key) 120 keyring_service = self.get_keyring_service(self.config) 121 if not password or password.startswith("@oracle:"): 122 password = get_service_password( 123 keyring_service, login, oracle=password, 124 interactive=self.config.interactive) 125 return password 126 127 def get_service_metadata(self): 128 return {} 129 130 def get_issue_for_record(self, record, extra=None): 131 origin = { 132 'annotation_length': self.anno_len, 133 'default_priority': self.default_priority, 134 'description_length': self.desc_len, 135 'templates': self.get_templates(), 136 'target': self.target, 137 'shorten': self.shorten, 138 'inline_links': self.inline_links, 139 'add_tags': self.add_tags, 140 } 141 origin.update(self.get_service_metadata()) 142 return self.ISSUE_CLASS(record, origin=origin, extra=extra) 143 144 def build_annotations(self, annotations, url): 145 final = [] 146 if self.annotation_links: 147 final.append(url) 148 if self.annotation_comments: 149 for author, message in annotations: 150 message = message.strip() 151 if not message or not author: 152 continue 153 154 if not self.annotation_newlines: 155 message = message.replace('\n', '').replace('\r', '') 156 157 if self.anno_len: 158 message = '%s%s' % ( 159 message[:self.anno_len], 160 '...' if len(message) > self.anno_len else '' 161 ) 162 final.append('@%s - %s' % (author, message)) 163 return final 164 165 @classmethod 166 def validate_config(cls, service_config, target): 167 """ Validate generic options for a particular target """ 168 if service_config.has_option(target, 'only_if_assigned'): 169 die("[%s] has an 'only_if_assigned' option. Should be " 170 "'%s.only_if_assigned'." % (target, cls.CONFIG_PREFIX)) 171 if service_config.has_option(target, 'also_unassigned'): 172 die("[%s] has an 'also_unassigned' option. Should be " 173 "'%s.also_unassigned'." % (target, cls.CONFIG_PREFIX)) 174 if service_config.has_option(target, 'default_priority'): 175 die("[%s] has a 'default_priority' option. Should be " 176 "'%s.default_priority'." % (target, cls.CONFIG_PREFIX)) 177 if service_config.has_option(target, 'add_tags'): 178 die("[%s] has an 'add_tags' option. Should be " 179 "'%s.add_tags'." % (target, cls.CONFIG_PREFIX)) 180 181 def include(self, issue): 182 """ Return true if the issue in question should be included """ 183 only_if_assigned = self.config.get('only_if_assigned', None) 184 185 if only_if_assigned: 186 owner = self.get_owner(issue) 187 include_owners = [only_if_assigned] 188 189 if self.config.get('also_unassigned', None, asbool): 190 include_owners.append(None) 191 192 return owner in include_owners 193 194 only_if_author = self.config.get('only_if_author', None) 195 196 if only_if_author: 197 return self.get_author(issue) == only_if_author 198 199 return True 200 201 def get_owner(self, issue): 202 """ Override this for filtering on tickets """ 203 raise NotImplementedError() 204 205 def get_author(self, issue): 206 """ Override this for filtering on tickets """ 207 raise NotImplementedError() 208 209 def issues(self): 210 """ Returns a list of dicts representing issues from a remote service. 211 212 This is the main place to begin if you are implementing a new service 213 for bugwarrior. Override this to gather issues for each service. 214 215 Each item in the list should be a dict that looks something like this: 216 217 { 218 "description": "Some description of the issue", 219 "project": "some_project", 220 "priority": "H", 221 "annotations": [ 222 "This is an annotation", 223 "This is another annotation", 224 ] 225 } 226 227 228 The description can be 'anything' but must be consistent and unique for 229 issues you're pulling from a remote service. You can and should use 230 the ``.description(...)`` method to help format your descriptions. 231 232 The project should be a string and may be anything you like. 233 234 The priority should be one of "H", "M", or "L". 235 """ 236 raise NotImplementedError() 237 238 @staticmethod 239 def get_keyring_service(service_config): 240 """ Given the keyring service name for this service. """ 241 raise NotImplementedError 242 243 244@six.python_2_unicode_compatible 245class Issue(object): 246 # Set to a dictionary mapping UDA short names with type and long name. 247 # 248 # Example:: 249 # 250 # { 251 # 'project_id': { 252 # 'type': 'string', 253 # 'label': 'Project ID', 254 # }, 255 # 'ticket_number': { 256 # 'type': 'number', 257 # 'label': 'Ticket Number', 258 # }, 259 # } 260 # 261 # Note: For best results, dictionary keys should be unique! 262 UDAS = {} 263 # Should be a tuple of field names (can be UDA names) that are usable for 264 # uniquely identifying an issue in the foreign system. 265 UNIQUE_KEY = [] 266 # Should be a dictionary of value-to-level mappings between the foreign 267 # system and the string values 'H', 'M' or 'L'. 268 PRIORITY_MAP = {} 269 270 def __init__(self, foreign_record, origin=None, extra=None): 271 self._foreign_record = foreign_record 272 self._origin = origin if origin else {} 273 self._extra = extra if extra else {} 274 275 def update_extra(self, extra): 276 self._extra.update(extra) 277 278 def to_taskwarrior(self): 279 """ Transform a foreign record into a taskwarrior dictionary.""" 280 raise NotImplementedError() 281 282 def get_default_description(self): 283 """ Return the old-style verbose description from bugwarrior. 284 285 This is useful for two purposes: 286 287 * Finding and linking historically-created records. 288 * Allowing people to keep using the historical description 289 for taskwarrior. 290 291 """ 292 raise NotImplementedError() 293 294 def get_added_tags(self): 295 added_tags = [] 296 for tag in self.origin['add_tags']: 297 tag = Template(tag).render(self.get_template_context()) 298 if tag: 299 added_tags.append(tag) 300 301 return added_tags 302 303 def get_taskwarrior_record(self, refined=True): 304 if not getattr(self, '_taskwarrior_record', None): 305 self._taskwarrior_record = self.to_taskwarrior() 306 record = copy.deepcopy(self._taskwarrior_record) 307 if refined: 308 record = self.refine_record(record) 309 if not 'tags' in record: 310 record['tags'] = [] 311 if refined: 312 record['tags'].extend(self.get_added_tags()) 313 return record 314 315 def get_priority(self): 316 return self.PRIORITY_MAP.get( 317 self.record.get('priority'), 318 self.origin['default_priority'] 319 ) 320 321 def get_processed_url(self, url): 322 """ Returns a URL with conditional processing. 323 324 If the following config key are set: 325 326 - [general]shorten 327 328 returns a shortened URL; otherwise returns the URL unaltered. 329 330 """ 331 if self.origin['shorten']: 332 return URLShortener().shorten(url) 333 return url 334 335 def parse_date(self, date, timezone='UTC'): 336 """ Parse a date string into a datetime object. 337 338 :param `date`: A time string parseable by `dateutil.parser.parse` 339 :param `timezone`: The string timezone name (from `pytz.all_timezones`) 340 to use as a default should the parsed time string not include 341 timezone information. 342 343 """ 344 if date: 345 date = parse_date(date) 346 if not date.tzinfo: 347 if timezone == LOCAL_TIMEZONE: 348 tzinfo = tzlocal() 349 else: 350 tzinfo = pytz.timezone(timezone) 351 date = date.replace(tzinfo=tzinfo) 352 return date 353 return None 354 355 def build_default_description( 356 self, title='', url='', number='', cls="issue" 357 ): 358 cls_markup = { 359 'issue': 'Is', 360 'pull_request': 'PR', 361 'merge_request': 'MR', 362 'todo': '', 363 'task': '', 364 'subtask': 'Subtask #', 365 } 366 url_separator = ' .. ' 367 url = url if self.origin['inline_links'] else '' 368 desc_len = self.origin['description_length'] 369 return u"%s%s#%s - %s%s%s" % ( 370 MARKUP, 371 cls_markup[cls], 372 number, 373 title[:desc_len] if desc_len else title, 374 url_separator if url else '', 375 url, 376 ) 377 378 def _get_unique_identifier(self): 379 record = self.get_taskwarrior_record() 380 return dict([ 381 (key, record[key],) for key in self.UNIQUE_KEY 382 ]) 383 384 def get_template_context(self): 385 context = ( 386 self.get_taskwarrior_record(refined=False).copy() 387 ) 388 context.update(self.extra) 389 context.update({ 390 'description': self.get_default_description(), 391 }) 392 return context 393 394 def refine_record(self, record): 395 for field in six.iterkeys(Task.FIELDS): 396 if field in self.origin['templates']: 397 template = Template(self.origin['templates'][field]) 398 record[field] = template.render(self.get_template_context()) 399 elif hasattr(self, 'get_default_%s' % field): 400 record[field] = getattr(self, 'get_default_%s' % field)() 401 return record 402 403 def __iter__(self): 404 record = self.get_taskwarrior_record() 405 for key in six.iterkeys(record): 406 yield key 407 408 def keys(self): 409 return list(self.__iter__()) 410 411 def iterkeys(self): 412 return self.__iter__() 413 414 def items(self): 415 record = self.get_taskwarrior_record() 416 return list(six.iteritems(record)) 417 418 def iteritems(self): 419 record = self.get_taskwarrior_record() 420 for item in six.iteritems(record): 421 yield item 422 423 def update(self, *args): 424 raise AttributeError( 425 "You cannot set attributes on issues." 426 ) 427 428 def get(self, attribute, default=None): 429 try: 430 return self[attribute] 431 except KeyError: 432 return default 433 434 def __getitem__(self, attribute): 435 record = self.get_taskwarrior_record() 436 return record[attribute] 437 438 def __setitem__(self, attribute, value): 439 raise AttributeError( 440 "You cannot set attributes on issues." 441 ) 442 443 def __delitem__(self, attribute): 444 raise AttributeError( 445 "You cannot delete attributes from issues." 446 ) 447 448 @property 449 def record(self): 450 return self._foreign_record 451 452 @property 453 def extra(self): 454 return self._extra 455 456 @property 457 def origin(self): 458 return self._origin 459 460 def __str__(self): 461 return '%s: %s' % ( 462 self.origin['target'], 463 self.get_taskwarrior_record()['description'] 464 ) 465 466 def __repr__(self): 467 return '<%s>' % str(self) 468 469 470class ServiceClient(object): 471 """ Abstract class responsible for making requests to service API's. """ 472 @staticmethod 473 def json_response(response): 474 # If we didn't get good results, just bail. 475 if response.status_code != 200: 476 raise IOError( 477 "Non-200 status code %r; %r; %r" % ( 478 response.status_code, response.url, response.text, 479 )) 480 if callable(response.json): 481 # Newer python-requests 482 return response.json() 483 else: 484 # Older python-requests 485 return response.json 486 487 488def _aggregate_issues(conf, main_section, target, queue, service_name): 489 """ This worker function is separated out from the main 490 :func:`aggregate_issues` func only so that we can use multiprocessing 491 on it for speed reasons. 492 """ 493 494 start = time.time() 495 496 try: 497 service = get_service(service_name)(conf, main_section, target) 498 issue_count = 0 499 for issue in service.issues(): 500 queue.put(issue) 501 issue_count += 1 502 except SystemExit as e: 503 log.critical(str(e)) 504 queue.put((SERVICE_FINISHED_ERROR, (target, e))) 505 except BaseException as e: 506 if hasattr(e, 'request') and e.request: 507 # Exceptions raised by requests library have the HTTP request 508 # object stored as attribute. The request can have hooks attached 509 # to it, and we need to remove them, as there can be unpickleable 510 # methods. There is no one left to call these hooks anyway. 511 e.request.hooks = {} 512 log.exception("Worker for [%s] failed: %s" % (target, e)) 513 queue.put((SERVICE_FINISHED_ERROR, (target, e))) 514 else: 515 queue.put((SERVICE_FINISHED_OK, (target, issue_count, ))) 516 finally: 517 duration = time.time() - start 518 log.info("Done with [%s] in %fs" % (target, duration)) 519 520 521def aggregate_issues(conf, main_section, debug): 522 """ Return all issues from every target. """ 523 log.info("Starting to aggregate remote issues.") 524 525 # Create and call service objects for every target in the config 526 targets = aslist(conf.get(main_section, 'targets')) 527 528 queue = multiprocessing.Queue() 529 530 log.info("Spawning %i workers." % len(targets)) 531 processes = [] 532 533 if debug: 534 for target in targets: 535 _aggregate_issues( 536 conf, 537 main_section, 538 target, 539 queue, 540 conf.get(target, 'service') 541 ) 542 else: 543 for target in targets: 544 proc = multiprocessing.Process( 545 target=_aggregate_issues, 546 args=(conf, main_section, target, queue, conf.get(target, 'service')) 547 ) 548 proc.start() 549 processes.append(proc) 550 551 # Sleep for 1 second here to try and avoid a race condition where 552 # all N workers start up and ask the gpg-agent process for 553 # information at the same time. This causes gpg-agent to fumble 554 # and tell some of our workers some incomplete things. 555 time.sleep(1) 556 557 currently_running = len(targets) 558 while currently_running > 0: 559 issue = queue.get(True) 560 if isinstance(issue, tuple): 561 completion_type, args = issue 562 if completion_type == SERVICE_FINISHED_ERROR: 563 target, e = args 564 log.info("Terminating workers") 565 for process in processes: 566 process.terminate() 567 raise RuntimeError( 568 "critical error in target '{}'".format(target)) 569 currently_running -= 1 570 continue 571 yield issue 572 573 log.info("Done aggregating remote issues.") 574