1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18import ctypes
19
20import os
21import time
22import argparse
23import subprocess
24import scipy.sparse as sp
25
26import mxnet as mx
27import numpy as np
28import numpy.random as rnd
29from mxnet.test_utils import rand_ndarray, set_default_context, assert_almost_equal, get_bz2_data
30from mxnet.base import check_call, _LIB
31from util import estimate_density
32
33PARSER = argparse.ArgumentParser(description="Benchmark sparse operators",
34                                 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
35PARSER.add_argument('--num-omp-threads', type=int,
36                    default=1, help='number of omp threads to set in MXNet')
37PARSER.add_argument('--gpu', action='store_true',
38                    help="to be run on gpu")
39# TODO: Use logging later
40PARSER.add_argument('--verbose', action='store_true',
41                    help="Verbose output")
42ARGS = PARSER.parse_args()
43
44# some data information
45KDDA = {
46    'data_mini': 'kdda.t.mini',
47    'data_name': 'kdda.t',
48    'data_origin_name': 'kdda.t.bz2',
49    'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2",
50    'feature_dim': 20216830,
51    'm': [1, 8, 32],
52    'batch_size': [64],
53    'default_index': {'batch_size': 0,
54                      'output_dim': 2},
55    'num_batches': 10
56}
57
58AVAZU = {
59    'data_mini': 'avazu-app.t.mini',
60    'data_name': 'avazu-app.t',
61    'data_origin_name': 'avazu-app.t.bz2',
62    'url': "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/avazu-app.t.bz2",
63    'feature_dim': 1000000,
64    'm': [1, 1000, 2000],
65    'batch_size': [128, 256],
66    'default_index': {'batch_size': 0,
67                      'output_dim': 1},
68    'num_batches': 10
69}
70
71CRITEO = {
72    'data_mini': 'criteo.t.mini',
73    'data_name': 'criteo.t',
74    'data_origin_name': 'criteo.t.bz2',
75    'url' : "https://s3-us-west-2.amazonaws.com/sparse-dataset/criteo.t.bz2",
76    'feature_dim': 8388621,
77    'm': [1, 8, 16, 32, 64],
78    'batch_size': [64, 128],
79    'default_index': {'batch_size': 1,
80                      'output_dim': 3},
81    'num_batches': 10
82}
83
84SYNTHETIC1 = {
85    'feature_dim': [1000000],
86    'm': [256, 1000],
87    'density': [0.001, 0.005, 0.01, 0.02, 0.05,
88                0.1, 0.2, 0.5, 0.65],
89    'batch_size': [64, 128],
90    'default_index': {'batch_size': 1,
91                      'density': 2,
92                      'output_dim': 1,
93                      'feature_dim': 0},
94    'num_repeat': 10
95}
96
97SYNTHETIC2 = {
98    'feature_dim': [8000000, 16000000],
99    'm': [1, 32],
100    'density': [0.001, 0.005, 0.01, 0.02, 0.05,
101                0.1, 0.2, 0.5, 0.65],
102    'batch_size': [64, 128],
103    'default_index': {'batch_size': 1,
104                      'density': 2,
105                      'output_dim': 1,
106                      'feature_dim': 0},
107    'num_repeat': 10
108}
109
110def measure_cost(repeat, scipy_trans_lhs, scipy_dns_lhs, func_name, *args, **kwargs):
111    """Measure time cost of running a function
112    """
113    mx.nd.waitall()
114    args_list = []
115    for arg in args:
116        args_list.append(arg)
117    start = time.time()
118    if scipy_trans_lhs:
119        args_list[0] = np.transpose(args_list[0]) if scipy_dns_lhs else sp.spmatrix.transpose(args_list[0])
120    for _ in range(repeat):
121        func_name(*args_list, **kwargs)
122    mx.nd.waitall()
123    end = time.time()
124    diff = end - start
125    return diff / repeat
126
127
128def _get_iter(path, data_shape, batch_size):
129    data_train = mx.io.LibSVMIter(data_libsvm=path,
130                                  data_shape=data_shape,
131                                  batch_size=batch_size)
132    data_iter = iter(data_train)
133    return data_iter
134
135
136def _line_count(path):
137    return int(subprocess.check_output('wc -l {}'.format(path), shell=True).split()[0])
138
139
140def _compare_sparse_dense(data_dir, file_name, mini_file_name, feature_dim,
141                          output_dim, density, batch_size, num_batches=3, num_repeat=5, transpose=False,
142                          rsp=False):
143
144    def create_mini_path(mini_path, path, num_batches):
145        """Samples batches of size: batch_size, total number: num_batches
146        from the dataset files for running benchmarks"""
147        if not os.path.exists(mini_path):
148            last = _line_count(path) - num_batches * batch_size
149            last = last if last >= 1 else 1
150            start = int(rnd.uniform(1, last))
151            os.system("sed -n '%d,%dp' %r > %r"
152                      %(start, start + num_batches * batch_size, path, mini_path))
153            assert os.path.exists(mini_path)
154
155
156    def run_benchmark(mini_path):
157        """Run benchmarks
158        """
159        data_shape = (feature_dim, )
160        train_iter = _get_iter(mini_path, data_shape, batch_size)
161        weight_row_dim = batch_size if transpose else feature_dim
162        weight_shape = (weight_row_dim, output_dim)
163        if not rsp:
164            weight = mx.nd.random.uniform(low=0, high=1, shape=weight_shape)
165        else:
166            weight = rand_ndarray(weight_shape, "row_sparse", density=0.05, distribution="uniform")
167        total_cost = {}
168        average_cost = {}
169        count = 0
170        total_cost["sparse"] = 0.
171        total_cost["dense"] = 0.
172        for _ in train_iter:
173            csr_data = train_iter.getdata()
174            dns_data = csr_data.tostype('default')
175            cost_sparse = measure_cost(num_repeat, False, False, mx.nd.sparse.dot, csr_data, weight, transpose_a=transpose)
176            cost_dense = measure_cost(num_repeat, False, False, mx.nd.dot, dns_data, weight, transpose_a=transpose)
177            total_cost["sparse"] += cost_sparse
178            total_cost["dense"] += cost_dense
179            count = count + 1
180        average_cost["sparse"] = total_cost["sparse"] / count
181        average_cost["dense"] = total_cost["dense"] / count
182        return (average_cost["sparse"], average_cost["dense"])
183
184
185    def print_result(average_cost_sparse, average_cost_dense):
186        """Print result of comparison between sparse and dense
187        """
188        ratio = average_cost_dense / average_cost_sparse
189        fmt = '{:15.4f} {:10d} {:10d} {:10d} {:20.2f} {:15.2f} {:15.2f} {:10} {:10}'
190        print(fmt.format(density * 100, batch_size, output_dim, feature_dim,
191                         ratio, average_cost_dense*1000, average_cost_sparse*1000,
192                         transpose, rsp))
193
194    mini_path = os.path.join(data_dir, mini_file_name)
195    path = os.path.join(data_dir, file_name)
196    create_mini_path(mini_path, path, num_batches)
197    average_cost_sparse, average_cost_dense = run_benchmark(mini_path)
198    print_result(average_cost_sparse, average_cost_dense)
199
200
201def test_dot_real(data_dict):
202    """Dot operator testing with real datasets"""
203    data_dir = os.path.join(os.getcwd(), 'data')
204
205    path = os.path.join(data_dir, data_dict['data_name'])
206    if not os.path.exists(path):
207        get_bz2_data(
208            data_dir,
209            data_dict['data_name'],
210            data_dict['url'],
211            data_dict['data_origin_name']
212        )
213        assert os.path.exists(path)
214
215    k = data_dict['feature_dim']
216    m = data_dict['m']
217    batch_size_list = data_dict['batch_size']
218
219    default_output_index = data_dict['default_index']['output_dim']
220    default_batch_size_index = data_dict['default_index']['batch_size']
221    density = estimate_density(path, data_dict['feature_dim'])
222    num_batches = data_dict['num_batches']
223
224    assert default_batch_size_index < len(batch_size_list)
225    assert default_output_index < len(m)
226    if ARGS.verbose:
227        print("Running Benchmarking on %r data") % data_dict['data_mini']
228    print('{:>15} {:>10} {:>10} {:>10} {:>20} {:>15} {:>15} {:>10} {:>10}'.format('density(%)',
229                                                                                 'n',
230                                                                                 'm',
231                                                                                 'k',
232                                                                                 't_dense/t_sparse',
233                                                                                 't_dense(ms)',
234                                                                                 't_sparse(ms)',
235                                                                                 'is_transpose',
236                                                                                 'rhs_rsp'))
237
238
239    for output_dim in m:
240        _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'],
241                              k, output_dim, density,
242                              batch_size_list[default_batch_size_index], num_batches)
243        _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'],
244                              k, output_dim, density,
245                              batch_size_list[default_batch_size_index], num_batches,
246                              transpose=True)
247        _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'],
248                              k, output_dim, density,
249                              batch_size_list[default_batch_size_index], num_batches, rsp=True)
250
251    for batch_size in batch_size_list:
252        _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'],
253                              k, m[default_output_index], density, batch_size, num_batches)
254        _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'],
255                              k, m[default_output_index], density, batch_size, num_batches,
256                              transpose=True)
257        _compare_sparse_dense(data_dir, data_dict['data_name'], data_dict['data_mini'],
258                              k, output_dim, density,
259                              batch_size_list[default_batch_size_index], num_batches, rsp=True)
260
261
262def test_dot_synthetic(data_dict):
263    """benchmark sparse mxnet dot and scipy dot operator with matrices of given density.
264    `t_sparse` is the runtime of the invoked sparse dot operator in ms, while `t_dense` is the
265    runtime of dot(dns, dns), with the same matrices except that they are in default storage type.
266    """
267    # Benchmark MXNet and Scipys dot operator
268    def bench_dot(lhs_shape, rhs_shape, lhs_stype, rhs_stype,
269                  lhs_den, rhs_den, trans_lhs, ctx, num_repeat=10, fw="mxnet", distribution="uniform"):
270        set_default_context(ctx)
271        assert fw == "mxnet" or fw == "scipy"
272        # Set funcs
273        dot_func_sparse = mx.nd.sparse.dot if fw == "mxnet" else sp.spmatrix.dot
274        dot_func_dense = mx.nd.dot if fw == "mxnet" else np.dot
275        # Create matrix instances
276        lhs_nd = rand_ndarray(lhs_shape, lhs_stype, density=lhs_den, distribution=distribution)
277        # only uniform distribution supported for rhs
278        if rhs_stype == 'csr':
279            rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution=distribution)
280        else:
281            rhs_nd = rand_ndarray(rhs_shape, rhs_stype, density=rhs_den, distribution="uniform")
282        lhs_dns = None
283        rhs_dns = None
284        dense_cost = None
285        sparse_cost = None
286
287        if fw == "mxnet":
288            lhs_dns = lhs_nd if lhs_stype == 'default' else lhs_nd.tostype('default')
289            rhs_dns = rhs_nd if rhs_stype == 'default' else rhs_nd.tostype('default')
290            # One warm up run, verify correctness
291            out = dot_func_sparse(lhs_nd, rhs_dns, trans_lhs)
292            out_expected = dot_func_dense(lhs_dns, rhs_dns, trans_lhs)
293            assert_almost_equal(out.asnumpy(), out_expected.asnumpy(), rtol=1e-1, atol=1e-1)
294            sparse_cost = measure_cost(num_repeat, False, False, dot_func_sparse, lhs_nd, rhs_nd, trans_lhs)
295            dense_cost = measure_cost(num_repeat, False, False, dot_func_dense, lhs_dns, rhs_dns, trans_lhs)
296        else:
297            lhs_dns = lhs_nd.asnumpy()
298            rhs_dns = rhs_nd.asnumpy()
299            lhs_nd = sp.csr_matrix(lhs_nd.asnumpy())
300            rhs_nd = rhs_nd.asnumpy()
301            # One warm up run, verify correctness
302            lhs_nd_copy = sp.spmatrix.transpose(lhs_nd) if trans_lhs else lhs_nd
303            out = dot_func_sparse(lhs_nd_copy, rhs_dns)
304            sparse_cost = measure_cost(num_repeat, trans_lhs, False, dot_func_sparse, lhs_nd, rhs_nd)
305            dense_cost = measure_cost(num_repeat, trans_lhs, True, dot_func_dense, lhs_dns, rhs_dns)
306
307        speedup = dense_cost / sparse_cost
308        # Print results
309        m = lhs_shape[0]
310        k = lhs_shape[1]
311        n = rhs_shape[1]
312        result_pattern = '{:15.1f} {:15.1f} {:>10} {:8d} {:8d} {:8d} {:13.2f} {:13.2f} {:8.2f}'
313        results = result_pattern.format(lhs_den*100,
314                                        rhs_den*100,
315                                        str(ctx),
316                                        m,
317                                        k,
318                                        n,
319                                        sparse_cost*1000,
320                                        dense_cost*1000,
321                                        speedup)
322        print(results)
323
324    def print_benchmark_info(lhs, rhs, lhs_trans, fw):
325        trans_str = "^T" if lhs_trans else ""
326        print("========================================================")
327        print("  %s sparse dot benchmark: dot(%s, %s) = %s  ") % (fw, lhs, rhs, rhs)
328        print("  (matrix multiplication: (m x k)%s * (k x n) = m x n)  ") % (trans_str)
329        print("========================================================")
330        headline_pattern = '{:>15} {:>15} {:>10} {:>8} {:>8} {:>8} {:>13} {:>13} {:>8}'
331        headline = headline_pattern.format('lhs_density(%)',
332                                           'rhs_density(%)',
333                                           'context',
334                                           'm', 'k', 'n',
335                                           't_sparse(ms)',
336                                           't_dense(ms)',
337                                           'speedup')
338        print(headline)
339
340
341    def run_benchmark(ctx=None, lhs="csr", lhs_trans=False, rhs="dns", fw="mxnet", rhs_density=1,
342                      distribution="uniform"):
343
344        if rhs_density > 1 or rhs_density < 0:
345            raise ValueError("rhs_density has to be between 0 and 1")
346
347        print_benchmark_info(lhs, rhs, lhs_trans, fw)
348
349        if rhs == "csr":
350            lhs_stype = "default"
351            rhs_stype = "csr"
352            assert (lhs_stype == 'default'), "Only dot(default, csr) supported"
353            # Arrange dimensions according to use case. For below csr will have num_rows << num_cols
354            feature_dim_list = data_dict['batch_size']
355            batch_size_list = data_dict['m']
356            output_dim_list = data_dict['feature_dim']
357            density_list = data_dict['density']
358            default_output_index = data_dict['default_index']['feature_dim']
359            default_density_index = data_dict['default_index']['density']
360            default_feature_index = data_dict['default_index']['batch_size']
361            default_batch_size_index = data_dict['default_index']['output_dim']
362            num_repeat = data_dict['num_repeat']
363
364        else:
365            lhs_stype = "csr"
366            rhs_stype = "row_sparse" if rhs == "rsp" else "default"
367
368            feature_dim_list = data_dict['feature_dim']
369            output_dim_list = data_dict['m']
370            batch_size_list = data_dict['batch_size']
371            density_list = data_dict['density']
372
373            default_output_index = data_dict['default_index']['output_dim']
374            default_batch_size_index = data_dict['default_index']['batch_size']
375            default_feature_index = data_dict['default_index']['feature_dim']
376            default_density_index = data_dict['default_index']['density']
377            num_repeat = data_dict['num_repeat']
378
379        for output_dim in output_dim_list:
380            if lhs_trans:
381                output_row_dim = batch_size_list[default_batch_size_index]
382            else:
383                output_row_dim = feature_dim_list[default_feature_index]
384            bench_dot((batch_size_list[default_batch_size_index],
385                       feature_dim_list[default_feature_index]),
386                      (output_row_dim, output_dim),
387                      lhs_stype, rhs_stype,
388                      density_list[default_density_index], rhs_density,
389                      lhs_trans, ctx, num_repeat=num_repeat,
390                      fw=fw, distribution=distribution)
391
392        for feature_dim in feature_dim_list:
393            if lhs_trans:
394                output_row_dim = batch_size_list[default_batch_size_index]
395            else:
396                output_row_dim = feature_dim
397            bench_dot((batch_size_list[default_batch_size_index], feature_dim),
398                      (output_row_dim, output_dim_list[default_output_index]),
399                      lhs_stype, rhs_stype, density_list[default_density_index], rhs_density,
400                      lhs_trans, ctx, num_repeat=num_repeat, fw=fw, distribution=distribution)
401
402        for batch_size in batch_size_list:
403            if lhs_trans:
404                output_row_dim = batch_size
405            else:
406                output_row_dim = feature_dim_list[default_feature_index]
407            bench_dot((batch_size, feature_dim_list[default_feature_index]),
408                      (output_row_dim,
409                       output_dim_list[default_output_index]),
410                      lhs_stype, rhs_stype, density_list[default_density_index],
411                      rhs_density, lhs_trans, ctx, num_repeat=num_repeat,
412                      fw=fw, distribution=distribution)
413
414        for density in density_list:
415            if lhs_trans:
416                output_row_dim = batch_size_list[default_batch_size_index]
417            else:
418                output_row_dim = feature_dim_list[default_feature_index]
419            bench_dot((batch_size_list[default_batch_size_index],
420                       feature_dim_list[default_feature_index]),
421                      (output_row_dim,
422                       output_dim_list[default_output_index]),
423                      lhs_stype, rhs_stype, density, density, lhs_trans, ctx,
424                      num_repeat=num_repeat, fw=fw, distribution=distribution)
425
426    check_call(_LIB.MXSetNumOMPThreads(ctypes.c_int(ARGS.num_omp_threads)))
427    context = mx.gpu() if ARGS.gpu else mx.cpu()
428    # TODO(anirudh): make the data dicts to config which can be passed at runtime
429    distributions = ["uniform", "powerlaw"]
430    for distribution in distributions:
431        run_benchmark(context, lhs="csr",
432                      rhs="default", lhs_trans=False,
433                      fw="mxnet", rhs_density=1,
434                      distribution=distribution)
435        run_benchmark(context, lhs="csr",
436                      rhs="default", lhs_trans=True,
437                      fw="mxnet", rhs_density=1,
438                      distribution=distribution)
439        run_benchmark(context, lhs="csr",
440                      rhs="rsp", lhs_trans=False,
441                      fw="mxnet", rhs_density=0.05,
442                      distribution=distribution)
443        run_benchmark(context, lhs="default",
444                      rhs="csr", lhs_trans=False,
445                      fw="mxnet", rhs_density=0.001,
446                      distribution=distribution)
447        if not ARGS.gpu:
448            run_benchmark(context, lhs="csr",
449                          rhs="default", lhs_trans=False,
450                          fw="scipy", rhs_density=1,
451                          distribution=distribution)
452            run_benchmark(context, lhs="csr",
453                          rhs="default", lhs_trans=True,
454                          fw="scipy", rhs_density=1,
455                          distribution=distribution)
456
457
458if __name__ == "__main__":
459    begin_time = time.time()
460    test_dot_real(KDDA)
461    test_dot_real(AVAZU)
462    test_dot_real(CRITEO)
463    test_dot_synthetic(SYNTHETIC1)
464    test_dot_synthetic(SYNTHETIC2)
465    total_time = time.time() - begin_time
466    print("total time is %f") % total_time
467