1""" 2Manage Kinesis Streams 3====================== 4 5.. versionadded:: 2017.7.0 6 7Create and destroy Kinesis streams. Be aware that this interacts with Amazon's 8services, and so may incur charges. 9 10This module uses ``boto3``, which can be installed via package, or pip. 11 12This module accepts explicit Kinesis credentials but can also utilize 13IAM roles assigned to the instance through Instance Profiles. Dynamic 14credentials are then automatically obtained from AWS API and no further 15configuration is necessary. More information available `here 16<http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html>`_. 17 18If IAM roles are not used you need to specify them either in a pillar file or 19in the minion's config file: 20 21.. code-block:: yaml 22 23 keyid: GKTADJGHEIQSXMKKRBJ08H 24 key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs 25 region: us-east-1 26 27It's also possible to specify ``key``, ``keyid`` and ``region`` via a 28profile, either passed in as a dict, or as a string to pull from 29pillars or minion config: 30 31.. code-block:: yaml 32 33 myprofile: 34 keyid: GKTADJGHEIQSXMKKRBJ08H 35 key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs 36 region: us-east-1 37 38.. code-block:: yaml 39 40 Ensure Kinesis stream does not exist: 41 boto_kinesis.absent: 42 - name: new_stream 43 - keyid: GKTADJGHEIQSXMKKRBJ08H 44 - key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs 45 - region: us-east-1 46 47 Ensure Kinesis stream exists: 48 boto_kinesis.present: 49 - name: new_stream 50 - retention_hours: 168 51 - enhanced_monitoring: ['ALL'] 52 - num_shards: 2 53 - keyid: GKTADJGHEIQSXMKKRBJ08H 54 - key: askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs 55 - region: us-east-1 56""" 57 58# pylint: disable=undefined-variable 59 60# Keep pylint from chocking on ret 61import logging 62 63log = logging.getLogger(__name__) 64 65__virtualname__ = "boto_kinesis" 66 67 68def __virtual__(): 69 """ 70 Only load if boto_kinesis is available. 71 """ 72 if "boto_kinesis.exists" in __salt__: 73 return __virtualname__ 74 return ( 75 False, 76 "The boto_kinesis module could not be loaded: boto libraries not found.", 77 ) 78 79 80def present( 81 name, 82 retention_hours=None, 83 enhanced_monitoring=None, 84 num_shards=None, 85 do_reshard=True, 86 region=None, 87 key=None, 88 keyid=None, 89 profile=None, 90): 91 """ 92 Ensure the kinesis stream is properly configured and scaled. 93 94 name (string) 95 Stream name 96 97 retention_hours (int) 98 Retain data for this many hours. 99 AWS allows minimum 24 hours, maximum 168 hours. 100 101 enhanced_monitoring (list of string) 102 Turn on enhanced monitoring for the specified shard-level metrics. 103 Pass in ['ALL'] or True for all metrics, [] or False for no metrics. 104 Turn on individual metrics by passing in a list: ['IncomingBytes', 'OutgoingBytes'] 105 Note that if only some metrics are supplied, the remaining metrics will be turned off. 106 107 num_shards (int) 108 Reshard stream (if necessary) to this number of shards 109 !!!!! Resharding is expensive! Each split or merge can take up to 30 seconds, 110 and the reshard method balances the partition space evenly. 111 Resharding from N to N+1 can require 2N operations. 112 Resharding is much faster with powers of 2 (e.g. 2^N to 2^N+1) !!!!! 113 114 do_reshard (boolean) 115 If set to False, this script will NEVER reshard the stream, 116 regardless of other input. Useful for testing. 117 118 region (string) 119 Region to connect to. 120 121 key (string) 122 Secret key to be used. 123 124 keyid (string) 125 Access key to be used. 126 127 profile (dict) 128 A dict with region, key and keyid, or a pillar key (string) 129 that contains a dict with region, key and keyid. 130 """ 131 132 ret = {"name": name, "result": True, "comment": "", "changes": {}} 133 134 comments = [] 135 changes_old = {} 136 changes_new = {} 137 138 # Ensure stream exists 139 exists = __salt__["boto_kinesis.exists"](name, region, key, keyid, profile) 140 if exists["result"] is False: 141 if __opts__["test"]: 142 ret["result"] = None 143 comments.append("Kinesis stream {} would be created".format(name)) 144 _add_changes(ret, changes_old, changes_new, comments) 145 return ret 146 else: 147 is_created = __salt__["boto_kinesis.create_stream"]( 148 name, num_shards, region, key, keyid, profile 149 ) 150 if "error" in is_created: 151 ret["result"] = False 152 comments.append( 153 "Failed to create stream {}: {}".format(name, is_created["error"]) 154 ) 155 _add_changes(ret, changes_old, changes_new, comments) 156 return ret 157 158 comments.append("Kinesis stream {} successfully created".format(name)) 159 changes_new["name"] = name 160 changes_new["num_shards"] = num_shards 161 else: 162 comments.append("Kinesis stream {} already exists".format(name)) 163 164 stream_response = __salt__["boto_kinesis.get_stream_when_active"]( 165 name, region, key, keyid, profile 166 ) 167 if "error" in stream_response: 168 ret["result"] = False 169 comments.append( 170 "Kinesis stream {}: error getting description: {}".format( 171 name, stream_response["error"] 172 ) 173 ) 174 _add_changes(ret, changes_old, changes_new, comments) 175 return ret 176 177 stream_details = stream_response["result"]["StreamDescription"] 178 179 # Configure retention hours 180 if retention_hours is not None: 181 old_retention_hours = stream_details["RetentionPeriodHours"] 182 retention_matches = old_retention_hours == retention_hours 183 if not retention_matches: 184 if __opts__["test"]: 185 ret["result"] = None 186 comments.append( 187 "Kinesis stream {}: retention hours would be updated to {}".format( 188 name, retention_hours 189 ) 190 ) 191 else: 192 if old_retention_hours > retention_hours: 193 retention_updated = __salt__[ 194 "boto_kinesis.decrease_stream_retention_period" 195 ](name, retention_hours, region, key, keyid, profile) 196 else: 197 retention_updated = __salt__[ 198 "boto_kinesis.increase_stream_retention_period" 199 ](name, retention_hours, region, key, keyid, profile) 200 201 if "error" in retention_updated: 202 ret["result"] = False 203 comments.append( 204 "Kinesis stream {}: failed to update retention hours: {}".format( 205 name, retention_updated["error"] 206 ) 207 ) 208 _add_changes(ret, changes_old, changes_new, comments) 209 return ret 210 211 comments.append( 212 "Kinesis stream {}: retention hours was successfully updated".format( 213 name 214 ) 215 ) 216 changes_old["retention_hours"] = old_retention_hours 217 changes_new["retention_hours"] = retention_hours 218 219 # wait until active again, otherwise it will log a lot of ResourceInUseExceptions 220 # note that this isn't required below; reshard() will itself handle waiting 221 stream_response = __salt__["boto_kinesis.get_stream_when_active"]( 222 name, region, key, keyid, profile 223 ) 224 if "error" in stream_response: 225 ret["result"] = False 226 comments.append( 227 "Kinesis stream {}: error getting description: {}".format( 228 name, stream_response["error"] 229 ) 230 ) 231 _add_changes(ret, changes_old, changes_new, comments) 232 return ret 233 234 stream_details = stream_response["result"]["StreamDescription"] 235 else: 236 comments.append( 237 "Kinesis stream {}: retention hours did not require change, already set" 238 " at {}".format(name, old_retention_hours) 239 ) 240 else: 241 comments.append( 242 "Kinesis stream {}: did not configure retention hours".format(name) 243 ) 244 245 # Configure enhanced monitoring 246 if enhanced_monitoring is not None: 247 if enhanced_monitoring is True or enhanced_monitoring == ["ALL"]: 248 # for ease of comparison; describe_stream will always return the full list of metrics, never 'ALL' 249 enhanced_monitoring = [ 250 "IncomingBytes", 251 "OutgoingRecords", 252 "IteratorAgeMilliseconds", 253 "IncomingRecords", 254 "ReadProvisionedThroughputExceeded", 255 "WriteProvisionedThroughputExceeded", 256 "OutgoingBytes", 257 ] 258 elif enhanced_monitoring is False or enhanced_monitoring == "None": 259 enhanced_monitoring = [] 260 261 old_enhanced_monitoring = stream_details.get("EnhancedMonitoring")[0][ 262 "ShardLevelMetrics" 263 ] 264 265 new_monitoring_set = set(enhanced_monitoring) 266 old_monitoring_set = set(old_enhanced_monitoring) 267 268 matching_metrics = new_monitoring_set.intersection(old_monitoring_set) 269 enable_metrics = list(new_monitoring_set.difference(matching_metrics)) 270 disable_metrics = list(old_monitoring_set.difference(matching_metrics)) 271 272 if len(enable_metrics) != 0: 273 if __opts__["test"]: 274 ret["result"] = None 275 comments.append( 276 "Kinesis stream {}: would enable enhanced monitoring for {}".format( 277 name, enable_metrics 278 ) 279 ) 280 else: 281 282 metrics_enabled = __salt__["boto_kinesis.enable_enhanced_monitoring"]( 283 name, enable_metrics, region, key, keyid, profile 284 ) 285 if "error" in metrics_enabled: 286 ret["result"] = False 287 comments.append( 288 "Kinesis stream {}: failed to enable enhanced monitoring: {}".format( 289 name, metrics_enabled["error"] 290 ) 291 ) 292 _add_changes(ret, changes_old, changes_new, comments) 293 return ret 294 295 comments.append( 296 "Kinesis stream {}: enhanced monitoring was enabled for shard-level" 297 " metrics {}".format(name, enable_metrics) 298 ) 299 300 if len(disable_metrics) != 0: 301 if __opts__["test"]: 302 ret["result"] = None 303 comments.append( 304 "Kinesis stream {}: would disable enhanced monitoring for {}".format( 305 name, disable_metrics 306 ) 307 ) 308 else: 309 310 metrics_disabled = __salt__["boto_kinesis.disable_enhanced_monitoring"]( 311 name, disable_metrics, region, key, keyid, profile 312 ) 313 if "error" in metrics_disabled: 314 ret["result"] = False 315 comments.append( 316 "Kinesis stream {}: failed to disable enhanced monitoring: {}".format( 317 name, metrics_disabled["error"] 318 ) 319 ) 320 _add_changes(ret, changes_old, changes_new, comments) 321 return ret 322 323 comments.append( 324 "Kinesis stream {}: enhanced monitoring was disabled for" 325 " shard-level metrics {}".format(name, disable_metrics) 326 ) 327 328 if len(disable_metrics) == 0 and len(enable_metrics) == 0: 329 comments.append( 330 "Kinesis stream {}: enhanced monitoring did not require change, already" 331 " set at {}".format( 332 name, 333 ( 334 old_enhanced_monitoring 335 if len(old_enhanced_monitoring) > 0 336 else "None" 337 ), 338 ) 339 ) 340 elif not __opts__["test"]: 341 changes_old["enhanced_monitoring"] = ( 342 old_enhanced_monitoring if len(old_enhanced_monitoring) > 0 else "None" 343 ) 344 changes_new["enhanced_monitoring"] = ( 345 enhanced_monitoring if len(enhanced_monitoring) > 0 else "None" 346 ) 347 else: 348 comments.append( 349 "Kinesis stream {}: did not configure enhanced monitoring".format(name) 350 ) 351 352 # Reshard stream if necessary 353 min_hash_key, max_hash_key, full_stream_details = __salt__[ 354 "boto_kinesis.get_info_for_reshard" 355 ](stream_details) 356 old_num_shards = len(full_stream_details["OpenShards"]) 357 358 if num_shards is not None and do_reshard: 359 num_shards_matches = old_num_shards == num_shards 360 if not num_shards_matches: 361 if __opts__["test"]: 362 ret["result"] = None 363 comments.append( 364 "Kinesis stream {}: would be resharded from {} to {} shards".format( 365 name, old_num_shards, num_shards 366 ) 367 ) 368 else: 369 log.info( 370 "Resharding stream from %s to %s shards, this could take a while", 371 old_num_shards, 372 num_shards, 373 ) 374 # reshard returns True when a split/merge action is taken, 375 # or False when no more actions are required 376 continue_reshard = True 377 while continue_reshard: 378 reshard_response = __salt__["boto_kinesis.reshard"]( 379 name, num_shards, do_reshard, region, key, keyid, profile 380 ) 381 382 if "error" in reshard_response: 383 ret["result"] = False 384 comments.append( 385 "Encountered error while resharding {}: {}".format( 386 name, reshard_response["error"] 387 ) 388 ) 389 _add_changes(ret, changes_old, changes_new, comments) 390 return ret 391 392 continue_reshard = reshard_response["result"] 393 394 comments.append( 395 "Kinesis stream {}: successfully resharded to {} shards".format( 396 name, num_shards 397 ) 398 ) 399 changes_old["num_shards"] = old_num_shards 400 changes_new["num_shards"] = num_shards 401 else: 402 comments.append( 403 "Kinesis stream {}: did not require resharding, remains at {} shards".format( 404 name, old_num_shards 405 ) 406 ) 407 else: 408 comments.append( 409 "Kinesis stream {}: did not reshard, remains at {} shards".format( 410 name, old_num_shards 411 ) 412 ) 413 414 _add_changes(ret, changes_old, changes_new, comments) 415 return ret 416 417 418def absent(name, region=None, key=None, keyid=None, profile=None): 419 """ 420 Delete the kinesis stream, if it exists. 421 422 name (string) 423 Stream name 424 425 region (string) 426 Region to connect to. 427 428 key (string) 429 Secret key to be used. 430 431 keyid (string) 432 Access key to be used. 433 434 profile (dict) 435 A dict with region, key and keyid, or a pillar key (string) 436 that contains a dict with region, key and keyid. 437 """ 438 ret = {"name": name, "result": True, "comment": "", "changes": {}} 439 440 exists = __salt__["boto_kinesis.exists"](name, region, key, keyid, profile) 441 if exists["result"] is False: 442 ret["comment"] = "Kinesis stream {} does not exist".format(name) 443 return ret 444 445 if __opts__["test"]: 446 ret["comment"] = "Kinesis stream {} would be deleted".format(name) 447 ret["result"] = None 448 return ret 449 450 is_deleted = __salt__["boto_kinesis.delete_stream"]( 451 name, region, key, keyid, profile 452 ) 453 if "error" in is_deleted: 454 ret["comment"] = "Failed to delete stream {}: {}".format( 455 name, is_deleted["error"] 456 ) 457 ret["result"] = False 458 else: 459 ret["comment"] = "Deleted stream {}".format(name) 460 ret["changes"].setdefault("old", "Stream {} exists".format(name)) 461 ret["changes"].setdefault("new", "Stream {} deleted".format(name)) 462 463 return ret 464 465 466def _add_changes(ret, changes_old, changes_new, comments): 467 ret["comment"] = ",\n".join(comments) 468 if changes_old: 469 ret["changes"]["old"] = changes_old 470 if changes_new: 471 ret["changes"]["new"] = changes_new 472