1# Copyright Cloudinary 2import base64 3import copy 4import hashlib 5import json 6import os 7import random 8import re 9import string 10import struct 11import time 12import urllib 13import zlib 14from collections import OrderedDict 15from datetime import datetime, date 16from fractions import Fraction 17from numbers import Number 18 19import six.moves.urllib.parse 20from six import iteritems, string_types 21from urllib3 import ProxyManager, PoolManager 22 23import cloudinary 24from cloudinary import auth_token 25from cloudinary.compat import PY3, to_bytes, to_bytearray, to_string, string_types, urlparse 26 27VAR_NAME_RE = r'(\$\([a-zA-Z]\w+\))' 28 29urlencode = six.moves.urllib.parse.urlencode 30unquote = six.moves.urllib.parse.unquote 31 32""" @deprecated: use cloudinary.SHARED_CDN """ 33SHARED_CDN = "res.cloudinary.com" 34 35DEFAULT_RESPONSIVE_WIDTH_TRANSFORMATION = {"width": "auto", "crop": "limit"} 36 37RANGE_VALUE_RE = r'^(?P<value>(\d+\.)?\d+)(?P<modifier>[%pP])?$' 38RANGE_RE = r'^(\d+\.)?\d+[%pP]?\.\.(\d+\.)?\d+[%pP]?$' 39FLOAT_RE = r'^(\d+)\.(\d+)?$' 40REMOTE_URL_RE = r'ftp:|https?:|s3:|gs:|data:([\w-]+\/[\w-]+(\+[\w-]+)?)?(;[\w-]+=[\w-]+)*;base64,([a-zA-Z0-9\/+\n=]+)$' 41__LAYER_KEYWORD_PARAMS = [("font_weight", "normal"), 42 ("font_style", "normal"), 43 ("text_decoration", "none"), 44 ("text_align", None), 45 ("stroke", "none")] 46 47# a list of keys used by the cloudinary_url function 48__URL_KEYS = [ 49 'api_secret', 50 'auth_token', 51 'cdn_subdomain', 52 'cloud_name', 53 'cname', 54 'format', 55 'private_cdn', 56 'resource_type', 57 'secure', 58 'secure_cdn_subdomain', 59 'secure_distribution', 60 'shorten', 61 'sign_url', 62 'ssl_detected', 63 'type', 64 'url_suffix', 65 'use_root_path', 66 'version', 67 'long_url_signature', 68 'signature_algorithm', 69] 70 71__SIMPLE_UPLOAD_PARAMS = [ 72 "public_id", 73 "public_id_prefix", 74 "callback", 75 "format", 76 "type", 77 "backup", 78 "faces", 79 "image_metadata", 80 "exif", 81 "colors", 82 "use_filename", 83 "unique_filename", 84 "display_name", 85 "use_filename_as_display_name", 86 "discard_original_filename", 87 "filename_override", 88 "invalidate", 89 "notification_url", 90 "eager_notification_url", 91 "eager_async", 92 "eval", 93 "proxy", 94 "folder", 95 "asset_folder", 96 "overwrite", 97 "moderation", 98 "raw_convert", 99 "quality_override", 100 "quality_analysis", 101 "ocr", 102 "categorization", 103 "detection", 104 "similarity_search", 105 "background_removal", 106 "upload_preset", 107 "phash", 108 "return_delete_token", 109 "auto_tagging", 110 "async", 111 "cinemagraph_analysis", 112 "accessibility_analysis", 113] 114 115__SERIALIZED_UPLOAD_PARAMS = [ 116 "timestamp", 117 "transformation", 118 "headers", 119 "eager", 120 "tags", 121 "allowed_formats", 122 "face_coordinates", 123 "custom_coordinates", 124 "context", 125 "auto_tagging", 126 "responsive_breakpoints", 127 "access_control", 128 "metadata", 129] 130 131upload_params = __SIMPLE_UPLOAD_PARAMS + __SERIALIZED_UPLOAD_PARAMS 132 133SHORT_URL_SIGNATURE_LENGTH = 8 134LONG_URL_SIGNATURE_LENGTH = 32 135 136SIGNATURE_SHA1 = "sha1" 137SIGNATURE_SHA256 = "sha256" 138 139signature_algorithms = { 140 SIGNATURE_SHA1: hashlib.sha1, 141 SIGNATURE_SHA256: hashlib.sha256, 142} 143 144 145def compute_hex_hash(s, algorithm=SIGNATURE_SHA1): 146 """ 147 Computes string hash using specified algorithm and return HEX string representation of hash. 148 149 :param s: String to compute hash for 150 :param algorithm: The name of algorithm to use for computing hash 151 152 :return: HEX string of computed hash value 153 """ 154 try: 155 hash_fn = signature_algorithms[algorithm] 156 except KeyError: 157 raise ValueError('Unsupported hash algorithm: {}'.format(algorithm)) 158 return hash_fn(to_bytes(s)).hexdigest() 159 160 161def build_array(arg): 162 if isinstance(arg, list): 163 return arg 164 elif arg is None: 165 return [] 166 else: 167 return [arg] 168 169 170def build_list_of_dicts(val): 171 """ 172 Converts a value that can be presented as a list of dict. 173 174 In case top level item is not a list, it is wrapped with a list 175 176 Valid values examples: 177 - Valid dict: {"k": "v", "k2","v2"} 178 - List of dict: [{"k": "v"}, {"k2","v2"}] 179 - JSON decodable string: '{"k": "v"}', or '[{"k": "v"}]' 180 - List of JSON decodable strings: ['{"k": "v"}', '{"k2","v2"}'] 181 182 Invalid values examples: 183 - ["not", "a", "dict"] 184 - [123, None], 185 - [["another", "list"]] 186 187 :param val: Input value 188 :type val: Union[list, dict, str] 189 190 :return: Converted(or original) list of dict 191 :raises: ValueError in case value cannot be converted to a list of dict 192 """ 193 if val is None: 194 return [] 195 196 if isinstance(val, str): 197 # use OrderedDict to preserve order 198 val = json.loads(val, object_pairs_hook=OrderedDict) 199 200 if isinstance(val, dict): 201 val = [val] 202 203 for index, item in enumerate(val): 204 if isinstance(item, str): 205 # use OrderedDict to preserve order 206 val[index] = json.loads(item, object_pairs_hook=OrderedDict) 207 if not isinstance(val[index], dict): 208 raise ValueError("Expected a list of dicts") 209 return val 210 211 212def encode_double_array(array): 213 array = build_array(array) 214 if len(array) > 0 and isinstance(array[0], list): 215 return "|".join([",".join([str(i) for i in build_array(inner)]) for inner in array]) 216 else: 217 return encode_list([str(i) for i in array]) 218 219 220def encode_dict(arg): 221 if isinstance(arg, dict): 222 if PY3: 223 items = arg.items() 224 else: 225 items = arg.iteritems() 226 return "|".join((k + "=" + v) for k, v in items) 227 else: 228 return arg 229 230 231def normalize_context_value(value): 232 """ 233 Escape "=" and "|" delimiter characters and json encode lists 234 235 :param value: Value to escape 236 :type value: int or str or list or tuple 237 238 :return: The normalized value 239 :rtype: str 240 """ 241 242 if isinstance(value, (list, tuple)): 243 value = json_encode(value) 244 245 return str(value).replace("=", "\\=").replace("|", "\\|") 246 247 248def encode_context(context): 249 """ 250 Encode metadata fields based on incoming value. 251 252 List and tuple values are encoded to json strings. 253 254 :param context: dict of context to be encoded 255 256 :return: a joined string of all keys and values properly escaped and separated by a pipe character 257 """ 258 if not isinstance(context, dict): 259 return context 260 261 return "|".join(("{}={}".format(k, normalize_context_value(v))) for k, v in iteritems(context)) 262 263 264def json_encode(value): 265 """ 266 Converts value to a json encoded string 267 268 :param value: value to be encoded 269 270 :return: JSON encoded string 271 """ 272 return json.dumps(value, default=__json_serializer, separators=(',', ':')) 273 274 275def encode_date_to_usage_api_format(date_obj): 276 """ 277 Encodes date object to `dd-mm-yyyy` format string 278 279 :param date_obj: datetime.date object to encode 280 281 :return: Encoded date as a string 282 """ 283 return date_obj.strftime('%d-%m-%Y') 284 285 286def patch_fetch_format(options): 287 """ 288 When upload type is fetch, remove the format options. 289 In addition, set the fetch_format options to the format value unless it was already set. 290 Mutates the options parameter! 291 292 :param options: URL and transformation options 293 """ 294 if options.get("type", "upload") != "fetch": 295 return 296 297 resource_format = options.pop("format", None) 298 if "fetch_format" not in options: 299 options["fetch_format"] = resource_format 300 301 302def generate_transformation_string(**options): 303 responsive_width = options.pop("responsive_width", cloudinary.config().responsive_width) 304 size = options.pop("size", None) 305 if size: 306 options["width"], options["height"] = size.split("x") 307 width = options.get("width") 308 height = options.get("height") 309 has_layer = ("underlay" in options) or ("overlay" in options) 310 311 crop = options.pop("crop", None) 312 angle = ".".join([str(value) for value in build_array(options.pop("angle", None))]) 313 no_html_sizes = has_layer or angle or crop == "fit" or crop == "limit" or responsive_width 314 315 if width and (str(width).startswith("auto") or str(width) == "ow" or is_fraction(width) or no_html_sizes): 316 del options["width"] 317 if height and (str(height) == "oh" or is_fraction(height) or no_html_sizes): 318 del options["height"] 319 320 background = options.pop("background", None) 321 if background: 322 background = background.replace("#", "rgb:") 323 color = options.pop("color", None) 324 if color: 325 color = color.replace("#", "rgb:") 326 327 base_transformations = build_array(options.pop("transformation", None)) 328 if any(isinstance(bs, dict) for bs in base_transformations): 329 def recurse(bs): 330 if isinstance(bs, dict): 331 return generate_transformation_string(**bs)[0] 332 else: 333 return generate_transformation_string(transformation=bs)[0] 334 335 base_transformations = list(map(recurse, base_transformations)) 336 named_transformation = None 337 else: 338 named_transformation = ".".join(base_transformations) 339 base_transformations = [] 340 341 effect = options.pop("effect", None) 342 if isinstance(effect, list): 343 effect = ":".join([str(x) for x in effect]) 344 elif isinstance(effect, dict): 345 effect = ":".join([str(x) for x in list(effect.items())[0]]) 346 347 border = options.pop("border", None) 348 if isinstance(border, dict): 349 border_color = border.get("color", "black").replace("#", "rgb:") 350 border = "%(width)spx_solid_%(color)s" % {"color": border_color, 351 "width": str(border.get("width", 2))} 352 353 flags = ".".join(build_array(options.pop("flags", None))) 354 dpr = options.pop("dpr", cloudinary.config().dpr) 355 duration = norm_range_value(options.pop("duration", None)) 356 start_offset = norm_auto_range_value(options.pop("start_offset", None)) 357 end_offset = norm_range_value(options.pop("end_offset", None)) 358 offset = split_range(options.pop("offset", None)) 359 if offset: 360 start_offset = norm_auto_range_value(offset[0]) 361 end_offset = norm_range_value(offset[1]) 362 363 video_codec = process_video_codec_param(options.pop("video_codec", None)) 364 365 aspect_ratio = options.pop("aspect_ratio", None) 366 if isinstance(aspect_ratio, Fraction): 367 aspect_ratio = str(aspect_ratio.numerator) + ":" + str(aspect_ratio.denominator) 368 369 overlay = process_layer(options.pop("overlay", None), "overlay") 370 underlay = process_layer(options.pop("underlay", None), "underlay") 371 if_value = process_conditional(options.pop("if", None)) 372 custom_function = process_custom_function(options.pop("custom_function", None)) 373 custom_pre_function = process_custom_pre_function(options.pop("custom_pre_function", None)) 374 fps = process_fps(options.pop("fps", None)) 375 376 params = { 377 "a": normalize_expression(angle), 378 "ar": normalize_expression(aspect_ratio), 379 "b": background, 380 "bo": border, 381 "c": crop, 382 "co": color, 383 "dpr": normalize_expression(dpr), 384 "du": normalize_expression(duration), 385 "e": normalize_expression(effect), 386 "eo": normalize_expression(end_offset), 387 "fl": flags, 388 "fn": custom_function or custom_pre_function, 389 "fps": fps, 390 "h": normalize_expression(height), 391 "ki": process_ki(options.pop("keyframe_interval", None)), 392 "l": overlay, 393 "o": normalize_expression(options.pop('opacity', None)), 394 "q": normalize_expression(options.pop('quality', None)), 395 "r": process_radius(options.pop('radius', None)), 396 "so": normalize_expression(start_offset), 397 "t": named_transformation, 398 "u": underlay, 399 "w": normalize_expression(width), 400 "x": normalize_expression(options.pop('x', None)), 401 "y": normalize_expression(options.pop('y', None)), 402 "vc": video_codec, 403 "z": normalize_expression(options.pop('zoom', None)) 404 } 405 simple_params = { 406 "ac": "audio_codec", 407 "af": "audio_frequency", 408 "br": "bit_rate", 409 "cs": "color_space", 410 "d": "default_image", 411 "dl": "delay", 412 "dn": "density", 413 "f": "fetch_format", 414 "g": "gravity", 415 "p": "prefix", 416 "pg": "page", 417 "sp": "streaming_profile", 418 "vs": "video_sampling", 419 } 420 421 for param, option in simple_params.items(): 422 params[param] = options.pop(option, None) 423 424 variables = options.pop('variables', {}) 425 var_params = [] 426 for key, value in options.items(): 427 if re.match(r'^\$', key): 428 var_params.append(u"{0}_{1}".format(key, normalize_expression(str(value)))) 429 430 var_params.sort() 431 432 if variables: 433 for var in variables: 434 var_params.append(u"{0}_{1}".format(var[0], normalize_expression(str(var[1])))) 435 436 variables = ','.join(var_params) 437 438 sorted_params = sorted([param + "_" + str(value) for param, value in params.items() if (value or value == 0)]) 439 if variables: 440 sorted_params.insert(0, str(variables)) 441 442 if if_value is not None: 443 sorted_params.insert(0, "if_" + str(if_value)) 444 445 if "raw_transformation" in options and (options["raw_transformation"] or options["raw_transformation"] == 0): 446 sorted_params.append(options.pop("raw_transformation")) 447 448 transformation = ",".join(sorted_params) 449 450 transformations = base_transformations + [transformation] 451 452 if responsive_width: 453 responsive_width_transformation = cloudinary.config().responsive_width_transformation \ 454 or DEFAULT_RESPONSIVE_WIDTH_TRANSFORMATION 455 transformations += [generate_transformation_string(**responsive_width_transformation)[0]] 456 url = "/".join([trans for trans in transformations if trans]) 457 458 if str(width).startswith("auto") or responsive_width: 459 options["responsive"] = True 460 if dpr == "auto": 461 options["hidpi"] = True 462 return url, options 463 464 465def chain_transformations(options, transformations): 466 """ 467 Helper function, allows chaining transformations to the end of transformations list 468 469 The result of this function is an updated options parameter 470 471 :param options: Original options 472 :param transformations: Transformations to chain at the end 473 474 :return: Resulting options 475 """ 476 477 transformations = copy.deepcopy(transformations) 478 479 transformations = build_array(transformations) 480 # preserve url options 481 url_options = dict((o, options[o]) for o in __URL_KEYS if o in options) 482 483 transformations.insert(0, options) 484 485 url_options["transformation"] = transformations 486 487 return url_options 488 489 490def is_fraction(width): 491 width = str(width) 492 return re.match(FLOAT_RE, width) and float(width) < 1 493 494 495def split_range(range): 496 if (isinstance(range, list) or isinstance(range, tuple)) and len(range) >= 2: 497 return [range[0], range[-1]] 498 elif isinstance(range, string_types) and re.match(RANGE_RE, range): 499 return range.split("..", 1) 500 else: 501 return None 502 503 504def norm_range_value(value): 505 if value is None: 506 return None 507 508 match = re.match(RANGE_VALUE_RE, str(value)) 509 510 if match is None: 511 return None 512 513 modifier = '' 514 if match.group('modifier') is not None: 515 modifier = 'p' 516 return match.group('value') + modifier 517 518 519def norm_auto_range_value(value): 520 if value == "auto": 521 return value 522 return norm_range_value(value) 523 524 525def process_video_codec_param(param): 526 out_param = param 527 if isinstance(out_param, dict): 528 out_param = param['codec'] 529 if 'profile' in param: 530 out_param = out_param + ':' + param['profile'] 531 if 'level' in param: 532 out_param = out_param + ':' + param['level'] 533 if param.get('b_frames') is False: 534 out_param = out_param + ':' + 'bframes_no' 535 536 return out_param 537 538 539def process_radius(param): 540 if param is None: 541 return 542 543 if isinstance(param, (list, tuple)): 544 if not 1 <= len(param) <= 4: 545 raise ValueError("Invalid radius param") 546 return ':'.join(normalize_expression(t) for t in param) 547 548 return str(param) 549 550 551def process_params(params): 552 processed_params = None 553 if isinstance(params, dict): 554 processed_params = {} 555 for key, value in params.items(): 556 if isinstance(value, list) or isinstance(value, tuple): 557 value_list = {"{}[{}]".format(key, i): i_value for i, i_value in enumerate(value)} 558 processed_params.update(value_list) 559 elif value is not None: 560 processed_params[key] = value 561 return processed_params 562 563 564def cleanup_params(params): 565 return dict([(k, __safe_value(v)) for (k, v) in params.items() if v is not None and not v == ""]) 566 567 568def sign_request(params, options): 569 api_key = options.get("api_key", cloudinary.config().api_key) 570 if not api_key: 571 raise ValueError("Must supply api_key") 572 api_secret = options.get("api_secret", cloudinary.config().api_secret) 573 if not api_secret: 574 raise ValueError("Must supply api_secret") 575 signature_algorithm = options.get("signature_algorithm", cloudinary.config().signature_algorithm) 576 577 params = cleanup_params(params) 578 params["signature"] = api_sign_request(params, api_secret, signature_algorithm) 579 params["api_key"] = api_key 580 581 return params 582 583 584def api_sign_request(params_to_sign, api_secret, algorithm=SIGNATURE_SHA1): 585 params = [(k + "=" + (",".join(v) if isinstance(v, list) else str(v))) for k, v in params_to_sign.items() if v] 586 to_sign = "&".join(sorted(params)) 587 return compute_hex_hash(to_sign + api_secret, algorithm) 588 589 590def breakpoint_settings_mapper(breakpoint_settings): 591 breakpoint_settings = copy.deepcopy(breakpoint_settings) 592 transformation = breakpoint_settings.get("transformation") 593 if transformation is not None: 594 breakpoint_settings["transformation"], _ = generate_transformation_string(**transformation) 595 return breakpoint_settings 596 597 598def generate_responsive_breakpoints_string(breakpoints): 599 if breakpoints is None: 600 return None 601 breakpoints = build_array(breakpoints) 602 return json.dumps(list(map(breakpoint_settings_mapper, breakpoints))) 603 604 605def finalize_source(source, format, url_suffix): 606 source = re.sub(r'([^:])/+', r'\1/', source) 607 if re.match(r'^https?:/', source): 608 source = smart_escape(source) 609 source_to_sign = source 610 else: 611 source = unquote(source) 612 if not PY3: 613 source = source.encode('utf8') 614 source = smart_escape(source) 615 source_to_sign = source 616 if url_suffix is not None: 617 if re.search(r'[\./]', url_suffix): 618 raise ValueError("url_suffix should not include . or /") 619 source = source + "/" + url_suffix 620 if format is not None: 621 source = source + "." + format 622 source_to_sign = source_to_sign + "." + format 623 624 return source, source_to_sign 625 626 627def finalize_resource_type(resource_type, type, url_suffix, use_root_path, shorten): 628 upload_type = type or "upload" 629 if url_suffix is not None: 630 if resource_type == "image" and upload_type == "upload": 631 resource_type = "images" 632 upload_type = None 633 elif resource_type == "raw" and upload_type == "upload": 634 resource_type = "files" 635 upload_type = None 636 else: 637 raise ValueError("URL Suffix only supported for image/upload and raw/upload") 638 639 if use_root_path: 640 if (resource_type == "image" and upload_type == "upload") or ( 641 resource_type == "images" and upload_type is None): 642 resource_type = None 643 upload_type = None 644 else: 645 raise ValueError("Root path only supported for image/upload") 646 647 if shorten and resource_type == "image" and upload_type == "upload": 648 resource_type = "iu" 649 upload_type = None 650 651 return resource_type, upload_type 652 653 654def unsigned_download_url_prefix(source, cloud_name, private_cdn, cdn_subdomain, 655 secure_cdn_subdomain, cname, secure, secure_distribution): 656 """cdn_subdomain and secure_cdn_subdomain 657 1) Customers in shared distribution (e.g. res.cloudinary.com) 658 if cdn_domain is true uses res-[1-5].cloudinary.com for both http and https. 659 Setting secure_cdn_subdomain to false disables this for https. 660 2) Customers with private cdn 661 if cdn_domain is true uses cloudname-res-[1-5].cloudinary.com for http 662 if secure_cdn_domain is true uses cloudname-res-[1-5].cloudinary.com for https 663 (please contact support if you require this) 664 3) Customers with cname 665 if cdn_domain is true uses a[1-5].cname for http. For https, uses the same naming scheme 666 as 1 for shared distribution and as 2 for private distribution.""" 667 shared_domain = not private_cdn 668 shard = __crc(source) 669 if secure: 670 if secure_distribution is None or secure_distribution == cloudinary.OLD_AKAMAI_SHARED_CDN: 671 secure_distribution = cloud_name + "-res.cloudinary.com" \ 672 if private_cdn else cloudinary.SHARED_CDN 673 674 shared_domain = shared_domain or secure_distribution == cloudinary.SHARED_CDN 675 if secure_cdn_subdomain is None and shared_domain: 676 secure_cdn_subdomain = cdn_subdomain 677 678 if secure_cdn_subdomain: 679 secure_distribution = re.sub('res.cloudinary.com', "res-" + shard + ".cloudinary.com", 680 secure_distribution) 681 682 prefix = "https://" + secure_distribution 683 elif cname: 684 subdomain = "a" + shard + "." if cdn_subdomain else "" 685 prefix = "http://" + subdomain + cname 686 else: 687 subdomain = cloud_name + "-res" if private_cdn else "res" 688 if cdn_subdomain: 689 subdomain = subdomain + "-" + shard 690 prefix = "http://" + subdomain + ".cloudinary.com" 691 692 if shared_domain: 693 prefix += "/" + cloud_name 694 695 return prefix 696 697 698def merge(*dict_args): 699 result = None 700 for dictionary in dict_args: 701 if dictionary is not None: 702 if result is None: 703 result = dictionary.copy() 704 else: 705 result.update(dictionary) 706 return result 707 708 709def cloudinary_url(source, **options): 710 original_source = source 711 712 patch_fetch_format(options) 713 type = options.pop("type", "upload") 714 715 transformation, options = generate_transformation_string(**options) 716 717 resource_type = options.pop("resource_type", "image") 718 719 force_version = options.pop("force_version", cloudinary.config().force_version) 720 if force_version is None: 721 force_version = True 722 723 version = options.pop("version", None) 724 725 format = options.pop("format", None) 726 cdn_subdomain = options.pop("cdn_subdomain", cloudinary.config().cdn_subdomain) 727 secure_cdn_subdomain = options.pop("secure_cdn_subdomain", 728 cloudinary.config().secure_cdn_subdomain) 729 cname = options.pop("cname", cloudinary.config().cname) 730 shorten = options.pop("shorten", cloudinary.config().shorten) 731 732 cloud_name = options.pop("cloud_name", cloudinary.config().cloud_name or None) 733 if cloud_name is None: 734 raise ValueError("Must supply cloud_name in tag or in configuration") 735 secure = options.pop("secure", cloudinary.config().secure) 736 private_cdn = options.pop("private_cdn", cloudinary.config().private_cdn) 737 secure_distribution = options.pop("secure_distribution", 738 cloudinary.config().secure_distribution) 739 sign_url = options.pop("sign_url", cloudinary.config().sign_url) 740 api_secret = options.pop("api_secret", cloudinary.config().api_secret) 741 url_suffix = options.pop("url_suffix", None) 742 use_root_path = options.pop("use_root_path", cloudinary.config().use_root_path) 743 auth_token = options.pop("auth_token", None) 744 long_url_signature = options.pop("long_url_signature", cloudinary.config().long_url_signature) 745 signature_algorithm = options.pop("signature_algorithm", cloudinary.config().signature_algorithm) 746 if auth_token is not False: 747 auth_token = merge(cloudinary.config().auth_token, auth_token) 748 749 if (not source) or type == "upload" and re.match(r'^https?:', source): 750 return original_source, options 751 752 resource_type, type = finalize_resource_type( 753 resource_type, type, url_suffix, use_root_path, shorten) 754 source, source_to_sign = finalize_source(source, format, url_suffix) 755 756 if not version and force_version \ 757 and source_to_sign.find("/") >= 0 \ 758 and not re.match(r'^https?:/', source_to_sign) \ 759 and not re.match(r'^v[0-9]+', source_to_sign): 760 version = "1" 761 if version: 762 version = "v" + str(version) 763 else: 764 version = None 765 766 transformation = re.sub(r'([^:])/+', r'\1/', transformation) 767 768 signature = None 769 if sign_url and not auth_token: 770 to_sign = "/".join(__compact([transformation, source_to_sign])) 771 if long_url_signature: 772 # Long signature forces SHA256 773 signature_algorithm = SIGNATURE_SHA256 774 chars_length = LONG_URL_SIGNATURE_LENGTH 775 else: 776 chars_length = SHORT_URL_SIGNATURE_LENGTH 777 if signature_algorithm not in signature_algorithms: 778 raise ValueError("Unsupported signature algorithm '{}'".format(signature_algorithm)) 779 hash_fn = signature_algorithms[signature_algorithm] 780 signature = "s--" + to_string( 781 base64.urlsafe_b64encode( 782 hash_fn(to_bytes(to_sign + api_secret)).digest())[0:chars_length]) + "--" 783 784 prefix = unsigned_download_url_prefix( 785 source, cloud_name, private_cdn, cdn_subdomain, secure_cdn_subdomain, 786 cname, secure, secure_distribution) 787 source = "/".join(__compact( 788 [prefix, resource_type, type, signature, transformation, version, source])) 789 if sign_url and auth_token: 790 path = urlparse(source).path 791 token = cloudinary.auth_token.generate(**merge(auth_token, {"url": path})) 792 source = "%s?%s" % (source, token) 793 return source, options 794 795 796def base_api_url(path, **options): 797 cloudinary_prefix = options.get("upload_prefix", cloudinary.config().upload_prefix) \ 798 or "https://api.cloudinary.com" 799 cloud_name = options.get("cloud_name", cloudinary.config().cloud_name) 800 801 if not cloud_name: 802 raise ValueError("Must supply cloud_name") 803 804 path = build_array(path) 805 806 return encode_unicode_url("/".join([cloudinary_prefix, cloudinary.API_VERSION, cloud_name] + path)) 807 808 809def cloudinary_api_url(action='upload', **options): 810 resource_type = options.get("resource_type", "image") 811 812 return base_api_url([resource_type, action], **options) 813 814 815def cloudinary_api_download_url(action, params, **options): 816 params = params.copy() 817 params["mode"] = "download" 818 cloudinary_params = sign_request(params, options) 819 return cloudinary_api_url(action, **options) + "?" + urlencode(bracketize_seq(cloudinary_params), True) 820 821 822def cloudinary_scaled_url(source, width, transformation, options): 823 """ 824 Generates a cloudinary url scaled to specified width. 825 826 :param source: The resource 827 :param width: Width in pixels of the srcset item 828 :param transformation: Custom transformation that overrides transformations provided in options 829 :param options: A dict with additional options 830 831 :return: Resulting URL of the item 832 """ 833 834 # preserve options from being destructed 835 options = copy.deepcopy(options) 836 837 if transformation: 838 if isinstance(transformation, string_types): 839 transformation = {"raw_transformation": transformation} 840 841 # Remove all transformation related options 842 options = dict((o, options[o]) for o in __URL_KEYS if o in options) 843 options.update(transformation) 844 845 scale_transformation = {"crop": "scale", "width": width} 846 847 url_options = options 848 patch_fetch_format(url_options) 849 url_options = chain_transformations(url_options, scale_transformation) 850 851 return cloudinary_url(source, **url_options)[0] 852 853 854def smart_escape(source, unsafe=r"([^a-zA-Z0-9_.\-\/:]+)"): 855 """ 856 Based on ruby's CGI::unescape. In addition does not escape / : 857 858 :param source: Source string to escape 859 :param unsafe: Unsafe characters 860 861 :return: Escaped string 862 """ 863 def pack(m): 864 return to_bytes('%' + "%".join( 865 ["%02X" % x for x in struct.unpack('B' * len(m.group(1)), m.group(1))] 866 ).upper()) 867 868 return to_string(re.sub(to_bytes(unsafe), pack, to_bytes(source))) 869 870 871def random_public_id(): 872 return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) 873 for _ in range(16)) 874 875 876def signed_preloaded_image(result): 877 filename = ".".join([x for x in [result["public_id"], result["format"]] if x]) 878 path = "/".join([result["resource_type"], "upload", "v" + str(result["version"]), filename]) 879 return path + "#" + result["signature"] 880 881 882def now(): 883 return str(int(time.time())) 884 885 886def private_download_url(public_id, format, **options): 887 cloudinary_params = sign_request({ 888 "timestamp": now(), 889 "public_id": public_id, 890 "format": format, 891 "type": options.get("type"), 892 "attachment": options.get("attachment"), 893 "expires_at": options.get("expires_at") 894 }, options) 895 896 return cloudinary_api_url("download", **options) + "?" + urlencode(cloudinary_params) 897 898 899def zip_download_url(tag, **options): 900 cloudinary_params = sign_request({ 901 "timestamp": now(), 902 "tag": tag, 903 "transformation": generate_transformation_string(**options)[0] 904 }, options) 905 906 return cloudinary_api_url("download_tag.zip", **options) + "?" + urlencode(cloudinary_params) 907 908 909def bracketize_seq(params): 910 url_params = dict() 911 for param_name in params: 912 param_value = params[param_name] 913 if isinstance(param_value, list): 914 param_name += "[]" 915 url_params[param_name] = param_value 916 return url_params 917 918 919def download_archive_url(**options): 920 return cloudinary_api_download_url(action="generate_archive", params=archive_params(**options), **options) 921 922 923def download_zip_url(**options): 924 new_options = options.copy() 925 new_options.update(target_format="zip") 926 return download_archive_url(**new_options) 927 928 929def download_folder(folder_path, **options): 930 """ 931 Creates and returns a URL that when invoked creates an archive of a folder. 932 :param folder_path: The full path from the root that is used to generate download url. 933 :type folder_path: str 934 :param options: Additional options. 935 :type options: dict, optional 936 :return: Signed URL to download the folder. 937 :rtype: str 938 """ 939 options["prefixes"] = folder_path 940 options.setdefault("resource_type", "all") 941 942 return download_archive_url(**options) 943 944 945def download_backedup_asset(asset_id, version_id, **options): 946 """ 947 The returned url allows downloading the backedup asset based on the the asset ID and the version ID. 948 949 Parameters asset_id and version_id are returned with api.resource(<PUBLIC_ID1>, versions=True) API call. 950 951 :param asset_id: The asset ID of the asset. 952 :type asset_id: str 953 :param version_id: The version ID of the asset. 954 :type version_id: str 955 :param options: Additional options. 956 :type options: dict, optional 957 :return:The signed URL for downloading backup version of the asset. 958 :rtype: str 959 """ 960 params = { 961 "timestamp": options.get("timestamp", now()), 962 "asset_id": asset_id, 963 "version_id": version_id 964 } 965 cloudinary_params = sign_request(params, options) 966 967 return base_api_url("download_backup", **options) + "?" + urlencode(bracketize_seq(cloudinary_params), True) 968 969 970def generate_auth_token(**options): 971 token_options = merge(cloudinary.config().auth_token, options) 972 return auth_token.generate(**token_options) 973 974 975def archive_params(**options): 976 if options.get("timestamp") is None: 977 timestamp = now() 978 else: 979 timestamp = options.get("timestamp") 980 params = { 981 "allow_missing": options.get("allow_missing"), 982 "async": options.get("async"), 983 "expires_at": options.get("expires_at"), 984 "flatten_folders": options.get("flatten_folders"), 985 "flatten_transformations": options.get("flatten_transformations"), 986 "keep_derived": options.get("keep_derived"), 987 "mode": options.get("mode"), 988 "notification_url": options.get("notification_url"), 989 "phash": options.get("phash"), 990 "prefixes": options.get("prefixes") and build_array(options.get("prefixes")), 991 "public_ids": options.get("public_ids") and build_array(options.get("public_ids")), 992 "fully_qualified_public_ids": options.get("fully_qualified_public_ids") and build_array( 993 options.get("fully_qualified_public_ids")), 994 "skip_transformation_name": options.get("skip_transformation_name"), 995 "tags": options.get("tags") and build_array(options.get("tags")), 996 "target_format": options.get("target_format"), 997 "target_public_id": options.get("target_public_id"), 998 "target_tags": options.get("target_tags") and build_array(options.get("target_tags")), 999 "timestamp": timestamp, 1000 "transformations": build_eager(options.get("transformations")), 1001 "type": options.get("type"), 1002 "use_original_filename": options.get("use_original_filename"), 1003 } 1004 return params 1005 1006 1007def build_eager(transformations): 1008 if transformations is None: 1009 return None 1010 1011 return "|".join([build_single_eager(et) for et in build_array(transformations)]) 1012 1013 1014def build_single_eager(options): 1015 """ 1016 Builds a single eager transformation which consists of transformation and (optionally) format joined by "/" 1017 1018 :param options: Options containing transformation parameters and (optionally) a "format" key 1019 format can be a string value (jpg, gif, etc) or can be set to "" (empty string). 1020 The latter leads to transformation ending with "/", which means "No extension, use original format" 1021 If format is not provided or set to None, only transformation is used (without the trailing "/") 1022 1023 :return: Resulting eager transformation string 1024 """ 1025 if isinstance(options, string_types): 1026 return options 1027 1028 trans_str = generate_transformation_string(**options)[0] 1029 1030 if not trans_str: 1031 return "" 1032 1033 file_format = options.get("format") 1034 1035 return trans_str + ("/" + file_format if file_format is not None else "") 1036 1037 1038def build_custom_headers(headers): 1039 if headers is None: 1040 return None 1041 elif isinstance(headers, list): 1042 pass 1043 elif isinstance(headers, dict): 1044 headers = [k + ": " + v for k, v in headers.items()] 1045 else: 1046 return headers 1047 return "\n".join(headers) 1048 1049 1050def build_upload_params(**options): 1051 params = {param_name: options.get(param_name) for param_name in __SIMPLE_UPLOAD_PARAMS} 1052 1053 serialized_params = { 1054 "timestamp": now(), 1055 "metadata": encode_context(options.get("metadata")), 1056 "transformation": generate_transformation_string(**options)[0], 1057 "headers": build_custom_headers(options.get("headers")), 1058 "eager": build_eager(options.get("eager")), 1059 "tags": options.get("tags") and encode_list(build_array(options["tags"])), 1060 "allowed_formats": options.get("allowed_formats") and encode_list(build_array(options["allowed_formats"])), 1061 "face_coordinates": encode_double_array(options.get("face_coordinates")), 1062 "custom_coordinates": encode_double_array(options.get("custom_coordinates")), 1063 "context": encode_context(options.get("context")), 1064 "auto_tagging": options.get("auto_tagging") and str(options.get("auto_tagging")), 1065 "responsive_breakpoints": generate_responsive_breakpoints_string(options.get("responsive_breakpoints")), 1066 "access_control": options.get("access_control") and json_encode( 1067 build_list_of_dicts(options.get("access_control"))) 1068 } 1069 1070 # make sure that we are in-sync with __SERIALIZED_UPLOAD_PARAMS which are in use by other methods 1071 serialized_params = {param_name: serialized_params[param_name] for param_name in __SERIALIZED_UPLOAD_PARAMS} 1072 1073 params.update(serialized_params) 1074 1075 return params 1076 1077 1078def build_multi_and_sprite_params(**options): 1079 """ 1080 Build params for multi, download_multi, generate_sprite, and download_generated_sprite methods 1081 """ 1082 tag = options.get("tag") 1083 urls = options.get("urls") 1084 if bool(tag) == bool(urls): 1085 raise ValueError("Either 'tag' or 'urls' parameter has to be set but not both") 1086 params = { 1087 "mode": options.get("mode"), 1088 "timestamp": now(), 1089 "async": options.get("async"), 1090 "notification_url": options.get("notification_url"), 1091 "tag": tag, 1092 "urls": urls, 1093 "transformation": generate_transformation_string(fetch_format=options.get("format"), **options)[0] 1094 } 1095 return params 1096 1097 1098def __process_text_options(layer, layer_parameter): 1099 text_style = str(layer.get("text_style", "")) 1100 if text_style and not text_style.isspace(): 1101 return text_style 1102 1103 font_family = layer.get("font_family") 1104 font_size = layer.get("font_size") 1105 keywords = [] 1106 for attr, default_value in __LAYER_KEYWORD_PARAMS: 1107 attr_value = layer.get(attr) 1108 if attr_value != default_value and attr_value is not None: 1109 keywords.append(attr_value) 1110 1111 letter_spacing = layer.get("letter_spacing") 1112 if letter_spacing is not None: 1113 keywords.append("letter_spacing_" + str(letter_spacing)) 1114 1115 line_spacing = layer.get("line_spacing") 1116 if line_spacing is not None: 1117 keywords.append("line_spacing_" + str(line_spacing)) 1118 1119 font_antialiasing = layer.get("font_antialiasing") 1120 if font_antialiasing is not None: 1121 keywords.append("antialias_" + str(font_antialiasing)) 1122 1123 font_hinting = layer.get("font_hinting") 1124 if font_hinting is not None: 1125 keywords.append("hinting_" + str(font_hinting)) 1126 1127 if font_size is None and font_family is None and len(keywords) == 0: 1128 return None 1129 1130 if font_family is None: 1131 raise ValueError("Must supply font_family for text in " + layer_parameter) 1132 1133 if font_size is None: 1134 raise ValueError("Must supply font_size for text in " + layer_parameter) 1135 1136 keywords.insert(0, font_size) 1137 keywords.insert(0, font_family) 1138 1139 return '_'.join([str(k) for k in keywords]) 1140 1141 1142def process_layer(layer, layer_parameter): 1143 if isinstance(layer, string_types) and layer.startswith("fetch:"): 1144 layer = {"url": layer[len('fetch:'):]} 1145 if not isinstance(layer, dict): 1146 return layer 1147 1148 resource_type = layer.get("resource_type") 1149 text = layer.get("text") 1150 type = layer.get("type") 1151 public_id = layer.get("public_id") 1152 format = layer.get("format") 1153 fetch = layer.get("url") 1154 components = list() 1155 1156 if text is not None and resource_type is None: 1157 resource_type = "text" 1158 1159 if fetch and resource_type is None: 1160 resource_type = "fetch" 1161 1162 if public_id is not None and format is not None: 1163 public_id = public_id + "." + format 1164 1165 if public_id is None and resource_type != "text" and resource_type != "fetch": 1166 raise ValueError("Must supply public_id for for non-text " + layer_parameter) 1167 1168 if resource_type is not None and resource_type != "image": 1169 components.append(resource_type) 1170 1171 if type is not None and type != "upload": 1172 components.append(type) 1173 1174 if resource_type == "text" or resource_type == "subtitles": 1175 if public_id is None and text is None: 1176 raise ValueError("Must supply either text or public_id in " + layer_parameter) 1177 1178 text_options = __process_text_options(layer, layer_parameter) 1179 1180 if text_options is not None: 1181 components.append(text_options) 1182 1183 if public_id is not None: 1184 public_id = public_id.replace("/", ':') 1185 components.append(public_id) 1186 1187 if text is not None: 1188 var_pattern = VAR_NAME_RE 1189 match = re.findall(var_pattern, text) 1190 1191 parts = filter(lambda p: p is not None, re.split(var_pattern, text)) 1192 encoded_text = [] 1193 for part in parts: 1194 if re.match(var_pattern, part): 1195 encoded_text.append(part) 1196 else: 1197 encoded_text.append(smart_escape(smart_escape(part, r"([,/])"))) 1198 1199 text = ''.join(encoded_text) 1200 # text = text.replace("%2C", "%252C") 1201 # text = text.replace("/", "%252F") 1202 components.append(text) 1203 elif resource_type == "fetch": 1204 b64 = base64_encode_url(fetch) 1205 components.append(b64) 1206 else: 1207 public_id = public_id.replace("/", ':') 1208 components.append(public_id) 1209 1210 return ':'.join(components) 1211 1212 1213IF_OPERATORS = { 1214 "=": 'eq', 1215 "!=": 'ne', 1216 "<": 'lt', 1217 ">": 'gt', 1218 "<=": 'lte', 1219 ">=": 'gte', 1220 "&&": 'and', 1221 "||": 'or', 1222 "*": 'mul', 1223 "/": 'div', 1224 "+": 'add', 1225 "-": 'sub', 1226 "^": 'pow' 1227} 1228 1229PREDEFINED_VARS = { 1230 "aspect_ratio": "ar", 1231 "aspectRatio": "ar", 1232 "current_page": "cp", 1233 "currentPage": "cp", 1234 "face_count": "fc", 1235 "faceCount": "fc", 1236 "height": "h", 1237 "initial_aspect_ratio": "iar", 1238 "initialAspectRatio": "iar", 1239 "trimmed_aspect_ratio": "tar", 1240 "trimmedAspectRatio": "tar", 1241 "initial_height": "ih", 1242 "initialHeight": "ih", 1243 "initial_width": "iw", 1244 "initialWidth": "iw", 1245 "page_count": "pc", 1246 "pageCount": "pc", 1247 "page_x": "px", 1248 "pageX": "px", 1249 "page_y": "py", 1250 "pageY": "py", 1251 "tags": "tags", 1252 "width": "w", 1253 "duration": "du", 1254 "initial_duration": "idu", 1255 "initialDuration": "idu", 1256 "illustration_score": "ils", 1257 "illustrationScore": "ils", 1258 "context": "ctx" 1259} 1260 1261replaceRE = "((\\|\\||>=|<=|&&|!=|>|=|<|/|-|\\+|\\*|\\^)(?=[ _])|(\\$_*[^_ ]+)|(?<![\\$:])(" + \ 1262 '|'.join(PREDEFINED_VARS.keys()) + "))" 1263 1264 1265def translate_if(match): 1266 name = match.group(0) 1267 return IF_OPERATORS.get(name, 1268 PREDEFINED_VARS.get(name, 1269 name)) 1270 1271 1272def process_custom_function(custom_function): 1273 if not isinstance(custom_function, dict): 1274 return custom_function 1275 1276 function_type = custom_function.get("function_type") 1277 source = custom_function.get("source") 1278 if function_type == "remote": 1279 source = base64url_encode(source) 1280 1281 return ":".join([function_type, source]) 1282 1283 1284def process_custom_pre_function(custom_function): 1285 value = process_custom_function(custom_function) 1286 return "pre:{0}".format(value) if value else None 1287 1288 1289def process_fps(fps): 1290 """ 1291 Serializes fps transformation parameter 1292 1293 :param fps: A single number, a list of mixed type, a string, including open-ended and closed range values 1294 Examples: '24-29.97', 24, 24.973, '-24', [24, 29.97] 1295 1296 :return: string 1297 """ 1298 if not isinstance(fps, (list, tuple)): 1299 return fps 1300 1301 return "-".join(normalize_expression(f) for f in fps) 1302 1303 1304def process_ki(ki): 1305 """ 1306 Serializes keyframe_interval parameter 1307 :param ki: Keyframe interval. Should be either a string or a positive real number. 1308 :return: string 1309 """ 1310 if ki is None: 1311 return None 1312 if isinstance(ki, string_types): 1313 return ki 1314 if not isinstance(ki, Number): 1315 raise ValueError("Keyframe interval should be a number or a string") 1316 if ki <= 0: 1317 raise ValueError("Keyframe interval should be greater than zero") 1318 return str(float(ki)) 1319 1320 1321def process_conditional(conditional): 1322 if conditional is None: 1323 return conditional 1324 result = normalize_expression(conditional) 1325 return result 1326 1327 1328def normalize_expression(expression): 1329 if re.match(r'^!.+!$', str(expression)): # quoted string 1330 return expression 1331 elif expression: 1332 result = str(expression) 1333 result = re.sub(replaceRE, translate_if, result) 1334 result = re.sub('[ _]+', '_', result) 1335 return result 1336 else: 1337 return expression 1338 1339 1340def __join_pair(key, value): 1341 if value is None or value == "": 1342 return None 1343 elif value is True: 1344 return key 1345 else: 1346 return u"{0}=\"{1}\"".format(key, value) 1347 1348 1349def html_attrs(attrs, only=None): 1350 return ' '.join(sorted([__join_pair(key, value) for key, value in attrs.items() if only is None or key in only])) 1351 1352 1353def __safe_value(v): 1354 if isinstance(v, bool): 1355 return "1" if v else "0" 1356 else: 1357 return v 1358 1359 1360def __crc(source): 1361 return str((zlib.crc32(to_bytearray(source)) & 0xffffffff) % 5 + 1) 1362 1363 1364def __compact(array): 1365 return filter(lambda x: x, array) 1366 1367 1368def base64_encode_url(url): 1369 """ 1370 Returns the Base64-decoded version of url. 1371 The method tries to unquote the url because quoting it 1372 1373 :param str url: 1374 the url to encode. the value is URIdecoded and then 1375 re-encoded before converting to base64 representation 1376 1377 """ 1378 1379 try: 1380 url = unquote(url) 1381 except Exception: 1382 pass 1383 url = smart_escape(url) 1384 b64 = base64.b64encode(url.encode('utf-8')) 1385 return b64.decode('ascii') 1386 1387 1388def base64url_encode(data): 1389 """ 1390 Url safe version of urlsafe_b64encode with stripped `=` sign at the end. 1391 1392 :param data: input data 1393 1394 :return: Base64 URL safe encoded string 1395 """ 1396 return to_string(base64.urlsafe_b64encode(to_bytes(data))) 1397 1398 1399def encode_unicode_url(url_str): 1400 """ 1401 Quote and encode possible unicode url string (applicable for python2) 1402 1403 :param url_str: Url string to encode 1404 1405 :return: Encoded string 1406 """ 1407 if six.PY2: 1408 url_str = urllib.quote(url_str.encode('utf-8'), ":/?#[]@!$&'()*+,;=") 1409 1410 return url_str 1411 1412 1413def __json_serializer(obj): 1414 """JSON serializer for objects not serializable by default json code""" 1415 if isinstance(obj, (datetime, date)): 1416 return obj.isoformat() 1417 raise TypeError("Object of type %s is not JSON serializable" % type(obj)) 1418 1419 1420def is_remote_url(file): 1421 """Basic URL scheme check to define if it's remote URL""" 1422 return isinstance(file, string_types) and re.match(REMOTE_URL_RE, file) 1423 1424 1425def file_io_size(file_io): 1426 """ 1427 Helper function for getting file-like object size(suitable for both files and streams) 1428 1429 :param file_io: io.IOBase 1430 1431 :return: size 1432 """ 1433 initial_position = file_io.tell() 1434 file_io.seek(0, os.SEEK_END) 1435 size = file_io.tell() 1436 file_io.seek(initial_position, os.SEEK_SET) 1437 1438 return size 1439 1440 1441def check_property_enabled(f): 1442 """ 1443 Used as a class method decorator to check whether class is enabled(self.enabled is True) 1444 1445 :param f: function to call 1446 1447 :return: None if not enabled, otherwise calls function f 1448 """ 1449 def wrapper(*args, **kwargs): 1450 if not args[0].enabled: 1451 return None 1452 return f(*args, **kwargs) 1453 1454 return wrapper 1455 1456 1457def verify_api_response_signature(public_id, version, signature, algorithm=None): 1458 """ 1459 Verifies the authenticity of an API response signature 1460 1461 :param public_id: The public id of the asset as returned in the API response 1462 :param version: The version of the asset as returned in the API response 1463 :param signature: Actual signature. Can be retrieved from the X-Cld-Signature header 1464 :param algorithm: Name of hashing algorithm to use for calculation of HMACs. 1465 By default uses `cloudinary.config().signature_algorithm` 1466 1467 :return: Boolean result of the validation 1468 """ 1469 if not cloudinary.config().api_secret: 1470 raise Exception('Api secret key is empty') 1471 1472 parameters_to_sign = {'public_id': public_id, 1473 'version': version} 1474 1475 return signature == api_sign_request( 1476 parameters_to_sign, 1477 cloudinary.config().api_secret, 1478 algorithm or cloudinary.config().signature_algorithm 1479 ) 1480 1481 1482def verify_notification_signature(body, timestamp, signature, valid_for=7200, algorithm=None): 1483 """ 1484 Verifies the authenticity of a notification signature 1485 1486 :param body: Json of the request's body 1487 :param timestamp: Unix timestamp. Can be retrieved from the X-Cld-Timestamp header 1488 :param signature: Actual signature. Can be retrieved from the X-Cld-Signature header 1489 :param valid_for: The desired time in seconds for considering the request valid 1490 :param algorithm: Name of hashing algorithm to use for calculation of HMACs. 1491 By default uses `cloudinary.config().signature_algorithm` 1492 1493 :return: Boolean result of the validation 1494 """ 1495 if not cloudinary.config().api_secret: 1496 raise Exception('Api secret key is empty') 1497 1498 if timestamp < time.time() - valid_for: 1499 return False 1500 1501 if not isinstance(body, str): 1502 raise ValueError('Body should be type of string') 1503 1504 return signature == compute_hex_hash( 1505 '{}{}{}'.format(body, timestamp, cloudinary.config().api_secret), 1506 algorithm or cloudinary.config().signature_algorithm) 1507 1508 1509def get_http_connector(conf, options): 1510 """ 1511 Used to create http connector, depends on api_proxy configuration parameter 1512 1513 :param conf: configuration object 1514 :param options: additional options 1515 1516 :return: ProxyManager if api_proxy is set, otherwise PoolManager object 1517 """ 1518 if conf.api_proxy: 1519 return ProxyManager(conf.api_proxy, **options) 1520 else: 1521 return PoolManager(**options) 1522 1523 1524def encode_list(obj): 1525 if isinstance(obj, list): 1526 return ",".join(obj) 1527 return obj 1528 1529 1530def safe_cast(val, casting_fn, default=None): 1531 """ 1532 Attempts to cast a value to another using a given casting function 1533 Will return a default value if casting fails (configurable, defaults to None) 1534 1535 :param val: The value to cast 1536 :param casting_fn: The casting function that will receive the value to cast 1537 :param default: The return value if casting fails 1538 1539 :return: Result of casting the value or the value of the default parameter 1540 """ 1541 try: 1542 return casting_fn(val) 1543 except (ValueError, TypeError): 1544 return default 1545 1546 1547def __id(x): 1548 """ 1549 Identity function. Returns the passed in values. 1550 """ 1551 return x 1552 1553 1554def unique(collection, key=None): 1555 """ 1556 Removes duplicates from collection using key function 1557 1558 :param collection: The collection to remove duplicates from 1559 :param key: The function to generate key from each element. If not passed, identity function is used 1560 """ 1561 if key is None: 1562 key = __id 1563 1564 to_return = OrderedDict() 1565 1566 for element in collection: 1567 to_return[key(element)] = element 1568 1569 return list(to_return.values()) 1570