1# Licensed to the Apache Software Foundation (ASF) under one or more
2# contributor license agreements.  See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License.  You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import sys
17import unittest
18
19try:
20    import simplejson as json
21except ImportError:
22    import json
23
24from libcloud.utils.py3 import httplib
25from libcloud.utils.py3 import urlparse
26from libcloud.utils.py3 import b
27from libcloud.utils.py3 import parse_qsl
28
29from libcloud.common.cloudstack import CloudStackConnection
30from libcloud.common.types import MalformedResponseError
31
32from libcloud.test import MockHttp
33
34
35async_delay = 0
36
37
38class CloudStackMockDriver(object):
39    host = 'nonexistent.'
40    path = '/path'
41    async_poll_frequency = 0
42
43    name = 'fake'
44
45    async_delay = 0
46
47
48class CloudStackCommonTest(unittest.TestCase):
49    def setUp(self):
50        CloudStackConnection.conn_class = CloudStackMockHttp
51        self.connection = CloudStackConnection('apikey', 'secret',
52                                               host=CloudStackMockDriver.host)
53        self.connection.poll_interval = 0.0
54        self.driver = self.connection.driver = CloudStackMockDriver()
55
56    def test_sync_request_bad_response(self):
57        self.driver.path = '/bad/response'
58        try:
59            self.connection._sync_request('fake')
60        except Exception as e:
61            self.assertTrue(isinstance(e, MalformedResponseError))
62            return
63        self.assertTrue(False)
64
65    def test_sync_request(self):
66        self.driver.path = '/sync'
67        self.connection._sync_request('fake')
68
69    def test_async_request_successful(self):
70        self.driver.path = '/async/success'
71        result = self.connection._async_request('fake')
72        self.assertEqual(result, {'fake': 'result'})
73
74    def test_async_request_unsuccessful(self):
75        self.driver.path = '/async/fail'
76        try:
77            self.connection._async_request('fake')
78        except Exception as e:
79            self.assertEqual(CloudStackMockHttp.ERROR_TEXT, str(e))
80            return
81        self.assertFalse(True)
82
83    def test_async_request_delayed(self):
84        global async_delay
85        self.driver.path = '/async/delayed'
86        async_delay = 2
87        self.connection._async_request('fake')
88        self.assertEqual(async_delay, 0)
89
90    def test_signature_algorithm(self):
91        cases = [
92            (
93                {
94                    'command': 'listVirtualMachines'
95                }, 'z/a9Y7J52u48VpqIgiwaGUMCso0='
96            ), (
97                {
98                    'command': 'deployVirtualMachine',
99                    'name': 'fred',
100                    'displayname': 'George',
101                    'serviceofferingid': 5,
102                    'templateid': 17,
103                    'zoneid': 23,
104                    'networkids': 42
105                }, 'gHTo7mYmadZ+zluKHzlEKb1i/QU='
106            ), (
107                {
108                    'command': 'deployVirtualMachine',
109                    'name': 'fred',
110                    'displayname': 'George+Ringo',
111                    'serviceofferingid': 5,
112                    'templateid': 17,
113                    'zoneid': 23,
114                    'networkids': 42
115                }, 'tAgfrreI1ZvWlWLClD3gu4+aKv4='
116            )
117        ]
118
119        connection = CloudStackConnection('fnord', 'abracadabra')
120        for case in cases:
121            params = connection.add_default_params(case[0])
122            self.assertEqual(connection._make_signature(params), b(case[1]))
123
124
125class CloudStackMockHttp(MockHttp, unittest.TestCase):
126
127    ERROR_TEXT = 'ERROR TEXT'
128
129    def _response(self, status, result, response):
130        return (status, json.dumps(result), {}, response)
131
132    def _check_request(self, url):
133        url = urlparse.urlparse(url)
134        query = dict(parse_qsl(url.query))
135
136        self.assertTrue('apiKey' in query)
137        self.assertTrue('command' in query)
138        self.assertTrue('response' in query)
139        self.assertTrue('signature' in query)
140
141        self.assertTrue(query['response'] == 'json')
142
143        return query
144
145    def _bad_response(self, method, url, body, headers):
146        self._check_request(url)
147        result = {'success': True}
148        return self._response(httplib.OK, result, httplib.responses[httplib.OK])
149
150    def _sync(self, method, url, body, headers):
151        query = self._check_request(url)
152        result = {query['command'].lower() + 'response': {}}
153        return self._response(httplib.OK, result, httplib.responses[httplib.OK])
154
155    def _async_success(self, method, url, body, headers):
156        query = self._check_request(url)
157        if query['command'].lower() == 'queryasyncjobresult':
158            self.assertEqual(query['jobid'], '42')
159            result = {
160                query['command'].lower() + 'response': {
161                    'jobstatus': 1,
162                    'jobresult': {'fake': 'result'}
163                }
164            }
165        else:
166            result = {query['command'].lower() + 'response': {'jobid': '42'}}
167        return self._response(httplib.OK, result, httplib.responses[httplib.OK])
168
169    def _async_fail(self, method, url, body, headers):
170        query = self._check_request(url)
171        if query['command'].lower() == 'queryasyncjobresult':
172            self.assertEqual(query['jobid'], '42')
173            result = {
174                query['command'].lower() + 'response': {
175                    'jobstatus': 2,
176                    'jobresult': {'errortext': self.ERROR_TEXT}
177                }
178            }
179        else:
180            result = {query['command'].lower() + 'response': {'jobid': '42'}}
181        return self._response(httplib.OK, result, httplib.responses[httplib.OK])
182
183    def _async_delayed(self, method, url, body, headers):
184        global async_delay
185
186        query = self._check_request(url)
187        if query['command'].lower() == 'queryasyncjobresult':
188            self.assertEqual(query['jobid'], '42')
189            if async_delay == 0:
190                result = {
191                    query['command'].lower() + 'response': {
192                        'jobstatus': 1,
193                        'jobresult': {'fake': 'result'}
194                    }
195                }
196            else:
197                result = {
198                    query['command'].lower() + 'response': {
199                        'jobstatus': 0,
200                    }
201                }
202                async_delay -= 1
203        else:
204            result = {query['command'].lower() + 'response': {'jobid': '42'}}
205        return self._response(httplib.OK, result, httplib.responses[httplib.OK])
206
207if __name__ == '__main__':
208    sys.exit(unittest.main())
209