1#!/usr/bin/python
2#
3# Copyright (c) 2017 Ansible Project
4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
5
6from __future__ import absolute_import, division, print_function
7__metaclass__ = type
8
9
10ANSIBLE_METADATA = {'metadata_version': '1.1',
11                    'status': ['preview'],
12                    'supported_by': 'community'}
13
14DOCUMENTATION = '''
15---
16module: data_pipeline
17version_added: "2.4"
18author:
19  - Raghu Udiyar (@raags) <raghusiddarth@gmail.com>
20  - Sloane Hertel (@s-hertel) <shertel@redhat.com>
21requirements: [ "boto3" ]
22short_description: Create and manage AWS Datapipelines
23extends_documentation_fragment:
24    - aws
25    - ec2
26description:
27    - Create and manage AWS Datapipelines. Creation is not idempotent in AWS, so the I(uniqueId) is created by hashing the options (minus objects)
28      given to the datapipeline.
29    - The pipeline definition must be in the format given here
30      U(https://docs.aws.amazon.com/datapipeline/latest/APIReference/API_PutPipelineDefinition.html#API_PutPipelineDefinition_RequestSyntax).
31    - Also operations will wait for a configurable amount of time to ensure the pipeline is in the requested state.
32options:
33  name:
34    description:
35      - The name of the Datapipeline to create/modify/delete.
36    required: true
37  description:
38    description:
39      - An optional description for the pipeline being created.
40    default: ''
41  objects:
42    description:
43      - A list of pipeline object definitions, each of which is a dict that takes the keys C(id), C(name) and C(fields).
44    suboptions:
45      id:
46        description:
47          - The ID of the object.
48      name:
49        description:
50          - The name of the object.
51      fields:
52        description:
53          - A list of dicts that take the keys C(key) and C(stringValue)/C(refValue).
54            The value is specified as a reference to another object C(refValue) or as a string value C(stringValue)
55            but not as both.
56  parameters:
57    description:
58      - A list of parameter objects (dicts) in the pipeline definition.
59    suboptions:
60      id:
61        description:
62          - The ID of the parameter object.
63      attributes:
64        description:
65          - A list of attributes (dicts) of the parameter object. Each attribute takes the keys C(key) and C(stringValue) both
66            of which are strings.
67  values:
68    description:
69      - A list of parameter values (dicts) in the pipeline definition. Each dict takes the keys C(id) and C(stringValue) both
70        of which are strings.
71  timeout:
72    description:
73      - Time in seconds to wait for the pipeline to transition to the requested state, fail otherwise.
74    default: 300
75  state:
76    description:
77      - The requested state of the pipeline.
78    choices: ['present', 'absent', 'active', 'inactive']
79    default: present
80  tags:
81    description:
82      - A dict of key:value pair(s) to add to the pipeline.
83'''
84
85EXAMPLES = '''
86# Note: These examples do not set authentication details, see the AWS Guide for details.
87
88# Create pipeline
89- data_pipeline:
90    name: test-dp
91    region: us-west-2
92    objects: "{{pipelineObjects}}"
93    parameters: "{{pipelineParameters}}"
94    values: "{{pipelineValues}}"
95    tags:
96      key1: val1
97      key2: val2
98    state: present
99
100# Example populating and activating a pipeline that demonstrates two ways of providing pipeline objects
101- data_pipeline:
102  name: test-dp
103  objects:
104    - "id": "DefaultSchedule"
105      "name": "Every 1 day"
106      "fields":
107        - "key": "period"
108          "stringValue": "1 days"
109        - "key": "type"
110          "stringValue": "Schedule"
111        - "key": "startAt"
112          "stringValue": "FIRST_ACTIVATION_DATE_TIME"
113    - "id": "Default"
114      "name": "Default"
115      "fields": [ { "key": "resourceRole", "stringValue": "my_resource_role" },
116                  { "key": "role", "stringValue": "DataPipelineDefaultRole" },
117                  { "key": "pipelineLogUri", "stringValue": "s3://my_s3_log.txt" },
118                  { "key": "scheduleType", "stringValue": "cron" },
119                  { "key": "schedule", "refValue": "DefaultSchedule" },
120                  { "key": "failureAndRerunMode", "stringValue": "CASCADE" } ]
121  state: active
122
123# Activate pipeline
124- data_pipeline:
125    name: test-dp
126    region: us-west-2
127    state: active
128
129# Delete pipeline
130- data_pipeline:
131    name: test-dp
132    region: us-west-2
133    state: absent
134
135'''
136
137RETURN = '''
138changed:
139  description: whether the data pipeline has been modified
140  type: bool
141  returned: always
142  sample:
143    changed: true
144result:
145  description:
146    - Contains the data pipeline data (data_pipeline) and a return message (msg).
147      If the data pipeline exists data_pipeline will contain the keys description, name,
148      pipeline_id, state, tags, and unique_id. If the data pipeline does not exist then
149      data_pipeline will be an empty dict. The msg describes the status of the operation.
150  returned: always
151  type: dict
152'''
153
154import hashlib
155import json
156import time
157import traceback
158
159try:
160    import boto3
161    from botocore.exceptions import ClientError
162    HAS_BOTO3 = True
163except ImportError:
164    HAS_BOTO3 = False
165
166from ansible.module_utils.basic import AnsibleModule
167from ansible.module_utils.ec2 import ec2_argument_spec, get_aws_connection_info, boto3_conn, camel_dict_to_snake_dict
168from ansible.module_utils._text import to_text
169
170
171DP_ACTIVE_STATES = ['ACTIVE', 'SCHEDULED']
172DP_INACTIVE_STATES = ['INACTIVE', 'PENDING', 'FINISHED', 'DELETING']
173DP_ACTIVATING_STATE = 'ACTIVATING'
174DP_DEACTIVATING_STATE = 'DEACTIVATING'
175PIPELINE_DOESNT_EXIST = '^.*Pipeline with id: {0} does not exist$'
176
177
178class DataPipelineNotFound(Exception):
179    pass
180
181
182class TimeOutException(Exception):
183    pass
184
185
186def pipeline_id(client, name):
187    """Return pipeline id for the given pipeline name
188
189    :param object client: boto3 datapipeline client
190    :param string name: pipeline name
191    :returns: pipeline id
192    :raises: DataPipelineNotFound
193
194    """
195    pipelines = client.list_pipelines()
196    for dp in pipelines['pipelineIdList']:
197        if dp['name'] == name:
198            return dp['id']
199    raise DataPipelineNotFound
200
201
202def pipeline_description(client, dp_id):
203    """Return pipeline description list
204
205    :param object client: boto3 datapipeline client
206    :returns: pipeline description dictionary
207    :raises: DataPipelineNotFound
208
209    """
210    try:
211        return client.describe_pipelines(pipelineIds=[dp_id])
212    except ClientError as e:
213        raise DataPipelineNotFound
214
215
216def pipeline_field(client, dp_id, field):
217    """Return a pipeline field from the pipeline description.
218
219    The available fields are listed in describe_pipelines output.
220
221    :param object client: boto3 datapipeline client
222    :param string dp_id: pipeline id
223    :param string field: pipeline description field
224    :returns: pipeline field information
225
226    """
227    dp_description = pipeline_description(client, dp_id)
228    for field_key in dp_description['pipelineDescriptionList'][0]['fields']:
229        if field_key['key'] == field:
230            return field_key['stringValue']
231    raise KeyError("Field key {0} not found!".format(field))
232
233
234def run_with_timeout(timeout, func, *func_args, **func_kwargs):
235    """Run func with the provided args and kwargs, and wait utill
236    timeout for truthy return value
237
238    :param int timeout: time to wait for status
239    :param function func: function to run, should return True or False
240    :param args func_args: function args to pass to func
241    :param kwargs func_kwargs: function key word args
242    :returns: True if func returns truthy within timeout
243    :raises: TimeOutException
244
245    """
246
247    for _ in range(timeout // 10):
248        if func(*func_args, **func_kwargs):
249            return True
250        else:
251            # check every 10s
252            time.sleep(10)
253
254    raise TimeOutException
255
256
257def check_dp_exists(client, dp_id):
258    """Check if datapipeline exists
259
260    :param object client: boto3 datapipeline client
261    :param string dp_id: pipeline id
262    :returns: True or False
263
264    """
265    try:
266        # pipeline_description raises DataPipelineNotFound
267        if pipeline_description(client, dp_id):
268            return True
269        else:
270            return False
271    except DataPipelineNotFound:
272        return False
273
274
275def check_dp_status(client, dp_id, status):
276    """Checks if datapipeline matches states in status list
277
278    :param object client: boto3 datapipeline client
279    :param string dp_id: pipeline id
280    :param list status: list of states to check against
281    :returns: True or False
282
283    """
284    if not isinstance(status, list):
285        raise AssertionError()
286    if pipeline_field(client, dp_id, field="@pipelineState") in status:
287        return True
288    else:
289        return False
290
291
292def pipeline_status_timeout(client, dp_id, status, timeout):
293    args = (client, dp_id, status)
294    return run_with_timeout(timeout, check_dp_status, *args)
295
296
297def pipeline_exists_timeout(client, dp_id, timeout):
298    args = (client, dp_id)
299    return run_with_timeout(timeout, check_dp_exists, *args)
300
301
302def activate_pipeline(client, module):
303    """Activates pipeline
304
305    """
306    dp_name = module.params.get('name')
307    timeout = module.params.get('timeout')
308
309    try:
310        dp_id = pipeline_id(client, dp_name)
311    except DataPipelineNotFound:
312        module.fail_json(msg='Data Pipeline {0} not found'.format(dp_name))
313
314    if pipeline_field(client, dp_id, field="@pipelineState") in DP_ACTIVE_STATES:
315        changed = False
316    else:
317        try:
318            client.activate_pipeline(pipelineId=dp_id)
319        except ClientError as e:
320            if e.response["Error"]["Code"] == "InvalidRequestException":
321                module.fail_json(msg="You need to populate your pipeline before activation.")
322        try:
323            pipeline_status_timeout(client, dp_id, status=DP_ACTIVE_STATES,
324                                    timeout=timeout)
325        except TimeOutException:
326            if pipeline_field(client, dp_id, field="@pipelineState") == "FINISHED":
327                # activated but completed more rapidly than it was checked
328                pass
329            else:
330                module.fail_json(msg=('Data Pipeline {0} failed to activate '
331                                      'within timeout {1} seconds').format(dp_name, timeout))
332        changed = True
333
334    data_pipeline = get_result(client, dp_id)
335    result = {'data_pipeline': data_pipeline,
336              'msg': 'Data Pipeline {0} activated.'.format(dp_name)}
337
338    return (changed, result)
339
340
341def deactivate_pipeline(client, module):
342    """Deactivates pipeline
343
344    """
345    dp_name = module.params.get('name')
346    timeout = module.params.get('timeout')
347
348    try:
349        dp_id = pipeline_id(client, dp_name)
350    except DataPipelineNotFound:
351        module.fail_json(msg='Data Pipeline {0} not found'.format(dp_name))
352
353    if pipeline_field(client, dp_id, field="@pipelineState") in DP_INACTIVE_STATES:
354        changed = False
355    else:
356        client.deactivate_pipeline(pipelineId=dp_id)
357        try:
358            pipeline_status_timeout(client, dp_id, status=DP_INACTIVE_STATES,
359                                    timeout=timeout)
360        except TimeOutException:
361            module.fail_json(msg=('Data Pipeline {0} failed to deactivate'
362                                  'within timeout {1} seconds').format(dp_name, timeout))
363        changed = True
364
365    data_pipeline = get_result(client, dp_id)
366    result = {'data_pipeline': data_pipeline,
367              'msg': 'Data Pipeline {0} deactivated.'.format(dp_name)}
368
369    return (changed, result)
370
371
372def _delete_dp_with_check(dp_id, client, timeout):
373    client.delete_pipeline(pipelineId=dp_id)
374    try:
375        pipeline_status_timeout(client=client, dp_id=dp_id, status=[PIPELINE_DOESNT_EXIST], timeout=timeout)
376    except DataPipelineNotFound:
377        return True
378
379
380def delete_pipeline(client, module):
381    """Deletes pipeline
382
383    """
384    dp_name = module.params.get('name')
385    timeout = module.params.get('timeout')
386
387    try:
388        dp_id = pipeline_id(client, dp_name)
389        _delete_dp_with_check(dp_id, client, timeout)
390        changed = True
391    except DataPipelineNotFound:
392        changed = False
393    except TimeOutException:
394        module.fail_json(msg=('Data Pipeline {0} failed to delete'
395                              'within timeout {1} seconds').format(dp_name, timeout))
396    result = {'data_pipeline': {},
397              'msg': 'Data Pipeline {0} deleted'.format(dp_name)}
398
399    return (changed, result)
400
401
402def build_unique_id(module):
403    data = dict(module.params)
404    # removing objects from the unique id so we can update objects or populate the pipeline after creation without needing to make a new pipeline
405    [data.pop(each, None) for each in ('objects', 'timeout')]
406    json_data = json.dumps(data, sort_keys=True).encode("utf-8")
407    hashed_data = hashlib.md5(json_data).hexdigest()
408    return hashed_data
409
410
411def format_tags(tags):
412    """ Reformats tags
413
414    :param dict tags: dict of data pipeline tags (e.g. {key1: val1, key2: val2, key3: val3})
415    :returns: list of dicts (e.g. [{key: key1, value: val1}, {key: key2, value: val2}, {key: key3, value: val3}])
416
417    """
418    return [dict(key=k, value=v) for k, v in tags.items()]
419
420
421def get_result(client, dp_id):
422    """ Get the current state of the data pipeline and reformat it to snake_case for exit_json
423
424    :param object client: boto3 datapipeline client
425    :param string dp_id: pipeline id
426    :returns: reformatted dict of pipeline description
427
428     """
429    # pipeline_description returns a pipelineDescriptionList of length 1
430    # dp is a dict with keys "description" (str), "fields" (list), "name" (str), "pipelineId" (str), "tags" (dict)
431    dp = pipeline_description(client, dp_id)['pipelineDescriptionList'][0]
432
433    # Get uniqueId and pipelineState in fields to add to the exit_json result
434    dp["unique_id"] = pipeline_field(client, dp_id, field="uniqueId")
435    dp["pipeline_state"] = pipeline_field(client, dp_id, field="@pipelineState")
436
437    # Remove fields; can't make a list snake_case and most of the data is redundant
438    del dp["fields"]
439
440    # Note: tags is already formatted fine so we don't need to do anything with it
441
442    # Reformat data pipeline and add reformatted fields back
443    dp = camel_dict_to_snake_dict(dp)
444    return dp
445
446
447def diff_pipeline(client, module, objects, unique_id, dp_name):
448    """Check if there's another pipeline with the same unique_id and if so, checks if the object needs to be updated
449    """
450    result = {}
451    changed = False
452    create_dp = False
453
454    # See if there is already a pipeline with the same unique_id
455    unique_id = build_unique_id(module)
456    try:
457        dp_id = pipeline_id(client, dp_name)
458        dp_unique_id = to_text(pipeline_field(client, dp_id, field="uniqueId"))
459        if dp_unique_id != unique_id:
460            # A change is expected but not determined. Updated to a bool in create_pipeline().
461            changed = "NEW_VERSION"
462            create_dp = True
463        # Unique ids are the same - check if pipeline needs modification
464        else:
465            dp_objects = client.get_pipeline_definition(pipelineId=dp_id)['pipelineObjects']
466            # Definition needs to be updated
467            if dp_objects != objects:
468                changed, msg = define_pipeline(client, module, objects, dp_id)
469            # No changes
470            else:
471                msg = 'Data Pipeline {0} is present'.format(dp_name)
472            data_pipeline = get_result(client, dp_id)
473            result = {'data_pipeline': data_pipeline,
474                      'msg': msg}
475    except DataPipelineNotFound:
476        create_dp = True
477
478    return create_dp, changed, result
479
480
481def define_pipeline(client, module, objects, dp_id):
482    """Puts pipeline definition
483
484    """
485    dp_name = module.params.get('name')
486
487    if pipeline_field(client, dp_id, field="@pipelineState") == "FINISHED":
488        msg = 'Data Pipeline {0} is unable to be updated while in state FINISHED.'.format(dp_name)
489        changed = False
490
491    elif objects:
492        parameters = module.params.get('parameters')
493        values = module.params.get('values')
494
495        try:
496            client.put_pipeline_definition(pipelineId=dp_id,
497                                           pipelineObjects=objects,
498                                           parameterObjects=parameters,
499                                           parameterValues=values)
500            msg = 'Data Pipeline {0} has been updated.'.format(dp_name)
501            changed = True
502        except ClientError as e:
503            module.fail_json(msg="Failed to put the definition for pipeline {0}. Check that string/reference fields"
504                             "are not empty and that the number of objects in the pipeline does not exceed maximum allowed"
505                             "objects".format(dp_name), exception=traceback.format_exc())
506    else:
507        changed = False
508        msg = ""
509
510    return changed, msg
511
512
513def create_pipeline(client, module):
514    """Creates datapipeline. Uses uniqueId to achieve idempotency.
515
516    """
517    dp_name = module.params.get('name')
518    objects = module.params.get('objects', None)
519    description = module.params.get('description', '')
520    tags = module.params.get('tags')
521    timeout = module.params.get('timeout')
522
523    unique_id = build_unique_id(module)
524    create_dp, changed, result = diff_pipeline(client, module, objects, unique_id, dp_name)
525
526    if changed == "NEW_VERSION":
527        # delete old version
528        changed, _ = delete_pipeline(client, module)
529
530    # There isn't a pipeline or it has different parameters than the pipeline in existence.
531    if create_dp:
532        # Make pipeline
533        try:
534            tags = format_tags(tags)
535            dp = client.create_pipeline(name=dp_name,
536                                        uniqueId=unique_id,
537                                        description=description,
538                                        tags=tags)
539            dp_id = dp['pipelineId']
540            pipeline_exists_timeout(client, dp_id, timeout)
541        except ClientError as e:
542            module.fail_json(msg="Failed to create the data pipeline {0}.".format(dp_name), exception=traceback.format_exc())
543        except TimeOutException:
544            module.fail_json(msg=('Data Pipeline {0} failed to create'
545                                  'within timeout {1} seconds').format(dp_name, timeout))
546        # Put pipeline definition
547        _, msg = define_pipeline(client, module, objects, dp_id)
548
549        changed = True
550        data_pipeline = get_result(client, dp_id)
551        result = {'data_pipeline': data_pipeline,
552                  'msg': 'Data Pipeline {0} created.'.format(dp_name) + msg}
553
554    return (changed, result)
555
556
557def main():
558    argument_spec = ec2_argument_spec()
559    argument_spec.update(
560        dict(
561            name=dict(required=True),
562            version=dict(required=False),
563            description=dict(required=False, default=''),
564            objects=dict(required=False, type='list', default=[]),
565            parameters=dict(required=False, type='list', default=[]),
566            timeout=dict(required=False, type='int', default=300),
567            state=dict(default='present', choices=['present', 'absent',
568                                                   'active', 'inactive']),
569            tags=dict(required=False, type='dict', default={}),
570            values=dict(required=False, type='list', default=[])
571        )
572    )
573    module = AnsibleModule(argument_spec, supports_check_mode=False)
574
575    if not HAS_BOTO3:
576        module.fail_json(msg='boto3 is required for the datapipeline module!')
577
578    try:
579        region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)
580        if not region:
581            module.fail_json(msg="Region must be specified as a parameter, in EC2_REGION or AWS_REGION environment variables or in boto configuration file")
582        client = boto3_conn(module, conn_type='client',
583                            resource='datapipeline', region=region,
584                            endpoint=ec2_url, **aws_connect_kwargs)
585    except ClientError as e:
586        module.fail_json(msg="Can't authorize connection - " + str(e))
587
588    state = module.params.get('state')
589    if state == 'present':
590        changed, result = create_pipeline(client, module)
591    elif state == 'absent':
592        changed, result = delete_pipeline(client, module)
593    elif state == 'active':
594        changed, result = activate_pipeline(client, module)
595    elif state == 'inactive':
596        changed, result = deactivate_pipeline(client, module)
597
598    module.exit_json(result=result, changed=changed)
599
600
601if __name__ == '__main__':
602    main()
603