1 /*!
2  * Copyright 2018~2020 XGBoost contributors
3  */
4 
5 #include <xgboost/logging.h>
6 
7 #include <thrust/copy.h>
8 #include <thrust/functional.h>
9 #include <thrust/iterator/counting_iterator.h>
10 #include <thrust/iterator/transform_iterator.h>
11 #include <thrust/iterator/discard_iterator.h>
12 #include <thrust/reduce.h>
13 #include <thrust/sort.h>
14 #include <thrust/binary_search.h>
15 #include <thrust/execution_policy.h>
16 
17 #include <memory>
18 #include <mutex>
19 #include <utility>
20 #include <vector>
21 
22 #include "device_helpers.cuh"
23 #include "hist_util.h"
24 #include "hist_util.cuh"
25 #include "math.h"  // NOLINT
26 #include "quantile.h"
27 #include "categorical.h"
28 #include "xgboost/host_device_vector.h"
29 
30 
31 namespace xgboost {
32 namespace common {
33 
34 constexpr float SketchContainer::kFactor;
35 
36 namespace detail {
RequiredSampleCutsPerColumn(int max_bins,size_t num_rows)37 size_t RequiredSampleCutsPerColumn(int max_bins, size_t num_rows) {
38   double eps = 1.0 / (WQSketch::kFactor * max_bins);
39   size_t dummy_nlevel;
40   size_t num_cuts;
41   WQuantileSketch<bst_float, bst_float>::LimitSizeLevel(
42       num_rows, eps, &dummy_nlevel, &num_cuts);
43   return std::min(num_cuts, num_rows);
44 }
45 
RequiredSampleCuts(bst_row_t num_rows,bst_feature_t num_columns,size_t max_bins,size_t nnz)46 size_t RequiredSampleCuts(bst_row_t num_rows, bst_feature_t num_columns,
47                           size_t max_bins, size_t nnz) {
48   auto per_column = RequiredSampleCutsPerColumn(max_bins, num_rows);
49   auto if_dense = num_columns * per_column;
50   auto result = std::min(nnz, if_dense);
51   return result;
52 }
53 
RequiredMemory(bst_row_t num_rows,bst_feature_t num_columns,size_t nnz,size_t num_bins,bool with_weights)54 size_t RequiredMemory(bst_row_t num_rows, bst_feature_t num_columns, size_t nnz,
55                       size_t num_bins, bool with_weights) {
56   size_t peak = 0;
57   // 0. Allocate cut pointer in quantile container by increasing: n_columns + 1
58   size_t total = (num_columns + 1) * sizeof(SketchContainer::OffsetT);
59   // 1. Copy and sort: 2 * bytes_per_element * shape
60   total += BytesPerElement(with_weights) * num_rows * num_columns;
61   peak = std::max(peak, total);
62   // 2. Deallocate bytes_per_element * shape due to reusing memory in sort.
63   total -= BytesPerElement(with_weights) * num_rows * num_columns / 2;
64   // 3. Allocate colomn size scan by increasing: n_columns + 1
65   total += (num_columns + 1) * sizeof(SketchContainer::OffsetT);
66   // 4. Allocate cut pointer by increasing: n_columns + 1
67   total += (num_columns + 1) * sizeof(SketchContainer::OffsetT);
68   // 5. Allocate cuts: assuming rows is greater than bins: n_columns * limit_size
69   total += RequiredSampleCuts(num_rows, num_bins, num_bins, nnz) * sizeof(SketchEntry);
70   // 6. Deallocate copied entries by reducing: bytes_per_element * shape.
71   peak = std::max(peak, total);
72   total -= (BytesPerElement(with_weights) * num_rows * num_columns) / 2;
73   // 7. Deallocate column size scan.
74   peak = std::max(peak, total);
75   total -= (num_columns + 1) * sizeof(SketchContainer::OffsetT);
76   // 8. Deallocate cut size scan.
77   total -= (num_columns + 1) * sizeof(SketchContainer::OffsetT);
78   // 9. Allocate final cut values, min values, cut ptrs: std::min(rows, bins + 1) *
79   //    n_columns + n_columns + n_columns + 1
80   total += std::min(num_rows, num_bins) * num_columns * sizeof(float);
81   total += num_columns *
82            sizeof(std::remove_reference_t<decltype(
83                       std::declval<HistogramCuts>().MinValues())>::value_type);
84   total += (num_columns + 1) *
85            sizeof(std::remove_reference_t<decltype(
86                       std::declval<HistogramCuts>().Ptrs())>::value_type);
87   peak = std::max(peak, total);
88 
89   return peak;
90 }
91 
SketchBatchNumElements(size_t sketch_batch_num_elements,bst_row_t num_rows,bst_feature_t columns,size_t nnz,int device,size_t num_cuts,bool has_weight)92 size_t SketchBatchNumElements(size_t sketch_batch_num_elements,
93                               bst_row_t num_rows, bst_feature_t columns,
94                               size_t nnz, int device,
95                               size_t num_cuts, bool has_weight) {
96 #if defined(XGBOOST_USE_RMM) && XGBOOST_USE_RMM == 1
97   // device available memory is not accurate when rmm is used.
98   return nnz;
99 #endif  // defined(XGBOOST_USE_RMM) && XGBOOST_USE_RMM == 1
100 
101   if (sketch_batch_num_elements == 0) {
102     auto required_memory = RequiredMemory(num_rows, columns, nnz, num_cuts, has_weight);
103     // use up to 80% of available space
104     auto avail = dh::AvailableMemory(device) * 0.8;
105     if (required_memory > avail) {
106       sketch_batch_num_elements = avail / BytesPerElement(has_weight);
107     } else {
108       sketch_batch_num_elements = std::min(num_rows * static_cast<size_t>(columns), nnz);
109     }
110   }
111   return sketch_batch_num_elements;
112 }
113 
SortByWeight(dh::device_vector<float> * weights,dh::device_vector<Entry> * sorted_entries)114 void SortByWeight(dh::device_vector<float>* weights,
115                   dh::device_vector<Entry>* sorted_entries) {
116   // Sort both entries and wegihts.
117   dh::XGBDeviceAllocator<char> alloc;
118   thrust::sort_by_key(thrust::cuda::par(alloc), sorted_entries->begin(),
119                       sorted_entries->end(), weights->begin(),
120                       detail::EntryCompareOp());
121 
122   // Scan weights
123   dh::XGBCachingDeviceAllocator<char> caching;
124   thrust::inclusive_scan_by_key(thrust::cuda::par(caching),
125                                 sorted_entries->begin(), sorted_entries->end(),
126                                 weights->begin(), weights->begin(),
127                                 [=] __device__(const Entry& a, const Entry& b) {
128                                   return a.index == b.index;
129                                 });
130 }
131 
RemoveDuplicatedCategories(int32_t device,MetaInfo const & info,Span<bst_row_t> d_cuts_ptr,dh::device_vector<Entry> * p_sorted_entries,dh::caching_device_vector<size_t> * p_column_sizes_scan)132 void RemoveDuplicatedCategories(
133     int32_t device, MetaInfo const &info, Span<bst_row_t> d_cuts_ptr,
134     dh::device_vector<Entry> *p_sorted_entries,
135     dh::caching_device_vector<size_t> *p_column_sizes_scan) {
136   info.feature_types.SetDevice(device);
137   auto d_feature_types = info.feature_types.ConstDeviceSpan();
138   CHECK(!d_feature_types.empty());
139   auto &column_sizes_scan = *p_column_sizes_scan;
140   auto &sorted_entries = *p_sorted_entries;
141   // Removing duplicated entries in categorical features.
142   dh::caching_device_vector<size_t> new_column_scan(column_sizes_scan.size());
143   dh::SegmentedUnique(column_sizes_scan.data().get(),
144                       column_sizes_scan.data().get() + column_sizes_scan.size(),
145                       sorted_entries.begin(), sorted_entries.end(),
146                       new_column_scan.data().get(), sorted_entries.begin(),
147                       [=] __device__(Entry const &l, Entry const &r) {
148                         if (l.index == r.index) {
149                           if (IsCat(d_feature_types, l.index)) {
150                             return l.fvalue == r.fvalue;
151                           }
152                         }
153                         return false;
154                       });
155 
156   // Renew the column scan and cut scan based on categorical data.
157   auto d_old_column_sizes_scan = dh::ToSpan(column_sizes_scan);
158   dh::caching_device_vector<SketchContainer::OffsetT> new_cuts_size(
159       info.num_col_ + 1);
160   CHECK_EQ(new_column_scan.size(), new_cuts_size.size());
161   dh::LaunchN(
162       new_column_scan.size(),
163       [=, d_new_cuts_size = dh::ToSpan(new_cuts_size),
164        d_old_column_sizes_scan = dh::ToSpan(column_sizes_scan),
165        d_new_columns_ptr = dh::ToSpan(new_column_scan)] __device__(size_t idx) {
166         d_old_column_sizes_scan[idx] = d_new_columns_ptr[idx];
167         if (idx == d_new_columns_ptr.size() - 1) {
168           return;
169         }
170         if (IsCat(d_feature_types, idx)) {
171           // Cut size is the same as number of categories in input.
172           d_new_cuts_size[idx] =
173               d_new_columns_ptr[idx + 1] - d_new_columns_ptr[idx];
174         } else {
175           d_new_cuts_size[idx] = d_cuts_ptr[idx + 1] - d_cuts_ptr[idx];
176         }
177       });
178   // Turn size into ptr.
179   thrust::exclusive_scan(thrust::device, new_cuts_size.cbegin(),
180                          new_cuts_size.cend(), d_cuts_ptr.data());
181 }
182 }  // namespace detail
183 
ProcessBatch(int device,MetaInfo const & info,const SparsePage & page,size_t begin,size_t end,SketchContainer * sketch_container,int num_cuts_per_feature,size_t num_columns)184 void ProcessBatch(int device, MetaInfo const &info, const SparsePage &page,
185                   size_t begin, size_t end, SketchContainer *sketch_container,
186                   int num_cuts_per_feature, size_t num_columns) {
187   dh::XGBCachingDeviceAllocator<char> alloc;
188   dh::device_vector<Entry> sorted_entries;
189   if (page.data.DeviceCanRead()) {
190     const auto& device_data = page.data.ConstDevicePointer();
191     sorted_entries = dh::device_vector<Entry>(device_data + begin, device_data + end);
192   } else {
193     const auto& host_data = page.data.ConstHostVector();
194     sorted_entries = dh::device_vector<Entry>(host_data.begin() + begin,
195                                               host_data.begin() + end);
196   }
197   thrust::sort(thrust::cuda::par(alloc), sorted_entries.begin(),
198                sorted_entries.end(), detail::EntryCompareOp());
199 
200   HostDeviceVector<SketchContainer::OffsetT> cuts_ptr;
201   dh::caching_device_vector<size_t> column_sizes_scan;
202   data::IsValidFunctor dummy_is_valid(std::numeric_limits<float>::quiet_NaN());
203   auto batch_it = dh::MakeTransformIterator<data::COOTuple>(
204       sorted_entries.data().get(),
205       [] __device__(Entry const &e) -> data::COOTuple {
206         return {0, e.index, e.fvalue};  // row_idx is not needed for scanning column size.
207       });
208   detail::GetColumnSizesScan(device, num_columns, num_cuts_per_feature,
209                              batch_it, dummy_is_valid,
210                              0, sorted_entries.size(),
211                              &cuts_ptr, &column_sizes_scan);
212   auto d_cuts_ptr = cuts_ptr.DeviceSpan();
213 
214   if (sketch_container->HasCategorical()) {
215     detail::RemoveDuplicatedCategories(device, info, d_cuts_ptr,
216                                        &sorted_entries, &column_sizes_scan);
217   }
218 
219   auto const& h_cuts_ptr = cuts_ptr.ConstHostVector();
220   CHECK_EQ(d_cuts_ptr.size(), column_sizes_scan.size());
221 
222   // add cuts into sketches
223   sketch_container->Push(dh::ToSpan(sorted_entries), dh::ToSpan(column_sizes_scan),
224                          d_cuts_ptr, h_cuts_ptr.back());
225   sorted_entries.clear();
226   sorted_entries.shrink_to_fit();
227   CHECK_EQ(sorted_entries.capacity(), 0);
228   CHECK_NE(cuts_ptr.Size(), 0);
229 }
230 
ProcessWeightedBatch(int device,const SparsePage & page,MetaInfo const & info,size_t begin,size_t end,SketchContainer * sketch_container,int num_cuts_per_feature,size_t num_columns,bool is_ranking,Span<bst_group_t const> d_group_ptr)231 void ProcessWeightedBatch(int device, const SparsePage& page,
232                           MetaInfo const& info, size_t begin, size_t end,
233                           SketchContainer* sketch_container, int num_cuts_per_feature,
234                           size_t num_columns,
235                           bool is_ranking, Span<bst_group_t const> d_group_ptr) {
236   auto weights = info.weights_.ConstDeviceSpan();
237 
238   dh::XGBCachingDeviceAllocator<char> alloc;
239   const auto& host_data = page.data.ConstHostVector();
240   dh::device_vector<Entry> sorted_entries(host_data.begin() + begin,
241                                           host_data.begin() + end);
242 
243   // Binary search to assign weights to each element
244   dh::device_vector<float> temp_weights(sorted_entries.size());
245   auto d_temp_weights = temp_weights.data().get();
246   page.offset.SetDevice(device);
247   auto row_ptrs = page.offset.ConstDeviceSpan();
248   size_t base_rowid = page.base_rowid;
249   if (is_ranking) {
250     CHECK_GE(d_group_ptr.size(), 2)
251         << "Must have at least 1 group for ranking.";
252     CHECK_EQ(weights.size(), d_group_ptr.size() - 1)
253         << "Weight size should equal to number of groups.";
254     dh::LaunchN(temp_weights.size(), [=] __device__(size_t idx) {
255         size_t element_idx = idx + begin;
256         size_t ridx = dh::SegmentId(row_ptrs, element_idx);
257         bst_group_t group_idx = dh::SegmentId(d_group_ptr, ridx + base_rowid);
258         d_temp_weights[idx] = weights[group_idx];
259       });
260   } else {
261     dh::LaunchN(temp_weights.size(), [=] __device__(size_t idx) {
262         size_t element_idx = idx + begin;
263         size_t ridx = dh::SegmentId(row_ptrs, element_idx);
264         d_temp_weights[idx] = weights[ridx + base_rowid];
265       });
266   }
267   detail::SortByWeight(&temp_weights, &sorted_entries);
268 
269   HostDeviceVector<SketchContainer::OffsetT> cuts_ptr;
270   dh::caching_device_vector<size_t> column_sizes_scan;
271   data::IsValidFunctor dummy_is_valid(std::numeric_limits<float>::quiet_NaN());
272   auto batch_it = dh::MakeTransformIterator<data::COOTuple>(
273       sorted_entries.data().get(),
274       [] __device__(Entry const &e) -> data::COOTuple {
275         return {0, e.index, e.fvalue};  // row_idx is not needed for scaning column size.
276       });
277   detail::GetColumnSizesScan(device, num_columns, num_cuts_per_feature,
278                              batch_it, dummy_is_valid,
279                              0, sorted_entries.size(),
280                              &cuts_ptr, &column_sizes_scan);
281   auto d_cuts_ptr = cuts_ptr.DeviceSpan();
282   if (sketch_container->HasCategorical()) {
283     detail::RemoveDuplicatedCategories(device, info, d_cuts_ptr,
284                                        &sorted_entries, &column_sizes_scan);
285   }
286 
287   auto const& h_cuts_ptr = cuts_ptr.ConstHostVector();
288 
289   // Extract cuts
290   sketch_container->Push(dh::ToSpan(sorted_entries),
291                          dh::ToSpan(column_sizes_scan), d_cuts_ptr,
292                          h_cuts_ptr.back(), dh::ToSpan(temp_weights));
293   sorted_entries.clear();
294   sorted_entries.shrink_to_fit();
295 }
296 
DeviceSketch(int device,DMatrix * dmat,int max_bins,size_t sketch_batch_num_elements)297 HistogramCuts DeviceSketch(int device, DMatrix* dmat, int max_bins,
298                            size_t sketch_batch_num_elements) {
299   dmat->Info().feature_types.SetDevice(device);
300   dmat->Info().feature_types.ConstDevicePointer();  // pull to device early
301   // Configure batch size based on available memory
302   bool has_weights = dmat->Info().weights_.Size() > 0;
303   size_t num_cuts_per_feature =
304       detail::RequiredSampleCutsPerColumn(max_bins, dmat->Info().num_row_);
305   sketch_batch_num_elements = detail::SketchBatchNumElements(
306       sketch_batch_num_elements,
307       dmat->Info().num_row_,
308       dmat->Info().num_col_,
309       dmat->Info().num_nonzero_,
310       device, num_cuts_per_feature, has_weights);
311 
312   HistogramCuts cuts;
313   SketchContainer sketch_container(dmat->Info().feature_types, max_bins, dmat->Info().num_col_,
314                                    dmat->Info().num_row_, device);
315 
316   dmat->Info().weights_.SetDevice(device);
317   for (const auto& batch : dmat->GetBatches<SparsePage>()) {
318     size_t batch_nnz = batch.data.Size();
319     auto const& info = dmat->Info();
320     for (auto begin = 0ull; begin < batch_nnz; begin += sketch_batch_num_elements) {
321       size_t end = std::min(batch_nnz, size_t(begin + sketch_batch_num_elements));
322       if (has_weights) {
323         bool is_ranking = HostSketchContainer::UseGroup(dmat->Info());
324         dh::caching_device_vector<uint32_t> groups(info.group_ptr_.cbegin(),
325                                                    info.group_ptr_.cend());
326         ProcessWeightedBatch(
327             device, batch, dmat->Info(), begin, end,
328             &sketch_container,
329             num_cuts_per_feature,
330             dmat->Info().num_col_,
331             is_ranking, dh::ToSpan(groups));
332       } else {
333         ProcessBatch(device, dmat->Info(), batch, begin, end, &sketch_container,
334                      num_cuts_per_feature, dmat->Info().num_col_);
335       }
336     }
337   }
338   sketch_container.MakeCuts(&cuts);
339   return cuts;
340 }
341 }  // namespace common
342 }  // namespace xgboost
343