1"""Experimental support for external memory.  This is similar to the one in
2`quantile_data_iterator.py`, but for external memory instead of Quantile DMatrix.  The
3feature is not ready for production use yet.
4
5    .. versionadded:: 1.5.0
6
7"""
8import os
9import xgboost
10from typing import Callable, List, Tuple
11import tempfile
12import numpy as np
13
14
15def make_batches(
16    n_samples_per_batch: int, n_features: int, n_batches: int
17) -> Tuple[List[np.ndarray], List[np.ndarray]]:
18    """Generate random batches."""
19    X = []
20    y = []
21    rng = np.random.RandomState(1994)
22    for i in range(n_batches):
23        _X = rng.randn(n_samples_per_batch, n_features)
24        _y = rng.randn(n_samples_per_batch)
25        X.append(_X)
26        y.append(_y)
27    return X, y
28
29
30class Iterator(xgboost.DataIter):
31    """A custom iterator for loading files in batches."""
32    def __init__(self, file_paths: List[Tuple[str, str]]):
33        self._file_paths = file_paths
34        self._it = 0
35        # XGBoost will generate some cache files under current directory with the prefix
36        # "cache"
37        super().__init__(cache_prefix=os.path.join(".", "cache"))
38
39    def load_file(self) -> Tuple[np.ndarray, np.ndarray]:
40        X_path, y_path = self._file_paths[self._it]
41        X = np.loadtxt(X_path)
42        y = np.loadtxt(y_path)
43        assert X.shape[0] == y.shape[0]
44        return X, y
45
46    def next(self, input_data: Callable) -> int:
47        """Advance the iterator by 1 step and pass the data to XGBoost.  This function is
48        called by XGBoost during the construction of ``DMatrix``
49
50        """
51        if self._it == len(self._file_paths):
52            # return 0 to let XGBoost know this is the end of iteration
53            return 0
54
55        # input_data is a function passed in by XGBoost who has the similar signature to
56        # the ``DMatrix`` constructor.
57        X, y = self.load_file()
58        input_data(data=X, label=y)
59        self._it += 1
60        return 1
61
62    def reset(self) -> None:
63        """Reset the iterator to its beginning"""
64        self._it = 0
65
66
67def main(tmpdir: str) -> xgboost.Booster:
68    # generate some random data for demo
69    batches = make_batches(1024, 17, 31)
70    files = []
71    for i, (X, y) in enumerate(zip(*batches)):
72        X_path = os.path.join(tmpdir, "X-" + str(i) + ".txt")
73        np.savetxt(X_path, X)
74        y_path = os.path.join(tmpdir, "y-" + str(i) + ".txt")
75        np.savetxt(y_path, y)
76        files.append((X_path, y_path))
77
78    it = Iterator(files)
79    # For non-data arguments, specify it here once instead of passing them by the `next`
80    # method.
81    missing = np.NaN
82    Xy = xgboost.DMatrix(it, missing=missing, enable_categorical=False)
83
84    # Other tree methods including ``hist`` and ``gpu_hist`` also work, but has some
85    # caveats.  This is still an experimental feature.
86    booster = xgboost.train({"tree_method": "approx"}, Xy)
87    return booster
88
89
90if __name__ == "__main__":
91    with tempfile.TemporaryDirectory() as tmpdir:
92        main(tmpdir)
93