1import numpy as np
2from numpy.testing import (run_module_suite,
3                           assert_,
4                           assert_equal,
5                           assert_almost_equal,
6                           assert_array_equal,
7                           assert_array_almost_equal,
8                           assert_raises)
9from dipy.align.streamlinear import (compose_matrix44,
10                                     decompose_matrix44,
11                                     BundleSumDistanceMatrixMetric,
12                                     BundleMinDistanceMatrixMetric,
13                                     BundleMinDistanceMetric,
14                                     StreamlineLinearRegistration,
15                                     StreamlineDistanceMetric)
16
17from dipy.tracking.streamline import (center_streamlines,
18                                      unlist_streamlines,
19                                      relist_streamlines,
20                                      transform_streamlines,
21                                      set_number_of_points,
22                                      Streamlines)
23from dipy.io.streamline import load_tractogram
24from dipy.core.geometry import compose_matrix
25
26from dipy.data import get_fnames, two_cingulum_bundles
27from dipy.align.bundlemin import (_bundle_minimum_distance_matrix,
28                                  _bundle_minimum_distance,
29                                  distance_matrix_mdf)
30
31
32def simulated_bundle(no_streamlines=10, waves=False, no_pts=12):
33    t = np.linspace(-10, 10, 200)
34    # parallel waves or parallel lines
35    bundle = []
36    for i in np.linspace(-5, 5, no_streamlines):
37        if waves:
38            pts = np.vstack((np.cos(t), t, i * np.ones(t.shape))).T
39        else:
40            pts = np.vstack((np.zeros(t.shape), t, i * np.ones(t.shape))).T
41        pts = set_number_of_points(pts, no_pts)
42        bundle.append(pts)
43
44    return bundle
45
46
47def fornix_streamlines(no_pts=12):
48    fname = get_fnames('fornix')
49
50    fornix = load_tractogram(fname, 'same',
51                             bbox_valid_check=False).streamlines
52
53    fornix_streamlines = Streamlines(fornix)
54    streamlines = set_number_of_points(fornix_streamlines, no_pts)
55    return streamlines
56
57
58def evaluate_convergence(bundle, new_bundle2):
59    pts_static = np.concatenate(bundle, axis=0)
60    pts_moved = np.concatenate(new_bundle2, axis=0)
61    assert_array_almost_equal(pts_static, pts_moved, 3)
62
63
64def test_rigid_parallel_lines():
65
66    bundle_initial = simulated_bundle()
67    bundle, shift = center_streamlines(bundle_initial)
68    mat = compose_matrix44([20, 0, 10, 0, 40, 0])
69
70    bundle2 = transform_streamlines(bundle, mat)
71
72    bundle_sum_distance = BundleSumDistanceMatrixMetric()
73    options = {'maxcor': 100, 'ftol': 1e-9, 'gtol': 1e-16, 'eps': 1e-3}
74    srr = StreamlineLinearRegistration(metric=bundle_sum_distance,
75                                       x0=np.zeros(6),
76                                       method='L-BFGS-B',
77                                       bounds=None,
78                                       options=options)
79
80    new_bundle2 = srr.optimize(bundle, bundle2).transform(bundle2)
81    evaluate_convergence(bundle, new_bundle2)
82
83
84def test_rigid_real_bundles():
85
86    bundle_initial = fornix_streamlines()[:20]
87    bundle, shift = center_streamlines(bundle_initial)
88
89    mat = compose_matrix44([0, 0, 20, 45., 0, 0])
90
91    bundle2 = transform_streamlines(bundle, mat)
92
93    bundle_sum_distance = BundleSumDistanceMatrixMetric()
94    srr = StreamlineLinearRegistration(bundle_sum_distance,
95                                       x0=np.zeros(6),
96                                       method='Powell')
97    new_bundle2 = srr.optimize(bundle, bundle2).transform(bundle2)
98
99    evaluate_convergence(bundle, new_bundle2)
100
101    bundle_min_distance = BundleMinDistanceMatrixMetric()
102    srr = StreamlineLinearRegistration(bundle_min_distance,
103                                       x0=np.zeros(6),
104                                       method='Powell')
105    new_bundle2 = srr.optimize(bundle, bundle2).transform(bundle2)
106
107    evaluate_convergence(bundle, new_bundle2)
108
109    assert_raises(ValueError, StreamlineLinearRegistration, method='Whatever')
110
111
112def test_rigid_partial_real_bundles():
113
114    static = fornix_streamlines()[:20]
115    moving = fornix_streamlines()[20:40]
116    static_center, shift = center_streamlines(static)
117    moving_center, shift2 = center_streamlines(moving)
118
119    print(shift2)
120    mat = compose_matrix(translate=np.array([0, 0, 0.]),
121                         angles=np.deg2rad([40, 0, 0.]))
122    moved = transform_streamlines(moving_center, mat)
123
124    srr = StreamlineLinearRegistration()
125
126    srm = srr.optimize(static_center, moved)
127    print(srm.fopt)
128    print(srm.iterations)
129    print(srm.funcs)
130
131    moving_back = srm.transform(moved)
132    print(srm.matrix)
133
134    static_center = set_number_of_points(static_center, 100)
135    moving_center = set_number_of_points(moving_back, 100)
136
137    vol = np.zeros((100, 100, 100))
138    spts = np.concatenate(static_center, axis=0)
139    spts = np.round(spts).astype(int) + np.array([50, 50, 50])
140
141    mpts = np.concatenate(moving_center, axis=0)
142    mpts = np.round(mpts).astype(int) + np.array([50, 50, 50])
143
144    for index in spts:
145        i, j, k = index
146        vol[i, j, k] = 1
147
148    vol2 = np.zeros((100, 100, 100))
149    for index in mpts:
150        i, j, k = index
151        vol2[i, j, k] = 1
152
153    overlap = np.sum(np.logical_and(vol, vol2)) / float(np.sum(vol2))
154
155    assert_equal(overlap * 100 > 40, True)
156
157
158def test_stream_rigid():
159
160    static = fornix_streamlines()[:20]
161    moving = fornix_streamlines()[20:40]
162    center_streamlines(static)
163
164    mat = compose_matrix44([0, 0, 0, 0, 40, 0])
165    moving = transform_streamlines(moving, mat)
166
167    srr = StreamlineLinearRegistration()
168    sr_params = srr.optimize(static, moving)
169    moved = transform_streamlines(moving, sr_params.matrix)
170
171    srr = StreamlineLinearRegistration(verbose=True)
172    srm = srr.optimize(static, moving)
173    moved2 = transform_streamlines(moving, srm.matrix)
174    moved3 = srm.transform(moving)
175
176    assert_array_almost_equal(moved[0], moved2[0], decimal=3)
177    assert_array_almost_equal(moved2[0], moved3[0], decimal=3)
178
179
180def test_min_vs_min_fast_precision():
181
182    static = fornix_streamlines()[:20]
183    moving = fornix_streamlines()[:20]
184
185    static = [s.astype('f8') for s in static]
186    moving = [m.astype('f8') for m in moving]
187
188    bmd = BundleMinDistanceMatrixMetric()
189    bmd.setup(static, moving)
190
191    bmdf = BundleMinDistanceMetric()
192    bmdf.setup(static, moving)
193
194    x_test = [0.01, 0, 0, 0, 0, 0]
195
196    print(bmd.distance(x_test))
197    print(bmdf.distance(x_test))
198    assert_equal(bmd.distance(x_test), bmdf.distance(x_test))
199
200
201def test_same_number_of_points():
202
203    A = [np.random.rand(10, 3), np.random.rand(20, 3)]
204    B = [np.random.rand(21, 3), np.random.rand(30, 3)]
205    C = [np.random.rand(10, 3), np.random.rand(10, 3)]
206    D = [np.random.rand(20, 3), np.random.rand(20, 3)]
207
208    slr = StreamlineLinearRegistration()
209    assert_raises(ValueError, slr.optimize, A, B)
210    assert_raises(ValueError, slr.optimize, C, D)
211    assert_raises(ValueError, slr.optimize, C, B)
212
213
214def test_efficient_bmd():
215
216    a = np.array([[1, 1, 1],
217                  [2, 2, 2],
218                  [3, 3, 3]])
219
220    streamlines = [a, a + 2, a + 4]
221
222    points, offsets = unlist_streamlines(streamlines)
223    points = points.astype(np.double)
224    points2 = points.copy()
225
226    D = np.zeros((len(offsets), len(offsets)), dtype='f8')
227
228    _bundle_minimum_distance_matrix(points, points2,
229                                    len(offsets), len(offsets),
230                                    a.shape[0], D)
231
232    assert_equal(np.sum(np.diag(D)), 0)
233
234    points2 += 2
235
236    _bundle_minimum_distance_matrix(points, points2,
237                                    len(offsets), len(offsets),
238                                    a.shape[0], D)
239
240    streamlines2 = relist_streamlines(points2, offsets)
241    D2 = distance_matrix_mdf(streamlines, streamlines2)
242
243    assert_array_almost_equal(D, D2)
244
245    cols = D2.shape[1]
246    rows = D2.shape[0]
247
248    dist = 0.25 * (np.sum(np.min(D2, axis=0)) / float(cols) +
249                   np.sum(np.min(D2, axis=1)) / float(rows)) ** 2
250
251    dist2 = _bundle_minimum_distance(points, points2,
252                                     len(offsets), len(offsets),
253                                     a.shape[0])
254    assert_almost_equal(dist, dist2)
255
256
257def test_openmp_locks():
258
259    static = []
260    moving = []
261    pts = 20
262
263    for i in range(1000):
264        s = np.random.rand(pts, 3)
265        static.append(s)
266        moving.append(s + 2)
267
268    moving = moving[2:]
269
270    points, offsets = unlist_streamlines(static)
271    points2, offsets2 = unlist_streamlines(moving)
272
273    D = np.zeros((len(offsets), len(offsets2)), dtype='f8')
274
275    _bundle_minimum_distance_matrix(points, points2,
276                                    len(offsets), len(offsets2),
277                                    pts, D)
278
279    dist1 = 0.25 * (np.sum(np.min(D, axis=0)) / float(D.shape[1]) +
280                    np.sum(np.min(D, axis=1)) / float(D.shape[0])) ** 2
281
282    dist2 = _bundle_minimum_distance(points, points2,
283                                     len(offsets), len(offsets2),
284                                     pts)
285
286    assert_almost_equal(dist1, dist2, 6)
287
288
289def test_from_to_rigid():
290
291    t = np.array([10, 2, 3, 0.1, 20., 30.])
292    mat = compose_matrix44(t)
293    vec = decompose_matrix44(mat, 6)
294
295    assert_array_almost_equal(t, vec)
296
297    t = np.array([0, 0, 0, 180, 0., 0.])
298
299    mat = np.eye(4)
300    mat[0, 0] = -1
301
302    vec = decompose_matrix44(mat, 6)
303
304    assert_array_almost_equal(-t, vec)
305
306
307def test_matrix44():
308
309    assert_raises(ValueError, compose_matrix44, np.ones(5))
310    assert_raises(ValueError, compose_matrix44, np.ones(13))
311    assert_raises(ValueError, compose_matrix44, np.ones(16))
312
313
314def test_abstract_metric_class():
315
316    class DummyStreamlineMetric(StreamlineDistanceMetric):
317        def test():
318            pass
319    assert_raises(TypeError, DummyStreamlineMetric)
320
321
322def test_evolution_of_previous_iterations():
323
324    static = fornix_streamlines()[:20]
325    moving = fornix_streamlines()[:20]
326
327    moving = [m + np.array([10., 0., 0.]) for m in moving]
328
329    slr = StreamlineLinearRegistration(evolution=True)
330
331    slm = slr.optimize(static, moving)
332
333    assert_equal(len(slm.matrix_history), slm.iterations)
334
335
336def test_similarity_real_bundles():
337
338    bundle_initial = fornix_streamlines()
339    bundle_initial, shift = center_streamlines(bundle_initial)
340    bundle = bundle_initial[:20]
341    xgold = [0, 0, 10, 0, 0, 0, 1.5]
342    mat = compose_matrix44(xgold)
343    bundle2 = transform_streamlines(bundle_initial[:20], mat)
344
345    metric = BundleMinDistanceMatrixMetric()
346    x0 = np.array([0, 0, 0, 0, 0, 0, 1], 'f8')
347
348    slr = StreamlineLinearRegistration(metric=metric,
349                                       x0=x0,
350                                       method='Powell',
351                                       bounds=None,
352                                       verbose=False)
353
354    slm = slr.optimize(bundle, bundle2)
355    new_bundle2 = slm.transform(bundle2)
356    evaluate_convergence(bundle, new_bundle2)
357
358
359def test_affine_real_bundles():
360
361    bundle_initial = fornix_streamlines()
362    bundle_initial, shift = center_streamlines(bundle_initial)
363    bundle = bundle_initial[:20]
364    xgold = [0, 4, 2, 0, 10, 10, 1.2, 1.1, 1., 0., 0.2, 0.]
365    mat = compose_matrix44(xgold)
366    bundle2 = transform_streamlines(bundle_initial[:20], mat)
367
368    x0 = np.array([0, 0, 0, 0, 0, 0, 1., 1., 1., 0, 0, 0])
369
370    x = 25
371
372    bounds = [(-x, x), (-x, x), (-x, x),
373              (-x, x), (-x, x), (-x, x),
374              (0.1, 1.5), (0.1, 1.5), (0.1, 1.5),
375              (-1, 1), (-1, 1), (-1, 1)]
376
377    options = {'maxcor': 10, 'ftol': 1e-7, 'gtol': 1e-5, 'eps': 1e-8}
378
379    metric = BundleMinDistanceMatrixMetric()
380
381    slr = StreamlineLinearRegistration(metric=metric,
382                                       x0=x0,
383                                       method='L-BFGS-B',
384                                       bounds=bounds,
385                                       verbose=True,
386                                       options=options)
387    slm = slr.optimize(bundle, bundle2)
388
389    new_bundle2 = slm.transform(bundle2)
390
391    slr2 = StreamlineLinearRegistration(metric=metric,
392                                        x0=x0,
393                                        method='Powell',
394                                        bounds=None,
395                                        verbose=True,
396                                        options=None)
397
398    slm2 = slr2.optimize(bundle, new_bundle2)
399
400    new_bundle2 = slm2.transform(new_bundle2)
401
402    evaluate_convergence(bundle, new_bundle2)
403
404
405def test_vectorize_streamlines():
406
407    cingulum_bundles = two_cingulum_bundles()
408
409    cb_subj1 = cingulum_bundles[0]
410    cb_subj1 = set_number_of_points(cb_subj1, 10)
411    cb_subj1_pts_no = np.array([s.shape[0] for s in cb_subj1])
412
413    assert_equal(np.all(cb_subj1_pts_no == 10), True)
414
415
416def test_x0_input():
417
418    for x0 in [6, 7, 12, "Rigid", 'rigid', "similarity", "Affine"]:
419        StreamlineLinearRegistration(x0=x0)
420
421    for x0 in [np.random.rand(6), np.random.rand(7), np.random.rand(12)]:
422        StreamlineLinearRegistration(x0=x0)
423
424    for x0 in [8, 20, "Whatever", np.random.rand(20), np.random.rand(20, 3)]:
425        assert_raises(ValueError, StreamlineLinearRegistration, x0=x0)
426
427    x0 = np.random.rand(4, 3)
428    assert_raises(ValueError, StreamlineLinearRegistration, x0=x0)
429
430    x0_6 = np.zeros(6)
431    x0_7 = np.array([0, 0, 0, 0, 0, 0, 1.])
432    x0_12 = np.array([0, 0, 0, 0, 0, 0, 1., 1., 1., 0, 0, 0])
433
434    x0_s = [x0_6, x0_7, x0_12, x0_6, x0_7, x0_12]
435
436    for i, x0 in enumerate([6, 7, 12, "Rigid", "similarity", "Affine"]):
437        slr = StreamlineLinearRegistration(x0=x0)
438        assert_equal(slr.x0, x0_s[i])
439
440
441def test_compose_decompose_matrix44():
442
443    for i in range(20):
444        x0 = np.random.rand(12)
445        mat = compose_matrix44(x0[:6])
446        assert_array_almost_equal(x0[:6], decompose_matrix44(mat, size=6))
447        mat = compose_matrix44(x0[:7])
448        assert_array_almost_equal(x0[:7], decompose_matrix44(mat, size=7))
449        mat = compose_matrix44(x0[:12])
450        assert_array_almost_equal(x0[:12], decompose_matrix44(mat, size=12))
451
452    assert_raises(ValueError, decompose_matrix44, mat, 20)
453
454
455def test_cascade_of_optimizations_and_threading():
456
457    cingulum_bundles = two_cingulum_bundles()
458
459    cb1 = cingulum_bundles[0]
460    cb1 = set_number_of_points(cb1, 20)
461
462    test_x0 = np.array([10, 4, 3, 0, 20, 10, 1.5, 1.5, 1.5, 0., 0.2, 0])
463
464    cb2 = transform_streamlines(cingulum_bundles[0],
465                                compose_matrix44(test_x0))
466    cb2 = set_number_of_points(cb2, 20)
467
468    print('first rigid')
469    slr = StreamlineLinearRegistration(x0=6, num_threads=1)
470    slm = slr.optimize(cb1, cb2)
471
472    print('then similarity')
473    slr2 = StreamlineLinearRegistration(x0=7, num_threads=2)
474    slm2 = slr2.optimize(cb1, cb2, slm.matrix)
475
476    print('then affine')
477    slr3 = StreamlineLinearRegistration(x0=12, options={'maxiter': 50},
478                                        num_threads=None)
479    slm3 = slr3.optimize(cb1, cb2, slm2.matrix)
480
481    assert_(slm2.fopt < slm.fopt)
482    assert_(slm3.fopt < slm2.fopt)
483
484
485def test_wrong_num_threads():
486    A = [np.random.rand(10, 3), np.random.rand(10, 3)]
487    B = [np.random.rand(10, 3), np.random.rand(10, 3)]
488
489    slr = StreamlineLinearRegistration(num_threads=0)
490    assert_raises(ValueError, slr.optimize, A, B)
491
492
493if __name__ == '__main__':
494
495    run_module_suite()
496