1"""
2Manage Kinesis Streams
3======================
4
5.. versionadded:: 2017.7.0
6
7Create and destroy Kinesis streams. Be aware that this interacts with Amazon's
8services, and so may incur charges.
9
10This module uses ``boto3``, which can be installed via package, or pip.
11
12This module accepts explicit Kinesis credentials but can also utilize
13IAM roles assigned to the instance through Instance Profiles. Dynamic
14credentials are then automatically obtained from AWS API and no further
15configuration is necessary. More information available `here
16<http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html>`_.
17
18If IAM roles are not used you need to specify them either in a pillar file or
19in the minion's config file:
20
21.. code-block:: yaml
22
23    keyid: GKTADJGHEIQSXMKKRBJ08H
24    key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
25    region: us-east-1
26
27It's also possible to specify ``key``, ``keyid`` and ``region`` via a
28profile, either passed in as a dict, or as a string to pull from
29pillars or minion config:
30
31.. code-block:: yaml
32
33    myprofile:
34        keyid: GKTADJGHEIQSXMKKRBJ08H
35        key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
36            region: us-east-1
37
38.. code-block:: yaml
39
40    Ensure Kinesis stream does not exist:
41      boto_kinesis.absent:
42        - name: new_stream
43        - keyid: GKTADJGHEIQSXMKKRBJ08H
44        - key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
45        - region: us-east-1
46
47    Ensure Kinesis stream exists:
48      boto_kinesis.present:
49        - name: new_stream
50        - retention_hours: 168
51        - enhanced_monitoring: ['ALL']
52        - num_shards: 2
53        - keyid: GKTADJGHEIQSXMKKRBJ08H
54        - key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs
55        - region: us-east-1
56"""
57
58# pylint: disable=undefined-variable
59
60# Keep pylint from chocking on ret
61import logging
62
63log = logging.getLogger(__name__)
64
65__virtualname__ = "boto_kinesis"
66
67
68def __virtual__():
69    """
70    Only load if boto_kinesis is available.
71    """
72    if "boto_kinesis.exists" in __salt__:
73        return __virtualname__
74    return (
75        False,
76        "The boto_kinesis module could not be loaded: boto libraries not found.",
77    )
78
79
80def present(
81    name,
82    retention_hours=None,
83    enhanced_monitoring=None,
84    num_shards=None,
85    do_reshard=True,
86    region=None,
87    key=None,
88    keyid=None,
89    profile=None,
90):
91    """
92    Ensure the kinesis stream is properly configured and scaled.
93
94    name (string)
95        Stream name
96
97    retention_hours (int)
98        Retain data for this many hours.
99        AWS allows minimum 24 hours, maximum 168 hours.
100
101    enhanced_monitoring (list of string)
102        Turn on enhanced monitoring for the specified shard-level metrics.
103        Pass in ['ALL'] or True for all metrics, [] or False for no metrics.
104        Turn on individual metrics by passing in a list: ['IncomingBytes', 'OutgoingBytes']
105        Note that if only some metrics are supplied, the remaining metrics will be turned off.
106
107    num_shards (int)
108        Reshard stream (if necessary) to this number of shards
109        !!!!! Resharding is expensive! Each split or merge can take up to 30 seconds,
110        and the reshard method balances the partition space evenly.
111        Resharding from N to N+1 can require 2N operations.
112        Resharding is much faster with powers of 2 (e.g. 2^N to 2^N+1) !!!!!
113
114    do_reshard (boolean)
115        If set to False, this script will NEVER reshard the stream,
116        regardless of other input. Useful for testing.
117
118    region (string)
119        Region to connect to.
120
121    key (string)
122        Secret key to be used.
123
124    keyid (string)
125        Access key to be used.
126
127    profile (dict)
128        A dict with region, key and keyid, or a pillar key (string)
129        that contains a dict with region, key and keyid.
130    """
131
132    ret = {"name": name, "result": True, "comment": "", "changes": {}}
133
134    comments = []
135    changes_old = {}
136    changes_new = {}
137
138    # Ensure stream exists
139    exists = __salt__["boto_kinesis.exists"](name, region, key, keyid, profile)
140    if exists["result"] is False:
141        if __opts__["test"]:
142            ret["result"] = None
143            comments.append("Kinesis stream {} would be created".format(name))
144            _add_changes(ret, changes_old, changes_new, comments)
145            return ret
146        else:
147            is_created = __salt__["boto_kinesis.create_stream"](
148                name, num_shards, region, key, keyid, profile
149            )
150            if "error" in is_created:
151                ret["result"] = False
152                comments.append(
153                    "Failed to create stream {}: {}".format(name, is_created["error"])
154                )
155                _add_changes(ret, changes_old, changes_new, comments)
156                return ret
157
158            comments.append("Kinesis stream {} successfully created".format(name))
159            changes_new["name"] = name
160            changes_new["num_shards"] = num_shards
161    else:
162        comments.append("Kinesis stream {} already exists".format(name))
163
164    stream_response = __salt__["boto_kinesis.get_stream_when_active"](
165        name, region, key, keyid, profile
166    )
167    if "error" in stream_response:
168        ret["result"] = False
169        comments.append(
170            "Kinesis stream {}: error getting description: {}".format(
171                name, stream_response["error"]
172            )
173        )
174        _add_changes(ret, changes_old, changes_new, comments)
175        return ret
176
177    stream_details = stream_response["result"]["StreamDescription"]
178
179    # Configure retention hours
180    if retention_hours is not None:
181        old_retention_hours = stream_details["RetentionPeriodHours"]
182        retention_matches = old_retention_hours == retention_hours
183        if not retention_matches:
184            if __opts__["test"]:
185                ret["result"] = None
186                comments.append(
187                    "Kinesis stream {}: retention hours would be updated to {}".format(
188                        name, retention_hours
189                    )
190                )
191            else:
192                if old_retention_hours > retention_hours:
193                    retention_updated = __salt__[
194                        "boto_kinesis.decrease_stream_retention_period"
195                    ](name, retention_hours, region, key, keyid, profile)
196                else:
197                    retention_updated = __salt__[
198                        "boto_kinesis.increase_stream_retention_period"
199                    ](name, retention_hours, region, key, keyid, profile)
200
201                if "error" in retention_updated:
202                    ret["result"] = False
203                    comments.append(
204                        "Kinesis stream {}: failed to update retention hours: {}".format(
205                            name, retention_updated["error"]
206                        )
207                    )
208                    _add_changes(ret, changes_old, changes_new, comments)
209                    return ret
210
211                comments.append(
212                    "Kinesis stream {}: retention hours was successfully updated".format(
213                        name
214                    )
215                )
216                changes_old["retention_hours"] = old_retention_hours
217                changes_new["retention_hours"] = retention_hours
218
219                # wait until active again, otherwise it will log a lot of ResourceInUseExceptions
220                # note that this isn't required below; reshard() will itself handle waiting
221                stream_response = __salt__["boto_kinesis.get_stream_when_active"](
222                    name, region, key, keyid, profile
223                )
224                if "error" in stream_response:
225                    ret["result"] = False
226                    comments.append(
227                        "Kinesis stream {}: error getting description: {}".format(
228                            name, stream_response["error"]
229                        )
230                    )
231                    _add_changes(ret, changes_old, changes_new, comments)
232                    return ret
233
234                stream_details = stream_response["result"]["StreamDescription"]
235        else:
236            comments.append(
237                "Kinesis stream {}: retention hours did not require change, already set"
238                " at {}".format(name, old_retention_hours)
239            )
240    else:
241        comments.append(
242            "Kinesis stream {}: did not configure retention hours".format(name)
243        )
244
245    # Configure enhanced monitoring
246    if enhanced_monitoring is not None:
247        if enhanced_monitoring is True or enhanced_monitoring == ["ALL"]:
248            # for ease of comparison; describe_stream will always return the full list of metrics, never 'ALL'
249            enhanced_monitoring = [
250                "IncomingBytes",
251                "OutgoingRecords",
252                "IteratorAgeMilliseconds",
253                "IncomingRecords",
254                "ReadProvisionedThroughputExceeded",
255                "WriteProvisionedThroughputExceeded",
256                "OutgoingBytes",
257            ]
258        elif enhanced_monitoring is False or enhanced_monitoring == "None":
259            enhanced_monitoring = []
260
261        old_enhanced_monitoring = stream_details.get("EnhancedMonitoring")[0][
262            "ShardLevelMetrics"
263        ]
264
265        new_monitoring_set = set(enhanced_monitoring)
266        old_monitoring_set = set(old_enhanced_monitoring)
267
268        matching_metrics = new_monitoring_set.intersection(old_monitoring_set)
269        enable_metrics = list(new_monitoring_set.difference(matching_metrics))
270        disable_metrics = list(old_monitoring_set.difference(matching_metrics))
271
272        if len(enable_metrics) != 0:
273            if __opts__["test"]:
274                ret["result"] = None
275                comments.append(
276                    "Kinesis stream {}: would enable enhanced monitoring for {}".format(
277                        name, enable_metrics
278                    )
279                )
280            else:
281
282                metrics_enabled = __salt__["boto_kinesis.enable_enhanced_monitoring"](
283                    name, enable_metrics, region, key, keyid, profile
284                )
285                if "error" in metrics_enabled:
286                    ret["result"] = False
287                    comments.append(
288                        "Kinesis stream {}: failed to enable enhanced monitoring: {}".format(
289                            name, metrics_enabled["error"]
290                        )
291                    )
292                    _add_changes(ret, changes_old, changes_new, comments)
293                    return ret
294
295                comments.append(
296                    "Kinesis stream {}: enhanced monitoring was enabled for shard-level"
297                    " metrics {}".format(name, enable_metrics)
298                )
299
300        if len(disable_metrics) != 0:
301            if __opts__["test"]:
302                ret["result"] = None
303                comments.append(
304                    "Kinesis stream {}: would disable enhanced monitoring for {}".format(
305                        name, disable_metrics
306                    )
307                )
308            else:
309
310                metrics_disabled = __salt__["boto_kinesis.disable_enhanced_monitoring"](
311                    name, disable_metrics, region, key, keyid, profile
312                )
313                if "error" in metrics_disabled:
314                    ret["result"] = False
315                    comments.append(
316                        "Kinesis stream {}: failed to disable enhanced monitoring: {}".format(
317                            name, metrics_disabled["error"]
318                        )
319                    )
320                    _add_changes(ret, changes_old, changes_new, comments)
321                    return ret
322
323                comments.append(
324                    "Kinesis stream {}: enhanced monitoring was disabled for"
325                    " shard-level metrics {}".format(name, disable_metrics)
326                )
327
328        if len(disable_metrics) == 0 and len(enable_metrics) == 0:
329            comments.append(
330                "Kinesis stream {}: enhanced monitoring did not require change, already"
331                " set at {}".format(
332                    name,
333                    (
334                        old_enhanced_monitoring
335                        if len(old_enhanced_monitoring) > 0
336                        else "None"
337                    ),
338                )
339            )
340        elif not __opts__["test"]:
341            changes_old["enhanced_monitoring"] = (
342                old_enhanced_monitoring if len(old_enhanced_monitoring) > 0 else "None"
343            )
344            changes_new["enhanced_monitoring"] = (
345                enhanced_monitoring if len(enhanced_monitoring) > 0 else "None"
346            )
347    else:
348        comments.append(
349            "Kinesis stream {}: did not configure enhanced monitoring".format(name)
350        )
351
352    # Reshard stream if necessary
353    min_hash_key, max_hash_key, full_stream_details = __salt__[
354        "boto_kinesis.get_info_for_reshard"
355    ](stream_details)
356    old_num_shards = len(full_stream_details["OpenShards"])
357
358    if num_shards is not None and do_reshard:
359        num_shards_matches = old_num_shards == num_shards
360        if not num_shards_matches:
361            if __opts__["test"]:
362                ret["result"] = None
363                comments.append(
364                    "Kinesis stream {}: would be resharded from {} to {} shards".format(
365                        name, old_num_shards, num_shards
366                    )
367                )
368            else:
369                log.info(
370                    "Resharding stream from %s to %s shards, this could take a while",
371                    old_num_shards,
372                    num_shards,
373                )
374                # reshard returns True when a split/merge action is taken,
375                # or False when no more actions are required
376                continue_reshard = True
377                while continue_reshard:
378                    reshard_response = __salt__["boto_kinesis.reshard"](
379                        name, num_shards, do_reshard, region, key, keyid, profile
380                    )
381
382                    if "error" in reshard_response:
383                        ret["result"] = False
384                        comments.append(
385                            "Encountered error while resharding {}: {}".format(
386                                name, reshard_response["error"]
387                            )
388                        )
389                        _add_changes(ret, changes_old, changes_new, comments)
390                        return ret
391
392                    continue_reshard = reshard_response["result"]
393
394                comments.append(
395                    "Kinesis stream {}: successfully resharded to {} shards".format(
396                        name, num_shards
397                    )
398                )
399                changes_old["num_shards"] = old_num_shards
400                changes_new["num_shards"] = num_shards
401        else:
402            comments.append(
403                "Kinesis stream {}: did not require resharding, remains at {} shards".format(
404                    name, old_num_shards
405                )
406            )
407    else:
408        comments.append(
409            "Kinesis stream {}: did not reshard, remains at {} shards".format(
410                name, old_num_shards
411            )
412        )
413
414    _add_changes(ret, changes_old, changes_new, comments)
415    return ret
416
417
418def absent(name, region=None, key=None, keyid=None, profile=None):
419    """
420    Delete the kinesis stream, if it exists.
421
422    name (string)
423        Stream name
424
425    region (string)
426        Region to connect to.
427
428    key (string)
429        Secret key to be used.
430
431    keyid (string)
432        Access key to be used.
433
434    profile (dict)
435        A dict with region, key and keyid, or a pillar key (string)
436        that contains a dict with region, key and keyid.
437    """
438    ret = {"name": name, "result": True, "comment": "", "changes": {}}
439
440    exists = __salt__["boto_kinesis.exists"](name, region, key, keyid, profile)
441    if exists["result"] is False:
442        ret["comment"] = "Kinesis stream {} does not exist".format(name)
443        return ret
444
445    if __opts__["test"]:
446        ret["comment"] = "Kinesis stream {} would be deleted".format(name)
447        ret["result"] = None
448        return ret
449
450    is_deleted = __salt__["boto_kinesis.delete_stream"](
451        name, region, key, keyid, profile
452    )
453    if "error" in is_deleted:
454        ret["comment"] = "Failed to delete stream {}: {}".format(
455            name, is_deleted["error"]
456        )
457        ret["result"] = False
458    else:
459        ret["comment"] = "Deleted stream {}".format(name)
460        ret["changes"].setdefault("old", "Stream {} exists".format(name))
461        ret["changes"].setdefault("new", "Stream {} deleted".format(name))
462
463    return ret
464
465
466def _add_changes(ret, changes_old, changes_new, comments):
467    ret["comment"] = ",\n".join(comments)
468    if changes_old:
469        ret["changes"]["old"] = changes_old
470    if changes_new:
471        ret["changes"]["new"] = changes_new
472