1################################################################################################### 2# 3# pyral.response - defines a class to hold the response info from a Rally REST operation 4# with reasonably named accessors and an iterator interface to obtain 5# entity instances from the results 6# 7# dependencies: 8# intra-package: hydrator.EntityHydrator 9# 10################################################################################################### 11 12__version__ = (1, 5, 2) 13 14import sys 15import re 16import json 17import copy 18import time 19from pprint import pprint 20 21from .hydrate import EntityHydrator 22from .cargotruck import CargoTruck 23 24__all__ = ['RallyRESTResponse', 'ErrorResponse', 'RallyResponseError'] 25 26################################################################################################## 27 28errout = sys.stderr.write 29 30################################################################################################## 31 32class RallyResponseError(Exception): pass 33 34class ErrorResponse(object): 35 36 SECURITY_ERROR = 'An Authentication object was not found in the SecurityContext' 37 INVALID_CREDENTIALS = "Error 401 The username or password you entered is incorrect." 38 39 def __init__(self, status_code, problem): 40 self.status_code = status_code 41 self.headers = {'placeholder' : True} 42 self.content = {'OperationResult' : {'Errors' : [problem], 43 'Warnings': [], 44 'Results' : [], 45 } 46 } 47 48 prob_str = str(problem) 49 if ErrorResponse.SECURITY_ERROR in prob_str: 50 self.content['OperationResult']['Errors'] = [ErrorResponse.SECURITY_ERROR] 51 52 if ErrorResponse.INVALID_CREDENTIALS in prob_str: 53 self.content['OperationResult']['Errors'] = [ErrorResponse.INVALID_CREDENTIALS] 54 55################################################################################################## 56 57class RallyRESTResponse(object): 58 """ 59 An instance of this class is used to wrap the response from the Rally REST endpoint. 60 Part of the wrapping includes an iterator based interface to the collection of records 61 returned by the request (for GET). 62 """ 63 64 def __init__(self, session, context, request, response, hydration, limit, **kwargs): 65 """ 66 A wrapper for the response received back from the REST API. 67 The response has status_code, headers and content attributes which will be preserved. 68 In addition, we augment with some easy access attributes into the data attribute 69 and provide an iterator protocol to obtain hydrated instances with information 70 contained in the response['QueryResult']['Results'] list. 71 """ 72 self.session = session 73 self.context = context 74 self.resource = request 75 self.threads = kwargs['threads'] if 'threads' in kwargs else 0 76 self.debug = kwargs['debug'] if 'debug' in kwargs else False 77 self.data = None 78 request_path_elements = request.split('?')[0].split('/') 79## 80## print("RRR.init request: = %s " % request) 81## print("RRR.init request_path_elements = %s " % repr(request_path_elements)) 82## 83 self.target = request_path_elements[-1] 84 if re.match(r'^\d+$', self.target): 85 self.target = request_path_elements[-2] # this happens on request for RevisionHistory 86 if self.target.lower() == 'revisionhistory': 87 self.target = 'RevisionHistory' 88 89 self._served = -1 90 self.errors = [] 91 self.warnings = [] 92 self._item_type = self.target 93## 94## print("+" * 85) 95## print("resource: ", self.resource) 96## print("response is a %s" % type(response)) 97## # with attributes of status_code, content, data 98## # The content is a string that needs to be turned into a json object 99## # the json dict should have a key named 'QueryResult' or 'CreateResult' or 100## # 'DeleteResult' or 'UpdateResult' or 'OperationResult' 101## 102 self.status_code = response.status_code 103 self.headers = response.headers 104## 105## print("RallyRESTResponse.status_code is %s" % self.status_code) 106## print("RallyRESTResponse.headers: %s" % repr(self.headers)) 107## # response has these keys: url, status_code, headers, raw, _content, encoding, reason, elapsed, history, connection 108## 109## if self.status_code == 405: 110## print("RallyRESTResponse.status_code is %s" % self.status_code) 111## print(response.content) 112## print("x" * 80) 113## 114 if isinstance(response, ErrorResponse): 115 if 'OperationResult' in response.content: 116 if 'Errors' in response.content['OperationResult']: 117 self.errors = response.content['OperationResult']['Errors'] 118 return 119 120 self._stdFormat = True 121 try: 122 self.content = response.json() 123 except: 124 problem = "Response for request: {0} either was not JSON content or was an invalidly formed/incomplete JSON structure".format(self.resource) 125 raise RallyResponseError(problem) 126## 127## print("response content: %s" % self.content) 128## 129 self.request_type, self.data = self._determineRequestResponseType(request) 130## 131## print("RallyRESTResponse request_type: %s for %s" % (self.request_type, self._item_type)) 132## 133 134 if self.request_type == 'ImpliedQuery': 135 # the request is against a Rally Type name, ie. 'Subscription', 'Workspace', 'UserProfile', etc. 136 # or a Rally "sub-type" like PortfolioItem/Feature 137 # which is context dependent and has a singleton result 138 target = self.target 139 if target.endswith('.x'): 140 target = target[:-2] 141## 142## print("ImpliedQuery presumed target: |%s|" % target) 143## print("") 144## 145 if target not in list(self.content.keys()): 146 # check to see if there is a case-insensitive match before upchucking... 147 ckls = [k.lower() for k in list(self.content.keys())] 148 if target.lower() not in ckls: 149 forensic_info = "%s\n%s\n" % (response.status_code, response.content) 150 problem = 'missing _Xx_Result specifier for target %s in following:' % target 151 raise RallyResponseError('%s\n%s' % (problem, forensic_info)) 152 else: 153 matching_item_ix = ckls.index(target.lower()) 154 target = list(self.content.keys())[matching_item_ix] 155 self.target = target 156 self._stdFormat = False 157 # fudge in the QueryResult.Results.<target> dict keychain 158 self._item_type = target 159 self.data = {'QueryResult': {'Results' : { target: self.content[target] }}} 160 self.data['Errors'] = self.content[target]['Errors'] 161 self.data['Warnings'] = self.content[target]['Warnings'] 162 del self.content[target]['Errors'] # we just snagged this and repositioned it 163 del self.content[target]['Warnings'] # ditto 164 self.data['PageSize'] = 1 165 self.data['TotalResultCount'] = 1 166 167 qr = self.data 168 self.errors = qr['Errors'] 169 self.warnings = qr['Warnings'] 170 self.startIndex = int(qr['StartIndex']) if 'StartIndex' in qr else 0 171 self.pageSize = int(qr['PageSize']) if 'PageSize' in qr else 0 172 self.resultCount = int(qr['TotalResultCount']) if 'TotalResultCount' in qr else 0 173 self._limit = self.resultCount 174 if limit: 175 self._limit = min(limit, self.resultCount) 176 self._page = [] 177 if self.request_type in ['Query', 'ImpliedQuery']: 178 self.first_page = True 179 if self.threads <= 0: # 0 designates auto-threading, the 2 will be auto-adjusted later 180 self.threads = 2 181 self.max_threads = self.threads if self.threads <= 8 else 4 # readjust to sane if too big 182 183 if 'Results' in qr: 184 self._page = qr['Results'] 185 else: 186 if 'QueryResult' in qr and 'Results' in qr['QueryResult']: 187 self._page = qr['QueryResult']['Results'] 188 189 # if there is anything in self._page (which would be if the request is some kind of a query) 190 # AND the pageSize is less than the resultCount 191 # we look to see if upping the max_threads would be useful, and if so what is the "right" max_threads value 192 # we up max_threads to 2 if resultCount > 4*pagesize 193 # we up max_threads to 4 if resultCount > 10*pagesize 194 # we up max_threads to 8 if resultCount > 20*pagesize 195 if self.threads > 1: # readjust to accommodate the resultCount 196 self.max_threads = 1 197 reference_population = min(self._limit, self.resultCount) 198 if self._page and self.resultCount > 1000 and self.pageSize < reference_population: 199 pop_thread_limit = [( 1*self.pageSize, 1), 200 ( 4*self.pageSize, 2), 201 ( 10*self.pageSize, 4), 202 ( 20*self.pageSize, 8), 203 ( 40*self.pageSize, 10), 204 ] 205 for page_size_multiple, num_threads in pop_thread_limit: 206 if reference_population > page_size_multiple: 207 self.max_threads = num_threads 208## 209## print("initial page has %d items" % len(self._page)) 210## 211 if qr.get('Object', None): 212 self._page = qr['Object']['_ref'] 213## 214## print("%d items in the results starting at index: %d" % (self.resultCount, self.startIndex)) 215## 216 217 # for whatever reason, some queries where a start index is unspecified 218 # result in the start index returned being a 0 or a 1, go figure ... 219 # so we don't adjust the _servable value unless the start index is > 1 220 self._servable = 0 221 if self.resultCount > 0: 222 self._servable = self.resultCount 223 if self.startIndex > 1: 224 self._servable = self.resultCount - self.startIndex + 1 225 self._servable = min(self._servable, self._limit) 226 self._served = 0 227 self._curIndex = 0 228 self.hydrator = EntityHydrator(context, hydration=hydration) 229 if self.errors: 230 # transform the status code to an error code indicating an Unprocessable Entity if not already an error code 231 self.status_code = 422 if self.status_code == 200 else self.status_code 232## 233## print("RallyRESTResponse, self.target : |%s|" % self.target) 234## print("RallyRESTResponse, self.startIndex : |%s|" % self.startIndex) 235## print("RallyRESTResponse, self.resultCount: |%s|" % self.resultCount) 236## print("RallyRESTResponse, self._servable : |%s|" % self._servable) 237## print("RallyRESTResponse._page: has %d items" % len(self._page)) 238## print("RallyRESTResponse._page: %s" % self._page) 239## print("") 240## 241 242 def _determineRequestResponseType(self, request): 243 if 'OperationResult' in self.content: 244 return 'Operation', self.content['OperationResult'] 245 if 'QueryResult' in self.content: 246 return 'Query', self.content['QueryResult'] 247 if 'CreateResult' in self.content: 248 return 'Create', self.content['CreateResult'] 249 if 'UpdateResult' in self.content: 250 return 'Update', self.content['UpdateResult'] 251 if 'DeleteResult' in self.content: 252 return 'Delete', self.content['DeleteResult'] 253 if '_CreatedAt' in self.content and self.content['_CreatedAt'] == 'just now': 254 return 'Create', self.content 255 else: 256## 257## print("????? request type an ImpliedQuery?: %s" % request) 258## print(self.content) 259## print("=" * 80) 260## 261 return 'ImpliedQuery', self.content 262 263 264 def _item(self): 265 """ 266 Special non-public method, meant to only be used from the __getattr__ method 267 where a request has been issued for a specific entity OID. 268 Return the single item or None if the status_code != 200 269 """ 270 if self.status_code == 200: 271 return self._page[0] 272 else: 273 return None 274 275 def __bool__(self): 276 """ 277 This is for evaluating any invalid response as False. 278 """ 279 if 200 <= self.status_code < 300: 280 return True 281 else: 282 return False 283 284 def __iter__(self): 285 return self 286 287 def next(self): 288 return self.__next__() 289 290 def __next__(self): 291 """ 292 Return a hydrated instance from the self.page until the page is exhausted, 293 then issue another session.get(...request...) with startIndex 294 of startIndex + pageSize. raise the IteratorException when there are no more instances 295 that can be manufactured (self._curIndex > self.resultCount) 296 """ 297## 298## print("RallyRestResponse for %s, _stdFormat?: %s, _servable: %d _limit: %d _served: %d " % \ 299## (self.target, self._stdFormat, self._servable, self._limit, self._served)) 300## 301 if (self._served >= self._servable) or (self._limit and self._served >= self._limit): 302 raise StopIteration 303 304 if self._stdFormat: 305## 306## print("RallyRESTResponse.next, _stdFormat detected") 307## 308 if self._curIndex+1 < len(self._page): # possible when multi-threads return multiple pages 309 pass 310 elif self.max_threads > 1 and self._curIndex == len(self._page): # exhausted current "chapter" ? 311 self._page[:] = self.__retrieveNextPage() 312 self._curIndex = 0 313 elif self.max_threads == 1 and self._curIndex == self.pageSize: # in single threaded mode, the page is exhausted 314 self._page[:] = self.__retrieveNextPage() 315 self._curIndex = 0 316 if not self._page: 317 raise StopIteration 318 try: 319 item = self._page[self._curIndex] 320 #except IndexError: 321 # verbiage = "Unable to access item %d (%d items served so far from a " +\ 322 # "container purported to be %d items in length)" 323 # problem = verbiage % (self._curIndex+1, self._served, self.resultCount) 324 # errout("ERROR: %s\n" % problem) 325 # self._page[:] = self.__retrieveNextPage() 326 # pprint(self._page[0]) 327 # raise IndexError("RallyRESTResponse._page[%d]" % self._curIndex) 328 except IndexError: 329 item = None 330 if self.target in ['Workspace', 'Workspaces', 'Project', 'Projects']: 331 try: 332 self.page[:] = self.__retrieveNextPage() 333 self._curIndex = 0 334 item = self.page[self._curIndex] 335 except: 336 exception_type, value, traceback = sys.exc_info() 337 exc_name = re.search(r'\'exceptions\.(.+)\'', str(exception_type)).group(1) 338 problem = '%s: %s for response from request to get next data page for %s' % (exc_name, value, self.target) 339 errout("ERROR: %s\n" % problem) 340 if item is None: 341 verbiage = "Unable to access item %d (%d items served so far from a " +\ 342 "container purported to be %d items in length)" 343 problem = verbiage % (self._curIndex+1, self._served, self.resultCount) 344 errout("ERROR: %s\n" % problem) 345 raise StopIteration 346 else: # the Response had a non-std format 347## 348## blurb = "item from page is a %s, but Response was not in std-format" % self._item_type 349## print("RallyRESTResponse.next: %s" % blurb) 350## 351 # 352 # have to stuff the item type into the item dict like it is for the _stdFormat responses 353 # 354 item = self._page[self._item_type] 355 item_type_key = '_type' 356 if item_type_key not in item: 357 item[item_type_key] = str(self._item_type) 358 359 del item['_rallyAPIMajor'] 360 del item['_rallyAPIMinor'] 361 362 if self.debug: 363 self.showNextItem(item) 364 365 entityInstance = self.hydrator.hydrateInstance(item) 366 self._curIndex += 1 367 self._served += 1 368## 369## print(" next item served is a %s" % entityInstance._type) 370## 371 return entityInstance 372 373 def showNextItem(self, item): 374 print(" next item served is a %s" % self._item_type) 375 print("RallyRESTResponse.next, item before call to to hydrator.hydrateInstance") 376 all_item_keys = sorted(item.keys()) 377 underscore_prefix_keys = [key for key in all_item_keys if key[0] == u'_'] 378 std_underscore_prefix_keys = ['_type', '_ref', '_refObjectUUID', '_CreatedAt', '_objectVersion'] 379 for _key in std_underscore_prefix_keys: 380 try: 381 print(" %20.20s: %s" % (_key, item[_key])) 382 except: 383 pass 384 other_prefix_keys = [key for key in underscore_prefix_keys if key not in std_underscore_prefix_keys] 385 for _key in other_prefix_keys: 386 print(" %20.20s: %s" % (_key, item[_key])) 387 print("") 388 regular_keys = [key for key in all_item_keys if key[0] !='_'] 389 std_regular_keys = ['ObjectID', 'ObjectUUID', 'CreationDate'] 390 for key in std_regular_keys: 391 try: 392 print(" %20.20s: %s" % (key, item[key])) 393 except: 394 pass 395 other_regular_keys = [key for key in regular_keys if key not in std_regular_keys] 396 for key in other_regular_keys: 397 print(" %20.20s: %s" % (key, item[key])) 398 print("+ " * 30) 399 400 401 def __retrieveNextPage(self): 402 """ 403 If multi-threading is to be used (self.max_threads > 1) then 404 call the method to retrieve multiple pages, otherwise 405 just call the self.session.get method for the next page (after adjusting the self.startIndex) 406 """ 407 if self.max_threads > 1: 408 chapter = self.__retrievePages() 409 return chapter 410 411 self.startIndex += self.pageSize 412 nextPageUrl = re.sub(r'&start=\d+', '&start=%s' % self.startIndex, self.resource) 413 if not nextPageUrl.startswith('http'): 414 nextPageUrl = '%s/%s' % (self.context.serviceURL(), nextPageUrl) 415## 416## print("") 417## print("full URL for next page of data:\n %s" % nextPageUrl) 418## print("") 419## 420 try: 421 response = self.session.get(nextPageUrl) 422 except Exception as ex: 423 exception_type, value, traceback = sys.exc_info() 424 sys.stderr.write('%s: %s\n' % (exception_type, value)) 425 sys.stderr.write(traceback) 426 sys.exit(9) 427 return [] 428 429 content = response.json() 430 return content['QueryResult']['Results'] 431 432 def __retrievePages(self): 433 """ 434 Use self._served, self._servable, self.resource, self.max_threads, self.pageSize 435 and self.startIndex to come up with suitable thread count to grab the 436 next group of pages from Rally. 437 There will be a thread per page still to be retrieved (including the ending partial page) 438 up to self.max_threads. 439 For the number of threads that will actually be used, populate a page_urls list 440 with the url that thread will use to obtain a page worth of data (adjusts the 441 start=<number> in the url). 442 Once the page_urls list is constructed, delegate off to a an instance of a class 443 that will run the threads that obtain the raw response for the pages and put the 444 results into a list corresponding to the pages in ascending order. 445 """ 446 items_remaining = self._servable - self._served 447 num_threads = self.max_threads 448 max_chunksize = self.pageSize * num_threads 449 if items_remaining - max_chunksize < 0: 450 full, leftovers = divmod(items_remaining, self.pageSize) 451 num_threads = full + (1 if leftovers else 0) 452 stixes = [i+1 for i in range(num_threads)] 453 page_urls = [re.sub(r'&start=\d+', '&start=%s' % (self.startIndex + (i * self.pageSize)), self.resource) 454 for i in stixes] 455 456 success = False 457 exc = None 458 delays = [0, 2, 5] 459 for delay in delays: 460 time.sleep(delay) 461 cgt = CargoTruck(page_urls, num_threads) 462 try: 463 cgt.load(self.session, 'get', 15) 464 payload = cgt.dump() 465 success = True 466 break 467 except: 468 exc_name, exc_desc = sys.exc_info()[:2] 469 anomaly = "RallyResponse.next.__retrieveNextPage.__retrievePages caught exception " +\ 470 "in threaded request/response activity: %s, %s" % (exc_name, exc_desc) 471 pg1, pg2 = (self.startIndex + self.pageSize), (self.startIndex + (self.pageSize*num_threads)) 472 notice = "Retrying the page_urls for the page group startIndexes: %d -> %d" % (pg1, pg2) 473 print(notice) 474 475 if not success: 476 raise RallyResponseError("Unable to retrieve %d chunks of data" % num_threads) 477 478 chapter = [] 479 for chunk in payload: 480 chapter.extend(chunk.json()['QueryResult']['Results']) 481 482 self.startIndex += len(chapter) 483 return chapter 484 485 486 def __repr__(self): 487 if self.status_code == 200 and self._page: 488 try: 489 entity_type = self._page[0]['_type'] 490 return "%s result set, totalResultSetSize: %d, startIndex: %s pageSize: %s current Index: %s" % \ 491 (entity_type, self.resultCount, self.startIndex, self.pageSize, self._curIndex) 492 except: 493 info = (self.status_code, self.errors, self.warnings, self._page) 494 return "%s\nErrors: %s\nWarnings: %s\nData: %s\n" % info 495 else: 496 if self.errors: 497 blurb = self.errors[0] 498 elif self.warnings: 499 blurb = self.warnings[0] 500 else: 501 blurb = "%sResult TotalResultCount: %d Results: %s" % \ 502 (self.request_type, self.resultCount, self.content['QueryResult']['Results']) 503 return "%s %s" % (self.status_code, blurb) 504 505################################################################################################## 506 507