1# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"). You 4# may not use this file except in compliance with the License. A copy of 5# the License is located at 6# 7# http://aws.amazon.com/apache2.0/ 8# 9# or in the "license" file accompanying this file. This file is 10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 11# ANY KIND, either express or implied. See the License for the specific 12# language governing permissions and limitations under the License. 13from tests import unittest 14 15from botocore.exceptions import ClientError 16from botocore.vendored import six 17import botocore.session 18 19 20class TestGlacier(unittest.TestCase): 21 # We have to use a single vault for all the integration tests. 22 # This is because if we create a vault and upload then delete 23 # an archive, we cannot immediately clean up and delete the vault. 24 # The compromise is that we'll use a single vault and use 25 # get_or_create semantics for the integ tests. This does mean you 26 # need to be careful when writing tests. Assume that other code 27 # is also using this vault in parallel, so don't rely on things like 28 # number of archives in a vault. 29 30 VAULT_NAME = 'botocore-integ-test-vault' 31 32 def setUp(self): 33 self.session = botocore.session.get_session() 34 self.client = self.session.create_client('glacier', 'us-west-2') 35 # There's no error if the vault already exists so we don't 36 # need to catch any exceptions here. 37 self.client.create_vault(vaultName=self.VAULT_NAME) 38 39 def test_can_list_vaults_without_account_id(self): 40 response = self.client.list_vaults() 41 self.assertIn('VaultList', response) 42 43 def test_can_handle_error_responses(self): 44 with self.assertRaises(ClientError): 45 self.client.list_vaults(accountId='asdf') 46 47 def test_can_upload_archive(self): 48 body = six.BytesIO(b"bytes content") 49 response = self.client.upload_archive(vaultName=self.VAULT_NAME, 50 archiveDescription='test upload', 51 body=body) 52 self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 201) 53 archive_id = response['archiveId'] 54 response = self.client.delete_archive(vaultName=self.VAULT_NAME, 55 archiveId=archive_id) 56 self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 204) 57 58 def test_can_upload_archive_from_bytes(self): 59 response = self.client.upload_archive(vaultName=self.VAULT_NAME, 60 archiveDescription='test upload', 61 body=b'bytes body') 62 self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 201) 63 archive_id = response['archiveId'] 64 response = self.client.delete_archive(vaultName=self.VAULT_NAME, 65 archiveId=archive_id) 66 self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 204) 67 68 69if __name__ == '__main__': 70 unittest.main() 71