1"""
2Test the decoder module
3"""
4
5# Author: Andres Hoyos-Idrobo
6#         Binh Nguyen
7#         Thomas Bazeiile
8#
9# License: simplified BSD
10
11import warnings
12
13import numpy as np
14import pytest
15from nilearn._utils.param_validation import check_feature_screening
16from nilearn.decoding.decoder import (Decoder, DecoderRegressor,
17                                      FREMClassifier, FREMRegressor,
18                                      _BaseDecoder, _check_estimator,
19                                      _check_param_grid, _parallel_fit)
20from nilearn.decoding.tests.test_same_api import to_niimgs
21from nilearn.input_data import NiftiMasker
22from sklearn.datasets import load_iris, make_classification, make_regression
23from sklearn.dummy import DummyClassifier, DummyRegressor
24from sklearn.ensemble import RandomForestClassifier
25from sklearn.exceptions import NotFittedError
26from sklearn.linear_model import LogisticRegression, RidgeClassifierCV, RidgeCV
27from sklearn.metrics import accuracy_score, get_scorer, r2_score, roc_auc_score
28from sklearn.model_selection import KFold, LeaveOneGroupOut
29from sklearn.preprocessing import StandardScaler
30from sklearn.svm import SVR, LinearSVC
31
32try:
33    from sklearn.metrics import check_scoring
34except ImportError:
35    # for scikit-learn 0.18 and 0.19
36    from sklearn.metrics.scorer import check_scoring
37
38# Regression
39ridge = RidgeCV()
40svr = SVR(kernel='linear')
41# Classification
42svc = LinearSVC()
43logistic_l1 = LogisticRegression(penalty='l1')
44logistic_l2 = LogisticRegression(penalty='l2')
45ridge_classifier = RidgeClassifierCV()
46random_forest = RandomForestClassifier()
47
48dummy_classifier = DummyClassifier(random_state=0)
49dummy_regressor = DummyRegressor()
50
51regressors = {'ridge': (ridge, []),
52              'svr': (svr, 'C')}
53classifiers = {'svc': (svc, 'C'),
54               'logistic_l1': (logistic_l1, 'C'),
55               'logistic_l2': (logistic_l2, 'C'),
56               'ridge_classifier': (ridge_classifier, [])}
57# Create a test dataset
58rng = np.random.RandomState(0)
59X = rng.rand(100, 10)
60# Create different targets
61y_regression = rng.rand(100)
62y_classification = np.hstack([[-1] * 50, [1] * 50])
63y_classification_str = np.hstack([['face'] * 50, ['house'] * 50])
64y_multiclass = np.hstack([[0] * 35, [1] * 30, [2] * 35])
65
66
67def test_check_param_grid():
68    """testing several estimators, each one with its specific regularization
69    parameter
70    """
71
72    # Regression
73    for _, (regressor, param) in regressors.items():
74        param_grid = _check_param_grid(regressor, X, y_regression, None)
75        assert list(param_grid.keys()) == list(param)
76    # Classification
77    for _, (classifier, param) in classifiers.items():
78        param_grid = _check_param_grid(classifier, X, y_classification, None)
79        assert list(param_grid.keys()) == list(param)
80
81    # Using a non-linear estimator to raise the error
82    for estimator in ['log_l1', random_forest]:
83        pytest.raises(ValueError, _check_param_grid, estimator, X,
84                      y_classification, None)
85
86    # Test return parameter grid is empty
87    param_grid = _check_param_grid(dummy_classifier, X, y_classification, None)
88    assert param_grid == {}
89
90
91def test_check_inputs_length():
92    iris = load_iris()
93    X, y = iris.data, iris.target
94    y = 2 * (y > 0) - 1
95    X_, mask = to_niimgs(X, (2, 2, 2))
96
97    # Remove ten samples from y
98    y = y[:-10]
99
100    for model in [DecoderRegressor, Decoder, FREMRegressor, FREMClassifier]:
101        pytest.raises(ValueError, model(mask=mask,
102                                        screening_percentile=100.).fit, X_, y)
103
104
105def test_check_estimator():
106    """Check if the estimator is one of the supported estimators, and if not,
107    if it is a string, and if not, then raise the error
108    """
109
110    supported_estimators = ['svc', 'svc_l2', 'svc_l1',
111                            'logistic', 'logistic_l1', 'logistic_l2',
112                            'ridge', 'ridge_classifier',
113                            'ridge_regressor', 'svr', 'dummy_classifier',
114                            'dummy_regressor']
115    unsupported_estimators = ['ridgo', 'svb']
116    expected_warning = ('Use a custom estimator at your own risk '
117                        'of the process not working as intended.')
118
119    with warnings.catch_warnings(record=True) as raised_warnings:
120        for estimator in supported_estimators:
121            _check_estimator(_BaseDecoder(estimator=estimator).estimator)
122    warning_messages = [str(warning.message) for warning in raised_warnings]
123    assert expected_warning not in warning_messages
124
125    for estimator in unsupported_estimators:
126        pytest.raises(ValueError, _check_estimator,
127                      _BaseDecoder(estimator=estimator).estimator)
128    custom_estimator = random_forest
129    pytest.warns(UserWarning, _check_estimator,
130                 _BaseDecoder(estimator=custom_estimator).estimator)
131
132
133def test_parallel_fit():
134    """The goal of this test is to check that results of _parallel_fit is the
135    same for different controlled param_grid
136    """
137
138    X, y = make_regression(n_samples=100, n_features=20,
139                           n_informative=5, noise=0.2, random_state=42)
140    train = range(80)
141    test = range(80, len(y_classification))
142    outputs = []
143    estimator = svr
144    svr_params = [[1e-1, 1e0, 1e1], [1e-1, 1e0, 5e0, 1e1]]
145    scorer = check_scoring(estimator, 'r2')  #  define a scorer
146    # Define a screening selector
147    selector = check_feature_screening(screening_percentile=None,
148                                       mask_img=None, is_classification=False)
149    for params in svr_params:
150        param_grid = {}
151        param_grid['C'] = np.array(params)
152        outputs.append(list(_parallel_fit(estimator=estimator, X=X, y=y,
153                                          train=train, test=test,
154                                          param_grid=param_grid,
155                                          is_classification=False,
156                                          scorer=scorer, mask_img=None,
157                                          class_index=1,
158                                          selector=selector,
159                                          clustering_percentile=100)))
160    # check that every element of the output tuple is the same for both tries
161    for a, b in zip(outputs[0], outputs[1]):
162        if isinstance(a, np.ndarray):
163            np.testing.assert_array_almost_equal(a, b)
164        else:
165            assert a == b
166
167
168def test_decoder_binary_classification():
169    X, y = make_classification(n_samples=200, n_features=125, scale=3.0,
170                               n_informative=5, n_classes=2, random_state=42)
171    X, mask = to_niimgs(X, [5, 5, 5])
172
173    # check classification with masker object
174    model = Decoder(mask=NiftiMasker())
175    model.fit(X, y)
176    y_pred = model.predict(X)
177    assert model.scoring == "roc_auc"
178    assert model.score(X, y) == 1.0
179    assert accuracy_score(y, y_pred) > 0.95
180
181    # decoder object use predict_proba for scoring with logistic model
182    model = Decoder(estimator='logistic_l2', mask=mask)
183    model.fit(X, y)
184    y_pred = model.predict(X)
185    assert accuracy_score(y, y_pred) > 0.95
186
187    # check different screening_percentile value
188    for screening_percentile in [100, 20, None]:
189        model = Decoder(mask=mask, screening_percentile=screening_percentile)
190        model.fit(X, y)
191        y_pred = model.predict(X)
192        assert accuracy_score(y, y_pred) > 0.95
193
194    for clustering_percentile in [100, 99]:
195        model = FREMClassifier(estimator='logistic_l2', mask=mask,
196                               clustering_percentile=clustering_percentile,
197                               screening_percentile=90, cv=5)
198        model.fit(X, y)
199        y_pred = model.predict(X)
200        assert accuracy_score(y, y_pred) > 0.9
201
202    # check cross-validation scheme and fit attribute with groups enabled
203    rand_local = np.random.RandomState(42)
204    for cv in [KFold(n_splits=5), LeaveOneGroupOut()]:
205        model = Decoder(estimator='svc', mask=mask,
206                        standardize=True, cv=cv)
207        if isinstance(cv, LeaveOneGroupOut):
208            groups = rand_local.binomial(2, 0.3, size=len(y))
209        else:
210            groups = None
211        model.fit(X, y, groups=groups)
212        assert accuracy_score(y, y_pred) > 0.9
213
214
215def test_decoder_dummy_classifier():
216    n_samples = 400
217    X, y = make_classification(n_samples=n_samples, n_features=125, scale=3.0,
218                               n_informative=5, n_classes=2, random_state=42)
219    X, mask = to_niimgs(X, [5, 5, 5])
220
221    # We make 80% of y to have value of 1.0 to check whether the stratified
222    # strategy returns a proportion prediction value of 1.0 of roughly 80%
223    proportion = 0.8
224    y = np.zeros(n_samples)
225    y[:int(proportion * n_samples)] = 1.0
226
227    model = Decoder(estimator='dummy_classifier', mask=mask)
228    model.fit(X, y)
229    y_pred = model.predict(X)
230    assert np.sum(y_pred == 1.0) / n_samples - proportion < 0.05
231
232    # Set scoring of decoder with a callable
233    accuracy_scorer = get_scorer('accuracy')
234    model = Decoder(estimator='dummy_classifier', mask=mask,
235                    scoring=accuracy_scorer)
236    model.fit(X, y)
237    y_pred = model.predict(X)
238    assert model.scoring == accuracy_scorer
239    assert model.score(X, y) == accuracy_score(y, y_pred)
240
241    # An error should be raise when trying to compute the score without having
242    # called fit first.
243    model = Decoder(estimator='dummy_classifier', mask=mask)
244    with pytest.raises(
245            NotFittedError, match="This Decoder instance is not fitted yet."):
246        model.score(X, y)
247
248    # Decoder object use other strategy for dummy classifier.
249    param = dict(strategy='prior')
250    dummy_classifier.set_params(**param)
251    model = Decoder(estimator=dummy_classifier, mask=mask)
252    model.fit(X, y)
253    y_pred = model.predict(X)
254    assert np.all(y_pred) == 1.0
255    assert roc_auc_score(y, y_pred) == 0.5
256
257    # Same purpose with the above but for most_frequent strategy
258    param = dict(strategy='most_frequent')
259    dummy_classifier.set_params(**param)
260    model = Decoder(estimator=dummy_classifier, mask=mask)
261    model.fit(X, y)
262    y_pred = model.predict(X)
263    assert np.all(y_pred) == 1.0
264
265    # Returns model coefficients for dummy estimators as None
266    assert model.coef_ is None
267    # Dummy output are nothing but the attributes of the dummy estimators
268    assert model.dummy_output_ is not None
269    assert model.cv_scores_ is not None
270
271    # decoder object use other scoring metric for dummy classifier
272    model = Decoder(estimator='dummy_classifier', mask=mask, scoring='roc_auc')
273    model.fit(X, y)
274    assert np.mean(model.cv_scores_[0]) >= 0.45
275
276    # Raises a not implemented error with strategy constant
277    param = dict(strategy='constant')
278    dummy_classifier.set_params(**param)
279    model = Decoder(estimator=dummy_classifier, mask=mask)
280    pytest.raises(NotImplementedError, model.fit, X, y)
281
282    # Raises an error with unknown scoring metrics
283    model = Decoder(estimator=dummy_classifier, mask=mask, scoring="foo")
284    with pytest.raises(ValueError,
285                       match="'foo' is not a valid scoring value"):
286        model.fit(X, y)
287
288    # Default scoring
289    model = Decoder(estimator='dummy_classifier',
290                    scoring=None)
291    assert model.scoring is None
292    model.fit(X, y)
293    assert model.scorer_ == get_scorer("accuracy")
294    assert model.score(X, y) > 0.5
295
296
297def test_decoder_multiclass_classification():
298    X, y = make_classification(n_samples=200, n_features=125, scale=3.0,
299                               n_informative=5, n_classes=4, random_state=42)
300    X, mask = to_niimgs(X, [5, 5, 5])
301
302    # check classification with masker object
303    model = Decoder(mask=NiftiMasker())
304    model.fit(X, y)
305    y_pred = model.predict(X)
306    assert accuracy_score(y, y_pred) > 0.95
307
308    # check classification with masker object and dummy classifier
309    model = Decoder(estimator='dummy_classifier',
310                    mask=NiftiMasker(),
311                    scoring="accuracy")
312    model.fit(X, y)
313    y_pred = model.predict(X)
314    assert model.scoring == "accuracy"
315    # 4-class classification
316    assert accuracy_score(y, y_pred) > 0.2
317    assert model.score(X, y) == accuracy_score(y, y_pred)
318
319    # check different screening_percentile value
320    for screening_percentile in [100, 20, None]:
321        model = Decoder(mask=mask, screening_percentile=screening_percentile)
322        model.fit(X, y)
323        y_pred = model.predict(X)
324        assert accuracy_score(y, y_pred) > 0.95
325
326    # check FREM with clustering or not
327    for clustering_percentile in [100, 99]:
328        for estimator in ['svc_l2', 'svc_l1']:
329            model = FREMClassifier(estimator=estimator, mask=mask,
330                                   clustering_percentile=clustering_percentile,
331                                   screening_percentile=90,
332                                   cv=5)
333            model.fit(X, y)
334            y_pred = model.predict(X)
335            assert model.scoring == "roc_auc"
336            assert accuracy_score(y, y_pred) > 0.9
337
338    # check cross-validation scheme and fit attribute with groups enabled
339    rand_local = np.random.RandomState(42)
340    for cv in [KFold(n_splits=5), LeaveOneGroupOut()]:
341        model = Decoder(estimator='svc', mask=mask,
342                        standardize=True, cv=cv)
343        if isinstance(cv, LeaveOneGroupOut):
344            groups = rand_local.binomial(2, 0.3, size=len(y))
345        else:
346            groups = None
347        model.fit(X, y, groups=groups)
348        assert accuracy_score(y, y_pred) > 0.9
349
350
351def test_decoder_classification_string_label():
352    iris = load_iris()
353    X, y = iris.data, iris.target
354    X, mask = to_niimgs(X, [2, 2, 2])
355    labels = ['red', 'blue', 'green']
356    y_str = [labels[y[i]] for i in range(len(y))]
357
358    model = Decoder(mask=mask)
359    model.fit(X, y_str)
360    y_pred = model.predict(X)
361    assert accuracy_score(y_str, y_pred) > 0.95
362
363
364def test_decoder_regression():
365    dim = 30
366    X, y = make_regression(n_samples=100, n_features=dim**3, n_informative=dim,
367                           noise=1.5, bias=1.0, random_state=42)
368    X = StandardScaler().fit_transform(X)
369    X, mask = to_niimgs(X, [dim, dim, dim])
370    for reg in regressors:
371        for screening_percentile in [100, 20, 1, None]:
372            model = DecoderRegressor(estimator=reg, mask=mask,
373                                     screening_percentile=screening_percentile)
374            model.fit(X, y)
375            y_pred = model.predict(X)
376            assert r2_score(y, y_pred) > 0.95
377
378    dim = 5
379    X, y = make_regression(n_samples=100, n_features=dim**3, n_informative=dim,
380                           noise=1.5, bias=1.0, random_state=42)
381    X = StandardScaler().fit_transform(X)
382    X, mask = to_niimgs(X, [dim, dim, dim])
383
384    for clustering_percentile in [100, 99]:
385        model = FREMRegressor(estimator=reg, mask=mask,
386                              clustering_percentile=clustering_percentile,
387                              screening_percentile=90,
388                              cv=10)
389        model.fit(X, y)
390        y_pred = model.predict(X)
391        assert model.scoring == "r2"
392        assert r2_score(y, y_pred) > 0.95
393        assert model.score(X, y) == r2_score(y, y_pred)
394
395
396def test_decoder_dummy_regression():
397    dim = 30
398    X, y = make_regression(n_samples=100, n_features=dim**3, n_informative=dim,
399                           noise=1.5, bias=1.0, random_state=42)
400    X = StandardScaler().fit_transform(X)
401    X, mask = to_niimgs(X, [dim, dim, dim])
402
403    # Regression with dummy estimator
404    model = DecoderRegressor(estimator='dummy_regressor', mask=mask,
405                             scoring='r2', screening_percentile=1)
406    model.fit(X, y)
407    y_pred = model.predict(X)
408    assert model.scoring == "r2"
409    assert r2_score(y, y_pred) <= 0.
410    assert model.score(X, y) == r2_score(y, y_pred)
411
412    # Check that default scoring metric for regression is r2
413    model = DecoderRegressor(estimator='dummy_regressor',
414                             mask=mask,
415                             scoring=None)
416    model.fit(X, y)
417    y_pred = model.predict(X)
418    assert model.score(X, y) == r2_score(y, y_pred)
419
420    # decoder object use other strategy for dummy regressor
421    param = dict(strategy='median')
422    dummy_regressor.set_params(**param)
423    model = DecoderRegressor(estimator=dummy_regressor, mask=mask)
424    model.fit(X, y)
425    y_pred = model.predict(X)
426    assert r2_score(y, y_pred) <= 0.
427    # Returns model coefficients for dummy estimators as None
428    assert model.coef_ is None
429    # Dummy output are nothing but the attributes of the dummy estimators
430    assert model.dummy_output_ is not None
431    assert model.cv_scores_ is not None
432
433
434def test_decoder_apply_mask():
435    X_init, y = make_classification(n_samples=200, n_features=125, scale=3.0,
436                                    n_informative=5, n_classes=4,
437                                    random_state=42)
438    X, _ = to_niimgs(X_init, [5, 5, 5])
439    model = Decoder(mask=NiftiMasker())
440
441    X_masked = model._apply_mask(X)
442
443    # test whether if _apply mask output has the same shape as original matrix
444    assert X_masked.shape == X_init.shape
445
446    # test whether model.masker_ have some desire attributes manually set after
447    # calling _apply_mask; by default these parameters are set to None
448    target_affine = 2 * np.eye(4)
449    target_shape = (1, 1, 1)
450    t_r = 1
451    high_pass = 1
452    low_pass = 2
453    smoothing_fwhm = 0.5
454    model = Decoder(
455        target_affine=target_affine,
456        target_shape=target_shape,
457        t_r=t_r,
458        high_pass=high_pass,
459        low_pass=low_pass,
460        smoothing_fwhm=smoothing_fwhm
461    )
462
463    model._apply_mask(X)
464
465    assert np.any(model.masker_.target_affine == target_affine)
466    assert model.masker_.target_shape == target_shape
467    assert model.masker_.t_r == t_r
468    assert model.masker_.high_pass == high_pass
469    assert model.masker_.low_pass == low_pass
470    assert model.masker_.smoothing_fwhm == smoothing_fwhm
471
472
473def test_decoder_split_cv():
474    X, y = make_classification(n_samples=200, n_features=125, scale=3.0,
475                               n_informative=5, n_classes=4, random_state=42)
476    X, mask = to_niimgs(X, [5, 5, 5])
477    rand_local = np.random.RandomState(42)
478    groups = rand_local.binomial(2, 0.3, size=len(y))
479
480    # Check whether ValueError is raised when cv is not set correctly
481    for cv in ['abc', LinearSVC()]:
482        model = Decoder(mask=NiftiMasker(), cv=cv)
483        pytest.raises(ValueError, model.fit, X, y)
484
485    # Check whether decoder raised warning when groups is set to specific
486    # value but CV Splitter is not set
487    expected_warning = (
488        'groups parameter is specified but '
489        'cv parameter is not set to custom CV splitter. '
490        'Using default object LeaveOneGroupOut().'
491    )
492    with pytest.warns(UserWarning, match=expected_warning):
493        model = Decoder(mask=NiftiMasker())
494        model.fit(X, y, groups=groups)
495
496    # Check that warning is raised when n_features is lower than 50 after
497    # screening and clustering for FREM
498    with pytest.warns(UserWarning, match=".*screening_percentile parameters"):
499        model = FREMClassifier(clustering_percentile=10,
500                               screening_percentile=10, mask=NiftiMasker(),
501                               cv=1)
502        model.fit(X, y)
503