1#!/usr/bin/python 2# 3# Copyright (c) 2017 Ansible Project 4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 5 6from __future__ import absolute_import, division, print_function 7__metaclass__ = type 8 9 10ANSIBLE_METADATA = {'metadata_version': '1.1', 11 'status': ['preview'], 12 'supported_by': 'community'} 13 14DOCUMENTATION = ''' 15--- 16module: data_pipeline 17version_added: "2.4" 18author: 19 - Raghu Udiyar (@raags) <raghusiddarth@gmail.com> 20 - Sloane Hertel (@s-hertel) <shertel@redhat.com> 21requirements: [ "boto3" ] 22short_description: Create and manage AWS Datapipelines 23extends_documentation_fragment: 24 - aws 25 - ec2 26description: 27 - Create and manage AWS Datapipelines. Creation is not idempotent in AWS, so the I(uniqueId) is created by hashing the options (minus objects) 28 given to the datapipeline. 29 - The pipeline definition must be in the format given here 30 U(https://docs.aws.amazon.com/datapipeline/latest/APIReference/API_PutPipelineDefinition.html#API_PutPipelineDefinition_RequestSyntax). 31 - Also operations will wait for a configurable amount of time to ensure the pipeline is in the requested state. 32options: 33 name: 34 description: 35 - The name of the Datapipeline to create/modify/delete. 36 required: true 37 description: 38 description: 39 - An optional description for the pipeline being created. 40 default: '' 41 objects: 42 description: 43 - A list of pipeline object definitions, each of which is a dict that takes the keys C(id), C(name) and C(fields). 44 suboptions: 45 id: 46 description: 47 - The ID of the object. 48 name: 49 description: 50 - The name of the object. 51 fields: 52 description: 53 - A list of dicts that take the keys C(key) and C(stringValue)/C(refValue). 54 The value is specified as a reference to another object C(refValue) or as a string value C(stringValue) 55 but not as both. 56 parameters: 57 description: 58 - A list of parameter objects (dicts) in the pipeline definition. 59 suboptions: 60 id: 61 description: 62 - The ID of the parameter object. 63 attributes: 64 description: 65 - A list of attributes (dicts) of the parameter object. Each attribute takes the keys C(key) and C(stringValue) both 66 of which are strings. 67 values: 68 description: 69 - A list of parameter values (dicts) in the pipeline definition. Each dict takes the keys C(id) and C(stringValue) both 70 of which are strings. 71 timeout: 72 description: 73 - Time in seconds to wait for the pipeline to transition to the requested state, fail otherwise. 74 default: 300 75 state: 76 description: 77 - The requested state of the pipeline. 78 choices: ['present', 'absent', 'active', 'inactive'] 79 default: present 80 tags: 81 description: 82 - A dict of key:value pair(s) to add to the pipeline. 83''' 84 85EXAMPLES = ''' 86# Note: These examples do not set authentication details, see the AWS Guide for details. 87 88# Create pipeline 89- data_pipeline: 90 name: test-dp 91 region: us-west-2 92 objects: "{{pipelineObjects}}" 93 parameters: "{{pipelineParameters}}" 94 values: "{{pipelineValues}}" 95 tags: 96 key1: val1 97 key2: val2 98 state: present 99 100# Example populating and activating a pipeline that demonstrates two ways of providing pipeline objects 101- data_pipeline: 102 name: test-dp 103 objects: 104 - "id": "DefaultSchedule" 105 "name": "Every 1 day" 106 "fields": 107 - "key": "period" 108 "stringValue": "1 days" 109 - "key": "type" 110 "stringValue": "Schedule" 111 - "key": "startAt" 112 "stringValue": "FIRST_ACTIVATION_DATE_TIME" 113 - "id": "Default" 114 "name": "Default" 115 "fields": [ { "key": "resourceRole", "stringValue": "my_resource_role" }, 116 { "key": "role", "stringValue": "DataPipelineDefaultRole" }, 117 { "key": "pipelineLogUri", "stringValue": "s3://my_s3_log.txt" }, 118 { "key": "scheduleType", "stringValue": "cron" }, 119 { "key": "schedule", "refValue": "DefaultSchedule" }, 120 { "key": "failureAndRerunMode", "stringValue": "CASCADE" } ] 121 state: active 122 123# Activate pipeline 124- data_pipeline: 125 name: test-dp 126 region: us-west-2 127 state: active 128 129# Delete pipeline 130- data_pipeline: 131 name: test-dp 132 region: us-west-2 133 state: absent 134 135''' 136 137RETURN = ''' 138changed: 139 description: whether the data pipeline has been modified 140 type: bool 141 returned: always 142 sample: 143 changed: true 144result: 145 description: 146 - Contains the data pipeline data (data_pipeline) and a return message (msg). 147 If the data pipeline exists data_pipeline will contain the keys description, name, 148 pipeline_id, state, tags, and unique_id. If the data pipeline does not exist then 149 data_pipeline will be an empty dict. The msg describes the status of the operation. 150 returned: always 151 type: dict 152''' 153 154import hashlib 155import json 156import time 157import traceback 158 159try: 160 import boto3 161 from botocore.exceptions import ClientError 162 HAS_BOTO3 = True 163except ImportError: 164 HAS_BOTO3 = False 165 166from ansible.module_utils.basic import AnsibleModule 167from ansible.module_utils.ec2 import ec2_argument_spec, get_aws_connection_info, boto3_conn, camel_dict_to_snake_dict 168from ansible.module_utils._text import to_text 169 170 171DP_ACTIVE_STATES = ['ACTIVE', 'SCHEDULED'] 172DP_INACTIVE_STATES = ['INACTIVE', 'PENDING', 'FINISHED', 'DELETING'] 173DP_ACTIVATING_STATE = 'ACTIVATING' 174DP_DEACTIVATING_STATE = 'DEACTIVATING' 175PIPELINE_DOESNT_EXIST = '^.*Pipeline with id: {0} does not exist$' 176 177 178class DataPipelineNotFound(Exception): 179 pass 180 181 182class TimeOutException(Exception): 183 pass 184 185 186def pipeline_id(client, name): 187 """Return pipeline id for the given pipeline name 188 189 :param object client: boto3 datapipeline client 190 :param string name: pipeline name 191 :returns: pipeline id 192 :raises: DataPipelineNotFound 193 194 """ 195 pipelines = client.list_pipelines() 196 for dp in pipelines['pipelineIdList']: 197 if dp['name'] == name: 198 return dp['id'] 199 raise DataPipelineNotFound 200 201 202def pipeline_description(client, dp_id): 203 """Return pipeline description list 204 205 :param object client: boto3 datapipeline client 206 :returns: pipeline description dictionary 207 :raises: DataPipelineNotFound 208 209 """ 210 try: 211 return client.describe_pipelines(pipelineIds=[dp_id]) 212 except ClientError as e: 213 raise DataPipelineNotFound 214 215 216def pipeline_field(client, dp_id, field): 217 """Return a pipeline field from the pipeline description. 218 219 The available fields are listed in describe_pipelines output. 220 221 :param object client: boto3 datapipeline client 222 :param string dp_id: pipeline id 223 :param string field: pipeline description field 224 :returns: pipeline field information 225 226 """ 227 dp_description = pipeline_description(client, dp_id) 228 for field_key in dp_description['pipelineDescriptionList'][0]['fields']: 229 if field_key['key'] == field: 230 return field_key['stringValue'] 231 raise KeyError("Field key {0} not found!".format(field)) 232 233 234def run_with_timeout(timeout, func, *func_args, **func_kwargs): 235 """Run func with the provided args and kwargs, and wait utill 236 timeout for truthy return value 237 238 :param int timeout: time to wait for status 239 :param function func: function to run, should return True or False 240 :param args func_args: function args to pass to func 241 :param kwargs func_kwargs: function key word args 242 :returns: True if func returns truthy within timeout 243 :raises: TimeOutException 244 245 """ 246 247 for _ in range(timeout // 10): 248 if func(*func_args, **func_kwargs): 249 return True 250 else: 251 # check every 10s 252 time.sleep(10) 253 254 raise TimeOutException 255 256 257def check_dp_exists(client, dp_id): 258 """Check if datapipeline exists 259 260 :param object client: boto3 datapipeline client 261 :param string dp_id: pipeline id 262 :returns: True or False 263 264 """ 265 try: 266 # pipeline_description raises DataPipelineNotFound 267 if pipeline_description(client, dp_id): 268 return True 269 else: 270 return False 271 except DataPipelineNotFound: 272 return False 273 274 275def check_dp_status(client, dp_id, status): 276 """Checks if datapipeline matches states in status list 277 278 :param object client: boto3 datapipeline client 279 :param string dp_id: pipeline id 280 :param list status: list of states to check against 281 :returns: True or False 282 283 """ 284 if not isinstance(status, list): 285 raise AssertionError() 286 if pipeline_field(client, dp_id, field="@pipelineState") in status: 287 return True 288 else: 289 return False 290 291 292def pipeline_status_timeout(client, dp_id, status, timeout): 293 args = (client, dp_id, status) 294 return run_with_timeout(timeout, check_dp_status, *args) 295 296 297def pipeline_exists_timeout(client, dp_id, timeout): 298 args = (client, dp_id) 299 return run_with_timeout(timeout, check_dp_exists, *args) 300 301 302def activate_pipeline(client, module): 303 """Activates pipeline 304 305 """ 306 dp_name = module.params.get('name') 307 timeout = module.params.get('timeout') 308 309 try: 310 dp_id = pipeline_id(client, dp_name) 311 except DataPipelineNotFound: 312 module.fail_json(msg='Data Pipeline {0} not found'.format(dp_name)) 313 314 if pipeline_field(client, dp_id, field="@pipelineState") in DP_ACTIVE_STATES: 315 changed = False 316 else: 317 try: 318 client.activate_pipeline(pipelineId=dp_id) 319 except ClientError as e: 320 if e.response["Error"]["Code"] == "InvalidRequestException": 321 module.fail_json(msg="You need to populate your pipeline before activation.") 322 try: 323 pipeline_status_timeout(client, dp_id, status=DP_ACTIVE_STATES, 324 timeout=timeout) 325 except TimeOutException: 326 if pipeline_field(client, dp_id, field="@pipelineState") == "FINISHED": 327 # activated but completed more rapidly than it was checked 328 pass 329 else: 330 module.fail_json(msg=('Data Pipeline {0} failed to activate ' 331 'within timeout {1} seconds').format(dp_name, timeout)) 332 changed = True 333 334 data_pipeline = get_result(client, dp_id) 335 result = {'data_pipeline': data_pipeline, 336 'msg': 'Data Pipeline {0} activated.'.format(dp_name)} 337 338 return (changed, result) 339 340 341def deactivate_pipeline(client, module): 342 """Deactivates pipeline 343 344 """ 345 dp_name = module.params.get('name') 346 timeout = module.params.get('timeout') 347 348 try: 349 dp_id = pipeline_id(client, dp_name) 350 except DataPipelineNotFound: 351 module.fail_json(msg='Data Pipeline {0} not found'.format(dp_name)) 352 353 if pipeline_field(client, dp_id, field="@pipelineState") in DP_INACTIVE_STATES: 354 changed = False 355 else: 356 client.deactivate_pipeline(pipelineId=dp_id) 357 try: 358 pipeline_status_timeout(client, dp_id, status=DP_INACTIVE_STATES, 359 timeout=timeout) 360 except TimeOutException: 361 module.fail_json(msg=('Data Pipeline {0} failed to deactivate' 362 'within timeout {1} seconds').format(dp_name, timeout)) 363 changed = True 364 365 data_pipeline = get_result(client, dp_id) 366 result = {'data_pipeline': data_pipeline, 367 'msg': 'Data Pipeline {0} deactivated.'.format(dp_name)} 368 369 return (changed, result) 370 371 372def _delete_dp_with_check(dp_id, client, timeout): 373 client.delete_pipeline(pipelineId=dp_id) 374 try: 375 pipeline_status_timeout(client=client, dp_id=dp_id, status=[PIPELINE_DOESNT_EXIST], timeout=timeout) 376 except DataPipelineNotFound: 377 return True 378 379 380def delete_pipeline(client, module): 381 """Deletes pipeline 382 383 """ 384 dp_name = module.params.get('name') 385 timeout = module.params.get('timeout') 386 387 try: 388 dp_id = pipeline_id(client, dp_name) 389 _delete_dp_with_check(dp_id, client, timeout) 390 changed = True 391 except DataPipelineNotFound: 392 changed = False 393 except TimeOutException: 394 module.fail_json(msg=('Data Pipeline {0} failed to delete' 395 'within timeout {1} seconds').format(dp_name, timeout)) 396 result = {'data_pipeline': {}, 397 'msg': 'Data Pipeline {0} deleted'.format(dp_name)} 398 399 return (changed, result) 400 401 402def build_unique_id(module): 403 data = dict(module.params) 404 # removing objects from the unique id so we can update objects or populate the pipeline after creation without needing to make a new pipeline 405 [data.pop(each, None) for each in ('objects', 'timeout')] 406 json_data = json.dumps(data, sort_keys=True).encode("utf-8") 407 hashed_data = hashlib.md5(json_data).hexdigest() 408 return hashed_data 409 410 411def format_tags(tags): 412 """ Reformats tags 413 414 :param dict tags: dict of data pipeline tags (e.g. {key1: val1, key2: val2, key3: val3}) 415 :returns: list of dicts (e.g. [{key: key1, value: val1}, {key: key2, value: val2}, {key: key3, value: val3}]) 416 417 """ 418 return [dict(key=k, value=v) for k, v in tags.items()] 419 420 421def get_result(client, dp_id): 422 """ Get the current state of the data pipeline and reformat it to snake_case for exit_json 423 424 :param object client: boto3 datapipeline client 425 :param string dp_id: pipeline id 426 :returns: reformatted dict of pipeline description 427 428 """ 429 # pipeline_description returns a pipelineDescriptionList of length 1 430 # dp is a dict with keys "description" (str), "fields" (list), "name" (str), "pipelineId" (str), "tags" (dict) 431 dp = pipeline_description(client, dp_id)['pipelineDescriptionList'][0] 432 433 # Get uniqueId and pipelineState in fields to add to the exit_json result 434 dp["unique_id"] = pipeline_field(client, dp_id, field="uniqueId") 435 dp["pipeline_state"] = pipeline_field(client, dp_id, field="@pipelineState") 436 437 # Remove fields; can't make a list snake_case and most of the data is redundant 438 del dp["fields"] 439 440 # Note: tags is already formatted fine so we don't need to do anything with it 441 442 # Reformat data pipeline and add reformatted fields back 443 dp = camel_dict_to_snake_dict(dp) 444 return dp 445 446 447def diff_pipeline(client, module, objects, unique_id, dp_name): 448 """Check if there's another pipeline with the same unique_id and if so, checks if the object needs to be updated 449 """ 450 result = {} 451 changed = False 452 create_dp = False 453 454 # See if there is already a pipeline with the same unique_id 455 unique_id = build_unique_id(module) 456 try: 457 dp_id = pipeline_id(client, dp_name) 458 dp_unique_id = to_text(pipeline_field(client, dp_id, field="uniqueId")) 459 if dp_unique_id != unique_id: 460 # A change is expected but not determined. Updated to a bool in create_pipeline(). 461 changed = "NEW_VERSION" 462 create_dp = True 463 # Unique ids are the same - check if pipeline needs modification 464 else: 465 dp_objects = client.get_pipeline_definition(pipelineId=dp_id)['pipelineObjects'] 466 # Definition needs to be updated 467 if dp_objects != objects: 468 changed, msg = define_pipeline(client, module, objects, dp_id) 469 # No changes 470 else: 471 msg = 'Data Pipeline {0} is present'.format(dp_name) 472 data_pipeline = get_result(client, dp_id) 473 result = {'data_pipeline': data_pipeline, 474 'msg': msg} 475 except DataPipelineNotFound: 476 create_dp = True 477 478 return create_dp, changed, result 479 480 481def define_pipeline(client, module, objects, dp_id): 482 """Puts pipeline definition 483 484 """ 485 dp_name = module.params.get('name') 486 487 if pipeline_field(client, dp_id, field="@pipelineState") == "FINISHED": 488 msg = 'Data Pipeline {0} is unable to be updated while in state FINISHED.'.format(dp_name) 489 changed = False 490 491 elif objects: 492 parameters = module.params.get('parameters') 493 values = module.params.get('values') 494 495 try: 496 client.put_pipeline_definition(pipelineId=dp_id, 497 pipelineObjects=objects, 498 parameterObjects=parameters, 499 parameterValues=values) 500 msg = 'Data Pipeline {0} has been updated.'.format(dp_name) 501 changed = True 502 except ClientError as e: 503 module.fail_json(msg="Failed to put the definition for pipeline {0}. Check that string/reference fields" 504 "are not empty and that the number of objects in the pipeline does not exceed maximum allowed" 505 "objects".format(dp_name), exception=traceback.format_exc()) 506 else: 507 changed = False 508 msg = "" 509 510 return changed, msg 511 512 513def create_pipeline(client, module): 514 """Creates datapipeline. Uses uniqueId to achieve idempotency. 515 516 """ 517 dp_name = module.params.get('name') 518 objects = module.params.get('objects', None) 519 description = module.params.get('description', '') 520 tags = module.params.get('tags') 521 timeout = module.params.get('timeout') 522 523 unique_id = build_unique_id(module) 524 create_dp, changed, result = diff_pipeline(client, module, objects, unique_id, dp_name) 525 526 if changed == "NEW_VERSION": 527 # delete old version 528 changed, _ = delete_pipeline(client, module) 529 530 # There isn't a pipeline or it has different parameters than the pipeline in existence. 531 if create_dp: 532 # Make pipeline 533 try: 534 tags = format_tags(tags) 535 dp = client.create_pipeline(name=dp_name, 536 uniqueId=unique_id, 537 description=description, 538 tags=tags) 539 dp_id = dp['pipelineId'] 540 pipeline_exists_timeout(client, dp_id, timeout) 541 except ClientError as e: 542 module.fail_json(msg="Failed to create the data pipeline {0}.".format(dp_name), exception=traceback.format_exc()) 543 except TimeOutException: 544 module.fail_json(msg=('Data Pipeline {0} failed to create' 545 'within timeout {1} seconds').format(dp_name, timeout)) 546 # Put pipeline definition 547 _, msg = define_pipeline(client, module, objects, dp_id) 548 549 changed = True 550 data_pipeline = get_result(client, dp_id) 551 result = {'data_pipeline': data_pipeline, 552 'msg': 'Data Pipeline {0} created.'.format(dp_name) + msg} 553 554 return (changed, result) 555 556 557def main(): 558 argument_spec = ec2_argument_spec() 559 argument_spec.update( 560 dict( 561 name=dict(required=True), 562 version=dict(required=False), 563 description=dict(required=False, default=''), 564 objects=dict(required=False, type='list', default=[]), 565 parameters=dict(required=False, type='list', default=[]), 566 timeout=dict(required=False, type='int', default=300), 567 state=dict(default='present', choices=['present', 'absent', 568 'active', 'inactive']), 569 tags=dict(required=False, type='dict', default={}), 570 values=dict(required=False, type='list', default=[]) 571 ) 572 ) 573 module = AnsibleModule(argument_spec, supports_check_mode=False) 574 575 if not HAS_BOTO3: 576 module.fail_json(msg='boto3 is required for the datapipeline module!') 577 578 try: 579 region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True) 580 if not region: 581 module.fail_json(msg="Region must be specified as a parameter, in EC2_REGION or AWS_REGION environment variables or in boto configuration file") 582 client = boto3_conn(module, conn_type='client', 583 resource='datapipeline', region=region, 584 endpoint=ec2_url, **aws_connect_kwargs) 585 except ClientError as e: 586 module.fail_json(msg="Can't authorize connection - " + str(e)) 587 588 state = module.params.get('state') 589 if state == 'present': 590 changed, result = create_pipeline(client, module) 591 elif state == 'absent': 592 changed, result = delete_pipeline(client, module) 593 elif state == 'active': 594 changed, result = activate_pipeline(client, module) 595 elif state == 'inactive': 596 changed, result = deactivate_pipeline(client, module) 597 598 module.exit_json(result=result, changed=changed) 599 600 601if __name__ == '__main__': 602 main() 603