1 /*!
2 * Copyright 2019-2020 XGBoost contributors
3 */
4 #include <xgboost/data.h>
5 #include <thrust/iterator/discard_iterator.h>
6 #include <thrust/iterator/transform_output_iterator.h>
7 #include "../common/categorical.h"
8 #include "../common/hist_util.cuh"
9 #include "../common/random.h"
10 #include "./ellpack_page.cuh"
11 #include "device_adapter.cuh"
12
13 namespace xgboost {
14
EllpackPage()15 EllpackPage::EllpackPage() : impl_{new EllpackPageImpl()} {}
16
EllpackPage(DMatrix * dmat,const BatchParam & param)17 EllpackPage::EllpackPage(DMatrix* dmat, const BatchParam& param)
18 : impl_{new EllpackPageImpl(dmat, param)} {}
19
20 EllpackPage::~EllpackPage() = default;
21
EllpackPage(EllpackPage && that)22 EllpackPage::EllpackPage(EllpackPage&& that) { std::swap(impl_, that.impl_); }
23
Size() const24 size_t EllpackPage::Size() const { return impl_->Size(); }
25
SetBaseRowId(size_t row_id)26 void EllpackPage::SetBaseRowId(size_t row_id) { impl_->SetBaseRowId(row_id); }
27
28 // Bin each input data entry, store the bin indices in compressed form.
CompressBinEllpackKernel(common::CompressedBufferWriter wr,common::CompressedByteT * __restrict__ buffer,const size_t * __restrict__ row_ptrs,const Entry * __restrict__ entries,const float * __restrict__ cuts,const uint32_t * __restrict__ cut_rows,common::Span<FeatureType const> feature_types,size_t base_row,size_t n_rows,size_t row_stride,unsigned int null_gidx_value)29 __global__ void CompressBinEllpackKernel(
30 common::CompressedBufferWriter wr,
31 common::CompressedByteT* __restrict__ buffer, // gidx_buffer
32 const size_t* __restrict__ row_ptrs, // row offset of input data
33 const Entry* __restrict__ entries, // One batch of input data
34 const float* __restrict__ cuts, // HistogramCuts::cut_values_
35 const uint32_t* __restrict__ cut_rows, // HistogramCuts::cut_ptrs_
36 common::Span<FeatureType const> feature_types,
37 size_t base_row, // batch_row_begin
38 size_t n_rows,
39 size_t row_stride,
40 unsigned int null_gidx_value) {
41 size_t irow = threadIdx.x + blockIdx.x * blockDim.x;
42 int ifeature = threadIdx.y + blockIdx.y * blockDim.y;
43 if (irow >= n_rows || ifeature >= row_stride) {
44 return;
45 }
46 int row_length = static_cast<int>(row_ptrs[irow + 1] - row_ptrs[irow]);
47 unsigned int bin = null_gidx_value;
48 if (ifeature < row_length) {
49 Entry entry = entries[row_ptrs[irow] - row_ptrs[0] + ifeature];
50 int feature = entry.index;
51 float fvalue = entry.fvalue;
52 // {feature_cuts, ncuts} forms the array of cuts of `feature'.
53 const float* feature_cuts = &cuts[cut_rows[feature]];
54 int ncuts = cut_rows[feature + 1] - cut_rows[feature];
55 bool is_cat = common::IsCat(feature_types, ifeature);
56 // Assigning the bin in current entry.
57 // S.t.: fvalue < feature_cuts[bin]
58 if (is_cat) {
59 auto it = dh::MakeTransformIterator<int>(
60 feature_cuts, [](float v) { return common::AsCat(v); });
61 bin = thrust::lower_bound(thrust::seq, it, it + ncuts, common::AsCat(fvalue)) - it;
62 } else {
63 bin = thrust::upper_bound(thrust::seq, feature_cuts, feature_cuts + ncuts,
64 fvalue) -
65 feature_cuts;
66 }
67
68 if (bin >= ncuts) {
69 bin = ncuts - 1;
70 }
71 // Add the number of bins in previous features.
72 bin += cut_rows[feature];
73 }
74 // Write to gidx buffer.
75 wr.AtomicWriteSymbol(buffer, bin, (irow + base_row) * row_stride + ifeature);
76 }
77
78 // Construct an ELLPACK matrix with the given number of empty rows.
EllpackPageImpl(int device,common::HistogramCuts cuts,bool is_dense,size_t row_stride,size_t n_rows)79 EllpackPageImpl::EllpackPageImpl(int device, common::HistogramCuts cuts,
80 bool is_dense, size_t row_stride,
81 size_t n_rows)
82 : is_dense(is_dense),
83 cuts_(std::move(cuts)),
84 row_stride(row_stride),
85 n_rows(n_rows) {
86 monitor_.Init("ellpack_page");
87 dh::safe_cuda(cudaSetDevice(device));
88
89 monitor_.Start("InitCompressedData");
90 InitCompressedData(device);
91 monitor_.Stop("InitCompressedData");
92 }
93
EllpackPageImpl(int device,common::HistogramCuts cuts,const SparsePage & page,bool is_dense,size_t row_stride,common::Span<FeatureType const> feature_types)94 EllpackPageImpl::EllpackPageImpl(int device, common::HistogramCuts cuts,
95 const SparsePage &page, bool is_dense,
96 size_t row_stride,
97 common::Span<FeatureType const> feature_types)
98 : cuts_(std::move(cuts)), is_dense(is_dense), n_rows(page.Size()),
99 row_stride(row_stride) {
100 this->InitCompressedData(device);
101 this->CreateHistIndices(device, page, feature_types);
102 }
103
104 // Construct an ELLPACK matrix in memory.
EllpackPageImpl(DMatrix * dmat,const BatchParam & param)105 EllpackPageImpl::EllpackPageImpl(DMatrix* dmat, const BatchParam& param)
106 : is_dense(dmat->IsDense()) {
107 monitor_.Init("ellpack_page");
108 dh::safe_cuda(cudaSetDevice(param.gpu_id));
109
110 n_rows = dmat->Info().num_row_;
111
112 monitor_.Start("Quantiles");
113 // Create the quantile sketches for the dmatrix and initialize HistogramCuts.
114 row_stride = GetRowStride(dmat);
115 cuts_ = common::DeviceSketch(param.gpu_id, dmat, param.max_bin);
116 monitor_.Stop("Quantiles");
117
118 monitor_.Start("InitCompressedData");
119 this->InitCompressedData(param.gpu_id);
120 monitor_.Stop("InitCompressedData");
121
122 dmat->Info().feature_types.SetDevice(param.gpu_id);
123 auto ft = dmat->Info().feature_types.ConstDeviceSpan();
124 monitor_.Start("BinningCompression");
125 CHECK(dmat->SingleColBlock());
126 for (const auto& batch : dmat->GetBatches<SparsePage>()) {
127 CreateHistIndices(param.gpu_id, batch, ft);
128 }
129 monitor_.Stop("BinningCompression");
130 }
131
132 template <typename AdapterBatchT>
133 struct WriteCompressedEllpackFunctor {
WriteCompressedEllpackFunctorxgboost::WriteCompressedEllpackFunctor134 WriteCompressedEllpackFunctor(common::CompressedByteT* buffer,
135 const common::CompressedBufferWriter& writer,
136 AdapterBatchT batch,
137 EllpackDeviceAccessor accessor,
138 common::Span<FeatureType const> feature_types,
139 const data::IsValidFunctor& is_valid)
140 : d_buffer(buffer),
141 writer(writer),
142 batch(std::move(batch)),
143 accessor(std::move(accessor)),
144 feature_types(std::move(feature_types)),
145 is_valid(is_valid) {}
146
147 common::CompressedByteT* d_buffer;
148 common::CompressedBufferWriter writer;
149 AdapterBatchT batch;
150 EllpackDeviceAccessor accessor;
151 common::Span<FeatureType const> feature_types;
152 data::IsValidFunctor is_valid;
153
154 using Tuple = thrust::tuple<size_t, size_t, size_t>;
operator ()xgboost::WriteCompressedEllpackFunctor155 __device__ size_t operator()(Tuple out) {
156 auto e = batch.GetElement(out.get<2>());
157 if (is_valid(e)) {
158 // -1 because the scan is inclusive
159 size_t output_position =
160 accessor.row_stride * e.row_idx + out.get<1>() - 1;
161 uint32_t bin_idx = 0;
162 if (common::IsCat(feature_types, e.column_idx)) {
163 bin_idx = accessor.SearchBin<true>(e.value, e.column_idx);
164 } else {
165 bin_idx = accessor.SearchBin<false>(e.value, e.column_idx);
166 }
167 writer.AtomicWriteSymbol(d_buffer, bin_idx, output_position);
168 }
169 return 0;
170 }
171 };
172
173 template <typename Tuple>
174 struct TupleScanOp {
operator ()xgboost::TupleScanOp175 __device__ Tuple operator()(Tuple a, Tuple b) {
176 // Key equal
177 if (a.template get<0>() == b.template get<0>()) {
178 b.template get<1>() += a.template get<1>();
179 return b;
180 }
181 // Not equal
182 return b;
183 }
184 };
185
186 // Here the data is already correctly ordered and simply needs to be compacted
187 // to remove missing data
188 template <typename AdapterBatchT>
CopyDataToEllpack(const AdapterBatchT & batch,common::Span<FeatureType const> feature_types,EllpackPageImpl * dst,int device_idx,float missing)189 void CopyDataToEllpack(const AdapterBatchT &batch,
190 common::Span<FeatureType const> feature_types,
191 EllpackPageImpl *dst, int device_idx, float missing) {
192 // Some witchcraft happens here
193 // The goal is to copy valid elements out of the input to an ELLPACK matrix
194 // with a given row stride, using no extra working memory Standard stream
195 // compaction needs to be modified to do this, so we manually define a
196 // segmented stream compaction via operators on an inclusive scan. The output
197 // of this inclusive scan is fed to a custom function which works out the
198 // correct output position
199 auto counting = thrust::make_counting_iterator(0llu);
200 data::IsValidFunctor is_valid(missing);
201 auto key_iter = dh::MakeTransformIterator<size_t>(
202 counting,
203 [=] __device__(size_t idx) {
204 return batch.GetElement(idx).row_idx;
205 });
206 auto value_iter = dh::MakeTransformIterator<size_t>(
207 counting,
208 [=] __device__(size_t idx) -> size_t {
209 return is_valid(batch.GetElement(idx));
210 });
211
212 auto key_value_index_iter = thrust::make_zip_iterator(
213 thrust::make_tuple(key_iter, value_iter, counting));
214
215 // Tuple[0] = The row index of the input, used as a key to define segments
216 // Tuple[1] = Scanned flags of valid elements for each row
217 // Tuple[2] = The index in the input data
218 using Tuple = thrust::tuple<size_t, size_t, size_t>;
219
220 auto device_accessor = dst->GetDeviceAccessor(device_idx);
221 common::CompressedBufferWriter writer(device_accessor.NumSymbols());
222 auto d_compressed_buffer = dst->gidx_buffer.DevicePointer();
223
224 // We redirect the scan output into this functor to do the actual writing
225 WriteCompressedEllpackFunctor<AdapterBatchT> functor(
226 d_compressed_buffer, writer, batch, device_accessor, feature_types,
227 is_valid);
228 dh::TypedDiscard<Tuple> discard;
229 thrust::transform_output_iterator<
230 WriteCompressedEllpackFunctor<AdapterBatchT>, decltype(discard)>
231 out(discard, functor);
232 // Go one level down into cub::DeviceScan API to set OffsetT as 64 bit
233 // So we don't crash on n > 2^31
234 size_t temp_storage_bytes = 0;
235 using DispatchScan =
236 cub::DispatchScan<decltype(key_value_index_iter), decltype(out),
237 TupleScanOp<Tuple>, cub::NullType, int64_t>;
238 DispatchScan::Dispatch(nullptr, temp_storage_bytes, key_value_index_iter, out,
239 TupleScanOp<Tuple>(), cub::NullType(), batch.Size(),
240 nullptr, false);
241 dh::TemporaryArray<char> temp_storage(temp_storage_bytes);
242 DispatchScan::Dispatch(temp_storage.data().get(), temp_storage_bytes,
243 key_value_index_iter, out, TupleScanOp<Tuple>(),
244 cub::NullType(), batch.Size(), nullptr, false);
245 }
246
WriteNullValues(EllpackPageImpl * dst,int device_idx,common::Span<size_t> row_counts)247 void WriteNullValues(EllpackPageImpl* dst, int device_idx,
248 common::Span<size_t> row_counts) {
249 // Write the null values
250 auto device_accessor = dst->GetDeviceAccessor(device_idx);
251 common::CompressedBufferWriter writer(device_accessor.NumSymbols());
252 auto d_compressed_buffer = dst->gidx_buffer.DevicePointer();
253 auto row_stride = dst->row_stride;
254 dh::LaunchN(row_stride * dst->n_rows, [=] __device__(size_t idx) {
255 // For some reason this variable got captured as const
256 auto writer_non_const = writer;
257 size_t row_idx = idx / row_stride;
258 size_t row_offset = idx % row_stride;
259 if (row_offset >= row_counts[row_idx]) {
260 writer_non_const.AtomicWriteSymbol(d_compressed_buffer,
261 device_accessor.NullValue(), idx);
262 }
263 });
264 }
265
266 template <typename AdapterBatch>
EllpackPageImpl(AdapterBatch batch,float missing,int device,bool is_dense,int nthread,common::Span<size_t> row_counts_span,common::Span<FeatureType const> feature_types,size_t row_stride,size_t n_rows,size_t n_cols,common::HistogramCuts const & cuts)267 EllpackPageImpl::EllpackPageImpl(AdapterBatch batch, float missing, int device,
268 bool is_dense, int nthread,
269 common::Span<size_t> row_counts_span,
270 common::Span<FeatureType const> feature_types,
271 size_t row_stride, size_t n_rows, size_t n_cols,
272 common::HistogramCuts const& cuts) {
273 dh::safe_cuda(cudaSetDevice(device));
274
275 *this = EllpackPageImpl(device, cuts, is_dense, row_stride, n_rows);
276 CopyDataToEllpack(batch, feature_types, this, device, missing);
277 WriteNullValues(this, device, row_counts_span);
278 }
279
280 #define ELLPACK_BATCH_SPECIALIZE(__BATCH_T) \
281 template EllpackPageImpl::EllpackPageImpl( \
282 __BATCH_T batch, float missing, int device, bool is_dense, int nthread, \
283 common::Span<size_t> row_counts_span, \
284 common::Span<FeatureType const> feature_types, size_t row_stride, \
285 size_t n_rows, size_t n_cols, common::HistogramCuts const &cuts);
286
287 ELLPACK_BATCH_SPECIALIZE(data::CudfAdapterBatch)
288 ELLPACK_BATCH_SPECIALIZE(data::CupyAdapterBatch)
289
290 // A functor that copies the data from one EllpackPage to another.
291 struct CopyPage {
292 common::CompressedBufferWriter cbw;
293 common::CompressedByteT* dst_data_d;
294 common::CompressedIterator<uint32_t> src_iterator_d;
295 // The number of elements to skip.
296 size_t offset;
297
CopyPagexgboost::CopyPage298 CopyPage(EllpackPageImpl *dst, EllpackPageImpl const *src, size_t offset)
299 : cbw{dst->NumSymbols()}, dst_data_d{dst->gidx_buffer.DevicePointer()},
300 src_iterator_d{src->gidx_buffer.DevicePointer(), src->NumSymbols()},
301 offset(offset) {}
302
operator ()xgboost::CopyPage303 __device__ void operator()(size_t element_id) {
304 cbw.AtomicWriteSymbol(dst_data_d, src_iterator_d[element_id],
305 element_id + offset);
306 }
307 };
308
309 // Copy the data from the given EllpackPage to the current page.
Copy(int device,EllpackPageImpl const * page,size_t offset)310 size_t EllpackPageImpl::Copy(int device, EllpackPageImpl const *page,
311 size_t offset) {
312 monitor_.Start("Copy");
313 size_t num_elements = page->n_rows * page->row_stride;
314 CHECK_EQ(row_stride, page->row_stride);
315 CHECK_EQ(NumSymbols(), page->NumSymbols());
316 CHECK_GE(n_rows * row_stride, offset + num_elements);
317 if (page == this) {
318 LOG(FATAL) << "Concatenating the same Ellpack.";
319 return this->n_rows * this->row_stride;
320 }
321 gidx_buffer.SetDevice(device);
322 page->gidx_buffer.SetDevice(device);
323 dh::LaunchN(num_elements, CopyPage(this, page, offset));
324 monitor_.Stop("Copy");
325 return num_elements;
326 }
327
328 // A functor that compacts the rows from one EllpackPage into another.
329 struct CompactPage {
330 common::CompressedBufferWriter cbw;
331 common::CompressedByteT* dst_data_d;
332 common::CompressedIterator<uint32_t> src_iterator_d;
333 /*! \brief An array that maps the rows from the full DMatrix to the compacted
334 * page.
335 *
336 * The total size is the number of rows in the original, uncompacted DMatrix.
337 * Elements are the row ids in the compacted page. Rows not needed are set to
338 * SIZE_MAX.
339 *
340 * An example compacting 16 rows to 8 rows:
341 * [SIZE_MAX, 0, 1, SIZE_MAX, SIZE_MAX, 2, SIZE_MAX, 3, 4, 5, SIZE_MAX, 6,
342 * SIZE_MAX, 7, SIZE_MAX, SIZE_MAX]
343 */
344 common::Span<size_t> row_indexes;
345 size_t base_rowid;
346 size_t row_stride;
347
CompactPagexgboost::CompactPage348 CompactPage(EllpackPageImpl* dst, EllpackPageImpl const* src,
349 common::Span<size_t> row_indexes)
350 : cbw{dst->NumSymbols()},
351 dst_data_d{dst->gidx_buffer.DevicePointer()},
352 src_iterator_d{src->gidx_buffer.DevicePointer(), src->NumSymbols()},
353 row_indexes(row_indexes),
354 base_rowid{src->base_rowid},
355 row_stride{src->row_stride} {}
356
operator ()xgboost::CompactPage357 __device__ void operator()(size_t row_id) {
358 size_t src_row = base_rowid + row_id;
359 size_t dst_row = row_indexes[src_row];
360 if (dst_row == SIZE_MAX) return;
361 size_t dst_offset = dst_row * row_stride;
362 size_t src_offset = row_id * row_stride;
363 for (size_t j = 0; j < row_stride; j++) {
364 cbw.AtomicWriteSymbol(dst_data_d, src_iterator_d[src_offset + j],
365 dst_offset + j);
366 }
367 }
368 };
369
370 // Compacts the data from the given EllpackPage into the current page.
Compact(int device,EllpackPageImpl const * page,common::Span<size_t> row_indexes)371 void EllpackPageImpl::Compact(int device, EllpackPageImpl const* page,
372 common::Span<size_t> row_indexes) {
373 monitor_.Start("Compact");
374 CHECK_EQ(row_stride, page->row_stride);
375 CHECK_EQ(NumSymbols(), page->NumSymbols());
376 CHECK_LE(page->base_rowid + page->n_rows, row_indexes.size());
377 gidx_buffer.SetDevice(device);
378 page->gidx_buffer.SetDevice(device);
379 dh::LaunchN(page->n_rows, CompactPage(this, page, row_indexes));
380 monitor_.Stop("Compact");
381 }
382
383 // Initialize the buffer to stored compressed features.
InitCompressedData(int device)384 void EllpackPageImpl::InitCompressedData(int device) {
385 size_t num_symbols = NumSymbols();
386
387 // Required buffer size for storing data matrix in ELLPack format.
388 size_t compressed_size_bytes =
389 common::CompressedBufferWriter::CalculateBufferSize(row_stride * n_rows,
390 num_symbols);
391 gidx_buffer.SetDevice(device);
392 // Don't call fill unnecessarily
393 if (gidx_buffer.Size() == 0) {
394 gidx_buffer.Resize(compressed_size_bytes, 0);
395 } else {
396 gidx_buffer.Resize(compressed_size_bytes, 0);
397 thrust::fill(dh::tbegin(gidx_buffer), dh::tend(gidx_buffer), 0);
398 }
399 }
400
401 // Compress a CSR page into ELLPACK.
CreateHistIndices(int device,const SparsePage & row_batch,common::Span<FeatureType const> feature_types)402 void EllpackPageImpl::CreateHistIndices(int device,
403 const SparsePage& row_batch,
404 common::Span<FeatureType const> feature_types) {
405 if (row_batch.Size() == 0) return;
406 unsigned int null_gidx_value = NumSymbols() - 1;
407
408 const auto& offset_vec = row_batch.offset.ConstHostVector();
409
410 // bin and compress entries in batches of rows
411 size_t gpu_batch_nrows =
412 std::min(dh::TotalMemory(device) / (16 * row_stride * sizeof(Entry)),
413 static_cast<size_t>(row_batch.Size()));
414
415 size_t gpu_nbatches = common::DivRoundUp(row_batch.Size(), gpu_batch_nrows);
416
417 for (size_t gpu_batch = 0; gpu_batch < gpu_nbatches; ++gpu_batch) {
418 size_t batch_row_begin = gpu_batch * gpu_batch_nrows;
419 size_t batch_row_end =
420 std::min((gpu_batch + 1) * gpu_batch_nrows, row_batch.Size());
421 size_t batch_nrows = batch_row_end - batch_row_begin;
422
423 const auto ent_cnt_begin = offset_vec[batch_row_begin];
424 const auto ent_cnt_end = offset_vec[batch_row_end];
425
426 /*! \brief row offset in SparsePage (the input data). */
427 dh::device_vector<size_t> row_ptrs(batch_nrows + 1);
428 thrust::copy(offset_vec.data() + batch_row_begin,
429 offset_vec.data() + batch_row_end + 1, row_ptrs.begin());
430
431 // number of entries in this batch.
432 size_t n_entries = ent_cnt_end - ent_cnt_begin;
433 dh::device_vector<Entry> entries_d(n_entries);
434 // copy data entries to device.
435 if (row_batch.data.DeviceCanRead()) {
436 auto const& d_data = row_batch.data.ConstDeviceSpan();
437 dh::safe_cuda(cudaMemcpyAsync(
438 entries_d.data().get(), d_data.data() + ent_cnt_begin,
439 n_entries * sizeof(Entry), cudaMemcpyDefault));
440 } else {
441 const std::vector<Entry>& data_vec = row_batch.data.ConstHostVector();
442 dh::safe_cuda(cudaMemcpyAsync(
443 entries_d.data().get(), data_vec.data() + ent_cnt_begin,
444 n_entries * sizeof(Entry), cudaMemcpyDefault));
445 }
446
447 const dim3 block3(32, 8, 1); // 256 threads
448 const dim3 grid3(common::DivRoundUp(batch_nrows, block3.x),
449 common::DivRoundUp(row_stride, block3.y), 1);
450 auto device_accessor = GetDeviceAccessor(device);
451 dh::LaunchKernel {grid3, block3}(
452 CompressBinEllpackKernel, common::CompressedBufferWriter(NumSymbols()),
453 gidx_buffer.DevicePointer(), row_ptrs.data().get(),
454 entries_d.data().get(), device_accessor.gidx_fvalue_map.data(),
455 device_accessor.feature_segments.data(), feature_types,
456 batch_row_begin, batch_nrows, row_stride,
457 null_gidx_value);
458 }
459 }
460
461 // Return the number of rows contained in this page.
Size() const462 size_t EllpackPageImpl::Size() const { return n_rows; }
463
464 // Return the memory cost for storing the compressed features.
MemCostBytes(size_t num_rows,size_t row_stride,const common::HistogramCuts & cuts)465 size_t EllpackPageImpl::MemCostBytes(size_t num_rows, size_t row_stride,
466 const common::HistogramCuts& cuts) {
467 // Required buffer size for storing data matrix in EtoLLPack format.
468 size_t compressed_size_bytes =
469 common::CompressedBufferWriter::CalculateBufferSize(row_stride * num_rows,
470 cuts.TotalBins() + 1);
471 return compressed_size_bytes;
472 }
473
GetDeviceAccessor(int device,common::Span<FeatureType const> feature_types) const474 EllpackDeviceAccessor EllpackPageImpl::GetDeviceAccessor(
475 int device, common::Span<FeatureType const> feature_types) const {
476 gidx_buffer.SetDevice(device);
477 return {device,
478 cuts_,
479 is_dense,
480 row_stride,
481 base_rowid,
482 n_rows,
483 common::CompressedIterator<uint32_t>(gidx_buffer.ConstDevicePointer(),
484 NumSymbols()),
485 feature_types};
486 }
487 } // namespace xgboost
488