1"""
2Manage Data Pipelines
3
4.. versionadded:: 2016.3.0
5
6Be aware that this interacts with Amazon's services, and so may incur charges.
7
8This module uses ``boto3``, which can be installed via package, or pip.
9
10This module accepts explicit AWS credentials but can also utilize
11IAM roles assigned to the instance through Instance Profiles. Dynamic
12credentials are then automatically obtained from AWS API and no further
13configuration is necessary. More information available `here
14<http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html>`_.
15
16If IAM roles are not used you need to specify them either in a pillar file or
17in the minion's config file:
18
19.. code-block:: yaml
20
21    datapipeline.keyid: GKTADJGHEIQSXMKKRBJ08H
22    datapipeline.key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
23
24It's also possible to specify ``key``, ``keyid`` and ``region`` via a profile,
25either passed in as a dict, or as a string to pull from pillars or minion
26config:
27
28.. code-block:: yaml
29
30  myprofile:
31    keyid: GKTADJGHEIQSXMKKRBJ08H
32    key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
33    region: us-east-1
34
35.. code-block:: yaml
36
37  Ensure daily data pipeline exists:
38    boto_datapipeline.present:
39      - name: my-datapipeline
40      - pipeline_objects:
41          DefaultSchedule:
42            name: Every 1 day
43            fields:
44              period: 1 Day
45              type: Schedule
46              startAt: FIRST_ACTIVATION_DATE_TIME
47      - parameter_values:
48          myDDBTableName: my-dynamo-table
49"""
50
51
52import copy
53import datetime
54import difflib
55
56import salt.utils.data
57import salt.utils.json
58
59
60def __virtual__():
61    """
62    Only load if boto is available.
63    """
64    if "boto_datapipeline.create_pipeline" in __salt__:
65        return "boto_datapipeline"
66    return (False, "boto_datapipeline module could not be loaded")
67
68
69def present(
70    name,
71    pipeline_objects=None,
72    pipeline_objects_from_pillars="boto_datapipeline_pipeline_objects",
73    parameter_objects=None,
74    parameter_objects_from_pillars="boto_datapipeline_parameter_objects",
75    parameter_values=None,
76    parameter_values_from_pillars="boto_datapipeline_parameter_values",
77    region=None,
78    key=None,
79    keyid=None,
80    profile=None,
81):
82    """
83    Ensure the data pipeline exists with matching definition.
84
85    name
86        Name of the service to ensure a data pipeline exists for.
87
88    pipeline_objects
89        Pipeline objects to use. Will override objects read from pillars.
90
91    pipeline_objects_from_pillars
92        The pillar key to use for lookup.
93
94    parameter_objects
95        Parameter objects to use. Will override objects read from pillars.
96
97    parameter_objects_from_pillars
98        The pillar key to use for lookup.
99
100    parameter_values
101        Parameter values to use. Will override values read from pillars.
102
103    parameter_values_from_pillars
104        The pillar key to use for lookup.
105
106    region
107        Region to connect to.
108
109    key
110        Secret key to be used.
111
112    keyid
113        Access key to be used.
114
115    profile
116        A dict with region, key and keyid, or a pillar key (string)
117        that contains a dict with region, key and keyid.
118    """
119    ret = {"name": name, "result": True, "comment": "", "changes": {}}
120
121    pipeline_objects = pipeline_objects or {}
122    parameter_objects = parameter_objects or {}
123    parameter_values = parameter_values or {}
124
125    present, old_pipeline_definition = _pipeline_present_with_definition(
126        name,
127        _pipeline_objects(pipeline_objects_from_pillars, pipeline_objects),
128        _parameter_objects(parameter_objects_from_pillars, parameter_objects),
129        _parameter_values(parameter_values_from_pillars, parameter_values),
130        region=region,
131        key=key,
132        keyid=keyid,
133        profile=profile,
134    )
135    if present:
136        ret["comment"] = "AWS data pipeline {} present".format(name)
137        return ret
138
139    if __opts__["test"]:
140        ret["comment"] = "Data pipeline {} is set to be created or updated".format(name)
141        ret["result"] = None
142        return ret
143
144    result_create_pipeline = __salt__["boto_datapipeline.create_pipeline"](
145        name,
146        name,
147        region=region,
148        key=key,
149        keyid=keyid,
150        profile=profile,
151    )
152    if "error" in result_create_pipeline:
153        ret["result"] = False
154        ret["comment"] = "Failed to create data pipeline {}: {}".format(
155            name, result_create_pipeline["error"]
156        )
157        return ret
158
159    pipeline_id = result_create_pipeline["result"]
160
161    result_pipeline_definition = __salt__["boto_datapipeline.put_pipeline_definition"](
162        pipeline_id,
163        _pipeline_objects(pipeline_objects_from_pillars, pipeline_objects),
164        parameter_objects=_parameter_objects(
165            parameter_objects_from_pillars, parameter_objects
166        ),
167        parameter_values=_parameter_values(
168            parameter_values_from_pillars, parameter_values
169        ),
170        region=region,
171        key=key,
172        keyid=keyid,
173        profile=profile,
174    )
175    if "error" in result_pipeline_definition:
176        if _immutable_fields_error(result_pipeline_definition):
177            # If update not possible, delete and retry
178            result_delete_pipeline = __salt__["boto_datapipeline.delete_pipeline"](
179                pipeline_id,
180                region=region,
181                key=key,
182                keyid=keyid,
183                profile=profile,
184            )
185            if "error" in result_delete_pipeline:
186                ret["result"] = False
187                ret["comment"] = "Failed to delete data pipeline {}: {}".format(
188                    pipeline_id, result_delete_pipeline["error"]
189                )
190                return ret
191
192            result_create_pipeline = __salt__["boto_datapipeline.create_pipeline"](
193                name,
194                name,
195                region=region,
196                key=key,
197                keyid=keyid,
198                profile=profile,
199            )
200            if "error" in result_create_pipeline:
201                ret["result"] = False
202                ret["comment"] = "Failed to create data pipeline {}: {}".format(
203                    name, result_create_pipeline["error"]
204                )
205                return ret
206
207            pipeline_id = result_create_pipeline["result"]
208
209            result_pipeline_definition = __salt__[
210                "boto_datapipeline.put_pipeline_definition"
211            ](
212                pipeline_id,
213                _pipeline_objects(pipeline_objects_from_pillars, pipeline_objects),
214                parameter_objects=_parameter_objects(
215                    parameter_objects_from_pillars, parameter_objects
216                ),
217                parameter_values=_parameter_values(
218                    parameter_values_from_pillars, parameter_values
219                ),
220                region=region,
221                key=key,
222                keyid=keyid,
223                profile=profile,
224            )
225
226        if "error" in result_pipeline_definition:
227            # Still erroring after possible retry
228            ret["result"] = False
229            ret["comment"] = "Failed to create data pipeline {}: {}".format(
230                name, result_pipeline_definition["error"]
231            )
232            return ret
233
234    result_activate_pipeline = __salt__["boto_datapipeline.activate_pipeline"](
235        pipeline_id,
236        region=region,
237        key=key,
238        keyid=keyid,
239        profile=profile,
240    )
241    if "error" in result_activate_pipeline:
242        ret["result"] = False
243        ret["comment"] = "Failed to create data pipeline {}: {}".format(
244            name, result_pipeline_definition["error"]
245        )
246        return ret
247
248    pipeline_definition_result = __salt__["boto_datapipeline.get_pipeline_definition"](
249        pipeline_id,
250        version="active",
251        region=region,
252        key=key,
253        keyid=keyid,
254        profile=profile,
255    )
256    if "error" in pipeline_definition_result:
257        new_pipeline_definition = {}
258    else:
259        new_pipeline_definition = _standardize(pipeline_definition_result["result"])
260
261    if not old_pipeline_definition:
262        ret["changes"]["new"] = "Pipeline created."
263        ret["comment"] = "Data pipeline {} created".format(name)
264    else:
265        ret["changes"]["diff"] = _diff(old_pipeline_definition, new_pipeline_definition)
266        ret["comment"] = "Data pipeline {} updated".format(name)
267
268    return ret
269
270
271def _immutable_fields_error(result_pipeline_definition):
272    """Return true if update pipeline failed due to immutable fields
273
274    Some fields cannot be changed after a pipeline has been activated.
275
276    http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-manage-pipeline-modify-console.html#dp-edit-pipeline-limits
277    """
278    for e in result_pipeline_definition["error"]:
279        for e2 in e["errors"]:
280            if "can not be changed" in e2:
281                return True
282    return False
283
284
285def _pipeline_present_with_definition(
286    name,
287    expected_pipeline_objects,
288    expected_parameter_objects,
289    expected_parameter_values,
290    region,
291    key,
292    keyid,
293    profile,
294):
295    """
296    Return true if the pipeline exists and the definition matches.
297
298    name
299        The name of the pipeline.
300
301    expected_pipeline_objects
302        Pipeline objects that must match the definition.
303
304    expected_parameter_objects
305        Parameter objects that must match the definition.
306
307    expected_parameter_values
308        Parameter values that must match the definition.
309
310    region
311        Region to connect to.
312
313    key
314        Secret key to be used.
315
316    keyid
317        Access key to be used.
318
319    profile
320        A dict with region, key and keyid, or a pillar key (string)
321        that contains a dict with region, key and keyid.
322    """
323    result_pipeline_id = __salt__["boto_datapipeline.pipeline_id_from_name"](
324        name,
325        region=region,
326        key=key,
327        keyid=keyid,
328        profile=profile,
329    )
330    if "error" in result_pipeline_id:
331        return False, {}
332
333    pipeline_id = result_pipeline_id["result"]
334    pipeline_definition_result = __salt__["boto_datapipeline.get_pipeline_definition"](
335        pipeline_id,
336        version="active",
337        region=region,
338        key=key,
339        keyid=keyid,
340        profile=profile,
341    )
342    if "error" in pipeline_definition_result:
343        return False, {}
344
345    pipeline_definition = _standardize(pipeline_definition_result["result"])
346
347    pipeline_objects = pipeline_definition.get("pipelineObjects")
348    parameter_objects = pipeline_definition.get("parameterObjects")
349    parameter_values = pipeline_definition.get("parameterValues")
350
351    present = (
352        _recursive_compare(
353            _cleaned(pipeline_objects), _cleaned(expected_pipeline_objects)
354        )
355        and _recursive_compare(parameter_objects, expected_parameter_objects)
356        and _recursive_compare(parameter_values, expected_parameter_values)
357    )
358    return present, pipeline_definition
359
360
361def _cleaned(_pipeline_objects):
362    """Return standardized pipeline objects to be used for comparing
363
364    Remove year, month, and day components of the startDateTime so that data
365    pipelines with the same time of day but different days are considered
366    equal.
367    """
368    pipeline_objects = copy.deepcopy(_pipeline_objects)
369    for pipeline_object in pipeline_objects:
370        if pipeline_object["id"] == "DefaultSchedule":
371            for field_object in pipeline_object["fields"]:
372                if field_object["key"] == "startDateTime":
373                    start_date_time_string = field_object["stringValue"]
374                    start_date_time = datetime.datetime.strptime(
375                        start_date_time_string, "%Y-%m-%dT%H:%M:%S"
376                    )
377                    field_object["stringValue"] = start_date_time.strftime("%H:%M:%S")
378    return pipeline_objects
379
380
381def _recursive_compare(v1, v2):
382    """
383    Return v1 == v2. Compares list, dict, recursively.
384    """
385    if isinstance(v1, list):
386        if v2 is None:
387            v2 = []
388        if len(v1) != len(v2):
389            return False
390        v1.sort(key=_id_or_key)
391        v2.sort(key=_id_or_key)
392        for x, y in zip(v1, v2):
393            if not _recursive_compare(x, y):
394                return False
395        return True
396    elif isinstance(v1, dict):
397        if v2 is None:
398            v2 = {}
399        v1 = dict(v1)
400        v2 = dict(v2)
401        if sorted(v1) != sorted(v2):
402            return False
403        for k in v1:
404            if not _recursive_compare(v1[k], v2[k]):
405                return False
406        return True
407    else:
408        return v1 == v2
409
410
411def _id_or_key(list_item):
412    """
413    Return the value at key 'id' or 'key'.
414    """
415    if isinstance(list_item, dict):
416        if "id" in list_item:
417            return list_item["id"]
418        if "key" in list_item:
419            return list_item["key"]
420    return list_item
421
422
423def _diff(old_pipeline_definition, new_pipeline_definition):
424    """
425    Return string diff of pipeline definitions.
426    """
427    old_pipeline_definition.pop("ResponseMetadata", None)
428    new_pipeline_definition.pop("ResponseMetadata", None)
429
430    diff = salt.utils.data.decode(
431        difflib.unified_diff(
432            salt.utils.json.dumps(old_pipeline_definition, indent=4).splitlines(True),
433            salt.utils.json.dumps(new_pipeline_definition, indent=4).splitlines(True),
434        )
435    )
436    return "".join(diff)
437
438
439def _standardize(structure):
440    """
441    Return standardized format for lists/dictionaries.
442
443    Lists of dictionaries are sorted by the value of the dictionary at
444    its primary key ('id' or 'key'). OrderedDict's are converted to
445    basic dictionaries.
446    """
447
448    def mutating_helper(structure):
449        if isinstance(structure, list):
450            structure.sort(key=_id_or_key)
451            for each in structure:
452                mutating_helper(each)
453        elif isinstance(structure, dict):
454            structure = dict(structure)
455            for k, v in structure.items():
456                mutating_helper(k)
457                mutating_helper(v)
458
459    new_structure = copy.deepcopy(structure)
460    mutating_helper(new_structure)
461    return new_structure
462
463
464def _pipeline_objects(pipeline_objects_from_pillars, pipeline_object_overrides):
465    """
466    Return a list of pipeline objects that compose the pipeline
467
468    pipeline_objects_from_pillars
469        The pillar key to use for lookup
470
471    pipeline_object_overrides
472        Pipeline objects to use. Will override objects read from pillars.
473    """
474    from_pillars = copy.deepcopy(__salt__["pillar.get"](pipeline_objects_from_pillars))
475    from_pillars.update(pipeline_object_overrides)
476    pipeline_objects = _standardize(_dict_to_list_ids(from_pillars))
477    for pipeline_object in pipeline_objects:
478        pipeline_object["fields"] = _properties_from_dict(pipeline_object["fields"])
479    return pipeline_objects
480
481
482def _parameter_objects(parameter_objects_from_pillars, parameter_object_overrides):
483    """
484    Return a list of parameter objects that configure the pipeline
485
486    parameter_objects_from_pillars
487        The pillar key to use for lookup
488
489    parameter_object_overrides
490        Parameter objects to use. Will override objects read from pillars.
491    """
492    from_pillars = copy.deepcopy(__salt__["pillar.get"](parameter_objects_from_pillars))
493    from_pillars.update(parameter_object_overrides)
494    parameter_objects = _standardize(_dict_to_list_ids(from_pillars))
495    for parameter_object in parameter_objects:
496        parameter_object["attributes"] = _properties_from_dict(
497            parameter_object["attributes"]
498        )
499    return parameter_objects
500
501
502def _parameter_values(parameter_values_from_pillars, parameter_value_overrides):
503    """
504    Return a dictionary of parameter values that configure the pipeline
505
506    parameter_values_from_pillars
507        The pillar key to use for lookup
508
509    parameter_value_overrides
510        Parameter values to use. Will override values read from pillars.
511    """
512    from_pillars = copy.deepcopy(__salt__["pillar.get"](parameter_values_from_pillars))
513    from_pillars.update(parameter_value_overrides)
514    parameter_values = _standardize(from_pillars)
515    return _properties_from_dict(parameter_values, key_name="id")
516
517
518def _dict_to_list_ids(objects):
519    """
520    Convert a dictionary to a list of dictionaries, where each element has
521    a key value pair {'id': key}. This makes it easy to override pillar values
522    while still satisfying the boto api.
523    """
524    list_with_ids = []
525    for key, value in objects.items():
526        element = {"id": key}
527        element.update(value)
528        list_with_ids.append(element)
529    return list_with_ids
530
531
532def _properties_from_dict(d, key_name="key"):
533    """
534    Transforms dictionary into pipeline object properties.
535
536    The output format conforms to boto's specification.
537
538    Example input:
539        {
540            'a': '1',
541            'b': {
542                'ref': '2'
543            },
544        }
545
546    Example output:
547        [
548            {
549                'key': 'a',
550                'stringValue': '1',
551            },
552            {
553                'key': 'b',
554                'refValue': '2',
555            },
556        ]
557    """
558    fields = []
559    for key, value in d.items():
560        if isinstance(value, dict):
561            fields.append({key_name: key, "refValue": value["ref"]})
562        else:
563            fields.append({key_name: key, "stringValue": value})
564    return fields
565
566
567def absent(name, region=None, key=None, keyid=None, profile=None):
568    """
569    Ensure a pipeline with the service_name does not exist
570
571    name
572        Name of the service to ensure a data pipeline does not exist for.
573
574    region
575        Region to connect to.
576
577    key
578        Secret key to be used.
579
580    keyid
581        Access key to be used.
582
583    profile
584        A dict with region, key and keyid, or a pillar key (string)
585        that contains a dict with region, key and keyid.
586    """
587    ret = {"name": name, "result": True, "comment": "", "changes": {}}
588
589    result_pipeline_id = __salt__["boto_datapipeline.pipeline_id_from_name"](
590        name,
591        region=region,
592        key=key,
593        keyid=keyid,
594        profile=profile,
595    )
596    if "error" not in result_pipeline_id:
597        pipeline_id = result_pipeline_id["result"]
598        if __opts__["test"]:
599            ret["comment"] = "Data pipeline {} set to be deleted.".format(name)
600            ret["result"] = None
601            return ret
602        else:
603            __salt__["boto_datapipeline.delete_pipeline"](
604                pipeline_id,
605                region=region,
606                key=key,
607                keyid=keyid,
608                profile=profile,
609            )
610            ret["changes"]["old"] = {"pipeline_id": pipeline_id}
611            ret["changes"]["new"] = None
612    else:
613        ret["comment"] = "AWS data pipeline {} absent.".format(name)
614
615    return ret
616