1 #ifndef STREAMITER__HPP
2 #define STREAMITER__HPP
3
4 /* $Id: streamiter.hpp 609562 2020-06-03 19:09:59Z whlavina $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Authors: Andrei Gourianov, Alexander Astashyn
30 *
31 * File Description:
32 * Input stream iterators
33 * Please note:
34 * This API requires multi-threading
35 */
36
37 #include <corelib/ncbistd.hpp>
38 #include <corelib/ncbithr.hpp>
39 #include <serial/objistr.hpp>
40 #include <serial/objectio.hpp>
41
42 #include <queue>
43 #include <future>
44 #include <thread>
45 #include <mutex>
46 #include <condition_variable>
47
48
49 /** @addtogroup ObjStreamSupport
50 *
51 * @{
52 */
53
54
55 BEGIN_NCBI_SCOPE
56
57 #if defined(NCBI_THREADS)
58
59 namespace ns_ObjectIStreamFilterIterator {
60 template<typename TRoot>
61 TMemberIndex xxx_MemberIndex(const string& mem_name);
62 }
63 template<typename...>
64 class CObjectIStreamAsyncIterator;
65
66 /////////////////////////////////////////////////////////////////////////////
67 /// CObjectIStreamIterator
68 ///
69 /// Synchronously read multiple same-type data objects from an input stream
70 /// with optional filtering.
71 /// @sa CObjectIStreamAsyncIterator
72 ///
73 /// The algorithm assumes that the input stream on its top level consists
74 /// exclusively of one or more serial objects of type TRoot.
75 ///
76 /// There are two flavors of this template:
77 /// - CObjectIStreamIterator<TRoot> - iterate through the top-level serial
78 /// objects of type TRoot.
79 /// - CObjectIStreamIterator<TRoot, TChild> - iterate through serial objects
80 /// of type TChild which are contained within the top-level serial objects
81 /// of type TRoot.
82 ///
83 /// Usage:
84 /// @code
85 ///
86 /// CObjectIStream istr ....;
87 ///
88 /// for (CSeq_entry& obj : CObjectIStreamIterator<CSeq_entry>(istr)) {
89 /// // ...do something with "obj" here...
90 /// }
91 ///
92 /// for (CBioseq& obj : CObjectIStreamIterator<CSeq_entry,CBioseq>(istr)) {
93 /// // ...do something with "obj" here...
94 /// }
95 ///
96 ///
97 /// CObjectIStreamIterator<CSeq_entry> it(istr);
98 /// CObjectIStreamIterator<CSeq_entry> eos;
99 /// for_each (it, eos, [](CSeq_entry& obj) { ... });
100 ///
101 /// CObjectIStreamIterator<CSeq_entry,CBioseq> it(istr);
102 /// CObjectIStreamIterator<CSeq_entry,CBioseq> eos;
103 /// for_each (it, eos, [](CBioseq& obj) { ... });
104 ///
105 ///
106 /// for (CObjectIStreamIterator<CSeq_entry> it(istr); it.IsValid(); ++it) {
107 /// CSeq_entry& obj = *it;
108 /// // ...do something with "obj" here...
109 /// }
110 ///
111 /// for (CObjectIStreamIterator<CSeq_entry,CBioseq> it(istr);
112 /// it.IsValid(); ++it) {
113 /// CRef<CBioseq> obj(&*it);
114 /// // ...do something with "obj" here...
115 /// }
116 ///
117 /// for (CObjectIStreamIterator<CSeq_entry, string> it(istr);
118 /// it.IsValid(); ++it) {
119 /// string& obj = *it;
120 /// }
121 ///
122 /// with filtering (only CTaxon1_data objects with optional 'org' member set are valid):
123 /// CObjectIStreamIterator<CTaxon1_data> i(istr, eNoOwnership,
124 /// CObjectIStreamIterator<CTaxon1_data>::CParams().FilterByMember("org",
125 /// [](const CObjectIStream& istr, CTaxon1_data& obj,
126 /// TMemberIndex mem_index, CObjectInfo* mem, void* extra)->bool {
127 /// return mem != nullptr;
128 /// }));
129 ///
130 /// @endcode
131 ///
132 /// @attention
133 /// Input iterators only guarantee validity for single pass algorithms:
134 /// once an input iterator has been incremented, all copies of its previous
135 /// value may be invalidated. It is still possible to keep data objects
136 /// for future use by placing them into CRef containers, when applicable.
137
138 template<typename...>
139 class CObjectIStreamIterator
140 {
141 public:
142
143 /// Object member filtering function
144 ///
145 /// @param istr
146 /// Serial object stream
147 /// @param obj
148 /// Object being checked. It is being populated and is incomplete.
149 /// @param mem_index
150 /// Member index
151 /// @param mem
152 /// Member information. If mem is nullptr, the member is missing in the stream.
153 /// @param extra
154 /// Extra information provided by the caller when constructing iterator.
155 ///
156 /// @attention
157 /// When using filtering with CObjectIStreamAsyncIterator, please note
158 /// that the function may be called from different threads.
159 /// Synchronization of access to shared data, if required, is the responsibility of the client.
160 template<typename TObj>
161 using FMemberFilter = function<bool(const CObjectIStream& istr, TObj& obj,
162 TMemberIndex mem_index, CObjectInfo* mem,
163 void* extra)>;
164 /// Filtering parameters
165 template<typename TObj>
166 class CParams
167 {
168 public:
CParams(void)169 CParams(void)
170 : m_Index(kInvalidMember)
171 , m_FnFilter(nullptr)
172 , m_Extra(nullptr) {
173 }
174 /// Filter by member index
FilterByMember(TMemberIndex index,FMemberFilter<TObj> fn,void * extra=nullptr)175 CParams& FilterByMember(TMemberIndex index, FMemberFilter<TObj> fn, void* extra = nullptr) {
176 m_Index = index; m_FnFilter = fn; m_Extra = extra; return *this;
177 }
178 /// Filter by member name
FilterByMember(const string & mem_name,FMemberFilter<TObj> fn,void * extra=nullptr)179 CParams& FilterByMember(const string& mem_name, FMemberFilter<TObj> fn, void* extra = nullptr) {
180 m_Index = ns_ObjectIStreamFilterIterator::xxx_MemberIndex<TObj>(mem_name);
181 m_FnFilter = fn; m_Extra = extra; return *this;
182 }
183
184 private:
185 // void xxx_MemberIndex(const string& mem_name);
186 TMemberIndex m_Index;
187 FMemberFilter<TObj> m_FnFilter;
188 void* m_Extra;
189 template<typename...> friend class CObjectIStreamIterator;
190 template<typename...> friend class CObjectIStreamAsyncIterator;
191 };
192
193 /// Construct iterator upon an object serialization stream
194 ///
195 /// @param istr
196 /// Serial object stream
197 /// @param own_istr
198 /// eTakeOwnership means that the input stream will be deleted
199 /// automatically when the iterator gets destroyed
200 /// @param params
201 /// Filtering parameters (default is no filtering)
202 template<typename TObj>
203 CObjectIStreamIterator( CObjectIStream& istr,
204 EOwnership deleteInStream = eNoOwnership,
205 const CParams<TObj>& params = CParams<TObj>()) = delete;
206
207 /// Construct end-of-stream (invalid) iterator
208 /// @sa IsValid()
209 CObjectIStreamIterator(void) = delete;
210
211 // Copy-ctor and assignment
212 CObjectIStreamIterator(const CObjectIStreamIterator&);
213 CObjectIStreamIterator& operator=(const CObjectIStreamIterator&);
214
215 /// Advance to the next data object
216 CObjectIStreamIterator& operator++(void);
217
218 // Comparison
219 bool operator==(const CObjectIStreamIterator&) const;
220 bool operator!=(const CObjectIStreamIterator&) const;
221
222 /// Check whether the iterator points to a data
223 /// TRUE if the iterator is constructed upon a serialization stream AND
224 /// if it's not end-of-stream or error-in-stream
225 bool IsValid(void) const;
226
227 /// Return the underlying serial object stream
228 const CObjectIStream& GetObjectIStream(void) const;
229
230 /// Return data object which is currently pointed to by the iterator.
231 /// Throw an exception is the iterator does not point to a data, i.e.
232 /// if IsValid() is FALSE.
233 template<typename TObj> TObj& operator*();
234
235 /// Return pointer to data object which is currently pointed to by the
236 /// iterator.
237 /// Return NULL is the iterator does not point to a data, i.e.
238 /// if IsValid() is FALSE.
239 template<typename TObj> TObj* operator->();
240
241 /// Return self
242 CObjectIStreamIterator& begin(void);
243
244 /// Construct and return end-of-stream iterator
245 CObjectIStreamIterator end(void);
246
247 // dtor
248 ~CObjectIStreamIterator();
249 };
250
251
252 /////////////////////////////////////////////////////////////////////////////
253 /// CObjectIStreamAsyncIterator
254 ///
255 /// Asynchronously read multiple same-type data objects from an input stream
256 /// with optional filtering
257 /// @sa CObjectIStreamIterator
258 ///
259 /// The algorithm assumes that the input stream on its top level consists
260 /// exclusively of one or more serial objects of type TRoot.
261 ///
262 /// There are two flavors of this template:
263 /// - CObjectIStreamAsyncIterator<TRoot> - iterate through the top-level
264 /// serial objects of type TRoot.
265 /// - CObjectIStreamAsyncIterator<TRoot, TChild> - iterate through serial
266 /// objects of type TChild which are contained within the top-level serial
267 /// objects of type TRoot.
268 ///
269 /// @attention
270 /// This iterator supports only the TChild types that are derived from
271 /// CSerialObject class
272 ///
273 /// Usage:
274 /// @code
275 ///
276 /// CObjectIStream istr ....;
277 ///
278 /// for (CSeq_entry& obj : CObjectIStreamAsyncIterator<CSeq_entry>(istr))
279 /// {
280 /// // ...do something with "obj" here...
281 /// }
282 ///
283 /// for (CBioseq& obj : CObjectIStreamAsyncIterator<CSeq_entry,CBioseq>(istr))
284 /// {
285 /// // ...do something with "obj" here...
286 /// }
287 ///
288 ///
289 /// CObjectIStreamAsyncIterator<CSeq_entry> it(istr);
290 /// CObjectIStreamAsyncIterator<CSeq_entry> eos;
291 /// for_each (it, eos, [](CSeq_entry& obj) { ... });
292 ///
293 /// CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> it(istr);
294 /// CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> eos;
295 /// for_each (it, eos, [](CBioseq& obj) { ... });
296 ///
297 ///
298 /// for (CObjectIStreamAsyncIterator<CSeq_entry> it(istr);
299 /// it.IsValid(); ++it) {
300 /// CSeq_entry& obj = *it;
301 /// // ...do something with "obj" here...
302 /// }
303 ///
304 /// for (CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> it(istr);
305 /// it.IsValid(); ++it) {
306 /// CRef<CBioseq> obj(&*it);
307 /// // ...do something with "obj" here...
308 /// }
309 ///
310 /// @endcode
311 ///
312 /// To speed up reading, the iterator offloads data reading, pre-parsing and
313 /// parsing into separate threads. If the data stream contains numerous TRoot
314 /// data records CObjectIStreamAsyncIterator can give up to 2-4 times speed-up
315 /// (wall-clock wise) comparing to the synchronous processing (such as with
316 /// CObjectIStreamIterator) of the same data.
317 ///
318 /// The reader has to read the whole object into memory. If such objects are
319 /// relatively small, then there will be several objects read into a single
320 /// buffer, which is good. If data object is big it still goes into a single
321 /// buffer no matter how big the object is.
322 /// To limit memory consumption, use MaxTotalRawSize parameter.
323 ///
324 /// The iterator does its job asynchronously. It starts working immediately
325 /// after its creation and stops only when it is destroyed.
326 /// Even if you do not use it, it still works in the background, reading and
327 /// parsing the data.
328 ///
329 /// @attention
330 /// Input iterators only guarantee validity for single pass algorithms:
331 /// once an input iterator has been incremented, all copies of its previous
332 /// value may be invalidated. It is still possible to keep data objects
333 /// for future use by placing them into CRef containers, when applicable.
334
335 template<typename...>
336 class CObjectIStreamAsyncIterator
337 {
338 public:
339
340 /// Asynchronous parsing parameters
341 template<typename TObj>
342 class CParams : public CObjectIStreamIterator<>::CParams<TObj>
343 {
344 public:
345 using CParent = CObjectIStreamIterator<>::CParams<TObj>;
346 template<typename TR>
347 using FMemberFilter = CObjectIStreamIterator<>::FMemberFilter<TR>;
348
CParams(void)349 CParams(void)
350 : m_ThreadPolicy(launch::async)
351 , m_MaxParserThreads (16)
352 , m_MaxTotalRawSize (16 * 1024 * 1024)
353 , m_MinRawBufferSize (128 * 1024)
354 , m_SameThread(false) {
355 }
356
357 /// Filter by member index
FilterByMember(TMemberIndex index,FMemberFilter<TObj> fn,void * extra=nullptr)358 CParams& FilterByMember(TMemberIndex index, FMemberFilter<TObj> fn, void* extra = nullptr) {
359 CParent::FilterByMember(index, fn, extra); return *this;
360 }
361
362 /// Filter by member name
FilterByMember(const string & mem_name,FMemberFilter<TObj> fn,void * extra=nullptr)363 CParams& FilterByMember(const string& mem_name, FMemberFilter<TObj> fn, void* extra = nullptr) {
364 CParent::FilterByMember(mem_name, fn, extra); return *this;
365 }
366
367 /// Parsing thread launch policy
LaunchPolicy(launch policy)368 CParams& LaunchPolicy(launch policy) {
369 m_ThreadPolicy = policy; return *this;
370 }
371
372 /// Maximum number of parsing threads
MaxParserThreads(unsigned max_parser_threads)373 CParams& MaxParserThreads(unsigned max_parser_threads) {
374 m_MaxParserThreads = max_parser_threads; return *this;
375 }
376
377 /// Total size of raw data buffers is allowed to grow to this value
MaxTotalRawSize(size_t max_total_raw_size)378 CParams& MaxTotalRawSize(size_t max_total_raw_size) {
379 m_MaxTotalRawSize = max_total_raw_size; return *this;
380 }
381
382 /// Single raw data memory buffer size should be at least this big
MinRawBufferSize(size_t min_raw_buffer_size)383 CParams& MinRawBufferSize(size_t min_raw_buffer_size) {
384 m_MinRawBufferSize = min_raw_buffer_size; return *this;
385 }
386
387 /// Raw data read and its pre-parsing (storing the raw data pertaining
388 /// to a single object and putting it into the parsing queue) to be
389 /// done in the same thread.
390 /// @note
391 /// The default is to do these two tasks in two separate threads,
392 /// which in some cases can give an additional 10-20% performance
393 /// boost, wall-clock time wise.
ReadAndSkipInTheSameThread(bool same_thread)394 CParams& ReadAndSkipInTheSameThread(bool same_thread) {
395 m_SameThread = same_thread; return *this;
396 }
397
398 private:
399 launch m_ThreadPolicy;
400 unsigned m_MaxParserThreads;
401 size_t m_MaxTotalRawSize;
402 size_t m_MinRawBufferSize;
403 bool m_SameThread;
404
405 template<typename...> friend class CObjectIStreamAsyncIterator;
406 };
407
408
409 /// Construct iterator upon an object serialization stream
410 ///
411 /// @param istr
412 /// Serial object stream
413 /// @param own_istr
414 /// eTakeOwnership means that the input stream will be deleted
415 /// automatically when the iterator gets destroyed
416 /// @param params
417 /// Parsing algorithm's parameters
418 /// @param params
419 /// Filtering and parsing parameters (default is no filtering)
420 template<typename TObj>
421 CObjectIStreamAsyncIterator(CObjectIStream& istr,
422 EOwnership own_istr = eNoOwnership,
423 const CParams<TObj>& params = CParams<TObj>()) = delete;
424
425 /// Construct end-of-stream (invalid) iterator
426 /// @sa IsValid()
427 CObjectIStreamAsyncIterator(void) = delete;
428
429 // Copy-ctor and assignment
430 CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator&);
431 CObjectIStreamAsyncIterator& operator=(const CObjectIStreamAsyncIterator&);
432
433 /// Advance to the next data object
434 CObjectIStreamAsyncIterator& operator++(void);
435
436 // Comparison
437 bool operator==(const CObjectIStreamAsyncIterator&) const;
438 bool operator!=(const CObjectIStreamAsyncIterator&) const;
439
440 /// Check whether the iterator points to a data
441 /// TRUE if the iterator is constructed upon a serialization stream AND
442 /// if it's not end-of-stream or error-in-stream
443 bool IsValid(void) const;
444
445 /// Return data object which is currently pointed to by the iterator.
446 /// Throw an exception is the iterator does not point to a data, i.e.
447 /// if IsValid() is FALSE.
448 template<typename TObj> TObj& operator*();
449
450 /// Return pointer to data object which is currently pointed to by the
451 /// iterator.
452 /// Return NULL is the iterator does not point to a data, i.e.
453 /// if IsValid() is FALSE.
454 template<typename TObj> TObj* operator->();
455
456 /// Return self
457 CObjectIStreamAsyncIterator& begin(void);
458
459 /// Construct and return end-of-stream iterator
460 CObjectIStreamAsyncIterator end(void);
461
462 // dtor
463 ~CObjectIStreamAsyncIterator();
464 };
465
466
467
468 /////////////////////////////////////////////////////////////////////////////
469 /////////////////////////////////////////////////////////////////////////////
470 /// template specializations and implementation
471
472 /////////////////////////////////////////////////////////////////////////////
473 /// CObjectIStreamIterator<TRoot>
474
475 template<typename TRoot>
476 class CObjectIStreamIterator<TRoot>
477 : public iterator< input_iterator_tag, TRoot, ptrdiff_t, TRoot*, TRoot& >
478 {
479 public:
480 using CParams = CObjectIStreamIterator<>::CParams<TRoot>;
481
482 CObjectIStreamIterator( CObjectIStream& istr,
483 EOwnership deleteInStream = eNoOwnership,
484 const CParams& params = CParams());
485
486 CObjectIStreamIterator(void);
487 CObjectIStreamIterator(const CObjectIStreamIterator&);
488 CObjectIStreamIterator& operator=(const CObjectIStreamIterator&);
489 ~CObjectIStreamIterator();
490
491 CObjectIStreamIterator& operator++(void);
492 bool operator==(const CObjectIStreamIterator&) const;
493 bool operator!=(const CObjectIStreamIterator&) const;
494 bool IsValid(void) const;
495 const CObjectIStream& GetObjectIStream(void) const;
496
497 TRoot& operator*();
498 TRoot* operator->();
499
500 CObjectIStreamIterator& begin(void);
501 CObjectIStreamIterator end(void);
502
503 protected:
504 struct CData
505 {
506 CData(CObjectIStream& istr, EOwnership deleteInStream,
507 const CParams& params, TTypeInfo tinfo);
508 ~CData(void);
509
510 void x_BeginRead(void);
511 void x_EndRead(void);
512 void x_AcceptData(CObjectIStream& in, const CObjectInfo& type);
513 void x_Next(void);
514 bool x_NextNoFilter(const CObjectInfo& objinfo);
515 bool x_NextSeqWithFilter(const CObjectInfo& objinfo);
516 bool x_NextChoiceWithFilter(const CObjectInfo& objinfo);
517 bool x_NextContainerWithFilter(const CObjectInfo& objinfo);
518
519 CObjectIStream* m_Istr;
520 EOwnership m_Own;
521 CObjectTypeInfo m_ValueType;
522 CObjectInfo m_Value;
523 bool m_HasReader;
524 bool m_EndOfData;
525 CParams m_Params;
526 mutex m_ReaderMutex;
527 condition_variable m_ReaderCv;
528 thread m_Reader;
529 exception_ptr m_ReaderExpt;
530 enum EFilter {
531 eNone,
532 eOneSeq,
533 eOneRandom,
534 eAllSeq,
535 eAllRandom,
536 eOneChoice,
537 eAllChoice,
538 eOneContainer,
539 eAllContainer
540 } m_FilterType;
541
542 template<typename TR>
543 class x_CObjectIStreamIteratorHook : public CSkipObjectHook
544 {
545 public:
x_CObjectIStreamIteratorHook(typename CObjectIStreamIterator<TR>::CData * pthis)546 x_CObjectIStreamIteratorHook(
547 typename CObjectIStreamIterator<TR>::CData* pthis)
548 : m_This(pthis) {
549 }
SkipObject(CObjectIStream & in,const CObjectTypeInfo & type)550 virtual void SkipObject(CObjectIStream& in, const CObjectTypeInfo& type) override {
551 m_This->x_AcceptData(in,CObjectInfo(type.GetTypeInfo()));
552 }
553 private:
554 typename CObjectIStreamIterator<TR>::CData* m_This;
555 };
556 };
557
558 CObjectIStreamIterator( CObjectIStream& istr,
559 const CParams& params, EOwnership deleteInStream);
560 void x_ReaderThread(void);
561 shared_ptr<CData> m_Data;
562 };
563
564 /////////////////////////////////////////////////////////////////////////////
565 /// CObjectIStreamIterator<TRoot, TChild>
566
567 template<typename TRoot, typename TChild>
568 class CObjectIStreamIterator<TRoot, TChild>
569 : public CObjectIStreamIterator<TChild>
570 {
571 public:
572 using CParams = CObjectIStreamIterator<>::CParams<TChild>;
573
574 CObjectIStreamIterator( CObjectIStream& istr,
575 EOwnership deleteInStream = eNoOwnership,
576 const CParams& params = CParams());
577
578 CObjectIStreamIterator(void);
579 CObjectIStreamIterator(const CObjectIStreamIterator&);
580 CObjectIStreamIterator& operator=(const CObjectIStreamIterator&);
581 ~CObjectIStreamIterator();
582
583 CObjectIStreamIterator& operator++(void);
584 CObjectIStreamIterator& begin(void);
585 CObjectIStreamIterator end(void);
586
587 protected:
588 using CParent = CObjectIStreamIterator<TChild>;
589 void x_ReaderThread(void);
590
591 template<typename TR>
592 class x_CObjectIStreamIteratorReadHook : public CReadObjectHook
593 {
594 public:
x_CObjectIStreamIteratorReadHook(typename CObjectIStreamIterator<TR>::CData * pthis)595 x_CObjectIStreamIteratorReadHook(
596 typename CObjectIStreamIterator<TR>::CData* pthis)
597 : m_This(pthis) {
598 }
ReadObject(CObjectIStream & in,const CObjectInfo & type)599 virtual void ReadObject(CObjectIStream& in, const CObjectInfo& type) override {
600 m_This->x_AcceptData(in,type);
601 }
602 private:
603 typename CObjectIStreamIterator<TR>::CData* m_This;
604 };
605 };
606
607
608 /////////////////////////////////////////////////////////////////////////////
609 // helpers
610
611 namespace ns_ObjectIStreamFilterIterator {
612
613 template<typename TRoot>
614 typename enable_if< is_base_of< CSerialObject, TRoot>::value, TTypeInfo>::type
xxx_GetTypeInfo(void)615 xxx_GetTypeInfo(void)
616 {
617 return TRoot::GetTypeInfo();
618 }
619
620 template<typename TRoot>
621 //typename enable_if< !is_base_of< CSerialObject, TRoot>::value, TTypeInfo>::type
622 typename enable_if< is_pod<TRoot>::value || is_convertible<TRoot, std::string>::value, TTypeInfo>::type
xxx_GetTypeInfo(void)623 xxx_GetTypeInfo(void)
624 {
625 return CStdTypeInfo<TRoot>::GetTypeInfo();
626 }
627
628 template<typename TRoot>
xxx_MemberIndex(const string & mem_name)629 TMemberIndex xxx_MemberIndex(const string& mem_name)
630 {
631 TTypeInfo tinfo = xxx_GetTypeInfo<TRoot>();
632 ETypeFamily type = tinfo->GetTypeFamily();
633 if (type == eTypeFamilyClass || type == eTypeFamilyChoice) {
634 const CClassTypeInfoBase* cinfo = CTypeConverter<CClassTypeInfoBase>::SafeCast(tinfo);
635 return cinfo->GetItems().Find(mem_name);
636 }
637 return kInvalidMember;
638 }
639
640 } // ns_ObjectIStreamFilterIterator
641
642 #if 0
643 template<typename...>
644 template<typename TObj>
645 void
646 CObjectIStreamIterator<>::CParams<TObj>::xxx_MemberIndex(const string& mem_name) {
647 TTypeInfo tinfo = ns_ObjectIStreamFilterIterator::xxx_GetTypeInfo<TObj>();
648 ETypeFamily type = tinfo->GetTypeFamily();
649 if (type == eTypeFamilyClass || type == eTypeFamilyChoice) {
650 const CClassTypeInfoBase* cinfo = CTypeConverter<CClassTypeInfoBase>::SafeCast(tinfo);
651 return cinfo->GetItems().Find(mem_name);
652 }
653 return kInvalidMember;
654 }
655 #endif
656
657
658 /////////////////////////////////////////////////////////////////////////////
659 /// CObjectIStreamIterator<TRoot> implementation
660
661 template<typename TRoot>
CObjectIStreamIterator(void)662 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(void)
663 : m_Data(nullptr) {
664 }
665
666 template<typename TRoot>
CObjectIStreamIterator(CObjectIStream & istr,const CParams & params,EOwnership deleteInStream)667 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(
668 CObjectIStream& istr, const CParams& params, EOwnership deleteInStream)
669 : m_Data( new CData(istr, deleteInStream, params,
670 ns_ObjectIStreamFilterIterator::xxx_GetTypeInfo<TRoot>())) {
671 }
672
673 template<typename TRoot>
CObjectIStreamIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)674 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(
675 CObjectIStream& istr, EOwnership deleteInStream, const CParams& params)
676 : CObjectIStreamIterator(istr, params, deleteInStream)
677 {
678 if (m_Data->m_FilterType != CData::eNone && !m_Data->m_EndOfData) {
679 m_Data->m_HasReader = true;
680 m_Data->m_Reader = thread( mem_fun<void, CObjectIStreamIterator<TRoot> >(
681 &CObjectIStreamIterator<TRoot>::x_ReaderThread), this);
682 }
683 ++(*this);
684 }
685
686 template<typename TRoot>
CObjectIStreamIterator(const CObjectIStreamIterator & v)687 CObjectIStreamIterator<TRoot>::CObjectIStreamIterator(
688 const CObjectIStreamIterator& v) : m_Data(v.m_Data) {
689 }
690
691 template<typename TRoot>
692 CObjectIStreamIterator<TRoot>&
operator =(const CObjectIStreamIterator & v)693 CObjectIStreamIterator<TRoot>::operator=(const CObjectIStreamIterator& v) {
694 m_Data = v.m_Data;
695 return *this;
696 }
697
698 template<typename TRoot>
~CObjectIStreamIterator()699 CObjectIStreamIterator<TRoot>::~CObjectIStreamIterator() {
700 }
701
702 template<typename TRoot>
CData(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params,TTypeInfo tinfo)703 CObjectIStreamIterator<TRoot>::CData::CData(
704 CObjectIStream& istr, EOwnership deleteInStream,
705 const CParams& params, TTypeInfo tinfo)
706 : m_Istr(&istr), m_Own(deleteInStream)
707 , m_ValueType(tinfo), m_Value(tinfo), m_HasReader(false)
708 , m_EndOfData(m_Istr->EndOfData()), m_Params(params)
709 {
710 ETypeFamily type = tinfo->GetTypeFamily();
711 if (type != eTypeFamilyClass && type != eTypeFamilyChoice && type != eTypeFamilyContainer) {
712 m_Params.m_FnFilter = nullptr;
713 }
714 m_FilterType = eNone;
715 if (m_Params.m_FnFilter) {
716 if (type == eTypeFamilyClass) {
717 const CClassTypeInfo* cinfo = CTypeConverter<CClassTypeInfo>::SafeCast(tinfo);
718 if (cinfo->Implicit()) {
719 const CItemInfo* itemInfo =
720 cinfo->GetItems().GetItemInfo(cinfo->GetItems().FirstIndex());
721 if (itemInfo->GetTypeInfo()->GetTypeFamily() == eTypeFamilyContainer) {
722 m_FilterType = m_Params.m_Index != kInvalidMember ? eOneContainer : eAllContainer;
723 }
724 }
725 if (m_FilterType == eNone) {
726 bool is_random = cinfo->RandomOrder();
727 if (m_Params.m_Index != kInvalidMember) {
728 m_FilterType = is_random ? eOneRandom : eOneSeq;
729 } else {
730 m_FilterType = is_random ? eAllRandom : eAllSeq;
731 }
732 }
733 } else if (type == eTypeFamilyChoice) {
734 m_FilterType = m_Params.m_Index != kInvalidMember ? eOneChoice : eAllChoice;
735 } else if (type == eTypeFamilyContainer) {
736 m_FilterType = m_Params.m_Index != kInvalidMember ? eOneContainer : eAllContainer;
737 }
738 }
739 }
740
741 template<typename TRoot>
~CData(void)742 CObjectIStreamIterator<TRoot>::CData::~CData(void) {
743 if (m_Reader.joinable()) {
744 m_EndOfData = true;
745 m_ReaderCv.notify_all();
746 m_Reader.join();
747 }
748 if (m_Istr && m_Own == eTakeOwnership) {
749 delete m_Istr;
750 }
751 }
752
753 template<typename TRoot>
754 void
x_BeginRead(void)755 CObjectIStreamIterator<TRoot>::CData::x_BeginRead(void) {
756 unique_lock<mutex> lck( m_ReaderMutex);
757 while (m_Value.GetObjectPtr() != nullptr) {
758 m_ReaderCv.wait(lck);
759 }
760 }
761
762 template<typename TRoot>
763 void
x_EndRead(void)764 CObjectIStreamIterator<TRoot>::CData::x_EndRead(void) {
765 m_Value = CObjectInfo();
766 m_EndOfData = true;
767 m_ReaderCv.notify_one();
768 }
769
770 template<typename TRoot>
771 void
x_ReaderThread()772 CObjectIStreamIterator<TRoot>::x_ReaderThread() {
773 m_Data->x_BeginRead();
774 try {
775 m_Data->m_ValueType.SetLocalSkipHook(*(m_Data->m_Istr), new typename CData::template x_CObjectIStreamIteratorHook<TRoot>(m_Data.get()));
776 while (Serial_FilterSkip(*(m_Data->m_Istr),m_Data->m_ValueType))
777 ;
778 } catch (...) {
779 if (!m_Data->m_EndOfData) {
780 m_Data->m_ReaderExpt = current_exception();
781 }
782 }
783 m_Data->x_EndRead();
784 }
785
786 template<typename TRoot, typename TChild>
787 void
x_ReaderThread()788 CObjectIStreamIterator<TRoot,TChild>::x_ReaderThread() {
789 this->m_Data->x_BeginRead();
790 try {
791 this->m_Data->m_ValueType.SetLocalSkipHook(*(this->m_Data->m_Istr), new typename CParent::CData::template x_CObjectIStreamIteratorHook<TChild>(this->m_Data.get()));
792 this->m_Data->m_ValueType.SetLocalReadHook(*(this->m_Data->m_Istr), new x_CObjectIStreamIteratorReadHook<TChild>(this->m_Data.get()));
793 while (Serial_FilterSkip(*(this->m_Data->m_Istr),CObjectTypeInfo(CType<TRoot>())))
794 ;
795 } catch (...) {
796 if (!this->m_Data->m_EndOfData) {
797 this->m_Data->m_ReaderExpt = current_exception();
798 }
799 }
800 this->m_Data->x_EndRead();
801 }
802
803 template<typename TRoot>
804 void
x_AcceptData(CObjectIStream & in,const CObjectInfo & objinfo)805 CObjectIStreamIterator<TRoot>::CData::x_AcceptData(
806 CObjectIStream& in, const CObjectInfo& objinfo)
807 {
808 if (m_Istr->EndOfData()) {
809 m_EndOfData = true;
810 } else {
811 bool res = false;
812 switch ( m_FilterType) {
813 default:
814 case eNone:
815 res = x_NextNoFilter(objinfo);
816 break;
817 case eOneSeq:
818 case eOneRandom:
819 case eAllSeq:
820 case eAllRandom:
821 res = x_NextSeqWithFilter(objinfo);
822 break;
823 case eOneChoice:
824 case eAllChoice:
825 res = x_NextChoiceWithFilter(objinfo);
826 break;
827 case eOneContainer:
828 case eAllContainer:
829 res = x_NextContainerWithFilter(objinfo);
830 break;
831 }
832 if (res) {
833 unique_lock<mutex> lck(m_ReaderMutex);
834 m_Value = objinfo;
835 m_ReaderCv.notify_one();
836 while (m_Value.GetObjectPtr() != nullptr) {
837 if (m_EndOfData) {
838 NCBI_THROW( CEofException, eEof,
839 "CObjectIStreamIterator: abort data parsing");
840 }
841 m_ReaderCv.wait(lck);
842 }
843 } else {
844 in.SetDiscardCurrObject();
845 }
846 }
847 }
848
849 template<typename TRoot>
850 void
x_Next(void)851 CObjectIStreamIterator<TRoot>::CData::x_Next(void) {
852 unique_lock<mutex> lck(m_ReaderMutex);
853 m_Value = CObjectInfo();
854 m_ReaderCv.notify_one();
855 while (m_Value.GetObjectPtr() == nullptr && !m_EndOfData) {
856 m_ReaderCv.wait(lck);
857 }
858 if (m_ReaderExpt) {
859 rethrow_exception(m_ReaderExpt);
860 }
861 }
862
863 template<typename TRoot>
864 bool
x_NextNoFilter(const CObjectInfo & objinfo)865 CObjectIStreamIterator<TRoot>::CData::x_NextNoFilter(const CObjectInfo& objinfo)
866 {
867 objinfo.GetTypeInfo()->DefaultReadData(*m_Istr, objinfo.GetObjectPtr());
868 return true;
869 }
870
871 template<typename TRoot>
872 bool
x_NextSeqWithFilter(const CObjectInfo & objinfo)873 CObjectIStreamIterator<TRoot>::CData::x_NextSeqWithFilter(const CObjectInfo& objinfo)
874 {
875 TMemberIndex mi = kInvalidMember;
876 set<TMemberIndex> done;
877 bool checked = false;
878 bool valid = true;
879 TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
880
881 for ( CIStreamClassMemberIterator i(*m_Istr, objinfo); i; ++i ) {
882
883 TMemberIndex mi_now = (*i).GetMemberIndex();
884 CObjectInfoMI minfo(objinfo, mi_now);
885
886 if (valid) {
887 // before read - validate missing members
888 switch (m_FilterType) {
889 case eOneRandom:
890 case eAllRandom:
891 default:
892 break;
893 case eOneSeq:
894 if (mi_now > m_Params.m_Index && !checked) {
895 valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
896 checked = true;
897 }
898 break;
899 case eAllSeq:
900 for (++mi; valid && mi < mi_now; ++mi) {
901 valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
902 }
903 break;
904 }
905 }
906
907 // if still valid
908 if (valid) {
909 // read next member
910 i.ReadClassMember(objinfo);
911
912 // after read - validate member
913 switch (m_FilterType) {
914 default: break;
915 case eOneSeq:
916 case eOneRandom:
917 if (mi_now == m_Params.m_Index) {
918 CObjectInfo oi = minfo.GetMember().GetTypeFamily() == eTypeFamilyPointer ?
919 minfo.GetMember().GetPointedObject() : minfo.GetMember();
920 valid = m_Params.m_FnFilter( *m_Istr, obj, mi_now, &oi, m_Params.m_Extra);
921 checked = true;
922 }
923 break;
924 case eAllRandom:
925 done.insert(mi_now);
926 // no break
927 NCBI_FALLTHROUGH;
928 case eAllSeq:
929 {
930 CObjectInfo oi = minfo.GetMember().GetTypeFamily() == eTypeFamilyPointer ?
931 minfo.GetMember().GetPointedObject() : minfo.GetMember();
932 valid = m_Params.m_FnFilter( *m_Istr, obj, mi_now, &oi, m_Params.m_Extra);
933 }
934 break;
935 }
936 } else {
937 // object invalid - skip remaining members
938 i.SkipClassMember();
939 }
940 mi = mi_now;
941 }
942
943 // finally - validate missing members
944 if (valid) {
945 switch (m_FilterType) {
946 default: break;
947 case eOneSeq:
948 case eOneRandom:
949 if (!checked) {
950 valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
951 }
952 break;
953 case eAllSeq:
954 {
955 TMemberIndex mi_last = objinfo.GetClassTypeInfo()->GetItems().LastIndex() + 1;
956 for (++mi; valid && mi < mi_last; ++mi) {
957 valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
958 }
959 }
960 break;
961 case eAllRandom:
962 {
963 mi = objinfo.GetClassTypeInfo()->GetItems().FirstIndex();
964 TMemberIndex mi_last = objinfo.GetClassTypeInfo()->GetItems().LastIndex() + 1;
965 for (; valid && mi < mi_last; ++mi) {
966 if (done.find(mi) == done.end()) {
967 valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
968 }
969 }
970 }
971 break;
972 }
973 }
974 return valid;
975 }
976
977 template<typename TRoot>
978 bool
x_NextChoiceWithFilter(const CObjectInfo & objinfo)979 CObjectIStreamIterator<TRoot>::CData::x_NextChoiceWithFilter(const CObjectInfo& objinfo)
980 {
981 bool valid = true;
982 objinfo.GetTypeInfo()->DefaultReadData(*m_Istr, objinfo.GetObjectPtr());
983 TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
984 CObjectInfoCV cv = objinfo.GetCurrentChoiceVariant();
985 TMemberIndex i = cv.GetVariantIndex();
986 if (i == m_Params.m_Index) {
987 CObjectInfo oi = cv.GetVariant().GetTypeFamily() == eTypeFamilyPointer ?
988 cv.GetVariant().GetPointedObject() : cv.GetVariant();
989 valid = m_Params.m_FnFilter( *m_Istr, obj, i, &oi, m_Params.m_Extra);
990 } else {
991 valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
992 }
993 return valid;
994 }
995
996 template<typename TRoot>
997 bool
x_NextContainerWithFilter(const CObjectInfo & objinfo)998 CObjectIStreamIterator<TRoot>::CData::x_NextContainerWithFilter(const CObjectInfo& objinfo)
999 {
1000 TMemberIndex mi = kInvalidMember;
1001 bool valid = true;
1002 TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
1003
1004 for ( CIStreamContainerIterator i(*m_Istr, objinfo); i; ++i ) {
1005 if (valid) {
1006 CObjectInfo oi(i.ReadElement(objinfo.GetObjectPtr()));
1007 ++mi;
1008 if (oi.GetObjectPtr()) {
1009 if (m_FilterType == eAllContainer || (m_FilterType == eOneContainer && mi == m_Params.m_Index)) {
1010 CObjectInfo oe = oi.GetTypeFamily() == eTypeFamilyPointer ? oi.GetPointedObject() : oi;
1011 valid = m_Params.m_FnFilter( *m_Istr, obj, mi, &oe, m_Params.m_Extra);
1012 }
1013 }
1014 } else {
1015 i.SkipElement();
1016 }
1017 }
1018 return valid;
1019 }
1020
1021 template<typename TRoot>
1022 CObjectIStreamIterator<TRoot>&
operator ++(void)1023 CObjectIStreamIterator<TRoot>::operator++(void) {
1024 if (m_Data.get() != nullptr) {
1025 if (!m_Data->m_HasReader) {
1026 if (m_Data->m_Istr->EndOfData()) {
1027 m_Data.reset();
1028 } else {
1029 m_Data->m_Value = CObjectInfo(m_Data->m_ValueType);
1030 m_Data->m_Istr->Read(m_Data->m_Value);
1031 }
1032 } else {
1033 m_Data->x_Next();
1034 if (m_Data->m_EndOfData) {
1035 m_Data.reset();
1036 }
1037 }
1038 }
1039 return *this;
1040 }
1041
1042 template<typename TRoot>
1043 bool
operator ==(const CObjectIStreamIterator & v) const1044 CObjectIStreamIterator<TRoot>::operator==(
1045 const CObjectIStreamIterator& v) const {
1046 return m_Data.get() == v.m_Data.get();
1047 }
1048
1049 template<typename TRoot>
1050 bool
operator !=(const CObjectIStreamIterator & v) const1051 CObjectIStreamIterator<TRoot>::operator!=(
1052 const CObjectIStreamIterator& v) const {
1053 return m_Data.get() != v.m_Data.get();
1054 }
1055
1056 template<typename TRoot>
IsValid() const1057 bool CObjectIStreamIterator<TRoot>::IsValid() const {
1058 return m_Data.get() != nullptr && m_Data->m_Value.GetObjectPtr() != nullptr;
1059 }
1060
1061 template<typename TRoot>
1062 const CObjectIStream&
GetObjectIStream(void) const1063 CObjectIStreamIterator<TRoot>::GetObjectIStream(void) const {
1064 return *(m_Data->m_Istr);
1065 }
1066
1067 template<typename TRoot>
1068 TRoot&
operator *()1069 CObjectIStreamIterator<TRoot>::operator*() {
1070 return *(TRoot*)(m_Data->m_Value.GetObjectPtr());
1071 }
1072
1073 template<typename TRoot>
1074 TRoot*
operator ->()1075 CObjectIStreamIterator<TRoot>::operator->() {
1076 return IsValid() ? (TRoot*)m_Data->m_Value.GetObjectPtr() : nullptr;
1077 }
1078
1079 template<typename TRoot>
1080 CObjectIStreamIterator<TRoot>&
begin(void)1081 CObjectIStreamIterator<TRoot>::begin(void) {
1082 return *this;
1083 }
1084
1085 template<typename TRoot>
1086 CObjectIStreamIterator<TRoot>
end(void)1087 CObjectIStreamIterator<TRoot>::end(void) {
1088 return CObjectIStreamIterator<TRoot>();
1089 }
1090
1091
1092 /////////////////////////////////////////////////////////////////////////////
1093 /// CObjectIStreamIterator<TRoot, TChild> implementation
1094
1095 template<typename TRoot, typename TChild>
CObjectIStreamIterator(void)1096 CObjectIStreamIterator<TRoot, TChild>::CObjectIStreamIterator(void)
1097 : CParent() {
1098 }
1099
1100 template<typename TRoot, typename TChild>
CObjectIStreamIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)1101 CObjectIStreamIterator<TRoot, TChild>::CObjectIStreamIterator(
1102 CObjectIStream& istr, EOwnership deleteInStream, const CParams& params)
1103 : CParent(istr, params, deleteInStream)
1104 {
1105 if (!this->m_Data->m_EndOfData) {
1106 this->m_Data->m_HasReader = true;
1107 this->m_Data->m_Reader = thread( mem_fun<void, CObjectIStreamIterator<TRoot,TChild> >(
1108 &CObjectIStreamIterator<TRoot,TChild>::x_ReaderThread), this);
1109 }
1110 ++(*this);
1111 }
1112
1113 template<typename TRoot, typename TChild>
CObjectIStreamIterator(const CObjectIStreamIterator & v)1114 CObjectIStreamIterator<TRoot, TChild>::CObjectIStreamIterator(
1115 const CObjectIStreamIterator& v) : CParent(v) {
1116 }
1117
1118 template<typename TRoot, typename TChild>
1119 CObjectIStreamIterator<TRoot, TChild>&
operator =(const CObjectIStreamIterator & v)1120 CObjectIStreamIterator<TRoot, TChild>::operator=(
1121 const CObjectIStreamIterator& v) {
1122 CParent::operator=(v);
1123 return *this;
1124 }
1125
1126 template<typename TRoot, typename TChild>
~CObjectIStreamIterator()1127 CObjectIStreamIterator<TRoot, TChild>::~CObjectIStreamIterator() {
1128 }
1129
1130 template<typename TRoot, typename TChild>
1131 CObjectIStreamIterator<TRoot, TChild>&
operator ++(void)1132 CObjectIStreamIterator<TRoot, TChild>::operator++(void) {
1133 CParent::operator++();
1134 return *this;
1135 }
1136
1137 template<typename TRoot, typename TChild>
1138 CObjectIStreamIterator<TRoot, TChild>&
begin(void)1139 CObjectIStreamIterator<TRoot, TChild>::begin(void) {
1140 return *this;
1141 }
1142
1143 template<typename TRoot, typename TChild>
1144 CObjectIStreamIterator<TRoot, TChild>
end(void)1145 CObjectIStreamIterator<TRoot, TChild>::end(void) {
1146 return CObjectIStreamIterator<TRoot, TChild>();
1147 }
1148
1149
1150 /////////////////////////////////////////////////////////////////////////////
1151 /// CObjectIStreamAsyncIterator<TRoot>
1152
1153 template<typename TRoot>
1154 class CObjectIStreamAsyncIterator<TRoot>
1155 : public iterator< input_iterator_tag, TRoot, ptrdiff_t, TRoot*, TRoot& >
1156 {
1157 public:
1158 using CParams = CObjectIStreamAsyncIterator<>::CParams<TRoot>;
1159
1160 CObjectIStreamAsyncIterator( CObjectIStream& istr,
1161 EOwnership deleteInStream = eNoOwnership,
1162 const CParams& params = CParams());
1163 CObjectIStreamAsyncIterator(void);
1164 CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator&);
1165 CObjectIStreamAsyncIterator& operator=(const CObjectIStreamAsyncIterator&);
1166 ~CObjectIStreamAsyncIterator();
1167
1168 CObjectIStreamAsyncIterator& operator++(void);
1169 bool operator==(const CObjectIStreamAsyncIterator&) const;
1170 bool operator!=(const CObjectIStreamAsyncIterator&) const;
1171 bool IsValid(void) const;
1172
1173 TRoot& operator*();
1174 TRoot* operator->();
1175
1176 CObjectIStreamAsyncIterator& begin(void);
1177 CObjectIStreamAsyncIterator end(void);
1178
1179
1180 protected:
1181 typedef queue< CRef<TRoot> > TObjectsQueue;
1182 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1183 typedef function<TObjectsQueue(CRef<CByteSource>, ESerialDataFormat, const CParams&, TObjectsQueue)> FParserFunction;
1184 #else
1185 typedef function<TObjectsQueue(CRef<CByteSource>, ESerialDataFormat, const CParams&, TObjectsQueue&&)> FParserFunction;
1186 #endif
1187 CObjectIStreamAsyncIterator( CObjectIStream& istr,
1188 EOwnership deleteInStream,
1189 FParserFunction parser,
1190 const CParams& params);
1191 private:
1192 static TObjectsQueue sx_ClearGarbageAndParse(
1193 CRef<CByteSource> bytesource, ESerialDataFormat format,
1194 const CParams& params,
1195 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1196 TObjectsQueue garbage
1197 #else
1198 TObjectsQueue&& garbage
1199 #endif
1200 );
1201
1202 struct CData {
1203 CData(CObjectIStream& istr, EOwnership deleteInStream, FParserFunction parser,
1204 const CParams& params);
1205 ~CData(void);
1206
1207 using future_queue_t = future<TObjectsQueue>;
1208 using futures_queue_t = queue<future_queue_t>;
1209
1210 void x_UpdateObjectsQueue();
1211 void x_UpdateFuturesQueue();
1212 CRef< CByteSource > x_GetNextData(void);
1213 void x_ReaderThread(void);
1214
1215 TObjectsQueue m_ObjectsQueue; // current queue of objects
1216 TObjectsQueue m_GarbageQueue; // popped so-far from objects-queue
1217 futures_queue_t m_FuturesQueue; // queue-of-futures-of-object-queues
1218
1219 CObjectIStream* m_Istr;
1220 EOwnership m_Own;
1221 FParserFunction m_Parser;
1222 size_t m_ParserCount;
1223 size_t m_RawBufferSize;
1224 size_t m_MaxRawSize;
1225 size_t m_CurrentRawSize;
1226 launch m_Policy;
1227 bool m_EndOfData;
1228 CParams m_Params;
1229
1230 mutex m_ReaderMutex;
1231 condition_variable m_ReaderCv;
1232 thread m_Reader;
1233 queue< CRef< CByteSource > > m_ReaderData;
1234 queue< size_t > m_ReaderDataSize;
1235 };
1236 shared_ptr<CData> m_Data;
1237 };
1238
1239
1240 /////////////////////////////////////////////////////////////////////////////
1241 /// CObjectIStreamAsyncIterator<TRoot, TChild>
1242
1243 template<typename TRoot, typename TChild>
1244 class CObjectIStreamAsyncIterator<TRoot, TChild>
1245 : public CObjectIStreamAsyncIterator<TChild>
1246 {
1247 public:
1248 using CParams = CObjectIStreamAsyncIterator<>::CParams<TChild>;
1249
1250 CObjectIStreamAsyncIterator( CObjectIStream& istr,
1251 EOwnership deleteInStream = eNoOwnership,
1252 const CParams& params = CParams());
1253 CObjectIStreamAsyncIterator(void);
1254 CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator&);
1255 CObjectIStreamAsyncIterator& operator=(const CObjectIStreamAsyncIterator&);
1256 ~CObjectIStreamAsyncIterator();
1257
1258 CObjectIStreamAsyncIterator& operator++(void);
1259 CObjectIStreamAsyncIterator& begin(void);
1260 CObjectIStreamAsyncIterator end(void);
1261
1262
1263 private:
1264 using CParent = CObjectIStreamAsyncIterator<TChild>;
1265 using TObjectsQueue = typename CParent::TObjectsQueue;
1266
1267 static TObjectsQueue sx_ClearGarbageAndParse(
1268 CRef<CByteSource> bytesource, ESerialDataFormat format,
1269 const CParams& params,
1270 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1271 TObjectsQueue garbage
1272 #else
1273 TObjectsQueue&& garbage
1274 #endif
1275 );
1276 };
1277
1278 /////////////////////////////////////////////////////////////////////////////
1279 /// CObjectIStreamAsyncIterator<TRoot> implementation
1280
1281 template<typename TRoot>
CObjectIStreamAsyncIterator(void)1282 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(void)
1283 : m_Data(nullptr)
1284 {
1285 }
1286
1287 template<typename TRoot>
CObjectIStreamAsyncIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)1288 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(
1289 CObjectIStream& istr, EOwnership deleteInStream,
1290 const CParams& params)
1291 : CObjectIStreamAsyncIterator( istr, deleteInStream,
1292 &CObjectIStreamAsyncIterator<TRoot>::sx_ClearGarbageAndParse, params)
1293 {
1294 }
1295
1296 template<typename TRoot>
CObjectIStreamAsyncIterator(CObjectIStream & istr,EOwnership deleteInStream,FParserFunction parser,const CParams & params)1297 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(
1298 CObjectIStream& istr, EOwnership deleteInStream,
1299 FParserFunction parser, const CParams& params)
1300 : m_Data(new CData(istr, deleteInStream, parser, params))
1301 {
1302 ++(*this);
1303 }
1304
1305 template<typename TRoot>
CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator & v)1306 CObjectIStreamAsyncIterator<TRoot>::CObjectIStreamAsyncIterator(
1307 const CObjectIStreamAsyncIterator& v)
1308 : m_Data(v.m_Data)
1309 {
1310 }
1311
1312 template<typename TRoot>
1313 CObjectIStreamAsyncIterator<TRoot>&
operator =(const CObjectIStreamAsyncIterator & v)1314 CObjectIStreamAsyncIterator<TRoot>::operator=(
1315 const CObjectIStreamAsyncIterator& v) {
1316 m_Data = v.m_Data;
1317 return *this;
1318 }
1319
1320 template<typename TRoot>
~CObjectIStreamAsyncIterator()1321 CObjectIStreamAsyncIterator<TRoot>::~CObjectIStreamAsyncIterator() {
1322 }
1323
1324 template<typename TRoot>
1325 CObjectIStreamAsyncIterator<TRoot>&
operator ++()1326 CObjectIStreamAsyncIterator<TRoot>::operator++() {
1327 if (m_Data.get() != nullptr) {
1328 do {
1329 m_Data->x_UpdateFuturesQueue();
1330 m_Data->x_UpdateObjectsQueue();
1331 } while (!IsValid() && !m_Data->m_EndOfData);
1332 if (!IsValid()) {
1333 m_Data.reset();
1334 }
1335 }
1336 return *this;
1337 }
1338
1339 template<typename TRoot>
1340 bool
operator ==(const CObjectIStreamAsyncIterator & v) const1341 CObjectIStreamAsyncIterator<TRoot>::operator==(
1342 const CObjectIStreamAsyncIterator& v) const {
1343 return m_Data.get() == v.m_Data.get();
1344 }
1345
1346 template<typename TRoot>
1347 bool
operator !=(const CObjectIStreamAsyncIterator & v) const1348 CObjectIStreamAsyncIterator<TRoot>::operator!=(
1349 const CObjectIStreamAsyncIterator& v) const {
1350 return m_Data.get() != v.m_Data.get();
1351 }
1352
1353 template<typename TRoot>
IsValid() const1354 bool CObjectIStreamAsyncIterator<TRoot>::IsValid() const {
1355 return m_Data.get() != nullptr && !m_Data->m_ObjectsQueue.empty();
1356 }
1357
1358 template<typename TRoot>
1359 TRoot&
operator *()1360 CObjectIStreamAsyncIterator<TRoot>::operator*() {
1361 return m_Data->m_ObjectsQueue.front().GetObject();
1362 }
1363
1364 template<typename TRoot>
1365 TRoot*
operator ->()1366 CObjectIStreamAsyncIterator<TRoot>::operator->() {
1367 return IsValid() ? m_Data->m_ObjectsQueue.front().GetPointer() : nullptr;
1368 }
1369
1370 template<typename TRoot>
1371 CObjectIStreamAsyncIterator<TRoot>&
begin(void)1372 CObjectIStreamAsyncIterator<TRoot>::begin(void) {
1373 return *this;
1374 }
1375
1376 template<typename TRoot>
1377 CObjectIStreamAsyncIterator<TRoot>
end(void)1378 CObjectIStreamAsyncIterator<TRoot>::end(void) {
1379 return CObjectIStreamAsyncIterator<TRoot>();
1380 }
1381
1382
1383 template<typename TRoot>
1384 typename CObjectIStreamAsyncIterator<TRoot>::TObjectsQueue
sx_ClearGarbageAndParse(CRef<CByteSource> bytesource,ESerialDataFormat format,const CParams & params,TObjectsQueue garbage)1385 CObjectIStreamAsyncIterator<TRoot>::sx_ClearGarbageAndParse(
1386 CRef<CByteSource> bytesource,
1387 ESerialDataFormat format,
1388 const CParams& params,
1389 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1390 TObjectsQueue garbage
1391 #else
1392 TObjectsQueue&& garbage
1393 #endif
1394 )
1395 {
1396 {{
1397 TObjectsQueue dummy;
1398 swap(garbage, dummy);
1399 // garbage now gets destroyed, if last-reference
1400 }}
1401
1402 // deserialize objects from bytesource
1403 unique_ptr<CObjectIStream> istr { CObjectIStream::Create(format, *bytesource) };
1404
1405 TObjectsQueue queue;
1406 if (params.m_FnFilter) {
1407 for (TRoot& object : CObjectIStreamIterator<TRoot>( *istr, eNoOwnership, params)) {
1408 queue.push( CRef<TRoot>(&object));
1409 }
1410 } else {
1411 while(!istr->EndOfData()) {
1412 CRef<TRoot> object(new TRoot);
1413 istr->Read(&*object, object->GetThisTypeInfo());
1414 queue.push(object);
1415 }
1416 }
1417 return queue;
1418 }
1419
1420 template<typename TRoot>
CData(CObjectIStream & istr,EOwnership deleteInStream,FParserFunction parser,const CParams & params)1421 CObjectIStreamAsyncIterator<TRoot>::CData::CData(
1422 CObjectIStream& istr, EOwnership deleteInStream, FParserFunction parser,
1423 const CParams& params)
1424 : m_Istr(&istr)
1425 , m_Own(deleteInStream)
1426 , m_Parser(parser)
1427 , m_ParserCount( params.m_MaxParserThreads != 0 ? params.m_MaxParserThreads : 16)
1428 , m_RawBufferSize( params.m_MinRawBufferSize)
1429 , m_MaxRawSize( params.m_SameThread ? 0 : params.m_MaxTotalRawSize)
1430 , m_CurrentRawSize(0)
1431 , m_Policy(params.m_ThreadPolicy)
1432 , m_EndOfData(m_Istr->EndOfData())
1433 , m_Params(params)
1434 {
1435 if (m_MaxRawSize != 0 && !m_EndOfData) {
1436 m_Reader = thread(
1437 mem_fun<void, CObjectIStreamAsyncIterator<TRoot>::CData >(
1438 &CObjectIStreamAsyncIterator<TRoot>::CData::x_ReaderThread), this);
1439 }
1440 }
1441
1442 template<typename TRoot>
~CData()1443 CObjectIStreamAsyncIterator<TRoot>::CData::~CData() {
1444 if (m_Reader.joinable()) {
1445 m_EndOfData = true;
1446 m_ReaderCv.notify_all();
1447 m_Reader.join();
1448 }
1449 if (m_Istr && m_Own == eTakeOwnership) {
1450 delete m_Istr;
1451 }
1452 }
1453
1454 // m_GarbageQueue processing:
1455 //
1456 // When the current object (the one returned by operator*())
1457 // goes out of scope, if it is the last reference, the
1458 // destructor of the object will be called from main
1459 // thread, which is an expensive operation, which
1460 // we want to offload to a different thread - "here are some
1461 // objects - just let them go out of scope"
1462 //
1463 // So before calling m_ObjectsQueue pop we'll save the
1464 // current object in the garbage-queue to prevent it from being
1465 // destructed at this time, and will pass the garbage
1466 // queue to sx_ClearGarbageAndParse (executed asynchrously),
1467 // where the destructors of the garbage-objects will be called
1468 // (as apporpriate, as determined by CRefs going out of scope)
1469
1470 template<typename TRoot>
1471 void
x_UpdateObjectsQueue()1472 CObjectIStreamAsyncIterator<TRoot>::CData::x_UpdateObjectsQueue()
1473 {
1474 // bring the next objects up front; save the garbage
1475 if(!m_ObjectsQueue.empty()) {
1476 m_GarbageQueue.push( m_ObjectsQueue.front());
1477 m_ObjectsQueue.pop();
1478 }
1479
1480 // unpack the next objects-queue from futures-queue if empty
1481 if( m_ObjectsQueue.empty()
1482 && !m_FuturesQueue.empty())
1483 {
1484 m_ObjectsQueue = m_FuturesQueue.front().get();
1485 m_FuturesQueue.pop();
1486 }
1487 }
1488
1489 template<typename TRoot>
1490 void
x_UpdateFuturesQueue()1491 CObjectIStreamAsyncIterator<TRoot>::CData::x_UpdateFuturesQueue()
1492 {
1493 // nothing to deserialize, or already full
1494 if( m_FuturesQueue.size() >= m_ParserCount) {
1495 return;
1496 }
1497 if (m_EndOfData ||
1498 // no raw data ready yet, but we still have work to do
1499 (m_MaxRawSize != 0 && m_ReaderData.empty() && !m_FuturesQueue.empty())) {
1500 return;
1501 }
1502 CRef< CByteSource > data = x_GetNextData();
1503 if (data.IsNull()) {
1504 m_EndOfData = true;
1505 return;
1506 }
1507
1508 #if 0
1509 // for reference / profiling: clearing the garbage-queue
1510 // from this thread will make the processing considerably slower.
1511 // Instead, we'll pass the garbage to the async call below.
1512 if(false) {
1513 TObjectsQueue dummy;
1514 swap(m_GarbageQueue, dummy);
1515 }
1516 #endif
1517
1518 // launch async task to deserialize objects
1519 // from the skipped bytes in delay-buffer, and
1520 // also pass the garbage queue for destruction
1521 // of contents.
1522 // note: we can't move m_GarbageQueue directly
1523 // as it lacks ::clear() method that could restore
1524 // it to a valid empty state after move.
1525
1526 TObjectsQueue tmp_garbage_queue;
1527 swap(m_GarbageQueue, tmp_garbage_queue);
1528
1529 m_FuturesQueue.push( async( m_Policy, m_Parser,
1530 data, m_Istr->GetDataFormat(), m_Params, move(tmp_garbage_queue)));
1531 }
1532
1533 template<typename TRoot>
1534 CRef< CByteSource >
x_GetNextData(void)1535 CObjectIStreamAsyncIterator<TRoot>::CData::x_GetNextData(void)
1536 {
1537 if (m_MaxRawSize == 0) {
1538 // read raw data in this (main) thread
1539 if (m_Istr->EndOfData()) {
1540 return CRef< CByteSource >();
1541 }
1542 const CNcbiStreampos endpos =
1543 m_Istr->GetStreamPos() + (CNcbiStreampos)(m_RawBufferSize);
1544 CStreamDelayBufferGuard guard(*m_Istr);
1545 do {
1546 m_Istr->SkipAnyContentObject();
1547 } while( !m_Istr->EndOfData() && m_Istr->GetStreamPos() < endpos);
1548 return guard.EndDelayBuffer();
1549 }
1550
1551 // get raw data prepared by reader
1552 unique_lock<mutex> lck(m_ReaderMutex);
1553 while (m_ReaderData.empty()) {
1554 m_ReaderCv.wait(lck);
1555 }
1556 CRef< CByteSource > data = m_ReaderData.front();
1557 m_ReaderData.pop();
1558 m_CurrentRawSize -= m_ReaderDataSize.front();
1559 m_ReaderDataSize.pop();
1560 m_ReaderCv.notify_one();
1561 return data;
1562 }
1563
1564 template<typename TRoot>
1565 void
x_ReaderThread(void)1566 CObjectIStreamAsyncIterator<TRoot>::CData::x_ReaderThread(void)
1567 {
1568 // Skip over some objects in stream without parsing, up to buffer_size.
1569 while (!m_Istr->EndOfData()) {
1570 const CNcbiStreampos startpos = m_Istr->GetStreamPos();
1571 const CNcbiStreampos endpos =
1572 startpos + (CNcbiStreampos)(m_RawBufferSize);
1573
1574 CStreamDelayBufferGuard guard(*(m_Istr));
1575 try {
1576 do {
1577 m_Istr->SkipAnyContentObject();
1578 } while( !m_Istr->EndOfData() && m_Istr->GetStreamPos() < endpos);
1579 } catch (...) {
1580 }
1581
1582 size_t this_buffer_size = m_Istr->GetStreamPos() - startpos;
1583 CRef< CByteSource > data = guard.EndDelayBuffer();
1584 {
1585 unique_lock<mutex> lck(m_ReaderMutex);
1586 // make sure we do not consume too much memory
1587 while (!m_EndOfData && m_CurrentRawSize >= m_MaxRawSize) {
1588 m_ReaderCv.wait(lck);
1589 }
1590 if (m_EndOfData) {
1591 break;
1592 }
1593 m_ReaderData.push( data);
1594 m_ReaderDataSize.push( this_buffer_size);
1595 m_CurrentRawSize += this_buffer_size;
1596 m_ReaderCv.notify_one();
1597 }
1598 }
1599 CRef< CByteSource > data;
1600 m_ReaderMutex.lock();
1601 m_ReaderData.push( data);
1602 m_ReaderDataSize.push(0);
1603 m_ReaderMutex.unlock();
1604 m_ReaderCv.notify_one();
1605 }
1606
1607
1608 /////////////////////////////////////////////////////////////////////////////
1609 /// CObjectIStreamAsyncIterator<TRoot,TChild> implementation
1610
1611 template<typename TRoot, typename TChild>
CObjectIStreamAsyncIterator(void)1612 CObjectIStreamAsyncIterator<TRoot, TChild>::CObjectIStreamAsyncIterator(void)
1613 : CParent()
1614 {
1615 }
1616
1617 template<typename TRoot, typename TChild>
CObjectIStreamAsyncIterator(CObjectIStream & istr,EOwnership deleteInStream,const CParams & params)1618 CObjectIStreamAsyncIterator<TRoot, TChild>::CObjectIStreamAsyncIterator(
1619 CObjectIStream& istr, EOwnership deleteInStream,
1620 const CParams& params)
1621 : CParent(istr, deleteInStream,
1622 &CObjectIStreamAsyncIterator<TRoot, TChild>::sx_ClearGarbageAndParse, params)
1623 {
1624 }
1625
1626 template<typename TRoot, typename TChild>
CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator & v)1627 CObjectIStreamAsyncIterator<TRoot, TChild>::CObjectIStreamAsyncIterator(
1628 const CObjectIStreamAsyncIterator& v)
1629 : CParent(v)
1630 {
1631 }
1632
1633 template<typename TRoot, typename TChild>
1634 CObjectIStreamAsyncIterator<TRoot, TChild>&
operator =(const CObjectIStreamAsyncIterator & v)1635 CObjectIStreamAsyncIterator<TRoot, TChild>::operator=(
1636 const CObjectIStreamAsyncIterator& v) {
1637 CParent::operator=(v);
1638 return *this;
1639 }
1640
1641 template<typename TRoot, typename TChild>
~CObjectIStreamAsyncIterator()1642 CObjectIStreamAsyncIterator<TRoot, TChild>::~CObjectIStreamAsyncIterator() {
1643 }
1644
1645 template<typename TRoot, typename TChild>
1646 CObjectIStreamAsyncIterator<TRoot, TChild>&
operator ++()1647 CObjectIStreamAsyncIterator<TRoot, TChild>::operator++() {
1648 CParent::operator++();
1649 return *this;
1650 }
1651
1652 template<typename TRoot, typename TChild>
1653 CObjectIStreamAsyncIterator<TRoot, TChild>&
begin(void)1654 CObjectIStreamAsyncIterator<TRoot, TChild>::begin(void) {
1655 return *this;
1656 }
1657
1658 template<typename TRoot, typename TChild>
1659 CObjectIStreamAsyncIterator<TRoot, TChild>
end(void)1660 CObjectIStreamAsyncIterator<TRoot, TChild>::end(void) {
1661 return CObjectIStreamAsyncIterator<TRoot, TChild>();
1662 }
1663
1664 template<typename TRoot, typename TChild>
1665 typename CObjectIStreamAsyncIterator<TRoot, TChild>::TObjectsQueue
sx_ClearGarbageAndParse(CRef<CByteSource> bytesource,ESerialDataFormat format,const CParams & params,TObjectsQueue garbage)1666 CObjectIStreamAsyncIterator<TRoot, TChild>::sx_ClearGarbageAndParse(
1667 CRef<CByteSource> bytesource,
1668 ESerialDataFormat format,
1669 const CParams& params,
1670 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1671 TObjectsQueue garbage
1672 #else
1673 TObjectsQueue&& garbage
1674 #endif
1675 )
1676 {
1677 {{
1678 TObjectsQueue dummy;
1679 swap(garbage, dummy);
1680 // garbage now gets destroyed, if last-reference
1681 }}
1682
1683 // deserialize objects from bytesource
1684 unique_ptr<CObjectIStream> istr { CObjectIStream::Create(format, *bytesource) };
1685 TObjectsQueue queue;
1686 for (TChild& object : CObjectIStreamIterator<TRoot, TChild>( *istr, eNoOwnership, params)) {
1687 queue.push( CRef<TChild>(&object));
1688 }
1689 return queue;
1690 }
1691
1692
1693
1694
1695 /////////////////////////////////////////////////////////////////////////////
1696 /////////////////////////////////////////////////////////////////////////////
1697 // Iterate over objects in input stream
1698 //
1699 // IMPORTANT: the following API requires multi-threading
1700 //
1701 // IMPORTANT: this API is deprecated, use CObjectIStreamIterator instead (defined above)
1702
1703
1704 template<typename TRoot, typename TObject>
1705 class CIStreamIteratorThread_Base;
1706
1707 // Helper hook class
1708 template<typename TRoot, typename TObject>
1709 class CIStreamObjectHook : public CSerial_FilterObjectsHook<TObject>
1710 {
1711 public:
CIStreamObjectHook(CIStreamIteratorThread_Base<TRoot,TObject> & thr)1712 CIStreamObjectHook(CIStreamIteratorThread_Base<TRoot,TObject>& thr)
1713 : m_Reader(thr)
1714 {
1715 }
1716 virtual void Process(const TObject& obj) override;
1717 private:
1718 CIStreamIteratorThread_Base<TRoot,TObject>& m_Reader;
1719 };
1720
1721 // Helper thread class
1722 template<typename TRoot, typename TObject>
1723 class CIStreamIteratorThread_Base : public CThread
1724 {
1725 public:
CIStreamIteratorThread_Base(CObjectIStream & in,EOwnership deleteInStream)1726 CIStreamIteratorThread_Base(CObjectIStream& in, EOwnership deleteInStream)
1727 : m_In(in), m_Resume(0,1), m_Ready(0,1), m_Obj(0),
1728 m_Ownership(deleteInStream), m_Stop(false), m_Failed(false)
1729 {
1730 }
1731 // Resume thread, wait for the next object
Next(void)1732 void Next(void)
1733 {
1734 m_Obj = 0;
1735 if (!m_Stop && !m_In.EndOfData()) {
1736 m_Resume.Post();
1737 m_Ready.Wait();
1738 if (m_Failed) {
1739 NCBI_THROW(CSerialException,eFail,
1740 "invalid data object received");
1741 }
1742 }
1743 }
1744 // Request stop: thread is no longer needed
Stop(void)1745 void Stop(void)
1746 {
1747 m_Stop = true;
1748 m_Resume.Post();
1749 Join(0);
1750 }
Fail(void)1751 void Fail(void)
1752 {
1753 m_Failed = true;
1754 SetObject(0);
1755 }
1756 // Object is ready: suspend thread
SetObject(const TObject * obj)1757 void SetObject(const TObject* obj)
1758 {
1759 m_Obj = obj;
1760 m_Ready.Post();
1761 m_Resume.Wait();
1762 if (m_Stop) {
1763 Exit(0);
1764 }
1765 }
GetObject(void) const1766 const TObject* GetObject(void) const
1767 {
1768 return m_Obj;
1769 }
1770 protected:
~CIStreamIteratorThread_Base(void)1771 ~CIStreamIteratorThread_Base(void)
1772 {
1773 if (m_Ownership == eTakeOwnership) {
1774 delete &m_In;
1775 }
1776 }
Main(void)1777 virtual void* Main(void)
1778 {
1779 return 0;
1780 }
1781 protected:
1782 CObjectIStream& m_In;
1783 CSemaphore m_Resume;
1784 CSemaphore m_Ready;
1785 const TObject* m_Obj;
1786 EOwnership m_Ownership;
1787 bool m_Stop;
1788 bool m_Failed;
1789 };
1790
1791 // Reading thread for serial objects
1792 template<typename TRoot, typename TObject>
1793 class CIStreamObjectIteratorThread
1794 : public CIStreamIteratorThread_Base< TRoot,TObject >
1795 {
1796 public:
CIStreamObjectIteratorThread(CObjectIStream & in,EOwnership deleteInStream)1797 CIStreamObjectIteratorThread(CObjectIStream& in, EOwnership deleteInStream)
1798 : CIStreamIteratorThread_Base< TRoot,TObject >(in, deleteInStream)
1799 {
1800 }
1801 protected:
~CIStreamObjectIteratorThread(void)1802 ~CIStreamObjectIteratorThread(void)
1803 {
1804 }
Main(void)1805 virtual void* Main(void) override
1806 {
1807 this->m_Resume.Wait();
1808 // Enumerate objects of requested type
1809 try {
1810 Serial_FilterObjects< TRoot >( this->m_In,
1811 new CIStreamObjectHook< TRoot, TObject >(*this));
1812 this->SetObject(0);
1813 } catch (CException& e) {
1814 NCBI_REPORT_EXCEPTION("In CIStreamObjectIteratorThread",e);
1815 this->Fail();
1816 }
1817 return 0;
1818 }
1819 };
1820
1821 // Reading thread for std objects
1822 template<typename TRoot, typename TObject>
1823 class CIStreamStdIteratorThread
1824 : public CIStreamIteratorThread_Base< TRoot,TObject >
1825 {
1826 public:
CIStreamStdIteratorThread(CObjectIStream & in,EOwnership deleteInStream)1827 CIStreamStdIteratorThread(CObjectIStream& in, EOwnership deleteInStream)
1828 : CIStreamIteratorThread_Base< TRoot,TObject >(in, deleteInStream)
1829 {
1830 }
1831 protected:
~CIStreamStdIteratorThread(void)1832 ~CIStreamStdIteratorThread(void)
1833 {
1834 }
Main(void)1835 virtual void* Main(void) override
1836 {
1837 this->m_Resume.Wait();
1838 // Enumerate objects of requested type
1839 try {
1840 Serial_FilterStdObjects< TRoot >( this->m_In,
1841 new CIStreamObjectHook< TRoot, TObject >(*this));
1842 this->SetObject(0);
1843 } catch (CException& e) {
1844 NCBI_REPORT_EXCEPTION("In CIStreamStdIteratorThread",e);
1845 this->Fail();
1846 }
1847 return 0;
1848 }
1849 };
1850
1851 template<typename TRoot, typename TObject>
1852 inline
Process(const TObject & obj)1853 void CIStreamObjectHook<TRoot,TObject>::Process(const TObject& obj)
1854 {
1855 m_Reader.SetObject(&obj);
1856 }
1857
1858 // Stream iterator base class
1859 template<typename TRoot, typename TObject>
1860 class CIStreamIterator_Base
1861 {
1862 public:
operator ++()1863 void operator++()
1864 {
1865 m_Reader->Next();
1866 }
operator ++(int)1867 void operator++(int)
1868 {
1869 m_Reader->Next();
1870 }
operator *(void) const1871 const TObject& operator* (void) const
1872 {
1873 return *(m_Reader->GetObject());
1874 }
operator ->(void) const1875 const TObject* operator-> (void) const
1876 {
1877 return m_Reader->GetObject();
1878 }
IsValid(void) const1879 bool IsValid(void) const
1880 {
1881 return m_Reader->GetObject() != 0;
1882 }
1883 protected:
CIStreamIterator_Base()1884 CIStreamIterator_Base()
1885 : m_Reader(nullptr)
1886 {
1887 }
~CIStreamIterator_Base(void)1888 ~CIStreamIterator_Base(void)
1889 {
1890 if (m_Reader) {
1891 m_Reader->Stop();
1892 }
1893 }
1894 private:
1895 // prohibit copy
1896 CIStreamIterator_Base(const CIStreamIterator_Base<TRoot,TObject>& v);
1897 // prohibit assignment
1898 CIStreamIterator_Base<TRoot,TObject>& operator=(
1899 const CIStreamIterator_Base<TRoot,TObject>& v);
1900
1901 protected:
1902 CIStreamIteratorThread_Base< TRoot, TObject > *m_Reader;
1903 };
1904
1905 /// Stream iterator for serial objects
1906 ///
1907 /// Usage:
1908 /// CObjectIStream* is = CObjectIStream::Open(...);
1909 /// CIStreamObjectIterator<CRootClass,CObjectClass> i(*is);
1910 /// for ( ; i.IsValid(); ++i) {
1911 /// const CObjectClass& obj = *i;
1912 /// ...
1913 /// }
1914 /// IMPORTANT:
1915 /// This API requires multi-threading!
1916
1917 template<typename TRoot, typename TObject>
1918 class CIStreamObjectIterator
1919 : public CIStreamIterator_Base< TRoot, TObject>
1920 {
1921 public:
CIStreamObjectIterator(CObjectIStream & in,EOwnership deleteInStream=eNoOwnership)1922 CIStreamObjectIterator(CObjectIStream& in, EOwnership deleteInStream = eNoOwnership)
1923 {
1924 // Create reading thread, wait until it finds the next object
1925 this->m_Reader =
1926 new CIStreamObjectIteratorThread< TRoot, TObject >(in, deleteInStream);
1927 this->m_Reader->Run();
1928 this->m_Reader->Next();
1929 }
~CIStreamObjectIterator(void)1930 ~CIStreamObjectIterator(void)
1931 {
1932 }
1933 };
1934
1935 /// Stream iterator for standard type objects
1936 ///
1937 /// Usage:
1938 /// CObjectIStream* is = CObjectIStream::Open(...);
1939 /// CIStreamStdIterator<CRootClass,string> i(*is);
1940 /// for ( ; i.IsValid(); ++i) {
1941 /// const string& obj = *i;
1942 /// ...
1943 /// }
1944 /// IMPORTANT:
1945 /// This API requires multi-threading!
1946
1947 template<typename TRoot, typename TObject>
1948 class CIStreamStdIterator
1949 : public CIStreamIterator_Base< TRoot, TObject>
1950 {
1951 public:
CIStreamStdIterator(CObjectIStream & in,EOwnership deleteInStream=eNoOwnership)1952 CIStreamStdIterator(CObjectIStream& in, EOwnership deleteInStream = eNoOwnership)
1953 {
1954 // Create reading thread, wait until it finds the next object
1955 this->m_Reader =
1956 new CIStreamStdIteratorThread< TRoot, TObject >(in,deleteInStream);
1957 this->m_Reader->Run();
1958 this->m_Reader->Next();
1959 }
~CIStreamStdIterator(void)1960 ~CIStreamStdIterator(void)
1961 {
1962 }
1963 };
1964
1965 #endif // NCBI_THREADS
1966
1967
1968 /* @} */
1969
1970 END_NCBI_SCOPE
1971
1972 #endif /* STREAMITER__HPP */
1973