1# Copyright 2015 MongoDB, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Tests for the MongoDB Driver Performance Benchmarking Spec.""" 16 17import multiprocessing as mp 18import os 19import sys 20import tempfile 21import warnings 22 23try: 24 import simplejson as json 25except ImportError: 26 import json 27 28sys.path[0:0] = [""] 29 30from bson import decode, encode 31from bson.json_util import loads 32from gridfs import GridFSBucket 33from pymongo import MongoClient 34from pymongo.monotonic import time 35from test import client_context, host, port, unittest 36 37NUM_ITERATIONS = 100 38MAX_ITERATION_TIME = 300 39NUM_DOCS = 10000 40 41TEST_PATH = os.environ.get('TEST_PATH', os.path.join( 42 os.path.dirname(os.path.realpath(__file__)), 43 os.path.join('data'))) 44 45OUTPUT_FILE = os.environ.get('OUTPUT_FILE') 46 47result_data = [] 48 49def tearDownModule(): 50 output = json.dumps({ 51 'results': result_data 52 }, indent=4) 53 if OUTPUT_FILE: 54 with open(OUTPUT_FILE, 'w') as opf: 55 opf.write(output) 56 else: 57 print(output) 58 59 60class Timer(object): 61 def __enter__(self): 62 self.start = time() 63 return self 64 65 def __exit__(self, *args): 66 self.end = time() 67 self.interval = self.end - self.start 68 69 70class PerformanceTest(object): 71 72 @classmethod 73 def setUpClass(cls): 74 client_context.init() 75 76 def setUp(self): 77 pass 78 79 def tearDown(self): 80 name = self.__class__.__name__ 81 median = self.percentile(50) 82 result = self.data_size / median 83 print('Running %s. MEDIAN=%s' % (self.__class__.__name__, 84 self.percentile(50))) 85 result_data.append({ 86 'name': name, 87 'results': { 88 '1': { 89 'ops_per_sec': result 90 } 91 } 92 }) 93 94 def before(self): 95 pass 96 97 def after(self): 98 pass 99 100 def percentile(self, percentile): 101 if hasattr(self, 'results'): 102 sorted_results = sorted(self.results) 103 percentile_index = int(len(sorted_results) * percentile / 100) - 1 104 return sorted_results[percentile_index] 105 else: 106 self.fail('Test execution failed') 107 108 def runTest(self): 109 results = [] 110 start = time() 111 self.max_iterations = NUM_ITERATIONS 112 for i in range(NUM_ITERATIONS): 113 if time() - start > MAX_ITERATION_TIME: 114 warnings.warn('Test timed out, completed %s iterations.' % i) 115 break 116 self.before() 117 with Timer() as timer: 118 self.do_task() 119 self.after() 120 results.append(timer.interval) 121 122 self.results = results 123 124 125# BSON MICRO-BENCHMARKS 126class BsonEncodingTest(PerformanceTest): 127 def setUp(self): 128 # Location of test data. 129 with open( 130 os.path.join(TEST_PATH, 131 os.path.join('extended_bson', self.dataset))) as data: 132 self.document = loads(data.read()) 133 134 def do_task(self): 135 for _ in range(NUM_DOCS): 136 encode(self.document) 137 138 139class BsonDecodingTest(PerformanceTest): 140 def setUp(self): 141 # Location of test data. 142 with open( 143 os.path.join(TEST_PATH, 144 os.path.join('extended_bson', self.dataset))) as data: 145 self.document = encode(json.loads(data.read())) 146 147 def do_task(self): 148 for _ in range(NUM_DOCS): 149 decode(self.document) 150 151 152class TestFlatEncoding(BsonEncodingTest, unittest.TestCase): 153 dataset = 'flat_bson.json' 154 data_size = 75310000 155 156 157class TestFlatDecoding(BsonDecodingTest, unittest.TestCase): 158 dataset = 'flat_bson.json' 159 data_size = 75310000 160 161 162class TestDeepEncoding(BsonEncodingTest, unittest.TestCase): 163 dataset = 'deep_bson.json' 164 data_size = 19640000 165 166 167class TestDeepDecoding(BsonDecodingTest, unittest.TestCase): 168 dataset = 'deep_bson.json' 169 data_size = 19640000 170 171 172class TestFullEncoding(BsonEncodingTest, unittest.TestCase): 173 dataset = 'full_bson.json' 174 data_size = 57340000 175 176 177class TestFullDecoding(BsonDecodingTest, unittest.TestCase): 178 dataset = 'full_bson.json' 179 data_size = 57340000 180 181 182# SINGLE-DOC BENCHMARKS 183class TestRunCommand(PerformanceTest, unittest.TestCase): 184 data_size = 160000 185 def setUp(self): 186 self.client = client_context.client 187 self.client.drop_database('perftest') 188 189 def do_task(self): 190 command = self.client.perftest.command 191 for _ in range(NUM_DOCS): 192 command("ping") 193 194 195class TestDocument(PerformanceTest): 196 def setUp(self): 197 # Location of test data. 198 with open( 199 os.path.join( 200 TEST_PATH, os.path.join( 201 'single_and_multi_document', self.dataset)), 'r') as data: 202 self.document = json.loads(data.read()) 203 204 self.client = client_context.client 205 self.client.drop_database('perftest') 206 207 def tearDown(self): 208 super(TestDocument, self).tearDown() 209 self.client.drop_database('perftest') 210 211 def before(self): 212 self.corpus = self.client.perftest.create_collection('corpus') 213 214 def after(self): 215 self.client.perftest.drop_collection('corpus') 216 217 218class TestFindOneByID(TestDocument, unittest.TestCase): 219 data_size = 16220000 220 def setUp(self): 221 self.dataset = 'tweet.json' 222 super(TestFindOneByID, self).setUp() 223 224 documents = [self.document.copy() for _ in range(NUM_DOCS)] 225 self.corpus = self.client.perftest.corpus 226 result = self.corpus.insert_many(documents) 227 self.inserted_ids = result.inserted_ids 228 229 def do_task(self): 230 find_one = self.corpus.find_one 231 for _id in self.inserted_ids: 232 find_one({'_id': _id}) 233 234 def before(self): 235 pass 236 237 def after(self): 238 pass 239 240 241class TestSmallDocInsertOne(TestDocument, unittest.TestCase): 242 data_size = 2750000 243 def setUp(self): 244 self.dataset = 'small_doc.json' 245 super(TestSmallDocInsertOne, self).setUp() 246 247 self.documents = [self.document.copy() for _ in range(NUM_DOCS)] 248 249 def do_task(self): 250 insert_one = self.corpus.insert_one 251 for doc in self.documents: 252 insert_one(doc) 253 254 255class TestLargeDocInsertOne(TestDocument, unittest.TestCase): 256 data_size = 27310890 257 def setUp(self): 258 self.dataset = 'large_doc.json' 259 super(TestLargeDocInsertOne, self).setUp() 260 261 self.documents = [self.document.copy() for _ in range(10)] 262 263 def do_task(self): 264 insert_one = self.corpus.insert_one 265 for doc in self.documents: 266 insert_one(doc) 267 268 269# MULTI-DOC BENCHMARKS 270class TestFindManyAndEmptyCursor(TestDocument, unittest.TestCase): 271 data_size = 16220000 272 def setUp(self): 273 self.dataset = 'tweet.json' 274 super(TestFindManyAndEmptyCursor, self).setUp() 275 276 for _ in range(10): 277 self.client.perftest.command( 278 'insert', 'corpus', 279 documents=[self.document] * 1000) 280 self.corpus = self.client.perftest.corpus 281 282 def do_task(self): 283 list(self.corpus.find()) 284 285 def before(self): 286 pass 287 288 def after(self): 289 pass 290 291 292class TestSmallDocBulkInsert(TestDocument, unittest.TestCase): 293 data_size = 2750000 294 def setUp(self): 295 self.dataset = 'small_doc.json' 296 super(TestSmallDocBulkInsert, self).setUp() 297 self.documents = [self.document.copy() for _ in range(NUM_DOCS)] 298 299 def before(self): 300 self.corpus = self.client.perftest.create_collection('corpus') 301 302 def do_task(self): 303 self.corpus.insert_many(self.documents, ordered=True) 304 305 306class TestLargeDocBulkInsert(TestDocument, unittest.TestCase): 307 data_size = 27310890 308 def setUp(self): 309 self.dataset = 'large_doc.json' 310 super(TestLargeDocBulkInsert, self).setUp() 311 self.documents = [self.document.copy() for _ in range(10)] 312 313 def before(self): 314 self.corpus = self.client.perftest.create_collection('corpus') 315 316 def do_task(self): 317 self.corpus.insert_many(self.documents, ordered=True) 318 319 320class TestGridFsUpload(PerformanceTest, unittest.TestCase): 321 data_size = 52428800 322 def setUp(self): 323 self.client = client_context.client 324 self.client.drop_database('perftest') 325 326 gridfs_path = os.path.join( 327 TEST_PATH, 328 os.path.join('single_and_multi_document', 'gridfs_large.bin')) 329 with open(gridfs_path, 'rb') as data: 330 self.document = data.read() 331 332 self.bucket = GridFSBucket(self.client.perftest) 333 334 def tearDown(self): 335 super(TestGridFsUpload, self).tearDown() 336 self.client.drop_database('perftest') 337 338 def before(self): 339 self.bucket.upload_from_stream('init', b'x') 340 341 def do_task(self): 342 self.bucket.upload_from_stream('gridfstest', self.document) 343 344 345class TestGridFsDownload(PerformanceTest, unittest.TestCase): 346 data_size = 52428800 347 def setUp(self): 348 self.client = client_context.client 349 self.client.drop_database('perftest') 350 351 gridfs_path = os.path.join( 352 TEST_PATH, 353 os.path.join('single_and_multi_document', 'gridfs_large.bin')) 354 355 self.bucket = GridFSBucket(self.client.perftest) 356 with open(gridfs_path, 'rb') as gfile: 357 self.uploaded_id = self.bucket.upload_from_stream( 358 'gridfstest', gfile) 359 360 def tearDown(self): 361 super(TestGridFsDownload, self).tearDown() 362 self.client.drop_database('perftest') 363 364 def do_task(self): 365 self.bucket.open_download_stream(self.uploaded_id).read() 366 367 368proc_client = None 369 370 371def proc_init(*dummy): 372 global proc_client 373 proc_client = MongoClient(host, port) 374 375 376# PARALLEL BENCHMARKS 377def mp_map(map_func, files): 378 pool = mp.Pool(initializer=proc_init) 379 pool.map(map_func, files) 380 pool.close() 381 382 383def insert_json_file(filename): 384 with open(filename, 'r') as data: 385 coll = proc_client.perftest.corpus 386 coll.insert_many([json.loads(line) for line in data]) 387 388 389def insert_json_file_with_file_id(filename): 390 documents = [] 391 with open(filename, 'r') as data: 392 for line in data: 393 doc = json.loads(line) 394 doc['file'] = filename 395 documents.append(doc) 396 coll = proc_client.perftest.corpus 397 coll.insert_many(documents) 398 399 400def read_json_file(filename): 401 coll = proc_client.perftest.corpus 402 temp = tempfile.TemporaryFile() 403 try: 404 temp.writelines( 405 [json.dumps(doc) + '\n' for 406 doc in coll.find({'file': filename}, {'_id': False})]) 407 finally: 408 temp.close() 409 410 411def insert_gridfs_file(filename): 412 bucket = GridFSBucket(proc_client.perftest) 413 414 with open(filename, 'rb') as gfile: 415 bucket.upload_from_stream(filename, gfile) 416 417 418def read_gridfs_file(filename): 419 bucket = GridFSBucket(proc_client.perftest) 420 421 temp = tempfile.TemporaryFile() 422 try: 423 bucket.download_to_stream_by_name(filename, temp) 424 finally: 425 temp.close() 426 427 428class TestJsonMultiImport(PerformanceTest, unittest.TestCase): 429 data_size = 565000000 430 def setUp(self): 431 self.client = client_context.client 432 self.client.drop_database('perftest') 433 434 def before(self): 435 self.client.perftest.command({'create': 'corpus'}) 436 self.corpus = self.client.perftest.corpus 437 438 ldjson_path = os.path.join( 439 TEST_PATH, os.path.join('parallel', 'ldjson_multi')) 440 self.files = [os.path.join( 441 ldjson_path, s) for s in os.listdir(ldjson_path)] 442 443 def do_task(self): 444 mp_map(insert_json_file, self.files) 445 446 def after(self): 447 self.client.perftest.drop_collection('corpus') 448 449 def tearDown(self): 450 super(TestJsonMultiImport, self).tearDown() 451 self.client.drop_database('perftest') 452 453 454class TestJsonMultiExport(PerformanceTest, unittest.TestCase): 455 data_size = 565000000 456 def setUp(self): 457 self.client = client_context.client 458 self.client.drop_database('perftest') 459 self.client.perfest.corpus.create_index('file') 460 461 ldjson_path = os.path.join( 462 TEST_PATH, os.path.join('parallel', 'ldjson_multi')) 463 self.files = [os.path.join( 464 ldjson_path, s) for s in os.listdir(ldjson_path)] 465 466 mp_map(insert_json_file_with_file_id, self.files) 467 468 def do_task(self): 469 mp_map(read_json_file, self.files) 470 471 def tearDown(self): 472 super(TestJsonMultiExport, self).tearDown() 473 self.client.drop_database('perftest') 474 475 476class TestGridFsMultiFileUpload(PerformanceTest, unittest.TestCase): 477 data_size = 262144000 478 def setUp(self): 479 self.client = client_context.client 480 self.client.drop_database('perftest') 481 482 def before(self): 483 self.client.perftest.drop_collection('fs.files') 484 self.client.perftest.drop_collection('fs.chunks') 485 486 self.bucket = GridFSBucket(self.client.perftest) 487 gridfs_path = os.path.join( 488 TEST_PATH, os.path.join('parallel', 'gridfs_multi')) 489 self.files = [os.path.join( 490 gridfs_path, s) for s in os.listdir(gridfs_path)] 491 492 def do_task(self): 493 mp_map(insert_gridfs_file, self.files) 494 495 def tearDown(self): 496 super(TestGridFsMultiFileUpload, self).tearDown() 497 self.client.drop_database('perftest') 498 499 500class TestGridFsMultiFileDownload(PerformanceTest, unittest.TestCase): 501 data_size = 262144000 502 def setUp(self): 503 self.client = client_context.client 504 self.client.drop_database('perftest') 505 506 bucket = GridFSBucket(self.client.perftest) 507 508 gridfs_path = os.path.join( 509 TEST_PATH, os.path.join('parallel', 'gridfs_multi')) 510 self.files = [os.path.join( 511 gridfs_path, s) for s in os.listdir(gridfs_path)] 512 513 for fname in self.files: 514 with open(fname, 'rb') as gfile: 515 bucket.upload_from_stream(fname, gfile) 516 517 def do_task(self): 518 mp_map(read_gridfs_file, self.files) 519 520 def tearDown(self): 521 super(TestGridFsMultiFileDownload, self).tearDown() 522 self.client.drop_database('perftest') 523 524 525if __name__ == "__main__": 526 unittest.main() 527