1from typing import Union
2import xgboost as xgb
3import pytest
4import os
5import testing as tm
6import tempfile
7
8# We use the dataset for tests.
9pytestmark = pytest.mark.skipif(**tm.no_sklearn())
10
11
12class TestCallbacks:
13    @classmethod
14    def setup_class(cls):
15        from sklearn.datasets import load_breast_cancer
16        X, y = load_breast_cancer(return_X_y=True)
17        cls.X = X
18        cls.y = y
19
20        split = int(X.shape[0]*0.8)
21        cls.X_train = X[: split, ...]
22        cls.y_train = y[: split, ...]
23        cls.X_valid = X[split:, ...]
24        cls.y_valid = y[split:, ...]
25
26    def run_evaluation_monitor(
27        self,
28        D_train: xgb.DMatrix,
29        D_valid: xgb.DMatrix,
30        rounds: int,
31        verbose_eval: Union[bool, int]
32    ):
33        def check_output(output: str) -> None:
34            if int(verbose_eval) == 1:
35                # Should print each iteration info
36                assert len(output.split('\n')) == rounds
37            elif int(verbose_eval) > rounds:
38                # Should print first and latest iteration info
39                assert len(output.split('\n')) == 2
40            else:
41                # Should print info by each period additionaly to first and latest
42                # iteration
43                num_periods = rounds // int(verbose_eval)
44                # Extra information is required for latest iteration
45                is_extra_info_required = num_periods * int(verbose_eval) < (rounds - 1)
46                assert len(output.split('\n')) == (
47                    1 + num_periods + int(is_extra_info_required)
48                )
49
50        evals_result: xgb.callback.TrainingCallback.EvalsLog = {}
51        params = {'objective': 'binary:logistic', 'eval_metric': 'error'}
52        with tm.captured_output() as (out, err):
53            xgb.train(
54                params, D_train,
55                evals=[(D_train, 'Train'), (D_valid, 'Valid')],
56                num_boost_round=rounds,
57                evals_result=evals_result,
58                verbose_eval=verbose_eval,
59            )
60            output: str = out.getvalue().strip()
61            check_output(output)
62
63        with tm.captured_output() as (out, err):
64            xgb.cv(params, D_train, num_boost_round=rounds, verbose_eval=verbose_eval)
65            output = out.getvalue().strip()
66            check_output(output)
67
68    def test_evaluation_monitor(self):
69        D_train = xgb.DMatrix(self.X_train, self.y_train)
70        D_valid = xgb.DMatrix(self.X_valid, self.y_valid)
71        evals_result = {}
72        rounds = 10
73        xgb.train({'objective': 'binary:logistic',
74                   'eval_metric': 'error'}, D_train,
75                  evals=[(D_train, 'Train'), (D_valid, 'Valid')],
76                  num_boost_round=rounds,
77                  evals_result=evals_result,
78                  verbose_eval=True)
79        assert len(evals_result['Train']['error']) == rounds
80        assert len(evals_result['Valid']['error']) == rounds
81
82        self.run_evaluation_monitor(D_train, D_valid, rounds, True)
83        self.run_evaluation_monitor(D_train, D_valid, rounds, 2)
84        self.run_evaluation_monitor(D_train, D_valid, rounds, 4)
85        self.run_evaluation_monitor(D_train, D_valid, rounds, rounds + 1)
86
87    def test_early_stopping(self):
88        D_train = xgb.DMatrix(self.X_train, self.y_train)
89        D_valid = xgb.DMatrix(self.X_valid, self.y_valid)
90        evals_result = {}
91        rounds = 30
92        early_stopping_rounds = 5
93        booster = xgb.train({'objective': 'binary:logistic',
94                             'eval_metric': 'error'}, D_train,
95                            evals=[(D_train, 'Train'), (D_valid, 'Valid')],
96                            num_boost_round=rounds,
97                            evals_result=evals_result,
98                            verbose_eval=True,
99                            early_stopping_rounds=early_stopping_rounds)
100        dump = booster.get_dump(dump_format='json')
101        assert len(dump) - booster.best_iteration == early_stopping_rounds + 1
102
103        # No early stopping, best_iteration should be set to last epoch
104        booster = xgb.train({'objective': 'binary:logistic',
105                             'eval_metric': 'error'}, D_train,
106                            evals=[(D_train, 'Train'), (D_valid, 'Valid')],
107                            num_boost_round=10,
108                            evals_result=evals_result,
109                            verbose_eval=True)
110        assert booster.num_boosted_rounds() - 1 == booster.best_iteration
111
112    def test_early_stopping_custom_eval(self):
113        D_train = xgb.DMatrix(self.X_train, self.y_train)
114        D_valid = xgb.DMatrix(self.X_valid, self.y_valid)
115        early_stopping_rounds = 5
116        booster = xgb.train({'objective': 'binary:logistic',
117                             'eval_metric': 'error',
118                             'tree_method': 'hist'}, D_train,
119                            evals=[(D_train, 'Train'), (D_valid, 'Valid')],
120                            feval=tm.eval_error_metric,
121                            num_boost_round=1000,
122                            early_stopping_rounds=early_stopping_rounds,
123                            verbose_eval=False)
124        dump = booster.get_dump(dump_format='json')
125        assert len(dump) - booster.best_iteration == early_stopping_rounds + 1
126
127    def test_early_stopping_customize(self):
128        D_train = xgb.DMatrix(self.X_train, self.y_train)
129        D_valid = xgb.DMatrix(self.X_valid, self.y_valid)
130        early_stopping_rounds = 5
131        early_stop = xgb.callback.EarlyStopping(rounds=early_stopping_rounds,
132                                                metric_name='CustomErr',
133                                                data_name='Train')
134        # Specify which dataset and which metric should be used for early stopping.
135        booster = xgb.train(
136            {'objective': 'binary:logistic',
137             'eval_metric': ['error', 'rmse'],
138             'tree_method': 'hist'}, D_train,
139            evals=[(D_train, 'Train'), (D_valid, 'Valid')],
140            feval=tm.eval_error_metric,
141            num_boost_round=1000,
142            callbacks=[early_stop],
143            verbose_eval=False)
144        dump = booster.get_dump(dump_format='json')
145        assert len(dump) - booster.best_iteration == early_stopping_rounds + 1
146        assert len(early_stop.stopping_history['Train']['CustomErr']) == len(dump)
147
148        rounds = 100
149        early_stop = xgb.callback.EarlyStopping(
150            rounds=early_stopping_rounds,
151            metric_name='CustomErr',
152            data_name='Train',
153            min_delta=100,
154            save_best=True,
155        )
156        booster = xgb.train(
157            {
158                'objective': 'binary:logistic',
159                'eval_metric': ['error', 'rmse'],
160                'tree_method': 'hist'
161            },
162            D_train,
163            evals=[(D_train, 'Train'), (D_valid, 'Valid')],
164            feval=tm.eval_error_metric,
165            num_boost_round=rounds,
166            callbacks=[early_stop],
167            verbose_eval=False
168        )
169        # No iteration can be made with min_delta == 100
170        assert booster.best_iteration == 0
171        assert booster.num_boosted_rounds() == 1
172
173    def test_early_stopping_skl(self):
174        from sklearn.datasets import load_breast_cancer
175        X, y = load_breast_cancer(return_X_y=True)
176        cls = xgb.XGBClassifier()
177        early_stopping_rounds = 5
178        cls.fit(X, y, eval_set=[(X, y)],
179                early_stopping_rounds=early_stopping_rounds, eval_metric='error')
180        booster = cls.get_booster()
181        dump = booster.get_dump(dump_format='json')
182        assert len(dump) - booster.best_iteration == early_stopping_rounds + 1
183
184    def test_early_stopping_custom_eval_skl(self):
185        from sklearn.datasets import load_breast_cancer
186        X, y = load_breast_cancer(return_X_y=True)
187        cls = xgb.XGBClassifier()
188        early_stopping_rounds = 5
189        early_stop = xgb.callback.EarlyStopping(rounds=early_stopping_rounds)
190        cls.fit(X, y, eval_set=[(X, y)],
191                eval_metric=tm.eval_error_metric,
192                callbacks=[early_stop])
193        booster = cls.get_booster()
194        dump = booster.get_dump(dump_format='json')
195        assert len(dump) - booster.best_iteration == early_stopping_rounds + 1
196
197    def test_early_stopping_save_best_model(self):
198        from sklearn.datasets import load_breast_cancer
199        X, y = load_breast_cancer(return_X_y=True)
200        n_estimators = 100
201        cls = xgb.XGBClassifier(n_estimators=n_estimators)
202        early_stopping_rounds = 5
203        early_stop = xgb.callback.EarlyStopping(rounds=early_stopping_rounds,
204                                                save_best=True)
205        cls.fit(X, y, eval_set=[(X, y)],
206                eval_metric=tm.eval_error_metric, callbacks=[early_stop])
207        booster = cls.get_booster()
208        dump = booster.get_dump(dump_format='json')
209        assert len(dump) == booster.best_iteration + 1
210
211        early_stop = xgb.callback.EarlyStopping(rounds=early_stopping_rounds,
212                                                save_best=True)
213        cls = xgb.XGBClassifier(booster='gblinear', n_estimators=10)
214        with pytest.raises(ValueError):
215            cls.fit(X, y, eval_set=[(X, y)], eval_metric=tm.eval_error_metric,
216                    callbacks=[early_stop])
217
218        # No error
219        early_stop = xgb.callback.EarlyStopping(rounds=early_stopping_rounds,
220                                                save_best=False)
221        xgb.XGBClassifier(booster='gblinear', n_estimators=10).fit(
222            X, y, eval_set=[(X, y)],
223            eval_metric=tm.eval_error_metric,
224            callbacks=[early_stop])
225
226    def test_early_stopping_continuation(self):
227        from sklearn.datasets import load_breast_cancer
228        X, y = load_breast_cancer(return_X_y=True)
229        cls = xgb.XGBClassifier()
230        early_stopping_rounds = 5
231        early_stop = xgb.callback.EarlyStopping(rounds=early_stopping_rounds,
232                                                save_best=True)
233        cls.fit(X, y, eval_set=[(X, y)],
234                eval_metric=tm.eval_error_metric,
235                callbacks=[early_stop])
236        booster = cls.get_booster()
237        assert booster.num_boosted_rounds() == booster.best_iteration + 1
238
239        with tempfile.TemporaryDirectory() as tmpdir:
240            path = os.path.join(tmpdir, 'model.json')
241            cls.save_model(path)
242            cls = xgb.XGBClassifier()
243            cls.load_model(path)
244            assert cls._Booster is not None
245            early_stopping_rounds = 3
246            cls.fit(X, y, eval_set=[(X, y)], eval_metric=tm.eval_error_metric,
247                    early_stopping_rounds=early_stopping_rounds)
248            booster = cls.get_booster()
249            assert booster.num_boosted_rounds() == \
250                booster.best_iteration + early_stopping_rounds + 1
251
252    def run_eta_decay(self, tree_method, deprecated_callback):
253        """Test learning rate scheduler, used by both CPU and GPU tests."""
254        if deprecated_callback:
255            scheduler = xgb.callback.reset_learning_rate
256        else:
257            scheduler = xgb.callback.LearningRateScheduler
258
259        dpath = os.path.join(tm.PROJECT_ROOT, 'demo/data/')
260        dtrain = xgb.DMatrix(dpath + 'agaricus.txt.train')
261        dtest = xgb.DMatrix(dpath + 'agaricus.txt.test')
262        watchlist = [(dtest, 'eval'), (dtrain, 'train')]
263        num_round = 4
264
265        if deprecated_callback:
266            warning_check = pytest.warns(UserWarning)
267        else:
268            warning_check = tm.noop_context()
269
270        # learning_rates as a list
271        # init eta with 0 to check whether learning_rates work
272        param = {'max_depth': 2, 'eta': 0, 'verbosity': 0,
273                 'objective': 'binary:logistic', 'eval_metric': 'error',
274                 'tree_method': tree_method}
275        evals_result = {}
276        with warning_check:
277            bst = xgb.train(param, dtrain, num_round, watchlist,
278                            callbacks=[scheduler([
279                                0.8, 0.7, 0.6, 0.5
280                            ])],
281                            evals_result=evals_result)
282        eval_errors_0 = list(map(float, evals_result['eval']['error']))
283        assert isinstance(bst, xgb.core.Booster)
284        # validation error should decrease, if eta > 0
285        assert eval_errors_0[0] > eval_errors_0[-1]
286
287        # init learning_rate with 0 to check whether learning_rates work
288        param = {'max_depth': 2, 'learning_rate': 0, 'verbosity': 0,
289                 'objective': 'binary:logistic', 'eval_metric': 'error',
290                 'tree_method': tree_method}
291        evals_result = {}
292        with warning_check:
293            bst = xgb.train(param, dtrain, num_round, watchlist,
294                            callbacks=[scheduler(
295                                [0.8, 0.7, 0.6, 0.5])],
296                            evals_result=evals_result)
297        eval_errors_1 = list(map(float, evals_result['eval']['error']))
298        assert isinstance(bst, xgb.core.Booster)
299        # validation error should decrease, if learning_rate > 0
300        assert eval_errors_1[0] > eval_errors_1[-1]
301
302        # check if learning_rates override default value of eta/learning_rate
303        param = {
304            'max_depth': 2, 'verbosity': 0, 'objective': 'binary:logistic',
305            'eval_metric': 'error', 'tree_method': tree_method
306        }
307        evals_result = {}
308        with warning_check:
309            bst = xgb.train(param, dtrain, num_round, watchlist,
310                            callbacks=[scheduler(
311                                [0, 0, 0, 0]
312                            )],
313                            evals_result=evals_result)
314        eval_errors_2 = list(map(float, evals_result['eval']['error']))
315        assert isinstance(bst, xgb.core.Booster)
316        # validation error should not decrease, if eta/learning_rate = 0
317        assert eval_errors_2[0] == eval_errors_2[-1]
318
319        # learning_rates as a customized decay function
320        def eta_decay(ithround, num_boost_round=num_round):
321            return num_boost_round / (ithround + 1)
322
323        evals_result = {}
324        with warning_check:
325            bst = xgb.train(param, dtrain, num_round, watchlist,
326                            callbacks=[
327                                scheduler(eta_decay)
328                            ],
329                            evals_result=evals_result)
330        eval_errors_3 = list(map(float, evals_result['eval']['error']))
331
332        assert isinstance(bst, xgb.core.Booster)
333
334        assert eval_errors_3[0] == eval_errors_2[0]
335
336        for i in range(1, len(eval_errors_0)):
337            assert eval_errors_3[i] != eval_errors_2[i]
338
339        with warning_check:
340            xgb.cv(param, dtrain, num_round, callbacks=[scheduler(eta_decay)])
341
342    @pytest.mark.parametrize(
343        "tree_method, deprecated_callback",
344        [
345            ("hist", True),
346            ("hist", False),
347            ("approx", True),
348            ("approx", False),
349            ("exact", True),
350            ("exact", False),
351        ],
352    )
353    def test_eta_decay(self, tree_method, deprecated_callback):
354        self.run_eta_decay(tree_method, deprecated_callback)
355
356    def test_check_point(self):
357        from sklearn.datasets import load_breast_cancer
358        X, y = load_breast_cancer(return_X_y=True)
359        m = xgb.DMatrix(X, y)
360        with tempfile.TemporaryDirectory() as tmpdir:
361            check_point = xgb.callback.TrainingCheckPoint(directory=tmpdir,
362                                                          iterations=1,
363                                                          name='model')
364            xgb.train({'objective': 'binary:logistic'}, m,
365                      num_boost_round=10,
366                      verbose_eval=False,
367                      callbacks=[check_point])
368            for i in range(1, 10):
369                assert os.path.exists(
370                    os.path.join(tmpdir, 'model_' + str(i) + '.json'))
371
372            check_point = xgb.callback.TrainingCheckPoint(directory=tmpdir,
373                                                          iterations=1,
374                                                          as_pickle=True,
375                                                          name='model')
376            xgb.train({'objective': 'binary:logistic'}, m,
377                      num_boost_round=10,
378                      verbose_eval=False,
379                      callbacks=[check_point])
380            for i in range(1, 10):
381                assert os.path.exists(
382                    os.path.join(tmpdir, 'model_' + str(i) + '.pkl'))
383
384    def test_callback_list(self):
385        X, y = tm.get_boston()
386        m = xgb.DMatrix(X, y)
387        callbacks = [xgb.callback.EarlyStopping(rounds=10)]
388        for i in range(4):
389            xgb.train({'objective': 'reg:squarederror',
390                       'eval_metric': 'rmse'}, m,
391                      evals=[(m, 'Train')],
392                      num_boost_round=1,
393                      verbose_eval=True,
394                      callbacks=callbacks)
395        assert len(callbacks) == 1
396