1 #include <gtest/gtest.h>
2 #include "test_quantile.h"
3 #include "../../../src/common/quantile.h"
4 #include "../../../src/common/hist_util.h"
5 
6 namespace xgboost {
7 namespace common {
8 
TEST(Quantile,LoadBalance)9 TEST(Quantile, LoadBalance) {
10   size_t constexpr kRows = 1000, kCols = 100;
11   auto m = RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix();
12   std::vector<bst_feature_t> cols_ptr;
13   for (auto const &page : m->GetBatches<SparsePage>()) {
14     cols_ptr = HostSketchContainer::LoadBalance(page, kCols, 13);
15   }
16   size_t n_cols = 0;
17   for (size_t i = 1; i < cols_ptr.size(); ++i) {
18     n_cols += cols_ptr[i] - cols_ptr[i - 1];
19   }
20   CHECK_EQ(n_cols, kCols);
21 }
22 
TestDistributedQuantile(size_t rows,size_t cols)23 void TestDistributedQuantile(size_t rows, size_t cols) {
24   std::string msg {"Skipping AllReduce test"};
25   int32_t constexpr kWorkers = 4;
26   InitRabitContext(msg, kWorkers);
27   auto world = rabit::GetWorldSize();
28   if (world != 1) {
29     ASSERT_EQ(world, kWorkers);
30   } else {
31     return;
32   }
33 
34   std::vector<MetaInfo> infos(2);
35   auto& h_weights = infos.front().weights_.HostVector();
36   h_weights.resize(rows);
37   SimpleLCG lcg;
38   SimpleRealUniformDistribution<float> dist(3, 1000);
39   std::generate(h_weights.begin(), h_weights.end(), [&]() { return dist(&lcg); });
40   std::vector<bst_row_t> column_size(cols, rows);
41   size_t n_bins = 64;
42 
43   // Generate cuts for distributed environment.
44   auto sparsity = 0.5f;
45   auto rank = rabit::GetRank();
46   auto m = RandomDataGenerator{rows, cols, sparsity}
47                .Seed(rank)
48                .Lower(.0f)
49                .Upper(1.0f)
50                .GenerateDMatrix();
51   HostSketchContainer sketch_distributed(
52       column_size, n_bins, m->Info().feature_types.ConstHostSpan(), false,
53       OmpGetNumThreads(0));
54   for (auto const &page : m->GetBatches<SparsePage>()) {
55     sketch_distributed.PushRowPage(page, m->Info());
56   }
57   HistogramCuts distributed_cuts;
58   sketch_distributed.MakeCuts(&distributed_cuts);
59 
60   // Generate cuts for single node environment
61   rabit::Finalize();
62   CHECK_EQ(rabit::GetWorldSize(), 1);
63   std::for_each(column_size.begin(), column_size.end(), [=](auto& size) { size *= world; });
64   HostSketchContainer sketch_on_single_node(
65       column_size, n_bins, m->Info().feature_types.ConstHostSpan(), false,
66       OmpGetNumThreads(0));
67   for (auto rank = 0; rank < world; ++rank) {
68     auto m = RandomDataGenerator{rows, cols, sparsity}
69                  .Seed(rank)
70                  .Lower(.0f)
71                  .Upper(1.0f)
72                  .GenerateDMatrix();
73     for (auto const &page : m->GetBatches<SparsePage>()) {
74       sketch_on_single_node.PushRowPage(page, m->Info());
75     }
76   }
77 
78   HistogramCuts single_node_cuts;
79   sketch_on_single_node.MakeCuts(&single_node_cuts);
80 
81   auto const& sptrs = single_node_cuts.Ptrs();
82   auto const& dptrs = distributed_cuts.Ptrs();
83   auto const& svals = single_node_cuts.Values();
84   auto const& dvals = distributed_cuts.Values();
85   auto const& smins = single_node_cuts.MinValues();
86   auto const& dmins = distributed_cuts.MinValues();
87 
88   ASSERT_EQ(sptrs.size(), dptrs.size());
89   for (size_t i = 0; i < sptrs.size(); ++i) {
90     ASSERT_EQ(sptrs[i], dptrs[i]);
91   }
92 
93   ASSERT_EQ(svals.size(), dvals.size());
94   for (size_t i = 0; i < svals.size(); ++i) {
95     ASSERT_NEAR(svals[i], dvals[i], 2e-2f);
96   }
97 
98   ASSERT_EQ(smins.size(), dmins.size());
99   for (size_t i = 0; i < smins.size(); ++i) {
100     ASSERT_FLOAT_EQ(smins[i], dmins[i]);
101   }
102 }
103 
TEST(Quantile,DistributedBasic)104 TEST(Quantile, DistributedBasic) {
105 #if defined(__unix__)
106   constexpr size_t kRows = 10, kCols = 10;
107   TestDistributedQuantile(kRows, kCols);
108 #endif
109 }
110 
TEST(Quantile,Distributed)111 TEST(Quantile, Distributed) {
112 #if defined(__unix__)
113   constexpr size_t kRows = 1000, kCols = 200;
114   TestDistributedQuantile(kRows, kCols);
115 #endif
116 }
117 
TEST(Quantile,SameOnAllWorkers)118 TEST(Quantile, SameOnAllWorkers) {
119 #if defined(__unix__)
120   std::string msg{"Skipping Quantile AllreduceBasic test"};
121   int32_t constexpr kWorkers = 4;
122   InitRabitContext(msg, kWorkers);
123   auto world = rabit::GetWorldSize();
124   if (world != 1) {
125     CHECK_EQ(world, kWorkers);
126   } else {
127     LOG(WARNING) << msg;
128     return;
129   }
130 
131   constexpr size_t kRows = 1000, kCols = 100;
132   RunWithSeedsAndBins(
133       kRows, [=](int32_t seed, size_t n_bins, MetaInfo const &info) {
134         auto rank = rabit::GetRank();
135         HostDeviceVector<float> storage;
136         auto m = RandomDataGenerator{kRows, kCols, 0}
137                      .Device(0)
138                      .Seed(rank + seed)
139                      .GenerateDMatrix();
140         auto cuts = SketchOnDMatrix(m.get(), n_bins);
141         std::vector<float> cut_values(cuts.Values().size() * world, 0);
142         std::vector<
143             typename std::remove_reference_t<decltype(cuts.Ptrs())>::value_type>
144             cut_ptrs(cuts.Ptrs().size() * world, 0);
145         std::vector<float> cut_min_values(cuts.MinValues().size() * world, 0);
146 
147         size_t value_size = cuts.Values().size();
148         rabit::Allreduce<rabit::op::Max>(&value_size, 1);
149         size_t ptr_size = cuts.Ptrs().size();
150         rabit::Allreduce<rabit::op::Max>(&ptr_size, 1);
151         CHECK_EQ(ptr_size, kCols + 1);
152         size_t min_value_size = cuts.MinValues().size();
153         rabit::Allreduce<rabit::op::Max>(&min_value_size, 1);
154         CHECK_EQ(min_value_size, kCols);
155 
156         size_t value_offset = value_size * rank;
157         std::copy(cuts.Values().begin(), cuts.Values().end(),
158                   cut_values.begin() + value_offset);
159         size_t ptr_offset = ptr_size * rank;
160         std::copy(cuts.Ptrs().cbegin(), cuts.Ptrs().cend(),
161                   cut_ptrs.begin() + ptr_offset);
162         size_t min_values_offset = min_value_size * rank;
163         std::copy(cuts.MinValues().cbegin(), cuts.MinValues().cend(),
164                   cut_min_values.begin() + min_values_offset);
165 
166         rabit::Allreduce<rabit::op::Sum>(cut_values.data(), cut_values.size());
167         rabit::Allreduce<rabit::op::Sum>(cut_ptrs.data(), cut_ptrs.size());
168         rabit::Allreduce<rabit::op::Sum>(cut_min_values.data(), cut_min_values.size());
169 
170         for (int32_t i = 0; i < world; i++) {
171           for (size_t j = 0; j < value_size; ++j) {
172             size_t idx = i * value_size + j;
173             ASSERT_NEAR(cuts.Values().at(j), cut_values.at(idx), kRtEps);
174           }
175 
176           for (size_t j = 0; j < ptr_size; ++j) {
177             size_t idx = i * ptr_size + j;
178             ASSERT_EQ(cuts.Ptrs().at(j), cut_ptrs.at(idx));
179           }
180 
181           for (size_t j = 0; j < min_value_size; ++j) {
182             size_t idx = i * min_value_size + j;
183             ASSERT_EQ(cuts.MinValues().at(j), cut_min_values.at(idx));
184           }
185         }
186       });
187   rabit::Finalize();
188 #endif  // defined(__unix__)
189 }
190 }  // namespace common
191 }  // namespace xgboost
192