1"""Experimental support for external memory. This is similar to the one in 2`quantile_data_iterator.py`, but for external memory instead of Quantile DMatrix. The 3feature is not ready for production use yet. 4 5 .. versionadded:: 1.5.0 6 7""" 8import os 9import xgboost 10from typing import Callable, List, Tuple 11import tempfile 12import numpy as np 13 14 15def make_batches( 16 n_samples_per_batch: int, n_features: int, n_batches: int 17) -> Tuple[List[np.ndarray], List[np.ndarray]]: 18 """Generate random batches.""" 19 X = [] 20 y = [] 21 rng = np.random.RandomState(1994) 22 for i in range(n_batches): 23 _X = rng.randn(n_samples_per_batch, n_features) 24 _y = rng.randn(n_samples_per_batch) 25 X.append(_X) 26 y.append(_y) 27 return X, y 28 29 30class Iterator(xgboost.DataIter): 31 """A custom iterator for loading files in batches.""" 32 def __init__(self, file_paths: List[Tuple[str, str]]): 33 self._file_paths = file_paths 34 self._it = 0 35 # XGBoost will generate some cache files under current directory with the prefix 36 # "cache" 37 super().__init__(cache_prefix=os.path.join(".", "cache")) 38 39 def load_file(self) -> Tuple[np.ndarray, np.ndarray]: 40 X_path, y_path = self._file_paths[self._it] 41 X = np.loadtxt(X_path) 42 y = np.loadtxt(y_path) 43 assert X.shape[0] == y.shape[0] 44 return X, y 45 46 def next(self, input_data: Callable) -> int: 47 """Advance the iterator by 1 step and pass the data to XGBoost. This function is 48 called by XGBoost during the construction of ``DMatrix`` 49 50 """ 51 if self._it == len(self._file_paths): 52 # return 0 to let XGBoost know this is the end of iteration 53 return 0 54 55 # input_data is a function passed in by XGBoost who has the similar signature to 56 # the ``DMatrix`` constructor. 57 X, y = self.load_file() 58 input_data(data=X, label=y) 59 self._it += 1 60 return 1 61 62 def reset(self) -> None: 63 """Reset the iterator to its beginning""" 64 self._it = 0 65 66 67def main(tmpdir: str) -> xgboost.Booster: 68 # generate some random data for demo 69 batches = make_batches(1024, 17, 31) 70 files = [] 71 for i, (X, y) in enumerate(zip(*batches)): 72 X_path = os.path.join(tmpdir, "X-" + str(i) + ".txt") 73 np.savetxt(X_path, X) 74 y_path = os.path.join(tmpdir, "y-" + str(i) + ".txt") 75 np.savetxt(y_path, y) 76 files.append((X_path, y_path)) 77 78 it = Iterator(files) 79 # For non-data arguments, specify it here once instead of passing them by the `next` 80 # method. 81 missing = np.NaN 82 Xy = xgboost.DMatrix(it, missing=missing, enable_categorical=False) 83 84 # Other tree methods including ``hist`` and ``gpu_hist`` also work, but has some 85 # caveats. This is still an experimental feature. 86 booster = xgboost.train({"tree_method": "approx"}, Xy) 87 return booster 88 89 90if __name__ == "__main__": 91 with tempfile.TemporaryDirectory() as tmpdir: 92 main(tmpdir) 93