1# ------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for 4# license information. 5# -------------------------------------------------------------------------- 6from azure.common import ( 7 AzureConflictHttpError, 8 AzureHttpError, 9) 10 11from ..common._auth import ( 12 _StorageSASAuthentication, 13 _StorageSharedKeyAuthentication, 14) 15from ..common._common_conversion import ( 16 _int_to_str, 17 _to_str, 18) 19from ..common._connection import _ServiceParameters 20from ..common._constants import ( 21 SERVICE_HOST_BASE, 22 DEFAULT_PROTOCOL, 23) 24from ..common._deserialization import ( 25 _convert_xml_to_service_properties, 26 _convert_xml_to_signed_identifiers, 27 _convert_xml_to_service_stats, 28) 29from ..common._error import ( 30 _dont_fail_not_exist, 31 _dont_fail_on_exist, 32 _validate_not_none, 33 _ERROR_CONFLICT, 34 _ERROR_STORAGE_MISSING_INFO, 35 _validate_access_policies, 36 _validate_encryption_required, 37 _validate_decryption_required, 38) 39from ..common._http import ( 40 HTTPRequest, 41) 42from ..common._serialization import ( 43 _convert_signed_identifiers_to_xml, 44 _convert_service_properties_to_xml, 45) 46from ..common._serialization import ( 47 _get_request_body, 48 _add_metadata_headers, 49) 50from ..common.models import ( 51 Services, 52 ListGenerator, 53 _OperationContext, 54) 55from .sharedaccesssignature import ( 56 QueueSharedAccessSignature, 57) 58from ..common.storageclient import StorageClient 59from ._deserialization import ( 60 _convert_xml_to_queues, 61 _convert_xml_to_queue_messages, 62 _parse_queue_message_from_headers, 63 _parse_metadata_and_message_count, 64) 65from ._serialization import ( 66 _convert_queue_message_xml, 67 _get_path, 68) 69from .models import ( 70 QueueMessageFormat, 71) 72from ._constants import ( 73 X_MS_VERSION, 74 __version__ as package_version, 75) 76 77_HTTP_RESPONSE_NO_CONTENT = 204 78 79 80class QueueService(StorageClient): 81 ''' 82 This is the main class managing queue resources. 83 84 The Queue service stores messages. A queue can contain an unlimited number of 85 messages, each of which can be up to 64KB in size. Messages are generally added 86 to the end of the queue and retrieved from the front of the queue, although 87 first in, first out (FIFO) behavior is not guaranteed. 88 89 :ivar function(data) encode_function: 90 A function used to encode queue messages. Takes as 91 a parameter the data passed to the put_message API and returns the encoded 92 message. Defaults to take text and xml encode, but bytes and other 93 encodings can be used. For example, base64 may be preferable for developing 94 across multiple Azure Storage libraries in different languages. See the 95 :class:`~azure.storage.queue.models.QueueMessageFormat` for xml, base64 and 96 no encoding methods as well as binary equivalents. 97 :ivar function(data) decode_function: 98 A function used to encode decode messages. Takes as 99 a parameter the data returned by the get_messages and peek_messages APIs and 100 returns the decoded message. Defaults to return text and xml decode, but 101 bytes and other decodings can be used. For example, base64 may be preferable 102 for developing across multiple Azure Storage libraries in different languages. 103 See the :class:`~azure.storage.queue.models.QueueMessageFormat` for xml, base64 104 and no decoding methods as well as binary equivalents. 105 :ivar object key_encryption_key: 106 The key-encryption-key optionally provided by the user. If provided, will be used to 107 encrypt/decrypt in supported methods. 108 For methods requiring decryption, either the key_encryption_key OR the resolver must be provided. 109 If both are provided, the resolver will take precedence. 110 Must implement the following methods for APIs requiring encryption: 111 wrap_key(key)--wraps the specified key (bytes) using an algorithm of the user's choice. Returns the encrypted key as bytes. 112 get_key_wrap_algorithm()--returns the algorithm used to wrap the specified symmetric key. 113 get_kid()--returns a string key id for this key-encryption-key. 114 Must implement the following methods for APIs requiring decryption: 115 unwrap_key(key, algorithm)--returns the unwrapped form of the specified symmetric key using the string-specified algorithm. 116 get_kid()--returns a string key id for this key-encryption-key. 117 :ivar function key_resolver_function(kid): 118 A function to resolve keys optionally provided by the user. If provided, will be used to decrypt in supported methods. 119 For methods requiring decryption, either the key_encryption_key OR 120 the resolver must be provided. If both are provided, the resolver will take precedence. 121 It uses the kid string to return a key-encryption-key implementing the interface defined above. 122 :ivar bool require_encryption: 123 A flag that may be set to ensure that all messages successfully uploaded to the queue and all those downloaded and 124 successfully read from the queue are/were encrypted while on the server. If this flag is set, all required 125 parameters for encryption/decryption must be provided. See the above comments on the key_encryption_key and resolver. 126 ''' 127 128 def __init__(self, account_name=None, account_key=None, sas_token=None, 129 is_emulated=False, protocol=DEFAULT_PROTOCOL, endpoint_suffix=SERVICE_HOST_BASE, 130 request_session=None, connection_string=None, socket_timeout=None): 131 ''' 132 :param str account_name: 133 The storage account name. This is used to authenticate requests 134 signed with an account key and to construct the storage endpoint. It 135 is required unless a connection string is given. 136 :param str account_key: 137 The storage account key. This is used for shared key authentication. 138 :param str sas_token: 139 A shared access signature token to use to authenticate requests 140 instead of the account key. If account key and sas token are both 141 specified, account key will be used to sign. 142 :param bool is_emulated: 143 Whether to use the emulator. Defaults to False. If specified, will 144 override all other parameters besides connection string and request 145 session. 146 :param str protocol: 147 The protocol to use for requests. Defaults to https. 148 :param str endpoint_suffix: 149 The host base component of the url, minus the account name. Defaults 150 to Azure (core.windows.net). Override this to use the China cloud 151 (core.chinacloudapi.cn). 152 :param requests.Session request_session: 153 The session object to use for http requests. 154 :param str connection_string: 155 If specified, this will override all other parameters besides 156 request session. See 157 http://azure.microsoft.com/en-us/documentation/articles/storage-configure-connection-string/ 158 for the connection string format. 159 :param int socket_timeout: 160 If specified, this will override the default socket timeout. The timeout specified is in seconds. 161 See DEFAULT_SOCKET_TIMEOUT in _constants.py for the default value. 162 ''' 163 service_params = _ServiceParameters.get_service_parameters( 164 'queue', 165 account_name=account_name, 166 account_key=account_key, 167 sas_token=sas_token, 168 is_emulated=is_emulated, 169 protocol=protocol, 170 endpoint_suffix=endpoint_suffix, 171 request_session=request_session, 172 connection_string=connection_string, 173 socket_timeout=socket_timeout) 174 175 super(QueueService, self).__init__(service_params) 176 177 if self.account_key: 178 self.authentication = _StorageSharedKeyAuthentication( 179 self.account_name, 180 self.account_key, 181 self.is_emulated 182 ) 183 elif self.sas_token: 184 self.authentication = _StorageSASAuthentication(self.sas_token) 185 else: 186 raise ValueError(_ERROR_STORAGE_MISSING_INFO) 187 188 self.encode_function = QueueMessageFormat.text_xmlencode 189 self.decode_function = QueueMessageFormat.text_xmldecode 190 self.key_encryption_key = None 191 self.key_resolver_function = None 192 self.require_encryption = False 193 self._X_MS_VERSION = X_MS_VERSION 194 self._update_user_agent_string(package_version) 195 196 def generate_account_shared_access_signature(self, resource_types, permission, 197 expiry, start=None, ip=None, protocol=None): 198 ''' 199 Generates a shared access signature for the queue service. 200 Use the returned signature with the sas_token parameter of QueueService. 201 202 :param ResourceTypes resource_types: 203 Specifies the resource types that are accessible with the account SAS. 204 :param AccountPermissions permission: 205 The permissions associated with the shared access signature. The 206 user is restricted to operations allowed by the permissions. 207 Required unless an id is given referencing a stored access policy 208 which contains this field. This field must be omitted if it has been 209 specified in an associated stored access policy. 210 :param expiry: 211 The time at which the shared access signature becomes invalid. 212 Required unless an id is given referencing a stored access policy 213 which contains this field. This field must be omitted if it has 214 been specified in an associated stored access policy. Azure will always 215 convert values to UTC. If a date is passed in without timezone info, it 216 is assumed to be UTC. 217 :type expiry: datetime or str 218 :param start: 219 The time at which the shared access signature becomes valid. If 220 omitted, start time for this call is assumed to be the time when the 221 storage service receives the request. Azure will always convert values 222 to UTC. If a date is passed in without timezone info, it is assumed to 223 be UTC. 224 :type start: datetime or str 225 :param str ip: 226 Specifies an IP address or a range of IP addresses from which to accept requests. 227 If the IP address from which the request originates does not match the IP address 228 or address range specified on the SAS token, the request is not authenticated. 229 For example, specifying sip=168.1.5.65 or sip=168.1.5.60-168.1.5.70 on the SAS 230 restricts the request to those IP addresses. 231 :param str protocol: 232 Specifies the protocol permitted for a request made. The default value 233 is https,http. See :class:`~azure.storage.common.models.Protocol` for possible values. 234 :return: A Shared Access Signature (sas) token. 235 :rtype: str 236 ''' 237 _validate_not_none('self.account_name', self.account_name) 238 _validate_not_none('self.account_key', self.account_key) 239 240 sas = QueueSharedAccessSignature(self.account_name, self.account_key) 241 return sas.generate_account(Services.QUEUE, resource_types, permission, 242 expiry, start=start, ip=ip, protocol=protocol) 243 244 def generate_queue_shared_access_signature(self, queue_name, 245 permission=None, 246 expiry=None, 247 start=None, 248 id=None, 249 ip=None, protocol=None, ): 250 ''' 251 Generates a shared access signature for the queue. 252 Use the returned signature with the sas_token parameter of QueueService. 253 254 :param str queue_name: 255 The name of the queue to create a SAS token for. 256 :param QueuePermissions permission: 257 The permissions associated with the shared access signature. The 258 user is restricted to operations allowed by the permissions. 259 Required unless an id is given referencing a stored access policy 260 which contains this field. This field must be omitted if it has been 261 specified in an associated stored access policy. 262 :param expiry: 263 The time at which the shared access signature becomes invalid. 264 Required unless an id is given referencing a stored access policy 265 which contains this field. This field must be omitted if it has 266 been specified in an associated stored access policy. Azure will always 267 convert values to UTC. If a date is passed in without timezone info, it 268 is assumed to be UTC. 269 :type expiry: datetime or str 270 :param start: 271 The time at which the shared access signature becomes valid. If 272 omitted, start time for this call is assumed to be the time when the 273 storage service receives the request. Azure will always convert values 274 to UTC. If a date is passed in without timezone info, it is assumed to 275 be UTC. 276 :type start: datetime or str 277 :param str id: 278 A unique value up to 64 characters in length that correlates to a 279 stored access policy. To create a stored access policy, use :func:`~set_queue_acl`. 280 :param str ip: 281 Specifies an IP address or a range of IP addresses from which to accept requests. 282 If the IP address from which the request originates does not match the IP address 283 or address range specified on the SAS token, the request is not authenticated. 284 For example, specifying sip='168.1.5.65' or sip='168.1.5.60-168.1.5.70' on the SAS 285 restricts the request to those IP addresses. 286 :param str protocol: 287 Specifies the protocol permitted for a request made. The default value 288 is https,http. See :class:`~azure.storage.common.models.Protocol` for possible values. 289 :return: A Shared Access Signature (sas) token. 290 :rtype: str 291 ''' 292 _validate_not_none('queue_name', queue_name) 293 _validate_not_none('self.account_name', self.account_name) 294 _validate_not_none('self.account_key', self.account_key) 295 296 sas = QueueSharedAccessSignature(self.account_name, self.account_key) 297 return sas.generate_queue( 298 queue_name, 299 permission=permission, 300 expiry=expiry, 301 start=start, 302 id=id, 303 ip=ip, 304 protocol=protocol, 305 ) 306 307 def get_queue_service_stats(self, timeout=None): 308 ''' 309 Retrieves statistics related to replication for the Queue service. It is 310 only available when read-access geo-redundant replication is enabled for 311 the storage account. 312 313 With geo-redundant replication, Azure Storage maintains your data durable 314 in two locations. In both locations, Azure Storage constantly maintains 315 multiple healthy replicas of your data. The location where you read, 316 create, update, or delete data is the primary storage account location. 317 The primary location exists in the region you choose at the time you 318 create an account via the Azure Management Azure classic portal, for 319 example, North Central US. The location to which your data is replicated 320 is the secondary location. The secondary location is automatically 321 determined based on the location of the primary; it is in a second data 322 center that resides in the same region as the primary location. Read-only 323 access is available from the secondary location, if read-access geo-redundant 324 replication is enabled for your storage account. 325 326 :param int timeout: 327 The timeout parameter is expressed in seconds. 328 :return: The queue service stats. 329 :rtype: :class:`~azure.storage.common.models.ServiceStats` 330 ''' 331 request = HTTPRequest() 332 request.method = 'GET' 333 request.host_locations = self._get_host_locations(primary=False, secondary=True) 334 request.path = _get_path() 335 request.query = { 336 'restype': 'service', 337 'comp': 'stats', 338 'timeout': _int_to_str(timeout), 339 } 340 341 return self._perform_request(request, _convert_xml_to_service_stats) 342 343 def get_queue_service_properties(self, timeout=None): 344 ''' 345 Gets the properties of a storage account's Queue service, including 346 logging, analytics and CORS rules. 347 348 :param int timeout: 349 The server timeout, expressed in seconds. 350 :return: The queue service properties. 351 :rtype: :class:`~azure.storage.common.models.ServiceProperties` 352 ''' 353 request = HTTPRequest() 354 request.method = 'GET' 355 request.host_locations = self._get_host_locations(secondary=True) 356 request.path = _get_path() 357 request.query = { 358 'restype': 'service', 359 'comp': 'properties', 360 'timeout': _int_to_str(timeout), 361 } 362 363 return self._perform_request(request, _convert_xml_to_service_properties) 364 365 def set_queue_service_properties(self, logging=None, hour_metrics=None, 366 minute_metrics=None, cors=None, timeout=None): 367 ''' 368 Sets the properties of a storage account's Queue service, including 369 Azure Storage Analytics. If an element (ex Logging) is left as None, the 370 existing settings on the service for that functionality are preserved. 371 For more information on Azure Storage Analytics, see 372 https://msdn.microsoft.com/en-us/library/azure/hh343270.aspx. 373 374 :param Logging logging: 375 The logging settings provide request logs. 376 :param Metrics hour_metrics: 377 The hour metrics settings provide a summary of request 378 statistics grouped by API in hourly aggregates for queuess. 379 :param Metrics minute_metrics: 380 The minute metrics settings provide request statistics 381 for each minute for queues. 382 :param cors: 383 You can include up to five CorsRule elements in the 384 list. If an empty list is specified, all CORS rules will be deleted, 385 and CORS will be disabled for the service. For detailed information 386 about CORS rules and evaluation logic, see 387 https://msdn.microsoft.com/en-us/library/azure/dn535601.aspx. 388 :type cors: list(:class:`~azure.storage.common.models.CorsRule`) 389 :param int timeout: 390 The server timeout, expressed in seconds. 391 ''' 392 request = HTTPRequest() 393 request.method = 'PUT' 394 request.host_locations = self._get_host_locations() 395 request.path = _get_path() 396 request.query = { 397 'restype': 'service', 398 'comp': 'properties', 399 'timeout': _int_to_str(timeout), 400 } 401 request.body = _get_request_body( 402 _convert_service_properties_to_xml(logging, hour_metrics, minute_metrics, cors)) 403 self._perform_request(request) 404 405 def list_queues(self, prefix=None, num_results=None, include_metadata=False, 406 marker=None, timeout=None): 407 ''' 408 Returns a generator to list the queues. The generator will lazily follow 409 the continuation tokens returned by the service and stop when all queues 410 have been returned or num_results is reached. 411 412 If num_results is specified and the account has more than that number of 413 queues, the generator will have a populated next_marker field once it 414 finishes. This marker can be used to create a new generator if more 415 results are desired. 416 417 :param str prefix: 418 Filters the results to return only queues with names that begin 419 with the specified prefix. 420 :param int num_results: 421 The maximum number of queues to return. 422 :param bool include_metadata: 423 Specifies that container metadata be returned in the response. 424 :param str marker: 425 An opaque continuation token. This value can be retrieved from the 426 next_marker field of a previous generator object if num_results was 427 specified and that generator has finished enumerating results. If 428 specified, this generator will begin returning results from the point 429 where the previous generator stopped. 430 :param int timeout: 431 The server timeout, expressed in seconds. This function may make multiple 432 calls to the service in which case the timeout value specified will be 433 applied to each individual call. 434 ''' 435 include = 'metadata' if include_metadata else None 436 operation_context = _OperationContext(location_lock=True) 437 kwargs = {'prefix': prefix, 'max_results': num_results, 'include': include, 438 'marker': marker, 'timeout': timeout, '_context': operation_context} 439 resp = self._list_queues(**kwargs) 440 441 return ListGenerator(resp, self._list_queues, (), kwargs) 442 443 def _list_queues(self, prefix=None, marker=None, max_results=None, 444 include=None, timeout=None, _context=None): 445 ''' 446 Returns a list of queues under the specified account. Makes a single list 447 request to the service. Used internally by the list_queues method. 448 449 :param str prefix: 450 Filters the results to return only queues with names that begin 451 with the specified prefix. 452 :param str marker: 453 A token which identifies the portion of the query to be 454 returned with the next query operation. The operation returns a 455 next_marker element within the response body if the list returned 456 was not complete. This value may then be used as a query parameter 457 in a subsequent call to request the next portion of the list of 458 queues. The marker value is opaque to the client. 459 :param int max_results: 460 The maximum number of queues to return. A single list request may 461 return up to 1000 queues and potentially a continuation token which 462 should be followed to get additional resutls. 463 :param str include: 464 Include this parameter to specify that the container's 465 metadata be returned as part of the response body. 466 :param int timeout: 467 The server timeout, expressed in seconds. 468 ''' 469 request = HTTPRequest() 470 request.method = 'GET' 471 request.host_locations = self._get_host_locations(secondary=True) 472 request.path = _get_path() 473 request.query = { 474 'comp': 'list', 475 'prefix': _to_str(prefix), 476 'marker': _to_str(marker), 477 'maxresults': _int_to_str(max_results), 478 'include': _to_str(include), 479 'timeout': _int_to_str(timeout) 480 } 481 482 return self._perform_request(request, _convert_xml_to_queues, operation_context=_context) 483 484 def create_queue(self, queue_name, metadata=None, fail_on_exist=False, timeout=None): 485 ''' 486 Creates a queue under the given account. 487 488 :param str queue_name: 489 The name of the queue to create. A queue name must be from 3 through 490 63 characters long and may only contain lowercase letters, numbers, 491 and the dash (-) character. The first and last letters in the queue 492 must be alphanumeric. The dash (-) character cannot be the first or 493 last character. Consecutive dash characters are not permitted in the 494 queue name. 495 :param metadata: 496 A dict containing name-value pairs to associate with the queue as 497 metadata. Note that metadata names preserve the case with which they 498 were created, but are case-insensitive when set or read. 499 :type metadata: dict(str, str) 500 :param bool fail_on_exist: 501 Specifies whether to throw an exception if the queue already exists. 502 :param int timeout: 503 The server timeout, expressed in seconds. 504 :return: 505 A boolean indicating whether the queue was created. If fail_on_exist 506 was set to True, this will throw instead of returning false. 507 :rtype: bool 508 ''' 509 _validate_not_none('queue_name', queue_name) 510 request = HTTPRequest() 511 request.method = 'PUT' 512 request.host_locations = self._get_host_locations() 513 request.path = _get_path(queue_name) 514 request.query = {'timeout': _int_to_str(timeout)} 515 _add_metadata_headers(metadata, request) 516 517 def _return_request(request): 518 return request 519 520 if not fail_on_exist: 521 try: 522 response = self._perform_request(request, parser=_return_request) 523 if response.status == _HTTP_RESPONSE_NO_CONTENT: 524 return False 525 return True 526 except AzureHttpError as ex: 527 _dont_fail_on_exist(ex) 528 return False 529 else: 530 response = self._perform_request(request, parser=_return_request) 531 if response.status == _HTTP_RESPONSE_NO_CONTENT: 532 raise AzureConflictHttpError( 533 _ERROR_CONFLICT.format(response.message), response.status) 534 return True 535 536 def delete_queue(self, queue_name, fail_not_exist=False, timeout=None): 537 ''' 538 Deletes the specified queue and any messages it contains. 539 540 When a queue is successfully deleted, it is immediately marked for deletion 541 and is no longer accessible to clients. The queue is later removed from 542 the Queue service during garbage collection. 543 544 Note that deleting a queue is likely to take at least 40 seconds to complete. 545 If an operation is attempted against the queue while it was being deleted, 546 an :class:`AzureConflictHttpError` will be thrown. 547 548 :param str queue_name: 549 The name of the queue to delete. 550 :param bool fail_not_exist: 551 Specifies whether to throw an exception if the queue doesn't exist. 552 :param int timeout: 553 The server timeout, expressed in seconds. 554 :return: 555 A boolean indicating whether the queue was deleted. If fail_not_exist 556 was set to True, this will throw instead of returning false. 557 :rtype: bool 558 ''' 559 _validate_not_none('queue_name', queue_name) 560 request = HTTPRequest() 561 request.method = 'DELETE' 562 request.host_locations = self._get_host_locations() 563 request.path = _get_path(queue_name) 564 request.query = {'timeout': _int_to_str(timeout)} 565 if not fail_not_exist: 566 try: 567 self._perform_request(request) 568 return True 569 except AzureHttpError as ex: 570 _dont_fail_not_exist(ex) 571 return False 572 else: 573 self._perform_request(request) 574 return True 575 576 def get_queue_metadata(self, queue_name, timeout=None): 577 ''' 578 Retrieves user-defined metadata and queue properties on the specified 579 queue. Metadata is associated with the queue as name-value pairs. 580 581 :param str queue_name: 582 The name of an existing queue. 583 :param int timeout: 584 The server timeout, expressed in seconds. 585 :return: 586 A dictionary representing the queue metadata with an 587 approximate_message_count int property on the dict estimating the 588 number of messages in the queue. 589 :rtype: dict(str, str) 590 ''' 591 _validate_not_none('queue_name', queue_name) 592 request = HTTPRequest() 593 request.method = 'GET' 594 request.host_locations = self._get_host_locations(secondary=True) 595 request.path = _get_path(queue_name) 596 request.query = { 597 'comp': 'metadata', 598 'timeout': _int_to_str(timeout), 599 } 600 601 return self._perform_request(request, _parse_metadata_and_message_count) 602 603 def set_queue_metadata(self, queue_name, metadata=None, timeout=None): 604 ''' 605 Sets user-defined metadata on the specified queue. Metadata is 606 associated with the queue as name-value pairs. 607 608 :param str queue_name: 609 The name of an existing queue. 610 :param dict metadata: 611 A dict containing name-value pairs to associate with the 612 queue as metadata. 613 :param int timeout: 614 The server timeout, expressed in seconds. 615 ''' 616 _validate_not_none('queue_name', queue_name) 617 request = HTTPRequest() 618 request.method = 'PUT' 619 request.host_locations = self._get_host_locations() 620 request.path = _get_path(queue_name) 621 request.query = { 622 'comp': 'metadata', 623 'timeout': _int_to_str(timeout), 624 } 625 _add_metadata_headers(metadata, request) 626 627 self._perform_request(request) 628 629 def exists(self, queue_name, timeout=None): 630 ''' 631 Returns a boolean indicating whether the queue exists. 632 633 :param str queue_name: 634 The name of queue to check for existence. 635 :param int timeout: 636 The server timeout, expressed in seconds. 637 :return: A boolean indicating whether the queue exists. 638 :rtype: bool 639 ''' 640 try: 641 self.get_queue_metadata(queue_name, timeout=timeout) 642 return True 643 except AzureHttpError as ex: 644 _dont_fail_not_exist(ex) 645 return False 646 647 def get_queue_acl(self, queue_name, timeout=None): 648 ''' 649 Returns details about any stored access policies specified on the 650 queue that may be used with Shared Access Signatures. 651 652 :param str queue_name: 653 The name of an existing queue. 654 :param int timeout: 655 The server timeout, expressed in seconds. 656 :return: A dictionary of access policies associated with the queue. 657 :rtype: dict(str, :class:`~azure.storage.common.models.AccessPolicy`) 658 ''' 659 _validate_not_none('queue_name', queue_name) 660 request = HTTPRequest() 661 request.method = 'GET' 662 request.host_locations = self._get_host_locations(secondary=True) 663 request.path = _get_path(queue_name) 664 request.query = { 665 'comp': 'acl', 666 'timeout': _int_to_str(timeout), 667 } 668 669 return self._perform_request(request, _convert_xml_to_signed_identifiers) 670 671 def set_queue_acl(self, queue_name, signed_identifiers=None, timeout=None): 672 ''' 673 Sets stored access policies for the queue that may be used with Shared 674 Access Signatures. 675 676 When you set permissions for a queue, the existing permissions are replaced. 677 To update the queue's permissions, call :func:`~get_queue_acl` to fetch 678 all access policies associated with the queue, modify the access policy 679 that you wish to change, and then call this function with the complete 680 set of data to perform the update. 681 682 When you establish a stored access policy on a queue, it may take up to 683 30 seconds to take effect. During this interval, a shared access signature 684 that is associated with the stored access policy will throw an 685 :class:`AzureHttpError` until the access policy becomes active. 686 687 :param str queue_name: 688 The name of an existing queue. 689 :param signed_identifiers: 690 A dictionary of access policies to associate with the queue. The 691 dictionary may contain up to 5 elements. An empty dictionary 692 will clear the access policies set on the service. 693 :type signed_identifiers: dict(str, :class:`~azure.storage.common.models.AccessPolicy`) 694 :param int timeout: 695 The server timeout, expressed in seconds. 696 ''' 697 _validate_not_none('queue_name', queue_name) 698 _validate_access_policies(signed_identifiers) 699 request = HTTPRequest() 700 request.method = 'PUT' 701 request.host_locations = self._get_host_locations() 702 request.path = _get_path(queue_name) 703 request.query = { 704 'comp': 'acl', 705 'timeout': _int_to_str(timeout), 706 } 707 request.body = _get_request_body( 708 _convert_signed_identifiers_to_xml(signed_identifiers)) 709 self._perform_request(request) 710 711 def put_message(self, queue_name, content, visibility_timeout=None, 712 time_to_live=None, timeout=None): 713 ''' 714 Adds a new message to the back of the message queue. 715 716 The visibility timeout specifies the time that the message will be 717 invisible. After the timeout expires, the message will become visible. 718 If a visibility timeout is not specified, the default value of 0 is used. 719 720 The message time-to-live specifies how long a message will remain in the 721 queue. The message will be deleted from the queue when the time-to-live 722 period expires. 723 724 If the key-encryption-key field is set on the local service object, this method will 725 encrypt the content before uploading. 726 727 :param str queue_name: 728 The name of the queue to put the message into. 729 :param obj content: 730 Message content. Allowed type is determined by the encode_function 731 set on the service. Default is str. The encoded message can be up to 732 64KB in size. 733 :param int visibility_timeout: 734 If not specified, the default value is 0. Specifies the 735 new visibility timeout value, in seconds, relative to server time. 736 The value must be larger than or equal to 0, and cannot be 737 larger than 7 days. The visibility timeout of a message cannot be 738 set to a value later than the expiry time. visibility_timeout 739 should be set to a value smaller than the time-to-live value. 740 :param int time_to_live: 741 Specifies the time-to-live interval for the message, in 742 seconds. The time-to-live may be any positive number or -1 for infinity. If this 743 parameter is omitted, the default time-to-live is 7 days. 744 :param int timeout: 745 The server timeout, expressed in seconds. 746 :return: 747 A :class:`~azure.storage.queue.models.QueueMessage` object. 748 This object is also populated with the content although it is not 749 returned from the service. 750 :rtype: :class:`~azure.storage.queue.models.QueueMessage` 751 ''' 752 753 _validate_encryption_required(self.require_encryption, self.key_encryption_key) 754 755 _validate_not_none('queue_name', queue_name) 756 _validate_not_none('content', content) 757 request = HTTPRequest() 758 request.method = 'POST' 759 request.host_locations = self._get_host_locations() 760 request.path = _get_path(queue_name, True) 761 request.query = { 762 'visibilitytimeout': _to_str(visibility_timeout), 763 'messagettl': _to_str(time_to_live), 764 'timeout': _int_to_str(timeout) 765 } 766 767 request.body = _get_request_body(_convert_queue_message_xml(content, self.encode_function, 768 self.key_encryption_key)) 769 770 message_list = self._perform_request(request, _convert_xml_to_queue_messages, 771 [self.decode_function, False, 772 None, None, content]) 773 return message_list[0] 774 775 def get_messages(self, queue_name, num_messages=None, 776 visibility_timeout=None, timeout=None): 777 ''' 778 Retrieves one or more messages from the front of the queue. 779 780 When a message is retrieved from the queue, the response includes the message 781 content and a pop_receipt value, which is required to delete the message. 782 The message is not automatically deleted from the queue, but after it has 783 been retrieved, it is not visible to other clients for the time interval 784 specified by the visibility_timeout parameter. 785 786 If the key-encryption-key or resolver field is set on the local service object, the messages will be 787 decrypted before being returned. 788 789 :param str queue_name: 790 The name of the queue to get messages from. 791 :param int num_messages: 792 A nonzero integer value that specifies the number of 793 messages to retrieve from the queue, up to a maximum of 32. If 794 fewer are visible, the visible messages are returned. By default, 795 a single message is retrieved from the queue with this operation. 796 :param int visibility_timeout: 797 Specifies the new visibility timeout value, in seconds, relative 798 to server time. The new value must be larger than or equal to 1 799 second, and cannot be larger than 7 days. The visibility timeout of 800 a message can be set to a value later than the expiry time. 801 :param int timeout: 802 The server timeout, expressed in seconds. 803 :return: A :class:`~azure.storage.queue.models.QueueMessage` object representing the information passed. 804 :rtype: list(:class:`~azure.storage.queue.models.QueueMessage`) 805 ''' 806 _validate_decryption_required(self.require_encryption, self.key_encryption_key, 807 self.key_resolver_function) 808 809 _validate_not_none('queue_name', queue_name) 810 request = HTTPRequest() 811 request.method = 'GET' 812 request.host_locations = self._get_host_locations() 813 request.path = _get_path(queue_name, True) 814 request.query = { 815 'numofmessages': _to_str(num_messages), 816 'visibilitytimeout': _to_str(visibility_timeout), 817 'timeout': _int_to_str(timeout) 818 } 819 820 return self._perform_request(request, _convert_xml_to_queue_messages, 821 [self.decode_function, self.require_encryption, 822 self.key_encryption_key, self.key_resolver_function]) 823 824 def peek_messages(self, queue_name, num_messages=None, timeout=None): 825 ''' 826 Retrieves one or more messages from the front of the queue, but does 827 not alter the visibility of the message. 828 829 Only messages that are visible may be retrieved. When a message is retrieved 830 for the first time with a call to get_messages, its dequeue_count property 831 is set to 1. If it is not deleted and is subsequently retrieved again, the 832 dequeue_count property is incremented. The client may use this value to 833 determine how many times a message has been retrieved. Note that a call 834 to peek_messages does not increment the value of DequeueCount, but returns 835 this value for the client to read. 836 837 If the key-encryption-key or resolver field is set on the local service object, the messages will be 838 decrypted before being returned. 839 840 :param str queue_name: 841 The name of the queue to peek messages from. 842 :param int num_messages: 843 A nonzero integer value that specifies the number of 844 messages to peek from the queue, up to a maximum of 32. By default, 845 a single message is peeked from the queue with this operation. 846 :param int timeout: 847 The server timeout, expressed in seconds. 848 :return: 849 A list of :class:`~azure.storage.queue.models.QueueMessage` objects. Note that 850 time_next_visible and pop_receipt will not be populated as peek does 851 not pop the message and can only retrieve already visible messages. 852 :rtype: list(:class:`~azure.storage.queue.models.QueueMessage`) 853 ''' 854 855 _validate_decryption_required(self.require_encryption, self.key_encryption_key, 856 self.key_resolver_function) 857 858 _validate_not_none('queue_name', queue_name) 859 request = HTTPRequest() 860 request.method = 'GET' 861 request.host_locations = self._get_host_locations(secondary=True) 862 request.path = _get_path(queue_name, True) 863 request.query = { 864 'peekonly': 'true', 865 'numofmessages': _to_str(num_messages), 866 'timeout': _int_to_str(timeout) 867 } 868 869 return self._perform_request(request, _convert_xml_to_queue_messages, 870 [self.decode_function, self.require_encryption, 871 self.key_encryption_key, self.key_resolver_function]) 872 873 def delete_message(self, queue_name, message_id, pop_receipt, timeout=None): 874 ''' 875 Deletes the specified message. 876 877 Normally after a client retrieves a message with the get_messages operation, 878 the client is expected to process and delete the message. To delete the 879 message, you must have two items of data: id and pop_receipt. The 880 id is returned from the previous get_messages operation. The 881 pop_receipt is returned from the most recent :func:`~get_messages` or 882 :func:`~update_message` operation. In order for the delete_message operation 883 to succeed, the pop_receipt specified on the request must match the 884 pop_receipt returned from the :func:`~get_messages` or :func:`~update_message` 885 operation. 886 887 :param str queue_name: 888 The name of the queue from which to delete the message. 889 :param str message_id: 890 The message id identifying the message to delete. 891 :param str pop_receipt: 892 A valid pop receipt value returned from an earlier call 893 to the :func:`~get_messages` or :func:`~update_message`. 894 :param int timeout: 895 The server timeout, expressed in seconds. 896 ''' 897 _validate_not_none('queue_name', queue_name) 898 _validate_not_none('message_id', message_id) 899 _validate_not_none('pop_receipt', pop_receipt) 900 request = HTTPRequest() 901 request.method = 'DELETE' 902 request.host_locations = self._get_host_locations() 903 request.path = _get_path(queue_name, True, message_id) 904 request.query = { 905 'popreceipt': _to_str(pop_receipt), 906 'timeout': _int_to_str(timeout) 907 } 908 self._perform_request(request) 909 910 def clear_messages(self, queue_name, timeout=None): 911 ''' 912 Deletes all messages from the specified queue. 913 914 :param str queue_name: 915 The name of the queue whose messages to clear. 916 :param int timeout: 917 The server timeout, expressed in seconds. 918 ''' 919 _validate_not_none('queue_name', queue_name) 920 request = HTTPRequest() 921 request.method = 'DELETE' 922 request.host_locations = self._get_host_locations() 923 request.path = _get_path(queue_name, True) 924 request.query = {'timeout': _int_to_str(timeout)} 925 self._perform_request(request) 926 927 def update_message(self, queue_name, message_id, pop_receipt, visibility_timeout, 928 content=None, timeout=None): 929 ''' 930 Updates the visibility timeout of a message. You can also use this 931 operation to update the contents of a message. 932 933 This operation can be used to continually extend the invisibility of a 934 queue message. This functionality can be useful if you want a worker role 935 to "lease" a queue message. For example, if a worker role calls get_messages 936 and recognizes that it needs more time to process a message, it can 937 continually extend the message's invisibility until it is processed. If 938 the worker role were to fail during processing, eventually the message 939 would become visible again and another worker role could process it. 940 941 If the key-encryption-key field is set on the local service object, this method will 942 encrypt the content before uploading. 943 944 :param str queue_name: 945 The name of the queue containing the message to update. 946 :param str message_id: 947 The message id identifying the message to update. 948 :param str pop_receipt: 949 A valid pop receipt value returned from an earlier call 950 to the :func:`~get_messages` or :func:`~update_message` operation. 951 :param int visibility_timeout: 952 Specifies the new visibility timeout value, in seconds, 953 relative to server time. The new value must be larger than or equal 954 to 0, and cannot be larger than 7 days. The visibility timeout of a 955 message cannot be set to a value later than the expiry time. A 956 message can be updated until it has been deleted or has expired. 957 :param obj content: 958 Message content. Allowed type is determined by the encode_function 959 set on the service. Default is str. 960 :param int timeout: 961 The server timeout, expressed in seconds. 962 :return: 963 A list of :class:`~azure.storage.queue.models.QueueMessage` objects. For convenience, 964 this object is also populated with the content, although it is not returned by the service. 965 :rtype: list(:class:`~azure.storage.queue.models.QueueMessage`) 966 ''' 967 968 _validate_encryption_required(self.require_encryption, self.key_encryption_key) 969 970 _validate_not_none('queue_name', queue_name) 971 _validate_not_none('message_id', message_id) 972 _validate_not_none('pop_receipt', pop_receipt) 973 _validate_not_none('visibility_timeout', visibility_timeout) 974 request = HTTPRequest() 975 request.method = 'PUT' 976 request.host_locations = self._get_host_locations() 977 request.path = _get_path(queue_name, True, message_id) 978 request.query = { 979 'popreceipt': _to_str(pop_receipt), 980 'visibilitytimeout': _int_to_str(visibility_timeout), 981 'timeout': _int_to_str(timeout) 982 } 983 984 if content is not None: 985 request.body = _get_request_body(_convert_queue_message_xml(content, self.encode_function, 986 self.key_encryption_key)) 987 988 return self._perform_request(request, _parse_queue_message_from_headers) 989