1""" 2Manage Data Pipelines 3 4.. versionadded:: 2016.3.0 5 6Be aware that this interacts with Amazon's services, and so may incur charges. 7 8This module uses ``boto3``, which can be installed via package, or pip. 9 10This module accepts explicit AWS credentials but can also utilize 11IAM roles assigned to the instance through Instance Profiles. Dynamic 12credentials are then automatically obtained from AWS API and no further 13configuration is necessary. More information available `here 14<http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html>`_. 15 16If IAM roles are not used you need to specify them either in a pillar file or 17in the minion's config file: 18 19.. code-block:: yaml 20 21 datapipeline.keyid: GKTADJGHEIQSXMKKRBJ08H 22 datapipeline.key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs 23 24It's also possible to specify ``key``, ``keyid`` and ``region`` via a profile, 25either passed in as a dict, or as a string to pull from pillars or minion 26config: 27 28.. code-block:: yaml 29 30 myprofile: 31 keyid: GKTADJGHEIQSXMKKRBJ08H 32 key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs 33 region: us-east-1 34 35.. code-block:: yaml 36 37 Ensure daily data pipeline exists: 38 boto_datapipeline.present: 39 - name: my-datapipeline 40 - pipeline_objects: 41 DefaultSchedule: 42 name: Every 1 day 43 fields: 44 period: 1 Day 45 type: Schedule 46 startAt: FIRST_ACTIVATION_DATE_TIME 47 - parameter_values: 48 myDDBTableName: my-dynamo-table 49""" 50 51 52import copy 53import datetime 54import difflib 55 56import salt.utils.data 57import salt.utils.json 58 59 60def __virtual__(): 61 """ 62 Only load if boto is available. 63 """ 64 if "boto_datapipeline.create_pipeline" in __salt__: 65 return "boto_datapipeline" 66 return (False, "boto_datapipeline module could not be loaded") 67 68 69def present( 70 name, 71 pipeline_objects=None, 72 pipeline_objects_from_pillars="boto_datapipeline_pipeline_objects", 73 parameter_objects=None, 74 parameter_objects_from_pillars="boto_datapipeline_parameter_objects", 75 parameter_values=None, 76 parameter_values_from_pillars="boto_datapipeline_parameter_values", 77 region=None, 78 key=None, 79 keyid=None, 80 profile=None, 81): 82 """ 83 Ensure the data pipeline exists with matching definition. 84 85 name 86 Name of the service to ensure a data pipeline exists for. 87 88 pipeline_objects 89 Pipeline objects to use. Will override objects read from pillars. 90 91 pipeline_objects_from_pillars 92 The pillar key to use for lookup. 93 94 parameter_objects 95 Parameter objects to use. Will override objects read from pillars. 96 97 parameter_objects_from_pillars 98 The pillar key to use for lookup. 99 100 parameter_values 101 Parameter values to use. Will override values read from pillars. 102 103 parameter_values_from_pillars 104 The pillar key to use for lookup. 105 106 region 107 Region to connect to. 108 109 key 110 Secret key to be used. 111 112 keyid 113 Access key to be used. 114 115 profile 116 A dict with region, key and keyid, or a pillar key (string) 117 that contains a dict with region, key and keyid. 118 """ 119 ret = {"name": name, "result": True, "comment": "", "changes": {}} 120 121 pipeline_objects = pipeline_objects or {} 122 parameter_objects = parameter_objects or {} 123 parameter_values = parameter_values or {} 124 125 present, old_pipeline_definition = _pipeline_present_with_definition( 126 name, 127 _pipeline_objects(pipeline_objects_from_pillars, pipeline_objects), 128 _parameter_objects(parameter_objects_from_pillars, parameter_objects), 129 _parameter_values(parameter_values_from_pillars, parameter_values), 130 region=region, 131 key=key, 132 keyid=keyid, 133 profile=profile, 134 ) 135 if present: 136 ret["comment"] = "AWS data pipeline {} present".format(name) 137 return ret 138 139 if __opts__["test"]: 140 ret["comment"] = "Data pipeline {} is set to be created or updated".format(name) 141 ret["result"] = None 142 return ret 143 144 result_create_pipeline = __salt__["boto_datapipeline.create_pipeline"]( 145 name, 146 name, 147 region=region, 148 key=key, 149 keyid=keyid, 150 profile=profile, 151 ) 152 if "error" in result_create_pipeline: 153 ret["result"] = False 154 ret["comment"] = "Failed to create data pipeline {}: {}".format( 155 name, result_create_pipeline["error"] 156 ) 157 return ret 158 159 pipeline_id = result_create_pipeline["result"] 160 161 result_pipeline_definition = __salt__["boto_datapipeline.put_pipeline_definition"]( 162 pipeline_id, 163 _pipeline_objects(pipeline_objects_from_pillars, pipeline_objects), 164 parameter_objects=_parameter_objects( 165 parameter_objects_from_pillars, parameter_objects 166 ), 167 parameter_values=_parameter_values( 168 parameter_values_from_pillars, parameter_values 169 ), 170 region=region, 171 key=key, 172 keyid=keyid, 173 profile=profile, 174 ) 175 if "error" in result_pipeline_definition: 176 if _immutable_fields_error(result_pipeline_definition): 177 # If update not possible, delete and retry 178 result_delete_pipeline = __salt__["boto_datapipeline.delete_pipeline"]( 179 pipeline_id, 180 region=region, 181 key=key, 182 keyid=keyid, 183 profile=profile, 184 ) 185 if "error" in result_delete_pipeline: 186 ret["result"] = False 187 ret["comment"] = "Failed to delete data pipeline {}: {}".format( 188 pipeline_id, result_delete_pipeline["error"] 189 ) 190 return ret 191 192 result_create_pipeline = __salt__["boto_datapipeline.create_pipeline"]( 193 name, 194 name, 195 region=region, 196 key=key, 197 keyid=keyid, 198 profile=profile, 199 ) 200 if "error" in result_create_pipeline: 201 ret["result"] = False 202 ret["comment"] = "Failed to create data pipeline {}: {}".format( 203 name, result_create_pipeline["error"] 204 ) 205 return ret 206 207 pipeline_id = result_create_pipeline["result"] 208 209 result_pipeline_definition = __salt__[ 210 "boto_datapipeline.put_pipeline_definition" 211 ]( 212 pipeline_id, 213 _pipeline_objects(pipeline_objects_from_pillars, pipeline_objects), 214 parameter_objects=_parameter_objects( 215 parameter_objects_from_pillars, parameter_objects 216 ), 217 parameter_values=_parameter_values( 218 parameter_values_from_pillars, parameter_values 219 ), 220 region=region, 221 key=key, 222 keyid=keyid, 223 profile=profile, 224 ) 225 226 if "error" in result_pipeline_definition: 227 # Still erroring after possible retry 228 ret["result"] = False 229 ret["comment"] = "Failed to create data pipeline {}: {}".format( 230 name, result_pipeline_definition["error"] 231 ) 232 return ret 233 234 result_activate_pipeline = __salt__["boto_datapipeline.activate_pipeline"]( 235 pipeline_id, 236 region=region, 237 key=key, 238 keyid=keyid, 239 profile=profile, 240 ) 241 if "error" in result_activate_pipeline: 242 ret["result"] = False 243 ret["comment"] = "Failed to create data pipeline {}: {}".format( 244 name, result_pipeline_definition["error"] 245 ) 246 return ret 247 248 pipeline_definition_result = __salt__["boto_datapipeline.get_pipeline_definition"]( 249 pipeline_id, 250 version="active", 251 region=region, 252 key=key, 253 keyid=keyid, 254 profile=profile, 255 ) 256 if "error" in pipeline_definition_result: 257 new_pipeline_definition = {} 258 else: 259 new_pipeline_definition = _standardize(pipeline_definition_result["result"]) 260 261 if not old_pipeline_definition: 262 ret["changes"]["new"] = "Pipeline created." 263 ret["comment"] = "Data pipeline {} created".format(name) 264 else: 265 ret["changes"]["diff"] = _diff(old_pipeline_definition, new_pipeline_definition) 266 ret["comment"] = "Data pipeline {} updated".format(name) 267 268 return ret 269 270 271def _immutable_fields_error(result_pipeline_definition): 272 """Return true if update pipeline failed due to immutable fields 273 274 Some fields cannot be changed after a pipeline has been activated. 275 276 http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-manage-pipeline-modify-console.html#dp-edit-pipeline-limits 277 """ 278 for e in result_pipeline_definition["error"]: 279 for e2 in e["errors"]: 280 if "can not be changed" in e2: 281 return True 282 return False 283 284 285def _pipeline_present_with_definition( 286 name, 287 expected_pipeline_objects, 288 expected_parameter_objects, 289 expected_parameter_values, 290 region, 291 key, 292 keyid, 293 profile, 294): 295 """ 296 Return true if the pipeline exists and the definition matches. 297 298 name 299 The name of the pipeline. 300 301 expected_pipeline_objects 302 Pipeline objects that must match the definition. 303 304 expected_parameter_objects 305 Parameter objects that must match the definition. 306 307 expected_parameter_values 308 Parameter values that must match the definition. 309 310 region 311 Region to connect to. 312 313 key 314 Secret key to be used. 315 316 keyid 317 Access key to be used. 318 319 profile 320 A dict with region, key and keyid, or a pillar key (string) 321 that contains a dict with region, key and keyid. 322 """ 323 result_pipeline_id = __salt__["boto_datapipeline.pipeline_id_from_name"]( 324 name, 325 region=region, 326 key=key, 327 keyid=keyid, 328 profile=profile, 329 ) 330 if "error" in result_pipeline_id: 331 return False, {} 332 333 pipeline_id = result_pipeline_id["result"] 334 pipeline_definition_result = __salt__["boto_datapipeline.get_pipeline_definition"]( 335 pipeline_id, 336 version="active", 337 region=region, 338 key=key, 339 keyid=keyid, 340 profile=profile, 341 ) 342 if "error" in pipeline_definition_result: 343 return False, {} 344 345 pipeline_definition = _standardize(pipeline_definition_result["result"]) 346 347 pipeline_objects = pipeline_definition.get("pipelineObjects") 348 parameter_objects = pipeline_definition.get("parameterObjects") 349 parameter_values = pipeline_definition.get("parameterValues") 350 351 present = ( 352 _recursive_compare( 353 _cleaned(pipeline_objects), _cleaned(expected_pipeline_objects) 354 ) 355 and _recursive_compare(parameter_objects, expected_parameter_objects) 356 and _recursive_compare(parameter_values, expected_parameter_values) 357 ) 358 return present, pipeline_definition 359 360 361def _cleaned(_pipeline_objects): 362 """Return standardized pipeline objects to be used for comparing 363 364 Remove year, month, and day components of the startDateTime so that data 365 pipelines with the same time of day but different days are considered 366 equal. 367 """ 368 pipeline_objects = copy.deepcopy(_pipeline_objects) 369 for pipeline_object in pipeline_objects: 370 if pipeline_object["id"] == "DefaultSchedule": 371 for field_object in pipeline_object["fields"]: 372 if field_object["key"] == "startDateTime": 373 start_date_time_string = field_object["stringValue"] 374 start_date_time = datetime.datetime.strptime( 375 start_date_time_string, "%Y-%m-%dT%H:%M:%S" 376 ) 377 field_object["stringValue"] = start_date_time.strftime("%H:%M:%S") 378 return pipeline_objects 379 380 381def _recursive_compare(v1, v2): 382 """ 383 Return v1 == v2. Compares list, dict, recursively. 384 """ 385 if isinstance(v1, list): 386 if v2 is None: 387 v2 = [] 388 if len(v1) != len(v2): 389 return False 390 v1.sort(key=_id_or_key) 391 v2.sort(key=_id_or_key) 392 for x, y in zip(v1, v2): 393 if not _recursive_compare(x, y): 394 return False 395 return True 396 elif isinstance(v1, dict): 397 if v2 is None: 398 v2 = {} 399 v1 = dict(v1) 400 v2 = dict(v2) 401 if sorted(v1) != sorted(v2): 402 return False 403 for k in v1: 404 if not _recursive_compare(v1[k], v2[k]): 405 return False 406 return True 407 else: 408 return v1 == v2 409 410 411def _id_or_key(list_item): 412 """ 413 Return the value at key 'id' or 'key'. 414 """ 415 if isinstance(list_item, dict): 416 if "id" in list_item: 417 return list_item["id"] 418 if "key" in list_item: 419 return list_item["key"] 420 return list_item 421 422 423def _diff(old_pipeline_definition, new_pipeline_definition): 424 """ 425 Return string diff of pipeline definitions. 426 """ 427 old_pipeline_definition.pop("ResponseMetadata", None) 428 new_pipeline_definition.pop("ResponseMetadata", None) 429 430 diff = salt.utils.data.decode( 431 difflib.unified_diff( 432 salt.utils.json.dumps(old_pipeline_definition, indent=4).splitlines(True), 433 salt.utils.json.dumps(new_pipeline_definition, indent=4).splitlines(True), 434 ) 435 ) 436 return "".join(diff) 437 438 439def _standardize(structure): 440 """ 441 Return standardized format for lists/dictionaries. 442 443 Lists of dictionaries are sorted by the value of the dictionary at 444 its primary key ('id' or 'key'). OrderedDict's are converted to 445 basic dictionaries. 446 """ 447 448 def mutating_helper(structure): 449 if isinstance(structure, list): 450 structure.sort(key=_id_or_key) 451 for each in structure: 452 mutating_helper(each) 453 elif isinstance(structure, dict): 454 structure = dict(structure) 455 for k, v in structure.items(): 456 mutating_helper(k) 457 mutating_helper(v) 458 459 new_structure = copy.deepcopy(structure) 460 mutating_helper(new_structure) 461 return new_structure 462 463 464def _pipeline_objects(pipeline_objects_from_pillars, pipeline_object_overrides): 465 """ 466 Return a list of pipeline objects that compose the pipeline 467 468 pipeline_objects_from_pillars 469 The pillar key to use for lookup 470 471 pipeline_object_overrides 472 Pipeline objects to use. Will override objects read from pillars. 473 """ 474 from_pillars = copy.deepcopy(__salt__["pillar.get"](pipeline_objects_from_pillars)) 475 from_pillars.update(pipeline_object_overrides) 476 pipeline_objects = _standardize(_dict_to_list_ids(from_pillars)) 477 for pipeline_object in pipeline_objects: 478 pipeline_object["fields"] = _properties_from_dict(pipeline_object["fields"]) 479 return pipeline_objects 480 481 482def _parameter_objects(parameter_objects_from_pillars, parameter_object_overrides): 483 """ 484 Return a list of parameter objects that configure the pipeline 485 486 parameter_objects_from_pillars 487 The pillar key to use for lookup 488 489 parameter_object_overrides 490 Parameter objects to use. Will override objects read from pillars. 491 """ 492 from_pillars = copy.deepcopy(__salt__["pillar.get"](parameter_objects_from_pillars)) 493 from_pillars.update(parameter_object_overrides) 494 parameter_objects = _standardize(_dict_to_list_ids(from_pillars)) 495 for parameter_object in parameter_objects: 496 parameter_object["attributes"] = _properties_from_dict( 497 parameter_object["attributes"] 498 ) 499 return parameter_objects 500 501 502def _parameter_values(parameter_values_from_pillars, parameter_value_overrides): 503 """ 504 Return a dictionary of parameter values that configure the pipeline 505 506 parameter_values_from_pillars 507 The pillar key to use for lookup 508 509 parameter_value_overrides 510 Parameter values to use. Will override values read from pillars. 511 """ 512 from_pillars = copy.deepcopy(__salt__["pillar.get"](parameter_values_from_pillars)) 513 from_pillars.update(parameter_value_overrides) 514 parameter_values = _standardize(from_pillars) 515 return _properties_from_dict(parameter_values, key_name="id") 516 517 518def _dict_to_list_ids(objects): 519 """ 520 Convert a dictionary to a list of dictionaries, where each element has 521 a key value pair {'id': key}. This makes it easy to override pillar values 522 while still satisfying the boto api. 523 """ 524 list_with_ids = [] 525 for key, value in objects.items(): 526 element = {"id": key} 527 element.update(value) 528 list_with_ids.append(element) 529 return list_with_ids 530 531 532def _properties_from_dict(d, key_name="key"): 533 """ 534 Transforms dictionary into pipeline object properties. 535 536 The output format conforms to boto's specification. 537 538 Example input: 539 { 540 'a': '1', 541 'b': { 542 'ref': '2' 543 }, 544 } 545 546 Example output: 547 [ 548 { 549 'key': 'a', 550 'stringValue': '1', 551 }, 552 { 553 'key': 'b', 554 'refValue': '2', 555 }, 556 ] 557 """ 558 fields = [] 559 for key, value in d.items(): 560 if isinstance(value, dict): 561 fields.append({key_name: key, "refValue": value["ref"]}) 562 else: 563 fields.append({key_name: key, "stringValue": value}) 564 return fields 565 566 567def absent(name, region=None, key=None, keyid=None, profile=None): 568 """ 569 Ensure a pipeline with the service_name does not exist 570 571 name 572 Name of the service to ensure a data pipeline does not exist for. 573 574 region 575 Region to connect to. 576 577 key 578 Secret key to be used. 579 580 keyid 581 Access key to be used. 582 583 profile 584 A dict with region, key and keyid, or a pillar key (string) 585 that contains a dict with region, key and keyid. 586 """ 587 ret = {"name": name, "result": True, "comment": "", "changes": {}} 588 589 result_pipeline_id = __salt__["boto_datapipeline.pipeline_id_from_name"]( 590 name, 591 region=region, 592 key=key, 593 keyid=keyid, 594 profile=profile, 595 ) 596 if "error" not in result_pipeline_id: 597 pipeline_id = result_pipeline_id["result"] 598 if __opts__["test"]: 599 ret["comment"] = "Data pipeline {} set to be deleted.".format(name) 600 ret["result"] = None 601 return ret 602 else: 603 __salt__["boto_datapipeline.delete_pipeline"]( 604 pipeline_id, 605 region=region, 606 key=key, 607 keyid=keyid, 608 profile=profile, 609 ) 610 ret["changes"]["old"] = {"pipeline_id": pipeline_id} 611 ret["changes"]["new"] = None 612 else: 613 ret["comment"] = "AWS data pipeline {} absent.".format(name) 614 615 return ret 616