1 #ifndef BMSPARSEVEC_PARALLEL__H__INCLUDED__
2 #define BMSPARSEVEC_PARALLEL__H__INCLUDED__
3 /*
4 Copyright(c) 2020 Anatoliy Kuznetsov(anatoliy_kuznetsov at yahoo.com)
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10     http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 
18 For more information please visit:  http://bitmagic.io
19 */
20 
21 /*! \file bmsparsevec_parallel.h
22     \brief Parallel planner for operations with sparse vectors
23 */
24 
25 #include "bmsparsevec_serial.h"
26 
27 namespace bm
28 {
29 
30 /**
31     Builder class to prepare a batch of tasks for parallel optimization of
32     a sparse vector
33     @ingroup bmtasks
34  */
35 template<typename SVect, typename Lock>
36 class optimize_plan_builder
37 {
38 public:
39     typedef SVect                                     sparse_vector_type;
40     typedef Lock                                      lock_type;
41 
42     typedef typename sparse_vector_type::bvector_type bvector_type;
43     typedef typename bvector_type::allocator_type     allocator_type;
44     typedef typename bvector_type::optmode            optmode_type;
45     typedef typename sparse_vector_type::statistics   sv_statistics_type;
46 
47     struct task_batch : public bm::task_batch<allocator_type>
48     {
49         typedef bm::task_batch<allocator_type>          parent_type;
50         typedef typename parent_type::task_vector_type  task_vector_type;
51     };
52 
build_plan(task_batch & batch,sparse_vector_type & sv,typename bvector_type::optmode opt_mode,typename sparse_vector_type::statistics * st)53     void build_plan(task_batch& batch,
54                     sparse_vector_type& sv,
55                     typename bvector_type::optmode opt_mode,
56                     typename sparse_vector_type::statistics* st)
57     {
58         typename task_batch::task_vector_type& tv = batch.get_task_vector();
59         auto rsize = sv.get_bmatrix().rows();
60         tv.reserve(rsize);
61         for (unsigned k = 0; k < rsize; ++k)
62         {
63             bvector_type* bv = sv.get_bmatrix().get_row(k);
64             if (bv)
65             {
66                 bm::task_description& tdescr = tv.add();
67                 tdescr.init(task_run, (void*)&tdescr,
68                             (void*)bv, (void*)st, opt_mode);
69             }
70         } // for
71     }
72 
73 
74 protected:
75     /// Task execution Entry Point
76     /// @internal
task_run(void * argp)77     static void* task_run(void* argp)
78     {
79         if (!argp)
80             return 0;
81         bm::task_description* tdescr = static_cast<bm::task_description*>(argp);
82 
83         bvector_type* bv = static_cast<bvector_type*>(tdescr->ctx0);
84         sv_statistics_type* st = static_cast<sv_statistics_type*>(tdescr->ctx1);
85         optmode_type opt_mode = static_cast<optmode_type>(tdescr->param0);
86 
87         typename bvector_type::statistics stbv;
88         stbv.reset();
89         BM_DECLARE_TEMP_BLOCK(tb)
90         bv->optimize(tb, opt_mode, &stbv);
91 
92         if (st)
93         {
94             static lock_type lk;
95             bm::lock_guard<lock_type> lg(lk);
96             st->add(stbv);
97         }
98         return 0;
99     }
100 };
101 
102 /**
103     Parallel plan builder for the XOR filter scanner
104     @ingroup bmtasks
105  */
106 template<typename BV>
107 class compute_sim_matrix_plan_builder
108 {
109 public:
110     typedef BV                                       bvector_type;
111     typedef typename BV::size_type                   size_type;
112     typedef typename bvector_type::allocator_type    allocator_type;
113     typedef bm::bv_ref_vector<BV>                    bv_ref_vector_type;
114 
115 
116     struct task_batch : public bm::task_batch<allocator_type>
117     {
118         typedef bm::task_batch<allocator_type>          parent_type;
119         typedef typename parent_type::task_vector_type  task_vector_type;
120     };
121 
build_plan(task_batch & batch,bm::xor_sim_model<BV> & sim_model,const bv_ref_vector_type & ref_vect,const bm::xor_sim_params & xs_params)122     void build_plan(task_batch& batch,
123                     bm::xor_sim_model<BV>& sim_model,
124                     const bv_ref_vector_type& ref_vect,
125                     const bm::xor_sim_params& xs_params)
126     {
127         sim_model.bv_blocks.clear(true);
128         ref_vect.build_nb_digest_and_xor_matrix(sim_model.matr,
129                                                  sim_model.bv_blocks);
130 
131         typename bvector_type::size_type nb_count = sim_model.bv_blocks.count();
132         typename task_batch::task_vector_type& tv = batch.get_task_vector();
133         tv.reserve(nb_count);
134 
135         typename bvector_type::enumerator en(sim_model.bv_blocks);
136         for (size_type col = 0; en.valid(); ++en, ++col)
137         {
138             size_type nb = *en;
139             bm::task_description& tdescr = tv.add();
140             tdescr.init(task_run, (void*)&tdescr,
141                         (void*)&sim_model, (void*)&ref_vect, nb);
142             tdescr.payload0.u64 = col; // rank
143             tdescr.payload1.void_ptr = (void*)&xs_params;
144         } // for en
145     }
146 
147 protected:
148 
149     /// Task execution Entry Point
150     /// @internal
task_run(void * argp)151     static void* task_run(void* argp)
152     {
153         thread_local bm::xor_scanner<BV> xor_scan;
154 
155         if (!argp)
156             return 0;
157         bm::task_description* tdescr =
158             static_cast<bm::task_description*>(argp);
159         bm::xor_sim_model<BV>* sim_model =
160             static_cast<bm::xor_sim_model<BV>*>(tdescr->ctx0);
161         const bv_ref_vector_type* bv_ref =
162             static_cast<const bv_ref_vector_type*>(tdescr->ctx1);
163 
164         xor_scan.set_ref_vector(bv_ref);
165         size_type nb   = (size_type) tdescr->param0;
166         size_type rank = (size_type) tdescr->payload0.u64;
167         const bm::xor_sim_params* params =
168             static_cast<const bm::xor_sim_params*>(tdescr->payload1.void_ptr);
169 
170         xor_scan.sync_nb_vect();
171         xor_scan.compute_sim_model(*sim_model, nb, rank, *params);
172         return 0;
173     }
174 };
175 
176 
177 /**
178     Parallel plan builder for succinct sparse vector serialization
179 
180     @sa sparse_vector_serializer
181     @ingroup bmtasks
182  */
183 template<typename SV>
184 class sv_serialization_plan_builder
185 {
186 public:
187     typedef SV                                       sparse_vector_type;
188     typedef typename SV::bvector_type                bvector_type;
189     typedef typename SV::size_type                   size_type;
190     typedef typename bvector_type::allocator_type    allocator_type;
191     typedef bm::bv_ref_vector<bvector_type>          bv_ref_vector_type;
192     typedef bm::xor_sim_model<bvector_type>          xor_sim_model_type;
193 
194 
195     struct serialization_params
196     {
serialization_paramsserialization_params197         serialization_params()
198         : sb_bookmarks_(false),
199           sb_range_(0),
200           compression_level_(bm::set_compression_default),
201           bv_ref_ptr_(0), sim_model_ptr_(0)
202         {}
203 
204         bool            sb_bookmarks_; ///< Bookmarks flag
205         unsigned        sb_range_;     ///< Desired bookmarks interval
206         unsigned        compression_level_;
207 
208         const bv_ref_vector_type*   bv_ref_ptr_;
209         const xor_sim_model_type*   sim_model_ptr_;
210     };
211 
212     struct task_batch : public bm::task_batch<allocator_type>
213     {
214         typedef bm::task_batch<allocator_type>          parent_type;
215         typedef typename parent_type::task_vector_type  task_vector_type;
216 
217         serialization_params s_params;
218     };
219 
220 public:
sv_serialization_plan_builder()221     sv_serialization_plan_builder()
222     {}
223 
224     void set_bookmarks(bool enable, unsigned bm_interval = 256) BMNOEXCEPT
225         { s_params_.sb_bookmarks_ = enable; s_params_.sb_range_ = bm_interval; }
226 
set_xor_ref(const bv_ref_vector_type * bv_ref_ptr)227     void set_xor_ref(const bv_ref_vector_type* bv_ref_ptr) BMNOEXCEPT
228         {  s_params_.bv_ref_ptr_ = bv_ref_ptr; }
229 
set_sim_model(const xor_sim_model_type * sim_model)230     void set_sim_model(const xor_sim_model_type* sim_model) BMNOEXCEPT
231         { s_params_.sim_model_ptr_ = sim_model; }
232 
233 
build_plan(task_batch & batch,sparse_vector_serial_layout<SV> & sv_layout,const sparse_vector_type & sv)234     void build_plan(task_batch& batch,
235                     sparse_vector_serial_layout<SV>&  sv_layout,
236                     const sparse_vector_type& sv)
237     {
238         typename task_batch::task_vector_type& tv = batch.get_task_vector();
239         unsigned planes = sv.stored_planes();
240         tv.reserve(planes + 1); // +1 for finalization task
241 
242         batch.s_params = s_params_;
243 
244         for (unsigned i = 0; i < planes; ++i)
245         {
246             typename SV::bvector_type_const_ptr bv = sv.get_plane(i);
247             if (!bv)  // empty plane
248             {
249                 sv_layout.set_plane(i, 0, 0);
250                 continue;
251             }
252 
253             bm::task_description& tdescr = tv.add();
254             tdescr.init(task_run, (void*)&tdescr,
255                         (void*)bv, (void*)&batch.s_params, 0);
256             tdescr.ret = (void*)&sv_layout;
257 
258             if (s_params_.bv_ref_ptr_)
259             {
260                 BM_ASSERT(batch.s_params.sim_model_ptr_);
261                 tdescr.payload0.u32 =
262                     (unsigned)s_params_.bv_ref_ptr_->find_bv(bv);
263                 BM_ASSERT(tdescr.payload0.u32
264                           != s_params_.bv_ref_ptr_->not_found());
265             }
266             else
267             {
268                 // ref vector not set: see set_xor_ref()
269                 BM_ASSERT(!batch.s_params.sim_model_ptr_);
270             }
271 
272         } // for i
273 
274         // Add barrier task at the end to finalize the compression
275 
276         bm::task_description& tdescr = tv.add();
277         tdescr.init(task_run_final, (void*)&tdescr,
278                     (void*)0, (void*)&batch.s_params, 0);
279         tdescr.flags = bm::task_description::barrier_ok;
280         tdescr.ret = (void*)&sv_layout;
281     }
282 protected:
283     /// Task execution Entry Point
284     /// @internal
task_run(void * argp)285     static void* task_run(void* argp)
286     {
287         if (!argp)
288             return 0;
289         //TODO: full implementation
290         return 0;
291     }
292 
task_run_final(void * argp)293     static void* task_run_final(void* argp)
294     {
295         if (!argp)
296             return 0;
297         //TODO: full implementation
298         return 0;
299     }
300 protected:
301     serialization_params s_params_;
302 };
303 
304 } // namespace bm
305 
306 #endif
307