1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18import ctypes 19 20import os 21import time 22import argparse 23import subprocess 24import scipy.sparse as sp 25 26import mxnet as mx 27import numpy as np 28import numpy.random as rnd 29from mxnet.test_utils import rand_ndarray, set_default_context, assert_almost_equal, get_bz2_data 30from mxnet.base import check_call, _LIB 31from util import estimate_density 32 33PARSER = argparse.ArgumentParser(description="Benchmark sparse operators", 34 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 35PARSER.add_argument('--num-omp-threads', type=int, 36 default=1, help='number of omp threads to set in MXNet') 37PARSER.add_argument('--gpu', action='store_true', 38 help="to be run on gpu") 39# TODO: Use logging later 40PARSER.add_argument('--verbose', action='store_true', 41 help="Verbose output") 42ARGS = PARSER.parse_args() 43 44# some data information 45KDDA = { 46 'data_mini': 'kdda.t.mini', 47 'data_name': 'kdda.t', 48 'data_origin_name': 'kdda.t.bz2', 49 'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2", 50 'feature_dim': 20216830, 51 'm': [1, 8, 32], 52 'batch_size': [64], 53 'default_index': {'batch_size': 0, 54 'output_dim': 2}, 55 'num_batches': 10 56} 57 58AVAZU = { 59 'data_mini': 'avazu-app.t.mini', 60 'data_name': 'avazu-app.t', 61 'data_origin_name': 'avazu-app.t.bz2', 62 'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2", 63 'feature_dim': 1000000, 64 'm': [1, 1000, 2000], 65 'batch_size': [128, 256], 66 'default_index': {'batch_size': 0, 67 'output_dim': 1}, 68 'num_batches': 10 69} 70 71CRITEO = { 72 'data_mini': 'criteo.t.mini', 73 'data_name': 'criteo.t', 74 'data_origin_name': 'criteo.t.bz2', 75 'url' : "https://s3-us-west-2.amazonaws.com/sparse-dataset/criteo.t.bz2", 76 'feature_dim': 8388621, 77 'm': [1, 8, 16, 32, 64], 78 'batch_size': [64, 128], 79 'default_index': {'batch_size': 1, 80 'output_dim': 3}, 81 'num_batches': 10 82} 83 84SYNTHETIC1 = { 85 'feature_dim': [1000000], 86 'm': [256, 1000], 87 'density': [0.001, 0.005, 0.01, 0.02, 0.05, 88 0.1, 0.2, 0.5, 0.65], 89 'batch_size': [64, 128], 90 'default_index': {'batch_size': 1, 91 'density': 2, 92 'output_dim': 1, 93 'feature_dim': 0}, 94 'num_repeat': 10 95} 96 97SYNTHETIC2 = { 98 'feature_dim': [8000000, 16000000], 99 'm': [1, 32], 100 'density': [0.001, 0.005, 0.01, 0.02, 0.05, 101 0.1, 0.2, 0.5, 0.65], 102 'batch_size': [64, 128], 103 'default_index': {'batch_size': 1, 104 'density': 2, 105 'output_dim': 1, 106 'feature_dim': 0}, 107 'num_repeat': 10 108} 109 110def measure_cost(repeat, scipy_trans_lhs, scipy_dns_lhs, func_name, *args, **kwargs): 111 """Measure time cost of running a function 112 """ 113 mx.nd.waitall() 114 args_list = [] 115 for arg in args: 116 args_list.append(arg) 117 start = time.time() 118 if scipy_trans_lhs: 119 args_list[0] = np.transpose(args_list[0]) if scipy_dns_lhs else sp.spmatrix.transpose(args_list[0]) 120 for _ in range(repeat): 121 func_name(*args_list, **kwargs) 122 mx.nd.waitall() 123 end = time.time() 124 diff = end - start 125 return diff / repeat 126 127 128def _get_iter(path, data_shape, batch_size): 129 data_train = mx.io.LibSVMIter(data_libsvm=path, 130 data_shape=data_shape, 131 batch_size=batch_size) 132 data_iter = iter(data_train) 133 return data_iter 134 135 136def _line_count(path): 137 return int(subprocess.check_output('wc -l {}'.format(path), shell=True).split()[0]) 138 139 140def _compare_sparse_dense(data_dir, file_name, mini_file_name, feature_dim, 141 output_dim, density, batch_size, num_batches=3, num_repeat=5, transpose=False, 142 rsp=False): 143 144 def create_mini_path(mini_path, path, num_batches): 145 """Samples batches of size: batch_size, total number: num_batches 146 from the dataset files for running benchmarks""" 147 if not os.path.exists(mini_path): 148 last = _line_count(path) - num_batches * batch_size 149 last = last if last >= 1 else 1 150 start = int(rnd.uniform(1, last)) 151 os.system("sed -n '%d,%dp' %r > %r" 152 %(start, start + num_batches * batch_size, path, mini_path)) 153 assert os.path.exists(mini_path) 154 155 156 def run_benchmark(mini_path): 157 """Run benchmarks 158 """ 159 data_shape = (feature_dim, ) 160 train_iter = _get_iter(mini_path, data_shape, batch_size) 161 weight_row_dim = batch_size if transpose else feature_dim 162 weight_shape = (weight_row_dim, output_dim) 163 if not rsp: 164 weight = mx.nd.random.uniform(low=0, high=1, shape=weight_shape) 165 else: 166 weight = rand_ndarray(weight_shape, "row_sparse", density=0.05, distribution="uniform") 167 total_cost = {} 168 average_cost = {} 169 count = 0 170 total_cost["sparse"] = 0. 171 total_cost["dense"] = 0. 172 for _ in train_iter: 173 csr_data = train_iter.getdata() 174 dns_data = csr_data.tostype('default') 175 cost_sparse = measure_cost(num_repeat, False, False, mx.nd.sparse.dot, csr_data, weight, transpose_a=transpose) 176 cost_dense = measure_cost(num_repeat, False, False, mx.nd.dot, dns_data, weight, transpose_a=transpose) 177 total_cost["sparse"] += cost_sparse 178 total_cost["dense"] += cost_dense 179 count = count + 1 180 average_cost["sparse"] = total_cost["sparse"] / count 181 average_cost["dense"] = total_cost["dense"] / count 182 return (average_cost["sparse"], average_cost["dense"]) 183 184 185 def print_result(average_cost_sparse, average_cost_dense): 186 """Print result of comparison between sparse and dense 187 """ 188 ratio = average_cost_dense / average_cost_sparse 189 fmt = '{:15.4f} {:10d} {:10d} {:10d} {:20.2f} {:15.2f} {:15.2f} {:10} {:10}' 190 print(fmt.format(density * 100, batch_size, output_dim, feature_dim, 191 ratio, average_cost_dense*1000, average_cost_sparse*1000, 192 transpose, rsp)) 193 194 mini_path = os.path.join(data_dir, mini_file_name) 195 path = os.path.join(data_dir, file_name) 196 create_mini_path(mini_path, path, num_batches) 197 average_cost_sparse, average_cost_dense = run_benchmark(mini_path) 198 print_result(average_cost_sparse, average_cost_dense) 199 200 201def test_dot_real(data_dict): 202 """Dot operator testing with real datasets""" 203 data_dir = os.path.join(os.getcwd(), 'data') 204 205 path = os.path.join(data_dir, data_dict['data_name']) 206 if not os.path.exists(path): 207 get_bz2_data( 208 data_dir, 209 data_dict['data_name'], 210 data_dict['url'], 211 data_dict['data_origin_name'] 212 ) 213 assert os.path.exists(path) 214 215 k = data_dict['feature_dim'] 216 m = data_dict['m'] 217 batch_size_list = data_dict['batch_size'] 218 219 default_output_index = data_dict['default_index']['output_dim'] 220 default_batch_size_index = data_dict['default_index']['batch_size'] 221 density = estimate_density(path, data_dict['feature_dim']) 222 num_batches = data_dict['num_batches'] 223 224 assert default_batch_size_index < len(batch_size_list) 225 assert default_output_index < len(m) 226 if ARGS.verbose: 227 print("Running Benchmarking on %r data") % data_dict['data_mini'] 228 print('{:>15} {:>10} {:>10} {:>10} {:>20} {:>15} {:>15} {:>10} {:>10}'.format('density(%)', 229 'n', 230 'm', 231 'k', 232 't_dense/t_sparse', 233 't_dense(ms)', 234 't_sparse(ms)', 235 'is_transpose', 236 'rhs_rsp')) 237 238 239 for output_dim in m: 240 _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], 241 k, output_dim, density, 242 batch_size_list[default_batch_size_index], num_batches) 243 _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], 244 k, output_dim, density, 245 batch_size_list[default_batch_size_index], num_batches, 246 transpose=True) 247 _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], 248 k, output_dim, density, 249 batch_size_list[default_batch_size_index], num_batches, rsp=True) 250 251 for batch_size in batch_size_list: 252 _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], 253 k, m[default_output_index], density, batch_size, num_batches) 254 _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], 255 k, m[default_output_index], density, batch_size, num_batches, 256 transpose=True) 257 _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'], 258 k, output_dim, density, 259 batch_size_list[default_batch_size_index], num_batches, rsp=True) 260 261 262def test_dot_synthetic(data_dict): 263 """benchmark sparse mxnet dot and scipy dot operator with matrices of given density. 264 `t_sparse` is the runtime of the invoked sparse dot operator in ms, while `t_dense` is the 265 runtime of dot(dns, dns), with the same matrices except that they are in default storage type. 266 """ 267 # Benchmark MXNet and Scipys dot operator 268 def bench_dot(lhs_shape, rhs_shape, lhs_stype, rhs_stype, 269 lhs_den, rhs_den, trans_lhs, ctx, num_repeat=10, fw="mxnet", distribution="uniform"): 270 set_default_context(ctx) 271 assert fw == "mxnet" or fw == "scipy" 272 # Set funcs 273 dot_func_sparse = mx.nd.sparse.dot if fw == "mxnet" else sp.spmatrix.dot 274 dot_func_dense = mx.nd.dot if fw == "mxnet" else np.dot 275 # Create matrix instances 276 lhs_nd = rand_ndarray(lhs_shape, lhs_stype, density=lhs_den, distribution=distribution) 277 # only uniform distribution supported for rhs 278 if rhs_stype == 'csr': 279 rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution=distribution) 280 else: 281 rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution="uniform") 282 lhs_dns = None 283 rhs_dns = None 284 dense_cost = None 285 sparse_cost = None 286 287 if fw == "mxnet": 288 lhs_dns = lhs_nd if lhs_stype == 'default' else lhs_nd.tostype('default') 289 rhs_dns = rhs_nd if rhs_stype == 'default' else rhs_nd.tostype('default') 290 # One warm up run, verify correctness 291 out = dot_func_sparse(lhs_nd, rhs_dns, trans_lhs) 292 out_expected = dot_func_dense(lhs_dns, rhs_dns, trans_lhs) 293 assert_almost_equal(out.asnumpy(), out_expected.asnumpy(), rtol=1e-1, atol=1e-1) 294 sparse_cost = measure_cost(num_repeat, False, False, dot_func_sparse, lhs_nd, rhs_nd, trans_lhs) 295 dense_cost = measure_cost(num_repeat, False, False, dot_func_dense, lhs_dns, rhs_dns, trans_lhs) 296 else: 297 lhs_dns = lhs_nd.asnumpy() 298 rhs_dns = rhs_nd.asnumpy() 299 lhs_nd = sp.csr_matrix(lhs_nd.asnumpy()) 300 rhs_nd = rhs_nd.asnumpy() 301 # One warm up run, verify correctness 302 lhs_nd_copy = sp.spmatrix.transpose(lhs_nd) if trans_lhs else lhs_nd 303 out = dot_func_sparse(lhs_nd_copy, rhs_dns) 304 sparse_cost = measure_cost(num_repeat, trans_lhs, False, dot_func_sparse, lhs_nd, rhs_nd) 305 dense_cost = measure_cost(num_repeat, trans_lhs, True, dot_func_dense, lhs_dns, rhs_dns) 306 307 speedup = dense_cost / sparse_cost 308 # Print results 309 m = lhs_shape[0] 310 k = lhs_shape[1] 311 n = rhs_shape[1] 312 result_pattern = '{:15.1f} {:15.1f} {:>10} {:8d} {:8d} {:8d} {:13.2f} {:13.2f} {:8.2f}' 313 results = result_pattern.format(lhs_den*100, 314 rhs_den*100, 315 str(ctx), 316 m, 317 k, 318 n, 319 sparse_cost*1000, 320 dense_cost*1000, 321 speedup) 322 print(results) 323 324 def print_benchmark_info(lhs, rhs, lhs_trans, fw): 325 trans_str = "^T" if lhs_trans else "" 326 print("========================================================") 327 print(" %s sparse dot benchmark: dot(%s, %s) = %s ") % (fw, lhs, rhs, rhs) 328 print(" (matrix multiplication: (m x k)%s * (k x n) = m x n) ") % (trans_str) 329 print("========================================================") 330 headline_pattern = '{:>15} {:>15} {:>10} {:>8} {:>8} {:>8} {:>13} {:>13} {:>8}' 331 headline = headline_pattern.format('lhs_density(%)', 332 'rhs_density(%)', 333 'context', 334 'm', 'k', 'n', 335 't_sparse(ms)', 336 't_dense(ms)', 337 'speedup') 338 print(headline) 339 340 341 def run_benchmark(ctx=None, lhs="csr", lhs_trans=False, rhs="dns", fw="mxnet", rhs_density=1, 342 distribution="uniform"): 343 344 if rhs_density > 1 or rhs_density < 0: 345 raise ValueError("rhs_density has to be between 0 and 1") 346 347 print_benchmark_info(lhs, rhs, lhs_trans, fw) 348 349 if rhs == "csr": 350 lhs_stype = "default" 351 rhs_stype = "csr" 352 assert (lhs_stype == 'default'), "Only dot(default, csr) supported" 353 # Arrange dimensions according to use case. For below csr will have num_rows << num_cols 354 feature_dim_list = data_dict['batch_size'] 355 batch_size_list = data_dict['m'] 356 output_dim_list = data_dict['feature_dim'] 357 density_list = data_dict['density'] 358 default_output_index = data_dict['default_index']['feature_dim'] 359 default_density_index = data_dict['default_index']['density'] 360 default_feature_index = data_dict['default_index']['batch_size'] 361 default_batch_size_index = data_dict['default_index']['output_dim'] 362 num_repeat = data_dict['num_repeat'] 363 364 else: 365 lhs_stype = "csr" 366 rhs_stype = "row_sparse" if rhs == "rsp" else "default" 367 368 feature_dim_list = data_dict['feature_dim'] 369 output_dim_list = data_dict['m'] 370 batch_size_list = data_dict['batch_size'] 371 density_list = data_dict['density'] 372 373 default_output_index = data_dict['default_index']['output_dim'] 374 default_batch_size_index = data_dict['default_index']['batch_size'] 375 default_feature_index = data_dict['default_index']['feature_dim'] 376 default_density_index = data_dict['default_index']['density'] 377 num_repeat = data_dict['num_repeat'] 378 379 for output_dim in output_dim_list: 380 if lhs_trans: 381 output_row_dim = batch_size_list[default_batch_size_index] 382 else: 383 output_row_dim = feature_dim_list[default_feature_index] 384 bench_dot((batch_size_list[default_batch_size_index], 385 feature_dim_list[default_feature_index]), 386 (output_row_dim, output_dim), 387 lhs_stype, rhs_stype, 388 density_list[default_density_index], rhs_density, 389 lhs_trans, ctx, num_repeat=num_repeat, 390 fw=fw, distribution=distribution) 391 392 for feature_dim in feature_dim_list: 393 if lhs_trans: 394 output_row_dim = batch_size_list[default_batch_size_index] 395 else: 396 output_row_dim = feature_dim 397 bench_dot((batch_size_list[default_batch_size_index], feature_dim), 398 (output_row_dim, output_dim_list[default_output_index]), 399 lhs_stype, rhs_stype, density_list[default_density_index], rhs_density, 400 lhs_trans, ctx, num_repeat=num_repeat, fw=fw, distribution=distribution) 401 402 for batch_size in batch_size_list: 403 if lhs_trans: 404 output_row_dim = batch_size 405 else: 406 output_row_dim = feature_dim_list[default_feature_index] 407 bench_dot((batch_size, feature_dim_list[default_feature_index]), 408 (output_row_dim, 409 output_dim_list[default_output_index]), 410 lhs_stype, rhs_stype, density_list[default_density_index], 411 rhs_density, lhs_trans, ctx, num_repeat=num_repeat, 412 fw=fw, distribution=distribution) 413 414 for density in density_list: 415 if lhs_trans: 416 output_row_dim = batch_size_list[default_batch_size_index] 417 else: 418 output_row_dim = feature_dim_list[default_feature_index] 419 bench_dot((batch_size_list[default_batch_size_index], 420 feature_dim_list[default_feature_index]), 421 (output_row_dim, 422 output_dim_list[default_output_index]), 423 lhs_stype, rhs_stype, density, density, lhs_trans, ctx, 424 num_repeat=num_repeat, fw=fw, distribution=distribution) 425 426 check_call(_LIB.MXSetNumOMPThreads(ctypes.c_int(ARGS.num_omp_threads))) 427 context = mx.gpu() if ARGS.gpu else mx.cpu() 428 # TODO(anirudh): make the data dicts to config which can be passed at runtime 429 distributions = ["uniform", "powerlaw"] 430 for distribution in distributions: 431 run_benchmark(context, lhs="csr", 432 rhs="default", lhs_trans=False, 433 fw="mxnet", rhs_density=1, 434 distribution=distribution) 435 run_benchmark(context, lhs="csr", 436 rhs="default", lhs_trans=True, 437 fw="mxnet", rhs_density=1, 438 distribution=distribution) 439 run_benchmark(context, lhs="csr", 440 rhs="rsp", lhs_trans=False, 441 fw="mxnet", rhs_density=0.05, 442 distribution=distribution) 443 run_benchmark(context, lhs="default", 444 rhs="csr", lhs_trans=False, 445 fw="mxnet", rhs_density=0.001, 446 distribution=distribution) 447 if not ARGS.gpu: 448 run_benchmark(context, lhs="csr", 449 rhs="default", lhs_trans=False, 450 fw="scipy", rhs_density=1, 451 distribution=distribution) 452 run_benchmark(context, lhs="csr", 453 rhs="default", lhs_trans=True, 454 fw="scipy", rhs_density=1, 455 distribution=distribution) 456 457 458if __name__ == "__main__": 459 begin_time = time.time() 460 test_dot_real(KDDA) 461 test_dot_real(AVAZU) 462 test_dot_real(CRITEO) 463 test_dot_synthetic(SYNTHETIC1) 464 test_dot_synthetic(SYNTHETIC2) 465 total_time = time.time() - begin_time 466 print("total time is %f") % total_time 467