1from __future__ import absolute_import
2
3import json
4import logging
5import os
6from io import BytesIO
7from io import StringIO
8
9import elasticsearch
10import py
11import pytest
12from freezegun import freeze_time
13
14from pytest_benchmark import plugin
15from pytest_benchmark.plugin import BenchmarkSession
16from pytest_benchmark.plugin import pytest_benchmark_compare_machine_info
17from pytest_benchmark.plugin import pytest_benchmark_generate_json
18from pytest_benchmark.plugin import pytest_benchmark_group_stats
19from pytest_benchmark.storage.elasticsearch import ElasticsearchStorage
20from pytest_benchmark.storage.elasticsearch import _mask_hosts
21from pytest_benchmark.utils import parse_elasticsearch_storage
22
23try:
24    import unittest.mock as mock
25except ImportError:
26    import mock
27
28logger = logging.getLogger(__name__)
29
30THIS = py.path.local(__file__)
31BENCHFILE = THIS.dirpath('test_storage/0030_5b78858eb718649a31fb93d8dc96ca2cee41a4cd_20150815_030419_uncommitted-changes.json')
32SAVE_DATA = json.loads(BENCHFILE.read_text(encoding='utf8'))
33SAVE_DATA["machine_info"] = {'foo': 'bar'}
34SAVE_DATA["commit_info"] = {'foo': 'bar'}
35
36tmp = SAVE_DATA.copy()
37
38ES_DATA = tmp.pop("benchmarks")[0]
39ES_DATA.update(tmp)
40ES_DATA["benchmark_id"] = "FoobarOS_commitId"
41
42
43class Namespace(object):
44    def __init__(self, **kwargs):
45        self.__dict__.update(kwargs)
46
47    def __getitem__(self, item):
48        return self.__dict__[item]
49
50
51class LooseFileLike(BytesIO):
52    def close(self):
53        value = self.getvalue()
54        super(LooseFileLike, self).close()
55        self.getvalue = lambda: value
56
57
58class MockStorage(ElasticsearchStorage):
59    def __init__(self):
60        self._es = mock.Mock(spec=elasticsearch.Elasticsearch)
61        self._es_hosts = self._es_index = self._es_doctype = 'mocked'
62        self.logger = logger
63        self.default_machine_id = "FoobarOS"
64
65
66class MockSession(BenchmarkSession):
67    def __init__(self):
68        self.verbose = False
69        self.quiet = False
70        self.histogram = True
71        self.benchmarks = []
72        self.performance_regressions = []
73        self.sort = u"min"
74        self.compare = '0001'
75        self.logger = logging.getLogger(__name__)
76        self.machine_id = "FoobarOS"
77        self.machine_info = {'foo': 'bar'}
78        self.save = self.autosave = self.json = False
79        self.options = {
80            'min_rounds': 123,
81            'min_time': 234,
82            'max_time': 345,
83        }
84        self.compare_fail = []
85        self.config = Namespace(hook=Namespace(
86            pytest_benchmark_group_stats=pytest_benchmark_group_stats,
87            pytest_benchmark_generate_machine_info=lambda **kwargs: {'foo': 'bar'},
88            pytest_benchmark_update_machine_info=lambda **kwargs: None,
89            pytest_benchmark_compare_machine_info=pytest_benchmark_compare_machine_info,
90            pytest_benchmark_generate_json=pytest_benchmark_generate_json,
91            pytest_benchmark_update_json=lambda **kwargs: None,
92            pytest_benchmark_generate_commit_info=lambda **kwargs: {'foo': 'bar'},
93            pytest_benchmark_update_commit_info=lambda **kwargs: None,
94        ))
95        self.elasticsearch_host = "localhost:9200"
96        self.elasticsearch_index = "benchmark"
97        self.elasticsearch_doctype = "benchmark"
98        self.storage = MockStorage()
99        self.group_by = 'group'
100        self.columns = ['min', 'max', 'mean', 'stddev', 'median', 'iqr',
101                        'outliers', 'rounds', 'iterations']
102        self.benchmarks = []
103        data = json.loads(BENCHFILE.read_text(encoding='utf8'))
104        self.benchmarks.extend(
105            Namespace(
106                as_dict=lambda include_data=False, stats=True, flat=False, _bench=bench:
107                    dict(_bench, **_bench["stats"]) if flat else dict(_bench),
108                name=bench['name'],
109                fullname=bench['fullname'],
110                group=bench['group'],
111                options=bench['options'],
112                has_error=False,
113                params=None,
114                **bench['stats']
115            )
116            for bench in data['benchmarks']
117        )
118
119
120try:
121    text_type = unicode
122except NameError:
123    text_type = str
124
125
126def force_text(text):
127    if isinstance(text, text_type):
128        return text
129    else:
130        return text.decode('utf-8')
131
132
133def force_bytes(text):
134    if isinstance(text, text_type):
135        return text.encode('utf-8')
136    else:
137        return text
138
139
140def make_logger(sess):
141    output = StringIO()
142    sess.logger = Namespace(
143        info=lambda text, **opts: output.write(force_text(text) + u'\n'),
144        error=lambda text: output.write(force_text(text) + u'\n'),
145    )
146    sess.storage.logger = Namespace(
147        info=lambda text, **opts: output.write(force_text(text) + u'\n'),
148        error=lambda text: output.write(force_text(text) + u'\n'),
149    )
150    return output
151
152
153@pytest.fixture
154def sess():
155    return MockSession()
156
157
158@pytest.fixture
159def logger_output(sess):
160    return make_logger(sess)
161
162
163@freeze_time("2015-08-15T00:04:18.687119")
164def test_handle_saving(sess, logger_output, monkeypatch):
165    monkeypatch.setattr(plugin, '__version__', '2.5.0')
166    sess.save = "commitId"
167    sess.autosave = True
168    sess.json = None
169    sess.save_data = False
170    sess.handle_saving()
171    sess.storage._es.index.assert_called_with(
172        index='mocked',
173        doc_type='mocked',
174        body=ES_DATA,
175        id='FoobarOS_commitId_tests/test_normal.py::test_xfast_parametrized[0]',
176    )
177
178
179def test_parse_with_no_creds():
180    string = 'https://example.org,another.org'
181    hosts, _, _, _ = parse_elasticsearch_storage(string)
182    assert len(hosts) == 2
183    assert 'https://example.org' in hosts
184    assert 'https://another.org' in hosts
185
186
187def test_parse_with_creds_in_first_host_of_url():
188    string = 'https://user:pass@example.org,another.org'
189    hosts, _, _, _ = parse_elasticsearch_storage(string)
190    assert len(hosts) == 2
191    assert 'https://user:pass@example.org' in hosts
192    assert 'https://another.org' in hosts
193
194
195def test_parse_with_creds_in_second_host_of_url():
196    string = 'https://example.org,user:pass@another.org'
197    hosts, _, _, _ = parse_elasticsearch_storage(string)
198    assert len(hosts) == 2
199    assert 'https://example.org' in hosts
200    assert 'https://user:pass@another.org' in hosts
201
202
203def test_parse_with_creds_in_netrc(tmpdir):
204    netrc_file = os.path.join(tmpdir.strpath, 'netrc')
205    with open(netrc_file, 'w') as f:
206        f.write('machine example.org login user1 password pass1\n')
207        f.write('machine another.org login user2 password pass2\n')
208    string = 'https://example.org,another.org'
209    hosts, _, _, _ = parse_elasticsearch_storage(string, netrc_file=netrc_file)
210    assert len(hosts) == 2
211    assert 'https://user1:pass1@example.org' in hosts
212    assert 'https://user2:pass2@another.org' in hosts
213
214
215def test_parse_url_creds_supersedes_netrc_creds(tmpdir):
216    netrc_file = os.path.join(tmpdir.strpath, 'netrc')
217    with open(netrc_file, 'w') as f:
218        f.write('machine example.org login user1 password pass1\n')
219        f.write('machine another.org login user2 password pass2\n')
220    string = 'https://user3:pass3@example.org,another.org'
221    hosts, _, _, _ = parse_elasticsearch_storage(string, netrc_file=netrc_file)
222    assert len(hosts) == 2
223    assert 'https://user3:pass3@example.org' in hosts  # superseded by creds in url
224    assert 'https://user2:pass2@another.org' in hosts  # got creds from netrc file
225
226
227def test__mask_hosts():
228    hosts = ['https://user1:pass1@example.org', 'https://user2:pass2@another.org']
229    masked_hosts = _mask_hosts(hosts)
230    assert len(masked_hosts) == len(hosts)
231    assert 'https://***:***@example.org' in masked_hosts
232    assert 'https://***:***@another.org' in masked_hosts
233