1 #ifndef BMSPARSEVEC_PARALLEL__H__INCLUDED__ 2 #define BMSPARSEVEC_PARALLEL__H__INCLUDED__ 3 /* 4 Copyright(c) 2020 Anatoliy Kuznetsov(anatoliy_kuznetsov at yahoo.com) 5 6 Licensed under the Apache License, Version 2.0 (the "License"); 7 you may not use this file except in compliance with the License. 8 You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12 Unless required by applicable law or agreed to in writing, software 13 distributed under the License is distributed on an "AS IS" BASIS, 14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 See the License for the specific language governing permissions and 16 limitations under the License. 17 18 For more information please visit: http://bitmagic.io 19 */ 20 21 /*! \file bmsparsevec_parallel.h 22 \brief Parallel planner for operations with sparse vectors 23 */ 24 25 #include "bmsparsevec_serial.h" 26 27 namespace bm 28 { 29 30 /** 31 Builder class to prepare a batch of tasks for parallel optimization of 32 a sparse vector 33 @ingroup bmtasks 34 */ 35 template<typename SVect, typename Lock> 36 class optimize_plan_builder 37 { 38 public: 39 typedef SVect sparse_vector_type; 40 typedef Lock lock_type; 41 42 typedef typename sparse_vector_type::bvector_type bvector_type; 43 typedef typename bvector_type::allocator_type allocator_type; 44 typedef typename bvector_type::optmode optmode_type; 45 typedef typename sparse_vector_type::statistics sv_statistics_type; 46 47 struct task_batch : public bm::task_batch<allocator_type> 48 { 49 typedef bm::task_batch<allocator_type> parent_type; 50 typedef typename parent_type::task_vector_type task_vector_type; 51 }; 52 build_plan(task_batch & batch,sparse_vector_type & sv,typename bvector_type::optmode opt_mode,typename sparse_vector_type::statistics * st)53 void build_plan(task_batch& batch, 54 sparse_vector_type& sv, 55 typename bvector_type::optmode opt_mode, 56 typename sparse_vector_type::statistics* st) 57 { 58 typename task_batch::task_vector_type& tv = batch.get_task_vector(); 59 auto rsize = sv.get_bmatrix().rows(); 60 tv.reserve(rsize); 61 for (unsigned k = 0; k < rsize; ++k) 62 { 63 bvector_type* bv = sv.get_bmatrix().get_row(k); 64 if (bv) 65 { 66 bm::task_description& tdescr = tv.add(); 67 tdescr.init(task_run, (void*)&tdescr, 68 (void*)bv, (void*)st, opt_mode); 69 } 70 } // for 71 } 72 73 74 protected: 75 /// Task execution Entry Point 76 /// @internal task_run(void * argp)77 static void* task_run(void* argp) 78 { 79 if (!argp) 80 return 0; 81 bm::task_description* tdescr = static_cast<bm::task_description*>(argp); 82 83 bvector_type* bv = static_cast<bvector_type*>(tdescr->ctx0); 84 sv_statistics_type* st = static_cast<sv_statistics_type*>(tdescr->ctx1); 85 optmode_type opt_mode = static_cast<optmode_type>(tdescr->param0); 86 87 typename bvector_type::statistics stbv; 88 stbv.reset(); 89 BM_DECLARE_TEMP_BLOCK(tb) 90 bv->optimize(tb, opt_mode, &stbv); 91 92 if (st) 93 { 94 static lock_type lk; 95 bm::lock_guard<lock_type> lg(lk); 96 st->add(stbv); 97 } 98 return 0; 99 } 100 }; 101 102 /** 103 Parallel plan builder for the XOR filter scanner 104 @ingroup bmtasks 105 */ 106 template<typename BV> 107 class compute_sim_matrix_plan_builder 108 { 109 public: 110 typedef BV bvector_type; 111 typedef typename BV::size_type size_type; 112 typedef typename bvector_type::allocator_type allocator_type; 113 typedef bm::bv_ref_vector<BV> bv_ref_vector_type; 114 115 116 struct task_batch : public bm::task_batch<allocator_type> 117 { 118 typedef bm::task_batch<allocator_type> parent_type; 119 typedef typename parent_type::task_vector_type task_vector_type; 120 }; 121 build_plan(task_batch & batch,bm::xor_sim_model<BV> & sim_model,const bv_ref_vector_type & ref_vect,const bm::xor_sim_params & xs_params)122 void build_plan(task_batch& batch, 123 bm::xor_sim_model<BV>& sim_model, 124 const bv_ref_vector_type& ref_vect, 125 const bm::xor_sim_params& xs_params) 126 { 127 sim_model.bv_blocks.clear(true); 128 ref_vect.build_nb_digest_and_xor_matrix(sim_model.matr, 129 sim_model.bv_blocks); 130 131 typename bvector_type::size_type nb_count = sim_model.bv_blocks.count(); 132 typename task_batch::task_vector_type& tv = batch.get_task_vector(); 133 tv.reserve(nb_count); 134 135 typename bvector_type::enumerator en(sim_model.bv_blocks); 136 for (size_type col = 0; en.valid(); ++en, ++col) 137 { 138 size_type nb = *en; 139 bm::task_description& tdescr = tv.add(); 140 tdescr.init(task_run, (void*)&tdescr, 141 (void*)&sim_model, (void*)&ref_vect, nb); 142 tdescr.payload0.u64 = col; // rank 143 tdescr.payload1.void_ptr = (void*)&xs_params; 144 } // for en 145 } 146 147 protected: 148 149 /// Task execution Entry Point 150 /// @internal task_run(void * argp)151 static void* task_run(void* argp) 152 { 153 thread_local bm::xor_scanner<BV> xor_scan; 154 155 if (!argp) 156 return 0; 157 bm::task_description* tdescr = 158 static_cast<bm::task_description*>(argp); 159 bm::xor_sim_model<BV>* sim_model = 160 static_cast<bm::xor_sim_model<BV>*>(tdescr->ctx0); 161 const bv_ref_vector_type* bv_ref = 162 static_cast<const bv_ref_vector_type*>(tdescr->ctx1); 163 164 xor_scan.set_ref_vector(bv_ref); 165 size_type nb = (size_type) tdescr->param0; 166 size_type rank = (size_type) tdescr->payload0.u64; 167 const bm::xor_sim_params* params = 168 static_cast<const bm::xor_sim_params*>(tdescr->payload1.void_ptr); 169 170 xor_scan.sync_nb_vect(); 171 xor_scan.compute_sim_model(*sim_model, nb, rank, *params); 172 return 0; 173 } 174 }; 175 176 177 /** 178 Parallel plan builder for succinct sparse vector serialization 179 180 @sa sparse_vector_serializer 181 @ingroup bmtasks 182 */ 183 template<typename SV> 184 class sv_serialization_plan_builder 185 { 186 public: 187 typedef SV sparse_vector_type; 188 typedef typename SV::bvector_type bvector_type; 189 typedef typename SV::size_type size_type; 190 typedef typename bvector_type::allocator_type allocator_type; 191 typedef bm::bv_ref_vector<bvector_type> bv_ref_vector_type; 192 typedef bm::xor_sim_model<bvector_type> xor_sim_model_type; 193 194 195 struct serialization_params 196 { serialization_paramsserialization_params197 serialization_params() 198 : sb_bookmarks_(false), 199 sb_range_(0), 200 compression_level_(bm::set_compression_default), 201 bv_ref_ptr_(0), sim_model_ptr_(0) 202 {} 203 204 bool sb_bookmarks_; ///< Bookmarks flag 205 unsigned sb_range_; ///< Desired bookmarks interval 206 unsigned compression_level_; 207 208 const bv_ref_vector_type* bv_ref_ptr_; 209 const xor_sim_model_type* sim_model_ptr_; 210 }; 211 212 struct task_batch : public bm::task_batch<allocator_type> 213 { 214 typedef bm::task_batch<allocator_type> parent_type; 215 typedef typename parent_type::task_vector_type task_vector_type; 216 217 serialization_params s_params; 218 }; 219 220 public: sv_serialization_plan_builder()221 sv_serialization_plan_builder() 222 {} 223 224 void set_bookmarks(bool enable, unsigned bm_interval = 256) BMNOEXCEPT 225 { s_params_.sb_bookmarks_ = enable; s_params_.sb_range_ = bm_interval; } 226 set_xor_ref(const bv_ref_vector_type * bv_ref_ptr)227 void set_xor_ref(const bv_ref_vector_type* bv_ref_ptr) BMNOEXCEPT 228 { s_params_.bv_ref_ptr_ = bv_ref_ptr; } 229 set_sim_model(const xor_sim_model_type * sim_model)230 void set_sim_model(const xor_sim_model_type* sim_model) BMNOEXCEPT 231 { s_params_.sim_model_ptr_ = sim_model; } 232 233 build_plan(task_batch & batch,sparse_vector_serial_layout<SV> & sv_layout,const sparse_vector_type & sv)234 void build_plan(task_batch& batch, 235 sparse_vector_serial_layout<SV>& sv_layout, 236 const sparse_vector_type& sv) 237 { 238 typename task_batch::task_vector_type& tv = batch.get_task_vector(); 239 unsigned planes = sv.stored_planes(); 240 tv.reserve(planes + 1); // +1 for finalization task 241 242 batch.s_params = s_params_; 243 244 for (unsigned i = 0; i < planes; ++i) 245 { 246 typename SV::bvector_type_const_ptr bv = sv.get_plane(i); 247 if (!bv) // empty plane 248 { 249 sv_layout.set_plane(i, 0, 0); 250 continue; 251 } 252 253 bm::task_description& tdescr = tv.add(); 254 tdescr.init(task_run, (void*)&tdescr, 255 (void*)bv, (void*)&batch.s_params, 0); 256 tdescr.ret = (void*)&sv_layout; 257 258 if (s_params_.bv_ref_ptr_) 259 { 260 BM_ASSERT(batch.s_params.sim_model_ptr_); 261 tdescr.payload0.u32 = 262 (unsigned)s_params_.bv_ref_ptr_->find_bv(bv); 263 BM_ASSERT(tdescr.payload0.u32 264 != s_params_.bv_ref_ptr_->not_found()); 265 } 266 else 267 { 268 // ref vector not set: see set_xor_ref() 269 BM_ASSERT(!batch.s_params.sim_model_ptr_); 270 } 271 272 } // for i 273 274 // Add barrier task at the end to finalize the compression 275 276 bm::task_description& tdescr = tv.add(); 277 tdescr.init(task_run_final, (void*)&tdescr, 278 (void*)0, (void*)&batch.s_params, 0); 279 tdescr.flags = bm::task_description::barrier_ok; 280 tdescr.ret = (void*)&sv_layout; 281 } 282 protected: 283 /// Task execution Entry Point 284 /// @internal task_run(void * argp)285 static void* task_run(void* argp) 286 { 287 if (!argp) 288 return 0; 289 //TODO: full implementation 290 return 0; 291 } 292 task_run_final(void * argp)293 static void* task_run_final(void* argp) 294 { 295 if (!argp) 296 return 0; 297 //TODO: full implementation 298 return 0; 299 } 300 protected: 301 serialization_params s_params_; 302 }; 303 304 } // namespace bm 305 306 #endif 307