1import numpy as np 2from numpy.testing import (run_module_suite, 3 assert_, 4 assert_equal, 5 assert_almost_equal, 6 assert_array_equal, 7 assert_array_almost_equal, 8 assert_raises) 9from dipy.align.streamlinear import (compose_matrix44, 10 decompose_matrix44, 11 BundleSumDistanceMatrixMetric, 12 BundleMinDistanceMatrixMetric, 13 BundleMinDistanceMetric, 14 StreamlineLinearRegistration, 15 StreamlineDistanceMetric) 16 17from dipy.tracking.streamline import (center_streamlines, 18 unlist_streamlines, 19 relist_streamlines, 20 transform_streamlines, 21 set_number_of_points, 22 Streamlines) 23from dipy.io.streamline import load_tractogram 24from dipy.core.geometry import compose_matrix 25 26from dipy.data import get_fnames, two_cingulum_bundles 27from dipy.align.bundlemin import (_bundle_minimum_distance_matrix, 28 _bundle_minimum_distance, 29 distance_matrix_mdf) 30 31 32def simulated_bundle(no_streamlines=10, waves=False, no_pts=12): 33 t = np.linspace(-10, 10, 200) 34 # parallel waves or parallel lines 35 bundle = [] 36 for i in np.linspace(-5, 5, no_streamlines): 37 if waves: 38 pts = np.vstack((np.cos(t), t, i * np.ones(t.shape))).T 39 else: 40 pts = np.vstack((np.zeros(t.shape), t, i * np.ones(t.shape))).T 41 pts = set_number_of_points(pts, no_pts) 42 bundle.append(pts) 43 44 return bundle 45 46 47def fornix_streamlines(no_pts=12): 48 fname = get_fnames('fornix') 49 50 fornix = load_tractogram(fname, 'same', 51 bbox_valid_check=False).streamlines 52 53 fornix_streamlines = Streamlines(fornix) 54 streamlines = set_number_of_points(fornix_streamlines, no_pts) 55 return streamlines 56 57 58def evaluate_convergence(bundle, new_bundle2): 59 pts_static = np.concatenate(bundle, axis=0) 60 pts_moved = np.concatenate(new_bundle2, axis=0) 61 assert_array_almost_equal(pts_static, pts_moved, 3) 62 63 64def test_rigid_parallel_lines(): 65 66 bundle_initial = simulated_bundle() 67 bundle, shift = center_streamlines(bundle_initial) 68 mat = compose_matrix44([20, 0, 10, 0, 40, 0]) 69 70 bundle2 = transform_streamlines(bundle, mat) 71 72 bundle_sum_distance = BundleSumDistanceMatrixMetric() 73 options = {'maxcor': 100, 'ftol': 1e-9, 'gtol': 1e-16, 'eps': 1e-3} 74 srr = StreamlineLinearRegistration(metric=bundle_sum_distance, 75 x0=np.zeros(6), 76 method='L-BFGS-B', 77 bounds=None, 78 options=options) 79 80 new_bundle2 = srr.optimize(bundle, bundle2).transform(bundle2) 81 evaluate_convergence(bundle, new_bundle2) 82 83 84def test_rigid_real_bundles(): 85 86 bundle_initial = fornix_streamlines()[:20] 87 bundle, shift = center_streamlines(bundle_initial) 88 89 mat = compose_matrix44([0, 0, 20, 45., 0, 0]) 90 91 bundle2 = transform_streamlines(bundle, mat) 92 93 bundle_sum_distance = BundleSumDistanceMatrixMetric() 94 srr = StreamlineLinearRegistration(bundle_sum_distance, 95 x0=np.zeros(6), 96 method='Powell') 97 new_bundle2 = srr.optimize(bundle, bundle2).transform(bundle2) 98 99 evaluate_convergence(bundle, new_bundle2) 100 101 bundle_min_distance = BundleMinDistanceMatrixMetric() 102 srr = StreamlineLinearRegistration(bundle_min_distance, 103 x0=np.zeros(6), 104 method='Powell') 105 new_bundle2 = srr.optimize(bundle, bundle2).transform(bundle2) 106 107 evaluate_convergence(bundle, new_bundle2) 108 109 assert_raises(ValueError, StreamlineLinearRegistration, method='Whatever') 110 111 112def test_rigid_partial_real_bundles(): 113 114 static = fornix_streamlines()[:20] 115 moving = fornix_streamlines()[20:40] 116 static_center, shift = center_streamlines(static) 117 moving_center, shift2 = center_streamlines(moving) 118 119 print(shift2) 120 mat = compose_matrix(translate=np.array([0, 0, 0.]), 121 angles=np.deg2rad([40, 0, 0.])) 122 moved = transform_streamlines(moving_center, mat) 123 124 srr = StreamlineLinearRegistration() 125 126 srm = srr.optimize(static_center, moved) 127 print(srm.fopt) 128 print(srm.iterations) 129 print(srm.funcs) 130 131 moving_back = srm.transform(moved) 132 print(srm.matrix) 133 134 static_center = set_number_of_points(static_center, 100) 135 moving_center = set_number_of_points(moving_back, 100) 136 137 vol = np.zeros((100, 100, 100)) 138 spts = np.concatenate(static_center, axis=0) 139 spts = np.round(spts).astype(int) + np.array([50, 50, 50]) 140 141 mpts = np.concatenate(moving_center, axis=0) 142 mpts = np.round(mpts).astype(int) + np.array([50, 50, 50]) 143 144 for index in spts: 145 i, j, k = index 146 vol[i, j, k] = 1 147 148 vol2 = np.zeros((100, 100, 100)) 149 for index in mpts: 150 i, j, k = index 151 vol2[i, j, k] = 1 152 153 overlap = np.sum(np.logical_and(vol, vol2)) / float(np.sum(vol2)) 154 155 assert_equal(overlap * 100 > 40, True) 156 157 158def test_stream_rigid(): 159 160 static = fornix_streamlines()[:20] 161 moving = fornix_streamlines()[20:40] 162 center_streamlines(static) 163 164 mat = compose_matrix44([0, 0, 0, 0, 40, 0]) 165 moving = transform_streamlines(moving, mat) 166 167 srr = StreamlineLinearRegistration() 168 sr_params = srr.optimize(static, moving) 169 moved = transform_streamlines(moving, sr_params.matrix) 170 171 srr = StreamlineLinearRegistration(verbose=True) 172 srm = srr.optimize(static, moving) 173 moved2 = transform_streamlines(moving, srm.matrix) 174 moved3 = srm.transform(moving) 175 176 assert_array_almost_equal(moved[0], moved2[0], decimal=3) 177 assert_array_almost_equal(moved2[0], moved3[0], decimal=3) 178 179 180def test_min_vs_min_fast_precision(): 181 182 static = fornix_streamlines()[:20] 183 moving = fornix_streamlines()[:20] 184 185 static = [s.astype('f8') for s in static] 186 moving = [m.astype('f8') for m in moving] 187 188 bmd = BundleMinDistanceMatrixMetric() 189 bmd.setup(static, moving) 190 191 bmdf = BundleMinDistanceMetric() 192 bmdf.setup(static, moving) 193 194 x_test = [0.01, 0, 0, 0, 0, 0] 195 196 print(bmd.distance(x_test)) 197 print(bmdf.distance(x_test)) 198 assert_equal(bmd.distance(x_test), bmdf.distance(x_test)) 199 200 201def test_same_number_of_points(): 202 203 A = [np.random.rand(10, 3), np.random.rand(20, 3)] 204 B = [np.random.rand(21, 3), np.random.rand(30, 3)] 205 C = [np.random.rand(10, 3), np.random.rand(10, 3)] 206 D = [np.random.rand(20, 3), np.random.rand(20, 3)] 207 208 slr = StreamlineLinearRegistration() 209 assert_raises(ValueError, slr.optimize, A, B) 210 assert_raises(ValueError, slr.optimize, C, D) 211 assert_raises(ValueError, slr.optimize, C, B) 212 213 214def test_efficient_bmd(): 215 216 a = np.array([[1, 1, 1], 217 [2, 2, 2], 218 [3, 3, 3]]) 219 220 streamlines = [a, a + 2, a + 4] 221 222 points, offsets = unlist_streamlines(streamlines) 223 points = points.astype(np.double) 224 points2 = points.copy() 225 226 D = np.zeros((len(offsets), len(offsets)), dtype='f8') 227 228 _bundle_minimum_distance_matrix(points, points2, 229 len(offsets), len(offsets), 230 a.shape[0], D) 231 232 assert_equal(np.sum(np.diag(D)), 0) 233 234 points2 += 2 235 236 _bundle_minimum_distance_matrix(points, points2, 237 len(offsets), len(offsets), 238 a.shape[0], D) 239 240 streamlines2 = relist_streamlines(points2, offsets) 241 D2 = distance_matrix_mdf(streamlines, streamlines2) 242 243 assert_array_almost_equal(D, D2) 244 245 cols = D2.shape[1] 246 rows = D2.shape[0] 247 248 dist = 0.25 * (np.sum(np.min(D2, axis=0)) / float(cols) + 249 np.sum(np.min(D2, axis=1)) / float(rows)) ** 2 250 251 dist2 = _bundle_minimum_distance(points, points2, 252 len(offsets), len(offsets), 253 a.shape[0]) 254 assert_almost_equal(dist, dist2) 255 256 257def test_openmp_locks(): 258 259 static = [] 260 moving = [] 261 pts = 20 262 263 for i in range(1000): 264 s = np.random.rand(pts, 3) 265 static.append(s) 266 moving.append(s + 2) 267 268 moving = moving[2:] 269 270 points, offsets = unlist_streamlines(static) 271 points2, offsets2 = unlist_streamlines(moving) 272 273 D = np.zeros((len(offsets), len(offsets2)), dtype='f8') 274 275 _bundle_minimum_distance_matrix(points, points2, 276 len(offsets), len(offsets2), 277 pts, D) 278 279 dist1 = 0.25 * (np.sum(np.min(D, axis=0)) / float(D.shape[1]) + 280 np.sum(np.min(D, axis=1)) / float(D.shape[0])) ** 2 281 282 dist2 = _bundle_minimum_distance(points, points2, 283 len(offsets), len(offsets2), 284 pts) 285 286 assert_almost_equal(dist1, dist2, 6) 287 288 289def test_from_to_rigid(): 290 291 t = np.array([10, 2, 3, 0.1, 20., 30.]) 292 mat = compose_matrix44(t) 293 vec = decompose_matrix44(mat, 6) 294 295 assert_array_almost_equal(t, vec) 296 297 t = np.array([0, 0, 0, 180, 0., 0.]) 298 299 mat = np.eye(4) 300 mat[0, 0] = -1 301 302 vec = decompose_matrix44(mat, 6) 303 304 assert_array_almost_equal(-t, vec) 305 306 307def test_matrix44(): 308 309 assert_raises(ValueError, compose_matrix44, np.ones(5)) 310 assert_raises(ValueError, compose_matrix44, np.ones(13)) 311 assert_raises(ValueError, compose_matrix44, np.ones(16)) 312 313 314def test_abstract_metric_class(): 315 316 class DummyStreamlineMetric(StreamlineDistanceMetric): 317 def test(): 318 pass 319 assert_raises(TypeError, DummyStreamlineMetric) 320 321 322def test_evolution_of_previous_iterations(): 323 324 static = fornix_streamlines()[:20] 325 moving = fornix_streamlines()[:20] 326 327 moving = [m + np.array([10., 0., 0.]) for m in moving] 328 329 slr = StreamlineLinearRegistration(evolution=True) 330 331 slm = slr.optimize(static, moving) 332 333 assert_equal(len(slm.matrix_history), slm.iterations) 334 335 336def test_similarity_real_bundles(): 337 338 bundle_initial = fornix_streamlines() 339 bundle_initial, shift = center_streamlines(bundle_initial) 340 bundle = bundle_initial[:20] 341 xgold = [0, 0, 10, 0, 0, 0, 1.5] 342 mat = compose_matrix44(xgold) 343 bundle2 = transform_streamlines(bundle_initial[:20], mat) 344 345 metric = BundleMinDistanceMatrixMetric() 346 x0 = np.array([0, 0, 0, 0, 0, 0, 1], 'f8') 347 348 slr = StreamlineLinearRegistration(metric=metric, 349 x0=x0, 350 method='Powell', 351 bounds=None, 352 verbose=False) 353 354 slm = slr.optimize(bundle, bundle2) 355 new_bundle2 = slm.transform(bundle2) 356 evaluate_convergence(bundle, new_bundle2) 357 358 359def test_affine_real_bundles(): 360 361 bundle_initial = fornix_streamlines() 362 bundle_initial, shift = center_streamlines(bundle_initial) 363 bundle = bundle_initial[:20] 364 xgold = [0, 4, 2, 0, 10, 10, 1.2, 1.1, 1., 0., 0.2, 0.] 365 mat = compose_matrix44(xgold) 366 bundle2 = transform_streamlines(bundle_initial[:20], mat) 367 368 x0 = np.array([0, 0, 0, 0, 0, 0, 1., 1., 1., 0, 0, 0]) 369 370 x = 25 371 372 bounds = [(-x, x), (-x, x), (-x, x), 373 (-x, x), (-x, x), (-x, x), 374 (0.1, 1.5), (0.1, 1.5), (0.1, 1.5), 375 (-1, 1), (-1, 1), (-1, 1)] 376 377 options = {'maxcor': 10, 'ftol': 1e-7, 'gtol': 1e-5, 'eps': 1e-8} 378 379 metric = BundleMinDistanceMatrixMetric() 380 381 slr = StreamlineLinearRegistration(metric=metric, 382 x0=x0, 383 method='L-BFGS-B', 384 bounds=bounds, 385 verbose=True, 386 options=options) 387 slm = slr.optimize(bundle, bundle2) 388 389 new_bundle2 = slm.transform(bundle2) 390 391 slr2 = StreamlineLinearRegistration(metric=metric, 392 x0=x0, 393 method='Powell', 394 bounds=None, 395 verbose=True, 396 options=None) 397 398 slm2 = slr2.optimize(bundle, new_bundle2) 399 400 new_bundle2 = slm2.transform(new_bundle2) 401 402 evaluate_convergence(bundle, new_bundle2) 403 404 405def test_vectorize_streamlines(): 406 407 cingulum_bundles = two_cingulum_bundles() 408 409 cb_subj1 = cingulum_bundles[0] 410 cb_subj1 = set_number_of_points(cb_subj1, 10) 411 cb_subj1_pts_no = np.array([s.shape[0] for s in cb_subj1]) 412 413 assert_equal(np.all(cb_subj1_pts_no == 10), True) 414 415 416def test_x0_input(): 417 418 for x0 in [6, 7, 12, "Rigid", 'rigid', "similarity", "Affine"]: 419 StreamlineLinearRegistration(x0=x0) 420 421 for x0 in [np.random.rand(6), np.random.rand(7), np.random.rand(12)]: 422 StreamlineLinearRegistration(x0=x0) 423 424 for x0 in [8, 20, "Whatever", np.random.rand(20), np.random.rand(20, 3)]: 425 assert_raises(ValueError, StreamlineLinearRegistration, x0=x0) 426 427 x0 = np.random.rand(4, 3) 428 assert_raises(ValueError, StreamlineLinearRegistration, x0=x0) 429 430 x0_6 = np.zeros(6) 431 x0_7 = np.array([0, 0, 0, 0, 0, 0, 1.]) 432 x0_12 = np.array([0, 0, 0, 0, 0, 0, 1., 1., 1., 0, 0, 0]) 433 434 x0_s = [x0_6, x0_7, x0_12, x0_6, x0_7, x0_12] 435 436 for i, x0 in enumerate([6, 7, 12, "Rigid", "similarity", "Affine"]): 437 slr = StreamlineLinearRegistration(x0=x0) 438 assert_equal(slr.x0, x0_s[i]) 439 440 441def test_compose_decompose_matrix44(): 442 443 for i in range(20): 444 x0 = np.random.rand(12) 445 mat = compose_matrix44(x0[:6]) 446 assert_array_almost_equal(x0[:6], decompose_matrix44(mat, size=6)) 447 mat = compose_matrix44(x0[:7]) 448 assert_array_almost_equal(x0[:7], decompose_matrix44(mat, size=7)) 449 mat = compose_matrix44(x0[:12]) 450 assert_array_almost_equal(x0[:12], decompose_matrix44(mat, size=12)) 451 452 assert_raises(ValueError, decompose_matrix44, mat, 20) 453 454 455def test_cascade_of_optimizations_and_threading(): 456 457 cingulum_bundles = two_cingulum_bundles() 458 459 cb1 = cingulum_bundles[0] 460 cb1 = set_number_of_points(cb1, 20) 461 462 test_x0 = np.array([10, 4, 3, 0, 20, 10, 1.5, 1.5, 1.5, 0., 0.2, 0]) 463 464 cb2 = transform_streamlines(cingulum_bundles[0], 465 compose_matrix44(test_x0)) 466 cb2 = set_number_of_points(cb2, 20) 467 468 print('first rigid') 469 slr = StreamlineLinearRegistration(x0=6, num_threads=1) 470 slm = slr.optimize(cb1, cb2) 471 472 print('then similarity') 473 slr2 = StreamlineLinearRegistration(x0=7, num_threads=2) 474 slm2 = slr2.optimize(cb1, cb2, slm.matrix) 475 476 print('then affine') 477 slr3 = StreamlineLinearRegistration(x0=12, options={'maxiter': 50}, 478 num_threads=None) 479 slm3 = slr3.optimize(cb1, cb2, slm2.matrix) 480 481 assert_(slm2.fopt < slm.fopt) 482 assert_(slm3.fopt < slm2.fopt) 483 484 485def test_wrong_num_threads(): 486 A = [np.random.rand(10, 3), np.random.rand(10, 3)] 487 B = [np.random.rand(10, 3), np.random.rand(10, 3)] 488 489 slr = StreamlineLinearRegistration(num_threads=0) 490 assert_raises(ValueError, slr.optimize, A, B) 491 492 493if __name__ == '__main__': 494 495 run_module_suite() 496