1# Copyright 2015 MongoDB, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Tests for the MongoDB Driver Performance Benchmarking Spec."""
16
17import multiprocessing as mp
18import os
19import sys
20import tempfile
21import warnings
22
23try:
24    import simplejson as json
25except ImportError:
26    import json
27
28sys.path[0:0] = [""]
29
30from bson import decode, encode
31from bson.json_util import loads
32from gridfs import GridFSBucket
33from pymongo import MongoClient
34from pymongo.monotonic import time
35from test import client_context, host, port, unittest
36
37NUM_ITERATIONS = 100
38MAX_ITERATION_TIME = 300
39NUM_DOCS = 10000
40
41TEST_PATH = os.environ.get('TEST_PATH', os.path.join(
42    os.path.dirname(os.path.realpath(__file__)),
43    os.path.join('data')))
44
45OUTPUT_FILE = os.environ.get('OUTPUT_FILE')
46
47result_data = []
48
49def tearDownModule():
50    output = json.dumps({
51        'results': result_data
52        }, indent=4)
53    if OUTPUT_FILE:
54        with open(OUTPUT_FILE, 'w') as opf:
55            opf.write(output)
56    else:
57        print(output)
58
59
60class Timer(object):
61    def __enter__(self):
62        self.start = time()
63        return self
64
65    def __exit__(self, *args):
66        self.end = time()
67        self.interval = self.end - self.start
68
69
70class PerformanceTest(object):
71
72    @classmethod
73    def setUpClass(cls):
74        client_context.init()
75
76    def setUp(self):
77        pass
78
79    def tearDown(self):
80        name = self.__class__.__name__
81        median = self.percentile(50)
82        result = self.data_size / median
83        print('Running %s. MEDIAN=%s' % (self.__class__.__name__,
84                                         self.percentile(50)))
85        result_data.append({
86            'name': name,
87            'results': {
88                '1': {
89                    'ops_per_sec': result
90                }
91            }
92        })
93
94    def before(self):
95        pass
96
97    def after(self):
98        pass
99
100    def percentile(self, percentile):
101        if hasattr(self, 'results'):
102            sorted_results = sorted(self.results)
103            percentile_index = int(len(sorted_results) * percentile / 100) - 1
104            return sorted_results[percentile_index]
105        else:
106            self.fail('Test execution failed')
107
108    def runTest(self):
109        results = []
110        start = time()
111        self.max_iterations = NUM_ITERATIONS
112        for i in range(NUM_ITERATIONS):
113            if time() - start > MAX_ITERATION_TIME:
114                warnings.warn('Test timed out, completed %s iterations.' % i)
115                break
116            self.before()
117            with Timer() as timer:
118                self.do_task()
119            self.after()
120            results.append(timer.interval)
121
122        self.results = results
123
124
125# BSON MICRO-BENCHMARKS
126class BsonEncodingTest(PerformanceTest):
127    def setUp(self):
128        # Location of test data.
129        with open(
130            os.path.join(TEST_PATH,
131                         os.path.join('extended_bson', self.dataset))) as data:
132            self.document = loads(data.read())
133
134    def do_task(self):
135        for _ in range(NUM_DOCS):
136            encode(self.document)
137
138
139class BsonDecodingTest(PerformanceTest):
140    def setUp(self):
141        # Location of test data.
142        with open(
143            os.path.join(TEST_PATH,
144                         os.path.join('extended_bson', self.dataset))) as data:
145            self.document = encode(json.loads(data.read()))
146
147    def do_task(self):
148        for _ in range(NUM_DOCS):
149            decode(self.document)
150
151
152class TestFlatEncoding(BsonEncodingTest, unittest.TestCase):
153    dataset = 'flat_bson.json'
154    data_size = 75310000
155
156
157class TestFlatDecoding(BsonDecodingTest, unittest.TestCase):
158    dataset = 'flat_bson.json'
159    data_size = 75310000
160
161
162class TestDeepEncoding(BsonEncodingTest, unittest.TestCase):
163    dataset = 'deep_bson.json'
164    data_size = 19640000
165
166
167class TestDeepDecoding(BsonDecodingTest, unittest.TestCase):
168    dataset = 'deep_bson.json'
169    data_size = 19640000
170
171
172class TestFullEncoding(BsonEncodingTest, unittest.TestCase):
173    dataset = 'full_bson.json'
174    data_size = 57340000
175
176
177class TestFullDecoding(BsonDecodingTest, unittest.TestCase):
178    dataset = 'full_bson.json'
179    data_size = 57340000
180
181
182# SINGLE-DOC BENCHMARKS
183class TestRunCommand(PerformanceTest, unittest.TestCase):
184    data_size = 160000
185    def setUp(self):
186        self.client = client_context.client
187        self.client.drop_database('perftest')
188
189    def do_task(self):
190        command = self.client.perftest.command
191        for _ in range(NUM_DOCS):
192            command("ping")
193
194
195class TestDocument(PerformanceTest):
196    def setUp(self):
197        # Location of test data.
198        with open(
199            os.path.join(
200                TEST_PATH, os.path.join(
201                    'single_and_multi_document', self.dataset)), 'r') as data:
202            self.document = json.loads(data.read())
203
204        self.client = client_context.client
205        self.client.drop_database('perftest')
206
207    def tearDown(self):
208        super(TestDocument, self).tearDown()
209        self.client.drop_database('perftest')
210
211    def before(self):
212        self.corpus = self.client.perftest.create_collection('corpus')
213
214    def after(self):
215        self.client.perftest.drop_collection('corpus')
216
217
218class TestFindOneByID(TestDocument, unittest.TestCase):
219    data_size = 16220000
220    def setUp(self):
221        self.dataset = 'tweet.json'
222        super(TestFindOneByID, self).setUp()
223
224        documents = [self.document.copy() for _ in range(NUM_DOCS)]
225        self.corpus = self.client.perftest.corpus
226        result = self.corpus.insert_many(documents)
227        self.inserted_ids = result.inserted_ids
228
229    def do_task(self):
230        find_one = self.corpus.find_one
231        for _id in self.inserted_ids:
232            find_one({'_id': _id})
233
234    def before(self):
235        pass
236
237    def after(self):
238        pass
239
240
241class TestSmallDocInsertOne(TestDocument, unittest.TestCase):
242    data_size = 2750000
243    def setUp(self):
244        self.dataset = 'small_doc.json'
245        super(TestSmallDocInsertOne, self).setUp()
246
247        self.documents = [self.document.copy() for _ in range(NUM_DOCS)]
248
249    def do_task(self):
250        insert_one = self.corpus.insert_one
251        for doc in self.documents:
252            insert_one(doc)
253
254
255class TestLargeDocInsertOne(TestDocument, unittest.TestCase):
256    data_size = 27310890
257    def setUp(self):
258        self.dataset = 'large_doc.json'
259        super(TestLargeDocInsertOne, self).setUp()
260
261        self.documents = [self.document.copy() for _ in range(10)]
262
263    def do_task(self):
264        insert_one = self.corpus.insert_one
265        for doc in self.documents:
266            insert_one(doc)
267
268
269# MULTI-DOC BENCHMARKS
270class TestFindManyAndEmptyCursor(TestDocument, unittest.TestCase):
271    data_size = 16220000
272    def setUp(self):
273        self.dataset = 'tweet.json'
274        super(TestFindManyAndEmptyCursor, self).setUp()
275
276        for _ in range(10):
277            self.client.perftest.command(
278                'insert', 'corpus',
279                documents=[self.document] * 1000)
280        self.corpus = self.client.perftest.corpus
281
282    def do_task(self):
283        list(self.corpus.find())
284
285    def before(self):
286        pass
287
288    def after(self):
289        pass
290
291
292class TestSmallDocBulkInsert(TestDocument, unittest.TestCase):
293    data_size = 2750000
294    def setUp(self):
295        self.dataset = 'small_doc.json'
296        super(TestSmallDocBulkInsert, self).setUp()
297        self.documents = [self.document.copy() for _ in range(NUM_DOCS)]
298
299    def before(self):
300        self.corpus = self.client.perftest.create_collection('corpus')
301
302    def do_task(self):
303        self.corpus.insert_many(self.documents, ordered=True)
304
305
306class TestLargeDocBulkInsert(TestDocument, unittest.TestCase):
307    data_size = 27310890
308    def setUp(self):
309        self.dataset = 'large_doc.json'
310        super(TestLargeDocBulkInsert, self).setUp()
311        self.documents = [self.document.copy() for _ in range(10)]
312
313    def before(self):
314        self.corpus = self.client.perftest.create_collection('corpus')
315
316    def do_task(self):
317        self.corpus.insert_many(self.documents, ordered=True)
318
319
320class TestGridFsUpload(PerformanceTest, unittest.TestCase):
321    data_size = 52428800
322    def setUp(self):
323        self.client = client_context.client
324        self.client.drop_database('perftest')
325
326        gridfs_path = os.path.join(
327            TEST_PATH,
328            os.path.join('single_and_multi_document', 'gridfs_large.bin'))
329        with open(gridfs_path, 'rb') as data:
330            self.document = data.read()
331
332        self.bucket = GridFSBucket(self.client.perftest)
333
334    def tearDown(self):
335        super(TestGridFsUpload, self).tearDown()
336        self.client.drop_database('perftest')
337
338    def before(self):
339        self.bucket.upload_from_stream('init', b'x')
340
341    def do_task(self):
342        self.bucket.upload_from_stream('gridfstest', self.document)
343
344
345class TestGridFsDownload(PerformanceTest, unittest.TestCase):
346    data_size = 52428800
347    def setUp(self):
348        self.client = client_context.client
349        self.client.drop_database('perftest')
350
351        gridfs_path = os.path.join(
352            TEST_PATH,
353            os.path.join('single_and_multi_document', 'gridfs_large.bin'))
354
355        self.bucket = GridFSBucket(self.client.perftest)
356        with open(gridfs_path, 'rb') as gfile:
357            self.uploaded_id = self.bucket.upload_from_stream(
358                'gridfstest', gfile)
359
360    def tearDown(self):
361        super(TestGridFsDownload, self).tearDown()
362        self.client.drop_database('perftest')
363
364    def do_task(self):
365        self.bucket.open_download_stream(self.uploaded_id).read()
366
367
368proc_client = None
369
370
371def proc_init(*dummy):
372    global proc_client
373    proc_client = MongoClient(host, port)
374
375
376# PARALLEL BENCHMARKS
377def mp_map(map_func, files):
378    pool = mp.Pool(initializer=proc_init)
379    pool.map(map_func, files)
380    pool.close()
381
382
383def insert_json_file(filename):
384    with open(filename, 'r') as data:
385        coll = proc_client.perftest.corpus
386        coll.insert_many([json.loads(line) for line in data])
387
388
389def insert_json_file_with_file_id(filename):
390    documents = []
391    with open(filename, 'r') as data:
392        for line in data:
393            doc = json.loads(line)
394            doc['file'] = filename
395            documents.append(doc)
396    coll = proc_client.perftest.corpus
397    coll.insert_many(documents)
398
399
400def read_json_file(filename):
401    coll = proc_client.perftest.corpus
402    temp = tempfile.TemporaryFile()
403    try:
404        temp.writelines(
405            [json.dumps(doc) + '\n' for
406             doc in coll.find({'file': filename}, {'_id': False})])
407    finally:
408        temp.close()
409
410
411def insert_gridfs_file(filename):
412    bucket = GridFSBucket(proc_client.perftest)
413
414    with open(filename, 'rb') as gfile:
415        bucket.upload_from_stream(filename, gfile)
416
417
418def read_gridfs_file(filename):
419    bucket = GridFSBucket(proc_client.perftest)
420
421    temp = tempfile.TemporaryFile()
422    try:
423        bucket.download_to_stream_by_name(filename, temp)
424    finally:
425        temp.close()
426
427
428class TestJsonMultiImport(PerformanceTest, unittest.TestCase):
429    data_size = 565000000
430    def setUp(self):
431        self.client = client_context.client
432        self.client.drop_database('perftest')
433
434    def before(self):
435        self.client.perftest.command({'create': 'corpus'})
436        self.corpus = self.client.perftest.corpus
437
438        ldjson_path = os.path.join(
439            TEST_PATH, os.path.join('parallel', 'ldjson_multi'))
440        self.files = [os.path.join(
441            ldjson_path, s) for s in os.listdir(ldjson_path)]
442
443    def do_task(self):
444        mp_map(insert_json_file, self.files)
445
446    def after(self):
447        self.client.perftest.drop_collection('corpus')
448
449    def tearDown(self):
450        super(TestJsonMultiImport, self).tearDown()
451        self.client.drop_database('perftest')
452
453
454class TestJsonMultiExport(PerformanceTest, unittest.TestCase):
455    data_size = 565000000
456    def setUp(self):
457        self.client = client_context.client
458        self.client.drop_database('perftest')
459        self.client.perfest.corpus.create_index('file')
460
461        ldjson_path = os.path.join(
462            TEST_PATH, os.path.join('parallel', 'ldjson_multi'))
463        self.files = [os.path.join(
464            ldjson_path, s) for s in os.listdir(ldjson_path)]
465
466        mp_map(insert_json_file_with_file_id, self.files)
467
468    def do_task(self):
469        mp_map(read_json_file, self.files)
470
471    def tearDown(self):
472        super(TestJsonMultiExport, self).tearDown()
473        self.client.drop_database('perftest')
474
475
476class TestGridFsMultiFileUpload(PerformanceTest, unittest.TestCase):
477    data_size = 262144000
478    def setUp(self):
479        self.client = client_context.client
480        self.client.drop_database('perftest')
481
482    def before(self):
483        self.client.perftest.drop_collection('fs.files')
484        self.client.perftest.drop_collection('fs.chunks')
485
486        self.bucket = GridFSBucket(self.client.perftest)
487        gridfs_path = os.path.join(
488            TEST_PATH, os.path.join('parallel', 'gridfs_multi'))
489        self.files = [os.path.join(
490            gridfs_path, s) for s in os.listdir(gridfs_path)]
491
492    def do_task(self):
493        mp_map(insert_gridfs_file, self.files)
494
495    def tearDown(self):
496        super(TestGridFsMultiFileUpload, self).tearDown()
497        self.client.drop_database('perftest')
498
499
500class TestGridFsMultiFileDownload(PerformanceTest, unittest.TestCase):
501    data_size = 262144000
502    def setUp(self):
503        self.client = client_context.client
504        self.client.drop_database('perftest')
505
506        bucket = GridFSBucket(self.client.perftest)
507
508        gridfs_path = os.path.join(
509            TEST_PATH, os.path.join('parallel', 'gridfs_multi'))
510        self.files = [os.path.join(
511            gridfs_path, s) for s in os.listdir(gridfs_path)]
512
513        for fname in self.files:
514            with open(fname, 'rb') as gfile:
515                bucket.upload_from_stream(fname, gfile)
516
517    def do_task(self):
518        mp_map(read_gridfs_file, self.files)
519
520    def tearDown(self):
521        super(TestGridFsMultiFileDownload, self).tearDown()
522        self.client.drop_database('perftest')
523
524
525if __name__ == "__main__":
526    unittest.main()
527